fix: streamline url status logging via single entrypoint i.e. logger.url_status

This commit is contained in:
Aravind Karnam
2025-03-20 18:59:15 +05:30
parent eedda1ae5c
commit ac2f9ae533
2 changed files with 205 additions and 68 deletions

View File

@@ -10,12 +10,17 @@ import asyncio
# from contextlib import nullcontext, asynccontextmanager
from contextlib import asynccontextmanager
from .models import CrawlResult, MarkdownGenerationResult, DispatchResult, ScrapingResult
from .models import (
CrawlResult,
MarkdownGenerationResult,
DispatchResult,
ScrapingResult,
)
from .async_database import async_db_manager
from .chunking_strategy import * # noqa: F403
from .chunking_strategy import IdentityChunking
from .content_filter_strategy import * # noqa: F403
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import NoExtractionStrategy
from .async_crawler_strategy import (
AsyncCrawlerStrategy,
@@ -30,7 +35,7 @@ from .markdown_generation_strategy import (
from .deep_crawling import DeepCrawlDecorator
from .async_logger import AsyncLogger, AsyncLoggerBase
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
from .utils import (
@@ -44,9 +49,10 @@ from .utils import (
from typing import Union, AsyncGenerator
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
CrawlResultT = TypeVar("CrawlResultT", bound=CrawlResult)
# RunManyReturn = Union[CrawlResultT, List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
class CrawlResultContainer(Generic[CrawlResultT]):
def __init__(self, results: Union[CrawlResultT, List[CrawlResultT]]):
# Normalize to a list
@@ -68,20 +74,21 @@ class CrawlResultContainer(Generic[CrawlResultT]):
# Delegate attribute access to the first element.
if self._results:
return getattr(self._results[0], attr)
raise AttributeError(f"{self.__class__.__name__} object has no attribute '{attr}'")
raise AttributeError(
f"{self.__class__.__name__} object has no attribute '{attr}'"
)
def __repr__(self):
return f"{self.__class__.__name__}({self._results!r})"
# Redefine the union type. Now synchronous calls always return a container,
# while stream mode is handled with an AsyncGenerator.
RunManyReturn = Union[
CrawlResultContainer[CrawlResultT],
AsyncGenerator[CrawlResultT, None]
CrawlResultContainer[CrawlResultT], AsyncGenerator[CrawlResultT, None]
]
class AsyncWebCrawler:
"""
Asynchronous web crawler with flexible caching capabilities.
@@ -193,7 +200,7 @@ class AsyncWebCrawler:
# Decorate arun method with deep crawling capabilities
self._deep_handler = DeepCrawlDecorator(self)
self.arun = self._deep_handler(self.arun)
self.arun = self._deep_handler(self.arun)
async def start(self):
"""
@@ -210,26 +217,39 @@ class AsyncWebCrawler:
AsyncWebCrawler: The initialized crawler instance
"""
# Check for builtin browser if requested
if self.browser_config.browser_mode == "builtin" and not self.browser_config.cdp_url:
if (
self.browser_config.browser_mode == "builtin"
and not self.browser_config.cdp_url
):
# Import here to avoid circular imports
from .browser_profiler import BrowserProfiler
profiler = BrowserProfiler(logger=self.logger)
# Get builtin browser info or launch if needed
browser_info = profiler.get_builtin_browser_info()
if not browser_info:
self.logger.info("Builtin browser not found, launching new instance...", tag="BROWSER")
self.logger.info(
"Builtin browser not found, launching new instance...",
tag="BROWSER",
)
cdp_url = await profiler.launch_builtin_browser()
if not cdp_url:
self.logger.warning("Failed to launch builtin browser, falling back to dedicated browser", tag="BROWSER")
self.logger.warning(
"Failed to launch builtin browser, falling back to dedicated browser",
tag="BROWSER",
)
else:
self.browser_config.cdp_url = cdp_url
self.browser_config.use_managed_browser = True
else:
self.logger.info(f"Using existing builtin browser at {browser_info.get('cdp_url')}", tag="BROWSER")
self.browser_config.cdp_url = browser_info.get('cdp_url')
self.logger.info(
f"Using existing builtin browser at {browser_info.get('cdp_url')}",
tag="BROWSER",
)
self.browser_config.cdp_url = browser_info.get("cdp_url")
self.browser_config.use_managed_browser = True
await self.crawler_strategy.__aenter__()
await self.awarmup()
return self
@@ -305,7 +325,7 @@ class AsyncWebCrawler:
# Auto-start if not ready
if not self.ready:
await self.start()
config = config or CrawlerRunConfig()
if not isinstance(url, str) or not url:
raise ValueError("Invalid URL, make sure the URL is a non-empty string")
@@ -319,9 +339,7 @@ class AsyncWebCrawler:
config.cache_mode = CacheMode.ENABLED
# Create cache context
cache_context = CacheContext(
url, config.cache_mode, False
)
cache_context = CacheContext(url, config.cache_mode, False)
# Initialize processing variables
async_response: AsyncCrawlResponse = None
@@ -351,7 +369,7 @@ class AsyncWebCrawler:
# if config.screenshot and not screenshot or config.pdf and not pdf:
if config.screenshot and not screenshot_data:
cached_result = None
if config.pdf and not pdf_data:
cached_result = None
@@ -383,14 +401,18 @@ class AsyncWebCrawler:
# Check robots.txt if enabled
if config and config.check_robots_txt:
if not await self.robots_parser.can_fetch(url, self.browser_config.user_agent):
if not await self.robots_parser.can_fetch(
url, self.browser_config.user_agent
):
return CrawlResult(
url=url,
html="",
success=False,
status_code=403,
error_message="Access denied by robots.txt",
response_headers={"X-Robots-Status": "Blocked by robots.txt"}
response_headers={
"X-Robots-Status": "Blocked by robots.txt"
},
)
##############################
@@ -417,7 +439,7 @@ class AsyncWebCrawler:
###############################################################
# Process the HTML content, Call CrawlerStrategy.process_html #
###############################################################
crawl_result : CrawlResult = await self.aprocess_html(
crawl_result: CrawlResult = await self.aprocess_html(
url=url,
html=html,
extracted_content=extracted_content,
@@ -441,18 +463,11 @@ class AsyncWebCrawler:
crawl_result.success = bool(html)
crawl_result.session_id = getattr(config, "session_id", None)
self.logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
self.logger.url_status(
url=cache_context.display_url,
success=crawl_result.success,
timing=time.perf_counter() - start_time,
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": crawl_result.success,
"timing": f"{time.perf_counter() - start_time:.2f}s",
},
colors={
"status": Fore.GREEN if crawl_result.success else Fore.RED,
"timing": Fore.YELLOW,
},
)
# Update cache if appropriate
@@ -462,17 +477,12 @@ class AsyncWebCrawler:
return CrawlResultContainer(crawl_result)
else:
self.logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": True,
"timing": f"{time.perf_counter() - start_time:.2f}s",
},
colors={"status": Fore.GREEN, "timing": Fore.YELLOW},
self.logger.url_status(
url=cache_context.display_url,
success=True,
timing=time.perf_counter() - start_time,
tag="COMPLETE"
)
cached_result.success = bool(html)
cached_result.session_id = getattr(config, "session_id", None)
cached_result.redirected_url = cached_result.redirected_url or url
@@ -494,7 +504,7 @@ class AsyncWebCrawler:
tag="ERROR",
)
return CrawlResultContainer(
return CrawlResultContainer(
CrawlResult(
url=url, html="", success=False, error_message=error_message
)
@@ -539,15 +549,14 @@ class AsyncWebCrawler:
# Process HTML content
params = config.__dict__.copy()
params.pop("url", None)
params.pop("url", None)
# add keys from kwargs to params that doesn't exist in params
params.update({k: v for k, v in kwargs.items() if k not in params.keys()})
################################
# Scraping Strategy Execution #
################################
result : ScrapingResult = scraping_strategy.scrap(url, html, **params)
result: ScrapingResult = scraping_strategy.scrap(url, html, **params)
if result is None:
raise ValueError(
@@ -593,11 +602,17 @@ class AsyncWebCrawler:
)
# Log processing completion
self.logger.info(
message="{url:.50}... | Time: {timing}s",
self.logger.url_status(
url=_url,
success=True,
timing=int((time.perf_counter() - t1) * 1000) / 1000,
tag="SCRAPE",
params={"url": _url, "timing": int((time.perf_counter() - t1) * 1000) / 1000},
)
# self.logger.info(
# message="{url:.50}... | Time: {timing}s",
# tag="SCRAPE",
# params={"url": _url, "timing": int((time.perf_counter() - t1) * 1000) / 1000},
# )
################################
# Structured Content Extraction #
@@ -667,7 +682,7 @@ class AsyncWebCrawler:
async def arun_many(
self,
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
config: Optional[CrawlerRunConfig] = None,
dispatcher: Optional[BaseDispatcher] = None,
# Legacy parameters maintained for backwards compatibility
# word_count_threshold=MIN_WORD_THRESHOLD,
@@ -681,8 +696,8 @@ class AsyncWebCrawler:
# pdf: bool = False,
# user_agent: str = None,
# verbose=True,
**kwargs
) -> RunManyReturn:
**kwargs,
) -> RunManyReturn:
"""
Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy.
@@ -738,28 +753,35 @@ class AsyncWebCrawler:
def transform_result(task_result):
return (
setattr(task_result.result, 'dispatch_result',
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
peak_memory=task_result.peak_memory,
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
)
) or task_result.result
setattr(
task_result.result,
"dispatch_result",
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
peak_memory=task_result.peak_memory,
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
),
)
or task_result.result
)
stream = config.stream
if stream:
async def result_transformer():
async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config):
async for task_result in dispatcher.run_urls_stream(
crawler=self, urls=urls, config=config
):
yield transform_result(task_result)
return result_transformer()
else:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
return [transform_result(res) for res in _results]
async def aclear_cache(self):
"""Clear the cache database."""