refactor: improve error handling in DataProcessor and optimize data parsing logic

This commit is contained in:
UncleCode
2024-12-03 19:44:38 +08:00
parent 95a4f74d2a
commit e9639ad189

View File

@@ -7,6 +7,7 @@ from pathlib import Path
from typing import Optional, List, Union from typing import Optional, List, Union
import json import json
import asyncio import asyncio
from contextlib import nullcontext
from .models import CrawlResult, MarkdownGenerationResult from .models import CrawlResult, MarkdownGenerationResult
from .async_database import async_db_manager from .async_database import async_db_manager
from .chunking_strategy import * from .chunking_strategy import *
@@ -67,6 +68,7 @@ class AsyncWebCrawler:
always_bypass_cache: bool = False, always_bypass_cache: bool = False,
always_by_pass_cache: Optional[bool] = None, # Deprecated parameter always_by_pass_cache: Optional[bool] = None, # Deprecated parameter
base_directory: str = str(os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())), base_directory: str = str(os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())),
thread_safe: bool = False,
**kwargs, **kwargs,
): ):
""" """
@@ -104,6 +106,8 @@ class AsyncWebCrawler:
else: else:
self.always_bypass_cache = always_bypass_cache self.always_bypass_cache = always_bypass_cache
self._lock = asyncio.Lock() if thread_safe else None
self.crawl4ai_folder = os.path.join(base_directory, ".crawl4ai") self.crawl4ai_folder = os.path.join(base_directory, ".crawl4ai")
os.makedirs(self.crawl4ai_folder, exist_ok=True) os.makedirs(self.crawl4ai_folder, exist_ok=True)
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True) os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True)
@@ -178,169 +182,170 @@ class AsyncWebCrawler:
Returns: Returns:
CrawlResult: The result of crawling and processing CrawlResult: The result of crawling and processing
""" """
try: async with self._lock or nullcontext():
# Handle deprecated parameters try:
if any([bypass_cache, disable_cache, no_cache_read, no_cache_write]): # Handle deprecated parameters
if kwargs.get("warning", True): if any([bypass_cache, disable_cache, no_cache_read, no_cache_write]):
warnings.warn( if kwargs.get("warning", True):
"Cache control boolean flags are deprecated and will be removed in version X.X.X. " warnings.warn(
"Use 'cache_mode' parameter instead. Examples:\n" "Cache control boolean flags are deprecated and will be removed in version X.X.X. "
"- For bypass_cache=True, use cache_mode=CacheMode.BYPASS\n" "Use 'cache_mode' parameter instead. Examples:\n"
"- For disable_cache=True, use cache_mode=CacheMode.DISABLED\n" "- For bypass_cache=True, use cache_mode=CacheMode.BYPASS\n"
"- For no_cache_read=True, use cache_mode=CacheMode.WRITE_ONLY\n" "- For disable_cache=True, use cache_mode=CacheMode.DISABLED\n"
"- For no_cache_write=True, use cache_mode=CacheMode.READ_ONLY\n" "- For no_cache_read=True, use cache_mode=CacheMode.WRITE_ONLY\n"
"Pass warning=False to suppress this warning.", "- For no_cache_write=True, use cache_mode=CacheMode.READ_ONLY\n"
DeprecationWarning, "Pass warning=False to suppress this warning.",
stacklevel=2 DeprecationWarning,
) stacklevel=2
)
# Convert legacy parameters if cache_mode not provided
if cache_mode is None:
cache_mode = _legacy_to_cache_mode(
disable_cache=disable_cache,
bypass_cache=bypass_cache,
no_cache_read=no_cache_read,
no_cache_write=no_cache_write
)
# Convert legacy parameters if cache_mode not provided # Default to ENABLED if no cache mode specified
if cache_mode is None: if cache_mode is None:
cache_mode = _legacy_to_cache_mode( cache_mode = CacheMode.ENABLED
disable_cache=disable_cache,
bypass_cache=bypass_cache, # Create cache context
no_cache_read=no_cache_read, cache_context = CacheContext(url, cache_mode, self.always_bypass_cache)
no_cache_write=no_cache_write
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
if not isinstance(extraction_strategy, ExtractionStrategy):
raise ValueError("Unsupported extraction strategy")
if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy")
word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD)
async_response: AsyncCrawlResponse = None
cached_result = None
screenshot_data = None
extracted_content = None
start_time = time.perf_counter()
# Try to get cached result if appropriate
if cache_context.should_read():
cached_result = await async_db_manager.aget_cached_url(url)
if cached_result:
html = sanitize_input_encode(cached_result.html)
extracted_content = sanitize_input_encode(cached_result.extracted_content or "")
if screenshot:
screenshot_data = cached_result.screenshot
if not screenshot_data:
cached_result = None
# if verbose:
# print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Cache hit for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {time.perf_counter() - start_time:.2f}s")
self.logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=time.perf_counter() - start_time,
tag="FETCH"
)
# Fetch fresh content if needed
if not cached_result or not html:
t1 = time.perf_counter()
if user_agent:
self.crawler_strategy.update_user_agent(user_agent)
async_response: AsyncCrawlResponse = await self.crawler_strategy.crawl(
url,
screenshot=screenshot,
**kwargs
) )
html = sanitize_input_encode(async_response.html)
# Default to ENABLED if no cache mode specified screenshot_data = async_response.screenshot
if cache_mode is None: t2 = time.perf_counter()
cache_mode = CacheMode.ENABLED self.logger.url_status(
# Create cache context
cache_context = CacheContext(url, cache_mode, self.always_bypass_cache)
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
if not isinstance(extraction_strategy, ExtractionStrategy):
raise ValueError("Unsupported extraction strategy")
if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy")
word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD)
async_response: AsyncCrawlResponse = None
cached_result = None
screenshot_data = None
extracted_content = None
start_time = time.perf_counter()
# Try to get cached result if appropriate
if cache_context.should_read():
cached_result = await async_db_manager.aget_cached_url(url)
if cached_result:
html = sanitize_input_encode(cached_result.html)
extracted_content = sanitize_input_encode(cached_result.extracted_content or "")
if screenshot:
screenshot_data = cached_result.screenshot
if not screenshot_data:
cached_result = None
# if verbose:
# print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Cache hit for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {time.perf_counter() - start_time:.2f}s")
self.logger.url_status(
url=cache_context.display_url, url=cache_context.display_url,
success=bool(html), success=bool(html),
timing=time.perf_counter() - start_time, timing=t2 - t1,
tag="FETCH" tag="FETCH"
) )
# if verbose:
# print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Live fetch for {cache_context.display_url}... | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {t2 - t1:.2f}s")
# Process the HTML content
# Fetch fresh content if needed crawl_result = await self.aprocess_html(
if not cached_result or not html: url=url,
t1 = time.perf_counter() html=html,
extracted_content=extracted_content,
word_count_threshold=word_count_threshold,
extraction_strategy=extraction_strategy,
chunking_strategy=chunking_strategy,
content_filter=content_filter,
css_selector=css_selector,
screenshot=screenshot_data,
verbose=verbose,
is_cached=bool(cached_result),
async_response=async_response,
is_web_url=cache_context.is_web_url,
is_local_file=cache_context.is_local_file,
is_raw_html=cache_context.is_raw_html,
**kwargs,
)
if user_agent: # Set response data
self.crawler_strategy.update_user_agent(user_agent) if async_response:
async_response: AsyncCrawlResponse = await self.crawler_strategy.crawl( crawl_result.status_code = async_response.status_code
url, crawl_result.response_headers = async_response.response_headers
screenshot=screenshot, crawl_result.downloaded_files = async_response.downloaded_files
**kwargs else:
) crawl_result.status_code = 200
html = sanitize_input_encode(async_response.html) crawl_result.response_headers = cached_result.response_headers if cached_result else {}
screenshot_data = async_response.screenshot
t2 = time.perf_counter() crawl_result.success = bool(html)
self.logger.url_status( crawl_result.session_id = kwargs.get("session_id", None)
url=cache_context.display_url,
success=bool(html),
timing=t2 - t1,
tag="FETCH"
)
# if verbose: # if verbose:
# print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Live fetch for {cache_context.display_url}... | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {t2 - t1:.2f}s") # print(f"{Fore.GREEN}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | Status: {Fore.GREEN if crawl_result.success else Fore.RED}{crawl_result.success} | {Fore.YELLOW}Total: {time.perf_counter() - start_time:.2f}s{Style.RESET_ALL}")
self.logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": crawl_result.success,
"timing": f"{time.perf_counter() - start_time:.2f}s"
},
colors={
"status": Fore.GREEN if crawl_result.success else Fore.RED,
"timing": Fore.YELLOW
}
)
# Process the HTML content # Update cache if appropriate
crawl_result = await self.aprocess_html( if cache_context.should_write() and not bool(cached_result):
url=url, await async_db_manager.acache_url(crawl_result)
html=html,
extracted_content=extracted_content, return crawl_result
word_count_threshold=word_count_threshold,
extraction_strategy=extraction_strategy,
chunking_strategy=chunking_strategy,
content_filter=content_filter,
css_selector=css_selector,
screenshot=screenshot_data,
verbose=verbose,
is_cached=bool(cached_result),
async_response=async_response,
is_web_url=cache_context.is_web_url,
is_local_file=cache_context.is_local_file,
is_raw_html=cache_context.is_raw_html,
**kwargs,
)
# Set response data except Exception as e:
if async_response: if not hasattr(e, "msg"):
crawl_result.status_code = async_response.status_code e.msg = str(e)
crawl_result.response_headers = async_response.response_headers # print(f"{Fore.RED}{self.tag_format('ERROR')} {self.log_icons['ERROR']} Failed to crawl {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | {e.msg}{Style.RESET_ALL}")
crawl_result.downloaded_files = async_response.downloaded_files
else: self.logger.error_status(
crawl_result.status_code = 200 url=cache_context.display_url,
crawl_result.response_headers = cached_result.response_headers if cached_result else {} error=create_box_message(e.msg, type = "error"),
tag="ERROR"
crawl_result.success = bool(html) )
crawl_result.session_id = kwargs.get("session_id", None) return CrawlResult(
url=url,
# if verbose: html="",
# print(f"{Fore.GREEN}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | Status: {Fore.GREEN if crawl_result.success else Fore.RED}{crawl_result.success} | {Fore.YELLOW}Total: {time.perf_counter() - start_time:.2f}s{Style.RESET_ALL}") success=False,
self.logger.success( error_message=e.msg
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": crawl_result.success,
"timing": f"{time.perf_counter() - start_time:.2f}s"
},
colors={
"status": Fore.GREEN if crawl_result.success else Fore.RED,
"timing": Fore.YELLOW
}
) )
# Update cache if appropriate
if cache_context.should_write() and not bool(cached_result):
await async_db_manager.acache_url(crawl_result)
return crawl_result
except Exception as e:
if not hasattr(e, "msg"):
e.msg = str(e)
# print(f"{Fore.RED}{self.tag_format('ERROR')} {self.log_icons['ERROR']} Failed to crawl {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | {e.msg}{Style.RESET_ALL}")
self.logger.error_status(
url=cache_context.display_url,
error=create_box_message(e.msg, type = "error"),
tag="ERROR"
)
return CrawlResult(
url=url,
html="",
success=False,
error_message=e.msg
)
async def arun_many( async def arun_many(
self, self,
urls: List[str], urls: List[str],