diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 8db69333..2c17602d 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import Optional, List, Union import json import asyncio +from contextlib import nullcontext from .models import CrawlResult, MarkdownGenerationResult from .async_database import async_db_manager from .chunking_strategy import * @@ -67,6 +68,7 @@ class AsyncWebCrawler: always_bypass_cache: bool = False, always_by_pass_cache: Optional[bool] = None, # Deprecated parameter base_directory: str = str(os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())), + thread_safe: bool = False, **kwargs, ): """ @@ -104,6 +106,8 @@ class AsyncWebCrawler: else: self.always_bypass_cache = always_bypass_cache + self._lock = asyncio.Lock() if thread_safe else None + self.crawl4ai_folder = os.path.join(base_directory, ".crawl4ai") os.makedirs(self.crawl4ai_folder, exist_ok=True) os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True) @@ -178,169 +182,170 @@ class AsyncWebCrawler: Returns: CrawlResult: The result of crawling and processing """ - try: - # Handle deprecated parameters - if any([bypass_cache, disable_cache, no_cache_read, no_cache_write]): - if kwargs.get("warning", True): - warnings.warn( - "Cache control boolean flags are deprecated and will be removed in version X.X.X. " - "Use 'cache_mode' parameter instead. Examples:\n" - "- For bypass_cache=True, use cache_mode=CacheMode.BYPASS\n" - "- For disable_cache=True, use cache_mode=CacheMode.DISABLED\n" - "- For no_cache_read=True, use cache_mode=CacheMode.WRITE_ONLY\n" - "- For no_cache_write=True, use cache_mode=CacheMode.READ_ONLY\n" - "Pass warning=False to suppress this warning.", - DeprecationWarning, - stacklevel=2 - ) + async with self._lock or nullcontext(): + try: + # Handle deprecated parameters + if any([bypass_cache, disable_cache, no_cache_read, no_cache_write]): + if kwargs.get("warning", True): + warnings.warn( + "Cache control boolean flags are deprecated and will be removed in version X.X.X. " + "Use 'cache_mode' parameter instead. Examples:\n" + "- For bypass_cache=True, use cache_mode=CacheMode.BYPASS\n" + "- For disable_cache=True, use cache_mode=CacheMode.DISABLED\n" + "- For no_cache_read=True, use cache_mode=CacheMode.WRITE_ONLY\n" + "- For no_cache_write=True, use cache_mode=CacheMode.READ_ONLY\n" + "Pass warning=False to suppress this warning.", + DeprecationWarning, + stacklevel=2 + ) + + # Convert legacy parameters if cache_mode not provided + if cache_mode is None: + cache_mode = _legacy_to_cache_mode( + disable_cache=disable_cache, + bypass_cache=bypass_cache, + no_cache_read=no_cache_read, + no_cache_write=no_cache_write + ) - # Convert legacy parameters if cache_mode not provided + # Default to ENABLED if no cache mode specified if cache_mode is None: - cache_mode = _legacy_to_cache_mode( - disable_cache=disable_cache, - bypass_cache=bypass_cache, - no_cache_read=no_cache_read, - no_cache_write=no_cache_write + cache_mode = CacheMode.ENABLED + + # Create cache context + cache_context = CacheContext(url, cache_mode, self.always_bypass_cache) + + extraction_strategy = extraction_strategy or NoExtractionStrategy() + extraction_strategy.verbose = verbose + if not isinstance(extraction_strategy, ExtractionStrategy): + raise ValueError("Unsupported extraction strategy") + if not isinstance(chunking_strategy, ChunkingStrategy): + raise ValueError("Unsupported chunking strategy") + + word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD) + + async_response: AsyncCrawlResponse = None + cached_result = None + screenshot_data = None + extracted_content = None + + start_time = time.perf_counter() + + # Try to get cached result if appropriate + if cache_context.should_read(): + cached_result = await async_db_manager.aget_cached_url(url) + + if cached_result: + html = sanitize_input_encode(cached_result.html) + extracted_content = sanitize_input_encode(cached_result.extracted_content or "") + if screenshot: + screenshot_data = cached_result.screenshot + if not screenshot_data: + cached_result = None + # if verbose: + # print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Cache hit for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {time.perf_counter() - start_time:.2f}s") + self.logger.url_status( + url=cache_context.display_url, + success=bool(html), + timing=time.perf_counter() - start_time, + tag="FETCH" + ) + + + # Fetch fresh content if needed + if not cached_result or not html: + t1 = time.perf_counter() + + if user_agent: + self.crawler_strategy.update_user_agent(user_agent) + async_response: AsyncCrawlResponse = await self.crawler_strategy.crawl( + url, + screenshot=screenshot, + **kwargs ) - - # Default to ENABLED if no cache mode specified - if cache_mode is None: - cache_mode = CacheMode.ENABLED - - # Create cache context - cache_context = CacheContext(url, cache_mode, self.always_bypass_cache) - - extraction_strategy = extraction_strategy or NoExtractionStrategy() - extraction_strategy.verbose = verbose - if not isinstance(extraction_strategy, ExtractionStrategy): - raise ValueError("Unsupported extraction strategy") - if not isinstance(chunking_strategy, ChunkingStrategy): - raise ValueError("Unsupported chunking strategy") - - word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD) - - async_response: AsyncCrawlResponse = None - cached_result = None - screenshot_data = None - extracted_content = None - - start_time = time.perf_counter() - - # Try to get cached result if appropriate - if cache_context.should_read(): - cached_result = await async_db_manager.aget_cached_url(url) - - if cached_result: - html = sanitize_input_encode(cached_result.html) - extracted_content = sanitize_input_encode(cached_result.extracted_content or "") - if screenshot: - screenshot_data = cached_result.screenshot - if not screenshot_data: - cached_result = None - # if verbose: - # print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Cache hit for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {time.perf_counter() - start_time:.2f}s") - self.logger.url_status( + html = sanitize_input_encode(async_response.html) + screenshot_data = async_response.screenshot + t2 = time.perf_counter() + self.logger.url_status( url=cache_context.display_url, success=bool(html), - timing=time.perf_counter() - start_time, + timing=t2 - t1, tag="FETCH" - ) + ) + # if verbose: + # print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Live fetch for {cache_context.display_url}... | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {t2 - t1:.2f}s") - - # Fetch fresh content if needed - if not cached_result or not html: - t1 = time.perf_counter() + # Process the HTML content + crawl_result = await self.aprocess_html( + url=url, + html=html, + extracted_content=extracted_content, + word_count_threshold=word_count_threshold, + extraction_strategy=extraction_strategy, + chunking_strategy=chunking_strategy, + content_filter=content_filter, + css_selector=css_selector, + screenshot=screenshot_data, + verbose=verbose, + is_cached=bool(cached_result), + async_response=async_response, + is_web_url=cache_context.is_web_url, + is_local_file=cache_context.is_local_file, + is_raw_html=cache_context.is_raw_html, + **kwargs, + ) - if user_agent: - self.crawler_strategy.update_user_agent(user_agent) - async_response: AsyncCrawlResponse = await self.crawler_strategy.crawl( - url, - screenshot=screenshot, - **kwargs - ) - html = sanitize_input_encode(async_response.html) - screenshot_data = async_response.screenshot - t2 = time.perf_counter() - self.logger.url_status( - url=cache_context.display_url, - success=bool(html), - timing=t2 - t1, - tag="FETCH" - ) + # Set response data + if async_response: + crawl_result.status_code = async_response.status_code + crawl_result.response_headers = async_response.response_headers + crawl_result.downloaded_files = async_response.downloaded_files + else: + crawl_result.status_code = 200 + crawl_result.response_headers = cached_result.response_headers if cached_result else {} + + crawl_result.success = bool(html) + crawl_result.session_id = kwargs.get("session_id", None) + # if verbose: - # print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Live fetch for {cache_context.display_url}... | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {t2 - t1:.2f}s") + # print(f"{Fore.GREEN}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | Status: {Fore.GREEN if crawl_result.success else Fore.RED}{crawl_result.success} | {Fore.YELLOW}Total: {time.perf_counter() - start_time:.2f}s{Style.RESET_ALL}") + self.logger.success( + message="{url:.50}... | Status: {status} | Total: {timing}", + tag="COMPLETE", + params={ + "url": cache_context.display_url, + "status": crawl_result.success, + "timing": f"{time.perf_counter() - start_time:.2f}s" + }, + colors={ + "status": Fore.GREEN if crawl_result.success else Fore.RED, + "timing": Fore.YELLOW + } + ) - # Process the HTML content - crawl_result = await self.aprocess_html( - url=url, - html=html, - extracted_content=extracted_content, - word_count_threshold=word_count_threshold, - extraction_strategy=extraction_strategy, - chunking_strategy=chunking_strategy, - content_filter=content_filter, - css_selector=css_selector, - screenshot=screenshot_data, - verbose=verbose, - is_cached=bool(cached_result), - async_response=async_response, - is_web_url=cache_context.is_web_url, - is_local_file=cache_context.is_local_file, - is_raw_html=cache_context.is_raw_html, - **kwargs, - ) + # Update cache if appropriate + if cache_context.should_write() and not bool(cached_result): + await async_db_manager.acache_url(crawl_result) + + return crawl_result - # Set response data - if async_response: - crawl_result.status_code = async_response.status_code - crawl_result.response_headers = async_response.response_headers - crawl_result.downloaded_files = async_response.downloaded_files - else: - crawl_result.status_code = 200 - crawl_result.response_headers = cached_result.response_headers if cached_result else {} - - crawl_result.success = bool(html) - crawl_result.session_id = kwargs.get("session_id", None) - - # if verbose: - # print(f"{Fore.GREEN}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | Status: {Fore.GREEN if crawl_result.success else Fore.RED}{crawl_result.success} | {Fore.YELLOW}Total: {time.perf_counter() - start_time:.2f}s{Style.RESET_ALL}") - self.logger.success( - message="{url:.50}... | Status: {status} | Total: {timing}", - tag="COMPLETE", - params={ - "url": cache_context.display_url, - "status": crawl_result.success, - "timing": f"{time.perf_counter() - start_time:.2f}s" - }, - colors={ - "status": Fore.GREEN if crawl_result.success else Fore.RED, - "timing": Fore.YELLOW - } + except Exception as e: + if not hasattr(e, "msg"): + e.msg = str(e) + # print(f"{Fore.RED}{self.tag_format('ERROR')} {self.log_icons['ERROR']} Failed to crawl {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | {e.msg}{Style.RESET_ALL}") + + self.logger.error_status( + url=cache_context.display_url, + error=create_box_message(e.msg, type = "error"), + tag="ERROR" + ) + return CrawlResult( + url=url, + html="", + success=False, + error_message=e.msg ) - # Update cache if appropriate - if cache_context.should_write() and not bool(cached_result): - await async_db_manager.acache_url(crawl_result) - - return crawl_result - - except Exception as e: - if not hasattr(e, "msg"): - e.msg = str(e) - # print(f"{Fore.RED}{self.tag_format('ERROR')} {self.log_icons['ERROR']} Failed to crawl {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | {e.msg}{Style.RESET_ALL}") - - self.logger.error_status( - url=cache_context.display_url, - error=create_box_message(e.msg, type = "error"), - tag="ERROR" - ) - return CrawlResult( - url=url, - html="", - success=False, - error_message=e.msg - ) - async def arun_many( self, urls: List[str],