Files
crawl4ai/crawl4ai/async_webcrawler.py
Nasrin f6f7f1b551 Release v0.8.0: Crash Recovery, Prefetch Mode & Security Fixes (#1712)
* Fix: Use correct URL variable for raw HTML extraction (#1116)

- Prevents full HTML content from being passed as URL to extraction strategies
- Added unit tests to verify raw HTML and regular URL processing

Fix: Wrong URL variable used for extraction of raw html

* Fix #1181: Preserve whitespace in code blocks during HTML scraping

  The remove_empty_elements_fast() method was removing whitespace-only
  span elements inside <pre> and <code> tags, causing import statements
  like "import torch" to become "importtorch". Now skips elements inside
  code blocks where whitespace is significant.

* Refactor Pydantic model configuration to use ConfigDict for arbitrary types

* Fix EmbeddingStrategy: Uncomment response handling for the variations and clean up mock data. ref #1621

* Fix: permission issues with .cache/url_seeder and other runtime cache dirs. ref #1638

* fix: ensure BrowserConfig.to_dict serializes proxy_config

* feat: make LLM backoff configurable end-to-end

- extend LLMConfig with backoff delay/attempt/factor fields and thread them
  through LLMExtractionStrategy, LLMContentFilter, table extraction, and
  Docker API handlers
- expose the backoff parameter knobs on perform_completion_with_backoff/aperform_completion_with_backoff
  and document them in the md_v2 guides

* reproduced AttributeError from #1642

* pass timeout parameter to docker client request

* added missing deep crawling objects to init

* generalized query in ContentRelevanceFilter to be a str or list

* import modules from enhanceable deserialization

* parameterized tests

* Fix: capture current page URL to reflect JavaScript navigation and add test for delayed redirects. ref #1268

* refactor: replace PyPDF2 with pypdf across the codebase. ref #1412

* Add browser_context_id and target_id parameters to BrowserConfig

Enable Crawl4AI to connect to pre-created CDP browser contexts, which is
essential for cloud browser services that pre-create isolated contexts.

Changes:
- Add browser_context_id and target_id parameters to BrowserConfig
- Update from_kwargs() and to_dict() methods
- Modify BrowserManager.start() to use existing context when provided
- Add _get_page_by_target_id() helper method
- Update get_page() to handle pre-existing targets
- Add test for browser_context_id functionality

This enables cloud services to:
1. Create isolated CDP contexts before Crawl4AI connects
2. Pass context/target IDs to BrowserConfig
3. Have Crawl4AI reuse existing contexts instead of creating new ones

* Add cdp_cleanup_on_close flag to prevent memory leaks in cloud/server scenarios

* Fix: add cdp_cleanup_on_close to from_kwargs

* Fix: find context by target_id for concurrent CDP connections

* Fix: use target_id to find correct page in get_page

* Fix: use CDP to find context by browserContextId for concurrent sessions

* Revert context matching attempts - Playwright cannot see CDP-created contexts

* Add create_isolated_context flag for concurrent CDP crawls

When True, forces creation of a new browser context instead of reusing
the default context. Essential for concurrent crawls on the same browser
to prevent navigation conflicts.

* Add context caching to create_isolated_context branch

Uses contexts_by_config cache (same as non-CDP mode) to reuse contexts
for multiple URLs with same config. Still creates new page per crawl
for navigation isolation. Benefits batch/deep crawls.

* Add init_scripts support to BrowserConfig for pre-page-load JS injection

This adds the ability to inject JavaScript that runs before any page loads,
useful for stealth evasions (canvas/audio fingerprinting, userAgentData).

- Add init_scripts parameter to BrowserConfig (list of JS strings)
- Apply init_scripts in setup_context() via context.add_init_script()
- Update from_kwargs() and to_dict() for serialization

* Fix CDP connection handling: support WS URLs and proper cleanup

Changes to browser_manager.py:

1. _verify_cdp_ready(): Support multiple URL formats
   - WebSocket URLs (ws://, wss://): Skip HTTP verification, Playwright handles directly
   - HTTP URLs with query params: Properly parse with urlparse to preserve query string
   - Fixes issue where naive f"{cdp_url}/json/version" broke WS URLs and query params

2. close(): Proper cleanup when cdp_cleanup_on_close=True
   - Close all sessions (pages)
   - Close all contexts
   - Call browser.close() to disconnect (doesn't terminate browser, just releases connection)
   - Wait 1 second for CDP connection to fully release
   - Stop Playwright instance to prevent memory leaks

This enables:
- Connecting to specific browsers via WS URL
- Reusing the same browser with multiple sequential connections
- No user wait needed between connections (internal 1s delay handles it)

Added tests/browser/test_cdp_cleanup_reuse.py with comprehensive tests.

* Update gitignore

* Some debugging for caching

* Add _generate_screenshot_from_html for raw: and file:// URLs

Implements the missing method that was being called but never defined.
Now raw: and file:// URLs can generate screenshots by:
1. Loading HTML into a browser page via page.set_content()
2. Taking screenshot using existing take_screenshot() method
3. Cleaning up the page afterward

This enables cached HTML to be rendered with screenshots in crawl4ai-cloud.

* Add PDF and MHTML support for raw: and file:// URLs

- Replace _generate_screenshot_from_html with _generate_media_from_html
- New method handles screenshot, PDF, and MHTML in one browser session
- Update raw: and file:// URL handlers to use new method
- Enables cached HTML to generate all media types

* Add crash recovery for deep crawl strategies

Add optional resume_state and on_state_change parameters to all deep
crawl strategies (BFS, DFS, Best-First) for cloud deployment crash
recovery.

Features:
- resume_state: Pass saved state to resume from checkpoint
- on_state_change: Async callback fired after each URL for real-time
  state persistence to external storage (Redis, DB, etc.)
- export_state(): Get last captured state manually
- Zero overhead when features are disabled (None defaults)

State includes visited URLs, pending queue/stack, depths, and
pages_crawled count. All state is JSON-serializable.

* Fix: HTTP strategy raw: URL parsing truncates at # character

The AsyncHTTPCrawlerStrategy.crawl() method used urlparse() to extract
content from raw: URLs. This caused HTML with CSS color codes like #eee
to be truncated because # is treated as a URL fragment delimiter.

Before: raw:body{background:#eee} -> parsed.path = 'body{background:'
After:  raw:body{background:#eee} -> raw_content = 'body{background:#eee'

Fix: Strip the raw: or raw:// prefix directly instead of using urlparse,
matching how the browser strategy handles it.

* Add base_url parameter to CrawlerRunConfig for raw HTML processing

When processing raw: HTML (e.g., from cache), the URL parameter is meaningless
for markdown link resolution. This adds a base_url parameter that can be set
explicitly to provide proper URL resolution context.

Changes:
- Add base_url parameter to CrawlerRunConfig.__init__
- Add base_url to CrawlerRunConfig.from_kwargs
- Update aprocess_html to use base_url for markdown generation

Usage:
  config = CrawlerRunConfig(base_url='https://example.com')
  result = await crawler.arun(url='raw:{html}', config=config)

* Add prefetch mode for two-phase deep crawling

- Add `prefetch` parameter to CrawlerRunConfig
- Add `quick_extract_links()` function for fast link extraction
- Add short-circuit in aprocess_html() for prefetch mode
- Add 42 tests (unit, integration, regression)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Updates on proxy rotation and proxy configuration

* Add proxy support to HTTP crawler strategy

* Add browser pipeline support for raw:/file:// URLs

- Add process_in_browser parameter to CrawlerRunConfig
- Route raw:/file:// URLs through _crawl_web() when browser operations needed
- Use page.set_content() instead of goto() for local content
- Fix cookie handling for non-HTTP URLs in browser_manager
- Auto-detect browser requirements: js_code, wait_for, screenshot, etc.
- Maintain fast path for raw:/file:// without browser params

Fixes #310

* Add smart TTL cache for sitemap URL seeder

- Add cache_ttl_hours and validate_sitemap_lastmod params to SeedingConfig
- New JSON cache format with metadata (version, created_at, lastmod, url_count)
- Cache validation by TTL expiry and sitemap lastmod comparison
- Auto-migration from old .jsonl to new .json format
- Fixes bug where incomplete cache was used indefinitely

* Update URL seeder docs with smart TTL cache parameters

- Add cache_ttl_hours and validate_sitemap_lastmod to parameter table
- Document smart TTL cache validation with examples
- Add cache-related troubleshooting entries
- Update key features summary

* Add MEMORY.md to gitignore

* Docs: Add multi-sample schema generation section

Add documentation explaining how to pass multiple HTML samples
to generate_schema() for stable selectors that work across pages
with varying DOM structures.

Includes:
- Problem explanation (fragile nth-child selectors)
- Solution with code example
- Key points for multi-sample queries
- Comparison table of fragile vs stable selectors

* Fix critical RCE and LFI vulnerabilities in Docker API deployment

Security fixes for vulnerabilities reported by ProjectDiscovery:

1. Remote Code Execution via Hooks (CVE pending)
   - Remove __import__ from allowed_builtins in hook_manager.py
   - Prevents arbitrary module imports (os, subprocess, etc.)
   - Hooks now disabled by default via CRAWL4AI_HOOKS_ENABLED env var

2. Local File Inclusion via file:// URLs (CVE pending)
   - Add URL scheme validation to /execute_js, /screenshot, /pdf, /html
   - Block file://, javascript:, data: and other dangerous schemes
   - Only allow http://, https://, and raw: (where appropriate)

3. Security hardening
   - Add CRAWL4AI_HOOKS_ENABLED=false as default (opt-in for hooks)
   - Add security warning comments in config.yml
   - Add validate_url_scheme() helper for consistent validation

Testing:
   - Add unit tests (test_security_fixes.py) - 16 tests
   - Add integration tests (run_security_tests.py) for live server

Affected endpoints:
   - POST /crawl (hooks disabled by default)
   - POST /crawl/stream (hooks disabled by default)
   - POST /execute_js (URL validation added)
   - POST /screenshot (URL validation added)
   - POST /pdf (URL validation added)
   - POST /html (URL validation added)

Breaking changes:
   - Hooks require CRAWL4AI_HOOKS_ENABLED=true to function
   - file:// URLs no longer work on API endpoints (use library directly)

* Enhance authentication flow by implementing JWT token retrieval and adding authorization headers to API requests

* Add release notes for v0.7.9, detailing breaking changes, security fixes, new features, bug fixes, and documentation updates

* Add release notes for v0.8.0, detailing breaking changes, security fixes, new features, bug fixes, and documentation updates

Documentation for v0.8.0 release:

- SECURITY.md: Security policy and vulnerability reporting guidelines
- RELEASE_NOTES_v0.8.0.md: Comprehensive release notes
- migration/v0.8.0-upgrade-guide.md: Step-by-step migration guide
- security/GHSA-DRAFT-RCE-LFI.md: GitHub security advisory drafts
- CHANGELOG.md: Updated with v0.8.0 changes

Breaking changes documented:
- Docker API hooks disabled by default (CRAWL4AI_HOOKS_ENABLED)
- file:// URLs blocked on Docker API endpoints

Security fixes credited to Neo by ProjectDiscovery

* Add examples for deep crawl crash recovery and prefetch mode in documentation

* Release v0.8.0: The v0.8.0 Update

- Updated version to 0.8.0
- Added comprehensive demo and release notes
- Updated all documentation

* Update security researcher acknowledgment with a hyperlink for Neo by ProjectDiscovery

* Add async agenerate_schema method for schema generation

- Extract prompt building to shared _build_schema_prompt() method
- Add agenerate_schema() async version using aperform_completion_with_backoff
- Refactor generate_schema() to use shared prompt builder
- Fixes Gemini/Vertex AI compatibility in async contexts (FastAPI)

* Fix: Enable litellm.drop_params for O-series/GPT-5 model compatibility

O-series (o1, o3) and GPT-5 models only support temperature=1.
Setting litellm.drop_params=True auto-drops unsupported parameters
instead of throwing UnsupportedParamsError.

Fixes temperature=0.01 error for these models in LLM extraction.

---------

Co-authored-by: rbushria <rbushri@gmail.com>
Co-authored-by: AHMET YILMAZ <tawfik@kidocode.com>
Co-authored-by: Soham Kukreti <kukretisoham@gmail.com>
Co-authored-by: Chris Murphy <chris.murphy@klaviyo.com>
Co-authored-by: unclecode <unclecode@kidocode.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-17 14:19:15 +01:00

982 lines
41 KiB
Python

from .__version__ import __version__ as crawl4ai_version
import os
import sys
import time
from pathlib import Path
from typing import Optional, List
import json
import asyncio
# from contextlib import nullcontext, asynccontextmanager
from contextlib import asynccontextmanager
from .models import (
CrawlResult,
MarkdownGenerationResult,
DispatchResult,
ScrapingResult,
CrawlResultContainer,
RunManyReturn
)
from .async_database import async_db_manager
from .chunking_strategy import * # noqa: F403
from .chunking_strategy import IdentityChunking
from .content_filter_strategy import * # noqa: F403
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import NoExtractionStrategy
from .async_crawler_strategy import (
AsyncCrawlerStrategy,
AsyncPlaywrightCrawlerStrategy,
AsyncCrawlResponse,
)
from .cache_context import CacheMode, CacheContext
from .markdown_generation_strategy import (
DefaultMarkdownGenerator,
MarkdownGenerationStrategy,
)
from .deep_crawling import DeepCrawlDecorator
from .async_logger import AsyncLogger, AsyncLoggerBase
from .async_configs import BrowserConfig, CrawlerRunConfig, ProxyConfig, SeedingConfig
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
from .async_url_seeder import AsyncUrlSeeder
from .utils import (
sanitize_input_encode,
InvalidCSSSelectorError,
fast_format_html,
get_error_context,
RobotsParser,
preprocess_html_for_schema,
compute_head_fingerprint,
)
from .cache_validator import CacheValidator, CacheValidationResult
class AsyncWebCrawler:
"""
Asynchronous web crawler with flexible caching capabilities.
There are two ways to use the crawler:
1. Using context manager (recommended for simple cases):
```python
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url="https://example.com")
```
2. Using explicit lifecycle management (recommended for long-running applications):
```python
crawler = AsyncWebCrawler()
await crawler.start()
# Use the crawler multiple times
result1 = await crawler.arun(url="https://example.com")
result2 = await crawler.arun(url="https://another.com")
await crawler.close()
```
Attributes:
browser_config (BrowserConfig): Configuration object for browser settings.
crawler_strategy (AsyncCrawlerStrategy): Strategy for crawling web pages.
logger (AsyncLogger): Logger instance for recording events and errors.
crawl4ai_folder (str): Directory for storing cache.
base_directory (str): Base directory for storing cache.
ready (bool): Whether the crawler is ready for use.
Methods:
start(): Start the crawler explicitly without using context manager.
close(): Close the crawler explicitly without using context manager.
arun(): Run the crawler for a single source: URL (web, local file, or raw HTML).
awarmup(): Perform warmup sequence.
arun_many(): Run the crawler for multiple sources.
aprocess_html(): Process HTML content.
Typical Usage:
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url="https://example.com")
print(result.markdown)
Using configuration:
browser_config = BrowserConfig(browser_type="chromium", headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS
)
result = await crawler.arun(url="https://example.com", config=crawler_config)
print(result.markdown)
"""
_domain_last_hit = {}
def __init__(
self,
crawler_strategy: AsyncCrawlerStrategy = None,
config: BrowserConfig = None,
base_directory: str = str(
os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())),
thread_safe: bool = False,
logger: AsyncLoggerBase = None,
**kwargs,
):
"""
Initialize the AsyncWebCrawler.
Args:
crawler_strategy: Strategy for crawling web pages. Default AsyncPlaywrightCrawlerStrategy
config: Configuration object for browser settings. Default BrowserConfig()
base_directory: Base directory for storing cache
thread_safe: Whether to use thread-safe operations
**kwargs: Additional arguments for backwards compatibility
"""
# Handle browser configuration
browser_config = config or BrowserConfig()
self.browser_config = browser_config
# Initialize logger first since other components may need it
self.logger = logger or AsyncLogger(
log_file=os.path.join(base_directory, ".crawl4ai", "crawler.log"),
verbose=self.browser_config.verbose,
tag_width=10,
)
# Initialize crawler strategy
params = {k: v for k, v in kwargs.items() if k in [
"browser_config", "logger"]}
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
browser_config=browser_config,
logger=self.logger,
**params, # Pass remaining kwargs for backwards compatibility
)
# Thread safety setup
self._lock = asyncio.Lock() if thread_safe else None
# Initialize directories
self.crawl4ai_folder = os.path.join(base_directory, ".crawl4ai")
os.makedirs(self.crawl4ai_folder, exist_ok=True)
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True)
# Initialize robots parser
self.robots_parser = RobotsParser()
self.ready = False
# Decorate arun method with deep crawling capabilities
self._deep_handler = DeepCrawlDecorator(self)
self.arun = self._deep_handler(self.arun)
self.url_seeder: Optional[AsyncUrlSeeder] = None
async def start(self):
"""
Start the crawler explicitly without using context manager.
This is equivalent to using 'async with' but gives more control over the lifecycle.
Returns:
AsyncWebCrawler: The initialized crawler instance
"""
await self.crawler_strategy.__aenter__()
self.logger.info(f"Crawl4AI {crawl4ai_version}", tag="INIT")
self.ready = True
return self
async def close(self):
"""
Close the crawler explicitly without using context manager.
This should be called when you're done with the crawler if you used start().
This method will:
1. Clean up browser resources
2. Close any open pages and contexts
"""
await self.crawler_strategy.__aexit__(None, None, None)
async def __aenter__(self):
return await self.start()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
@asynccontextmanager
async def nullcontext(self):
"""异步空上下文管理器"""
yield
async def arun(
self,
url: str,
config: CrawlerRunConfig = None,
**kwargs,
) -> RunManyReturn:
"""
Runs the crawler for a single source: URL (web, local file, or raw HTML).
Migration Guide:
Old way (deprecated):
result = await crawler.arun(
url="https://example.com",
word_count_threshold=200,
screenshot=True,
...
)
New way (recommended):
config = CrawlerRunConfig(
word_count_threshold=200,
screenshot=True,
...
)
result = await crawler.arun(url="https://example.com", crawler_config=config)
Args:
url: The URL to crawl (http://, https://, file://, or raw:)
crawler_config: Configuration object controlling crawl behavior
[other parameters maintained for backwards compatibility]
Returns:
CrawlResult: The result of crawling and processing
"""
# Auto-start if not ready
if not self.ready:
await self.start()
config = config or CrawlerRunConfig()
if not isinstance(url, str) or not url:
raise ValueError(
"Invalid URL, make sure the URL is a non-empty string")
async with self._lock or self.nullcontext():
try:
self.logger.verbose = config.verbose
# Default to ENABLED if no cache mode specified
if config.cache_mode is None:
config.cache_mode = CacheMode.ENABLED
# Create cache context
cache_context = CacheContext(url, config.cache_mode, False)
# Initialize processing variables
async_response: AsyncCrawlResponse = None
cached_result: CrawlResult = None
screenshot_data = None
pdf_data = None
extracted_content = None
start_time = time.perf_counter()
# Try to get cached result if appropriate
if cache_context.should_read():
cached_result = await async_db_manager.aget_cached_url(url)
# Smart Cache: Validate cache freshness if enabled
if cached_result and config.check_cache_freshness:
cache_metadata = await async_db_manager.aget_cache_metadata(url)
if cache_metadata:
async with CacheValidator(timeout=config.cache_validation_timeout) as validator:
validation = await validator.validate(
url=url,
stored_etag=cache_metadata.get("etag"),
stored_last_modified=cache_metadata.get("last_modified"),
stored_head_fingerprint=cache_metadata.get("head_fingerprint"),
)
if validation.status == CacheValidationResult.FRESH:
cached_result.cache_status = "hit_validated"
self.logger.info(
message="Cache validated: {reason}",
tag="CACHE",
params={"reason": validation.reason}
)
# Update metadata if we got new values
if validation.new_etag or validation.new_last_modified:
await async_db_manager.aupdate_cache_metadata(
url=url,
etag=validation.new_etag,
last_modified=validation.new_last_modified,
head_fingerprint=validation.new_head_fingerprint,
)
elif validation.status == CacheValidationResult.ERROR:
cached_result.cache_status = "hit_fallback"
self.logger.warning(
message="Cache validation failed, using cached: {reason}",
tag="CACHE",
params={"reason": validation.reason}
)
else:
# STALE or UNKNOWN - force recrawl
self.logger.info(
message="Cache stale: {reason}",
tag="CACHE",
params={"reason": validation.reason}
)
cached_result = None
elif cached_result:
cached_result.cache_status = "hit"
if cached_result:
html = sanitize_input_encode(cached_result.html)
extracted_content = sanitize_input_encode(
cached_result.extracted_content or ""
)
extracted_content = (
None
if not extracted_content or extracted_content == "[]"
else extracted_content
)
# If screenshot is requested but its not in cache, then set cache_result to None
screenshot_data = cached_result.screenshot
pdf_data = cached_result.pdf
# if config.screenshot and not screenshot or config.pdf and not pdf:
if config.screenshot and not screenshot_data:
cached_result = None
if config.pdf and not pdf_data:
cached_result = None
self.logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=time.perf_counter() - start_time,
tag="FETCH",
)
# Update proxy configuration from rotation strategy if available
if config and config.proxy_rotation_strategy:
# Handle sticky sessions - use same proxy for all requests with same session_id
if config.proxy_session_id:
next_proxy: ProxyConfig = await config.proxy_rotation_strategy.get_proxy_for_session(
config.proxy_session_id,
ttl=config.proxy_session_ttl
)
if next_proxy:
self.logger.info(
message="Using sticky proxy session: {session_id} -> {proxy}",
tag="PROXY",
params={
"session_id": config.proxy_session_id,
"proxy": next_proxy.server
}
)
config.proxy_config = next_proxy
else:
# Existing behavior: rotate on each request
next_proxy: ProxyConfig = await config.proxy_rotation_strategy.get_next_proxy()
if next_proxy:
self.logger.info(
message="Switch proxy: {proxy}",
tag="PROXY",
params={"proxy": next_proxy.server}
)
config.proxy_config = next_proxy
# Fetch fresh content if needed
if not cached_result or not html:
t1 = time.perf_counter()
if config.user_agent:
self.crawler_strategy.update_user_agent(
config.user_agent)
# Check robots.txt if enabled
if config and config.check_robots_txt:
if not await self.robots_parser.can_fetch(
url, self.browser_config.user_agent
):
return CrawlResult(
url=url,
html="",
success=False,
status_code=403,
error_message="Access denied by robots.txt",
response_headers={
"X-Robots-Status": "Blocked by robots.txt"
},
)
##############################
# Call CrawlerStrategy.crawl #
##############################
async_response = await self.crawler_strategy.crawl(
url,
config=config, # Pass the entire config object
)
html = sanitize_input_encode(async_response.html)
screenshot_data = async_response.screenshot
pdf_data = async_response.pdf_data
js_execution_result = async_response.js_execution_result
t2 = time.perf_counter()
self.logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=t2 - t1,
tag="FETCH",
)
###############################################################
# Process the HTML content, Call CrawlerStrategy.process_html #
###############################################################
from urllib.parse import urlparse
crawl_result: CrawlResult = await self.aprocess_html(
url=url,
html=html,
extracted_content=extracted_content,
config=config, # Pass the config object instead of individual parameters
screenshot_data=screenshot_data,
pdf_data=pdf_data,
verbose=config.verbose,
is_raw_html=True if url.startswith("raw:") else False,
redirected_url=async_response.redirected_url,
original_scheme=urlparse(url).scheme,
**kwargs,
)
crawl_result.status_code = async_response.status_code
crawl_result.redirected_url = async_response.redirected_url or url
crawl_result.response_headers = async_response.response_headers
crawl_result.downloaded_files = async_response.downloaded_files
crawl_result.js_execution_result = js_execution_result
crawl_result.mhtml = async_response.mhtml_data
crawl_result.ssl_certificate = async_response.ssl_certificate
# Add captured network and console data if available
crawl_result.network_requests = async_response.network_requests
crawl_result.console_messages = async_response.console_messages
crawl_result.success = bool(html)
crawl_result.session_id = getattr(
config, "session_id", None)
crawl_result.cache_status = "miss"
# Compute head fingerprint for cache validation
if html:
head_end = html.lower().find('</head>')
if head_end != -1:
head_html = html[:head_end + 7]
crawl_result.head_fingerprint = compute_head_fingerprint(head_html)
self.logger.url_status(
url=cache_context.display_url,
success=crawl_result.success,
timing=time.perf_counter() - start_time,
tag="COMPLETE",
)
# Update cache if appropriate
if cache_context.should_write() and not bool(cached_result):
await async_db_manager.acache_url(crawl_result)
return CrawlResultContainer(crawl_result)
else:
self.logger.url_status(
url=cache_context.display_url,
success=True,
timing=time.perf_counter() - start_time,
tag="COMPLETE"
)
cached_result.success = bool(html)
cached_result.session_id = getattr(
config, "session_id", None)
cached_result.redirected_url = cached_result.redirected_url or url
return CrawlResultContainer(cached_result)
except Exception as e:
error_context = get_error_context(sys.exc_info())
error_message = (
f"Unexpected error in _crawl_web at line {error_context['line_no']} "
f"in {error_context['function']} ({error_context['filename']}):\n"
f"Error: {str(e)}\n\n"
f"Code context:\n{error_context['code_context']}"
)
self.logger.error_status(
url=url,
error=error_message,
tag="ERROR",
)
return CrawlResultContainer(
CrawlResult(
url=url, html="", success=False, error_message=error_message
)
)
async def aprocess_html(
self,
url: str,
html: str,
extracted_content: str,
config: CrawlerRunConfig,
screenshot_data: str,
pdf_data: str,
verbose: bool,
**kwargs,
) -> CrawlResult:
"""
Process HTML content using the provided configuration.
Args:
url: The URL being processed
html: Raw HTML content
extracted_content: Previously extracted content (if any)
config: Configuration object controlling processing behavior
screenshot_data: Screenshot data (if any)
pdf_data: PDF data (if any)
verbose: Whether to enable verbose logging
**kwargs: Additional parameters for backwards compatibility
Returns:
CrawlResult: Processed result containing extracted and formatted content
"""
# === PREFETCH MODE SHORT-CIRCUIT ===
if getattr(config, 'prefetch', False):
from .utils import quick_extract_links
# Use base_url from config (for raw: URLs), redirected_url, or original url
effective_url = getattr(config, 'base_url', None) or kwargs.get('redirected_url') or url
links = quick_extract_links(html, effective_url)
return CrawlResult(
url=url,
html=html,
success=True,
links=links,
status_code=kwargs.get('status_code'),
response_headers=kwargs.get('response_headers'),
redirected_url=kwargs.get('redirected_url'),
ssl_certificate=kwargs.get('ssl_certificate'),
# All other fields default to None
)
# === END PREFETCH SHORT-CIRCUIT ===
cleaned_html = ""
try:
_url = url if not kwargs.get("is_raw_html", False) else "Raw HTML"
t1 = time.perf_counter()
# Get scraping strategy and ensure it has a logger
scraping_strategy = config.scraping_strategy
if not scraping_strategy.logger:
scraping_strategy.logger = self.logger
# Process HTML content
params = config.__dict__.copy()
params.pop("url", None)
# add keys from kwargs to params that doesn't exist in params
params.update({k: v for k, v in kwargs.items()
if k not in params.keys()})
################################
# Scraping Strategy Execution #
################################
result: ScrapingResult = scraping_strategy.scrap(
url, html, **params)
if result is None:
raise ValueError(
f"Process HTML, Failed to extract content from the website: {url}"
)
except InvalidCSSSelectorError as e:
raise ValueError(str(e))
except Exception as e:
raise ValueError(
f"Process HTML, Failed to extract content from the website: {url}, error: {str(e)}"
)
# Extract results - handle both dict and ScrapingResult
if isinstance(result, dict):
cleaned_html = sanitize_input_encode(
result.get("cleaned_html", ""))
media = result.get("media", {})
tables = media.pop("tables", []) if isinstance(media, dict) else []
links = result.get("links", {})
metadata = result.get("metadata", {})
else:
cleaned_html = sanitize_input_encode(result.cleaned_html)
# media = result.media.model_dump()
# tables = media.pop("tables", [])
# links = result.links.model_dump()
media = result.media.model_dump() if hasattr(result.media, 'model_dump') else result.media
tables = media.pop("tables", []) if isinstance(media, dict) else []
links = result.links.model_dump() if hasattr(result.links, 'model_dump') else result.links
metadata = result.metadata
fit_html = preprocess_html_for_schema(html_content=html, text_threshold= 500, max_size= 300_000)
################################
# Generate Markdown #
################################
markdown_generator: Optional[MarkdownGenerationStrategy] = (
config.markdown_generator or DefaultMarkdownGenerator()
)
# --- SELECT HTML SOURCE BASED ON CONTENT_SOURCE ---
# Get the desired source from the generator config, default to 'cleaned_html'
selected_html_source = getattr(markdown_generator, 'content_source', 'cleaned_html')
# Define the source selection logic using dict dispatch
html_source_selector = {
"raw_html": lambda: html, # The original raw HTML
"cleaned_html": lambda: cleaned_html, # The HTML after scraping strategy
"fit_html": lambda: fit_html, # The HTML after preprocessing for schema
}
markdown_input_html = cleaned_html # Default to cleaned_html
try:
# Get the appropriate lambda function, default to returning cleaned_html if key not found
source_lambda = html_source_selector.get(selected_html_source, lambda: cleaned_html)
# Execute the lambda to get the selected HTML
markdown_input_html = source_lambda()
# Log which source is being used (optional, but helpful for debugging)
# if self.logger and verbose:
# actual_source_used = selected_html_source if selected_html_source in html_source_selector else 'cleaned_html (default)'
# self.logger.debug(f"Using '{actual_source_used}' as source for Markdown generation for {url}", tag="MARKDOWN_SRC")
except Exception as e:
# Handle potential errors, especially from preprocess_html_for_schema
if self.logger:
self.logger.warning(
f"Error getting/processing '{selected_html_source}' for markdown source: {e}. Falling back to cleaned_html.",
tag="MARKDOWN_SRC"
)
# Ensure markdown_input_html is still the default cleaned_html in case of error
markdown_input_html = cleaned_html
# --- END: HTML SOURCE SELECTION ---
# Uncomment if by default we want to use PruningContentFilter
# if not config.content_filter and not markdown_generator.content_filter:
# markdown_generator.content_filter = PruningContentFilter()
markdown_result: MarkdownGenerationResult = (
markdown_generator.generate_markdown(
input_html=markdown_input_html,
# Use explicit base_url if provided (for raw: HTML), otherwise redirected_url, then url
base_url=params.get("base_url") or params.get("redirected_url") or url
# html2text_options=kwargs.get('html2text', {})
)
)
# Log processing completion
self.logger.url_status(
url=_url,
success=True,
timing=int((time.perf_counter() - t1) * 1000) / 1000,
tag="SCRAPE"
)
# self.logger.info(
# message="{url:.50}... | Time: {timing}s",
# tag="SCRAPE",
# params={"url": _url, "timing": int((time.perf_counter() - t1) * 1000) / 1000},
# )
################################
# Structured Content Extraction #
################################
if (
not bool(extracted_content)
and config.extraction_strategy
and not isinstance(config.extraction_strategy, NoExtractionStrategy)
):
t1 = time.perf_counter()
# Choose content based on input_format
content_format = config.extraction_strategy.input_format
if content_format == "fit_markdown" and not markdown_result.fit_markdown:
self.logger.url_status(
url=_url,
success=bool(html),
timing=time.perf_counter() - t1,
tag="EXTRACT",
)
content_format = "markdown"
content = {
"markdown": markdown_result.raw_markdown,
"html": html,
"fit_html": fit_html,
"cleaned_html": cleaned_html,
"fit_markdown": markdown_result.fit_markdown,
}.get(content_format, markdown_result.raw_markdown)
# Use IdentityChunking for HTML input, otherwise use provided chunking strategy
chunking = (
IdentityChunking()
if content_format in ["html", "cleaned_html", "fit_html"]
else config.chunking_strategy
)
sections = chunking.chunk(content)
# extracted_content = config.extraction_strategy.run(_url, sections)
# Use async version if available for better parallelism
if hasattr(config.extraction_strategy, 'arun'):
extracted_content = await config.extraction_strategy.arun(_url, sections)
else:
# Fallback to sync version run in thread pool to avoid blocking
extracted_content = await asyncio.to_thread(
config.extraction_strategy.run, url, sections
)
extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False
)
# Log extraction completion
self.logger.url_status(
url=_url,
success=bool(html),
timing=time.perf_counter() - t1,
tag="EXTRACT",
)
# Apply HTML formatting if requested
if config.prettiify:
cleaned_html = fast_format_html(cleaned_html)
# Return complete crawl result
return CrawlResult(
url=url,
html=html,
fit_html=fit_html,
cleaned_html=cleaned_html,
markdown=markdown_result,
media=media,
tables=tables, # NEW
links=links,
metadata=metadata,
screenshot=screenshot_data,
pdf=pdf_data,
extracted_content=extracted_content,
success=True,
error_message="",
)
async def arun_many(
self,
urls: List[str],
config: Optional[Union[CrawlerRunConfig, List[CrawlerRunConfig]]] = None,
dispatcher: Optional[BaseDispatcher] = None,
# Legacy parameters maintained for backwards compatibility
# word_count_threshold=MIN_WORD_THRESHOLD,
# extraction_strategy: ExtractionStrategy = None,
# chunking_strategy: ChunkingStrategy = RegexChunking(),
# content_filter: RelevantContentFilter = None,
# cache_mode: Optional[CacheMode] = None,
# bypass_cache: bool = False,
# css_selector: str = None,
# screenshot: bool = False,
# pdf: bool = False,
# user_agent: str = None,
# verbose=True,
**kwargs,
) -> RunManyReturn:
"""
Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy.
Args:
urls: List of URLs to crawl
config: Configuration object(s) controlling crawl behavior. Can be:
- Single CrawlerRunConfig: Used for all URLs
- List[CrawlerRunConfig]: Configs with url_matcher for URL-specific settings
dispatcher: The dispatcher strategy instance to use. Defaults to MemoryAdaptiveDispatcher
[other parameters maintained for backwards compatibility]
Returns:
Union[List[CrawlResult], AsyncGenerator[CrawlResult, None]]:
Either a list of all results or an async generator yielding results
Examples:
# Batch processing (default)
results = await crawler.arun_many(
urls=["https://example1.com", "https://example2.com"],
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
for result in results:
print(f"Processed {result.url}: {len(result.markdown)} chars")
# Streaming results
async for result in await crawler.arun_many(
urls=["https://example1.com", "https://example2.com"],
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS, stream=True),
):
print(f"Processed {result.url}: {len(result.markdown)} chars")
"""
config = config or CrawlerRunConfig()
# if config is None:
# config = CrawlerRunConfig(
# word_count_threshold=word_count_threshold,
# extraction_strategy=extraction_strategy,
# chunking_strategy=chunking_strategy,
# content_filter=content_filter,
# cache_mode=cache_mode,
# bypass_cache=bypass_cache,
# css_selector=css_selector,
# screenshot=screenshot,
# pdf=pdf,
# verbose=verbose,
# **kwargs,
# )
if dispatcher is None:
dispatcher = MemoryAdaptiveDispatcher(
rate_limiter=RateLimiter(
base_delay=(1.0, 3.0), max_delay=60.0, max_retries=3
),
)
def transform_result(task_result):
return (
setattr(
task_result.result,
"dispatch_result",
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
peak_memory=task_result.peak_memory,
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
),
)
or task_result.result
)
# Handle stream setting - use first config's stream setting if config is a list
if isinstance(config, list):
stream = config[0].stream if config else False
primary_config = config[0] if config else None
else:
stream = config.stream
primary_config = config
# Helper to release sticky session if auto_release is enabled
async def maybe_release_session():
if (primary_config and
primary_config.proxy_session_id and
primary_config.proxy_session_auto_release and
primary_config.proxy_rotation_strategy):
await primary_config.proxy_rotation_strategy.release_session(
primary_config.proxy_session_id
)
self.logger.info(
message="Auto-released proxy session: {session_id}",
tag="PROXY",
params={"session_id": primary_config.proxy_session_id}
)
if stream:
async def result_transformer():
try:
async for task_result in dispatcher.run_urls_stream(
crawler=self, urls=urls, config=config
):
yield transform_result(task_result)
finally:
# Auto-release session after streaming completes
await maybe_release_session()
return result_transformer()
else:
try:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
finally:
# Auto-release session after batch completes
await maybe_release_session()
async def aseed_urls(
self,
domain_or_domains: Union[str, List[str]],
config: Optional[SeedingConfig] = None,
**kwargs
) -> Union[List[str], Dict[str, List[Union[str, Dict[str, Any]]]]]:
"""
Discovers, filters, and optionally validates URLs for a given domain(s)
using sitemaps and Common Crawl archives.
Args:
domain_or_domains: A single domain string (e.g., "iana.org") or a list of domains.
config: A SeedingConfig object to control the seeding process.
Parameters passed directly via kwargs will override those in 'config'.
**kwargs: Additional parameters (e.g., `source`, `live_check`, `extract_head`,
`pattern`, `concurrency`, `hits_per_sec`, `force_refresh`, `verbose`)
that will be used to construct or update the SeedingConfig.
Returns:
If `extract_head` is False:
- For a single domain: `List[str]` of discovered URLs.
- For multiple domains: `Dict[str, List[str]]` mapping each domain to its URLs.
If `extract_head` is True:
- For a single domain: `List[Dict[str, Any]]` where each dict contains 'url'
and 'head_data' (parsed <head> metadata).
- For multiple domains: `Dict[str, List[Dict[str, Any]]]` mapping each domain
to a list of URL data dictionaries.
Raises:
ValueError: If `domain_or_domains` is not a string or a list of strings.
Exception: Any underlying exceptions from AsyncUrlSeeder or network operations.
Example:
>>> # Discover URLs from sitemap with live check for 'example.com'
>>> result = await crawler.aseed_urls("example.com", source="sitemap", live_check=True, hits_per_sec=10)
>>> # Discover URLs from Common Crawl, extract head data for 'example.com' and 'python.org'
>>> multi_domain_result = await crawler.aseed_urls(
>>> ["example.com", "python.org"],
>>> source="cc", extract_head=True, concurrency=200, hits_per_sec=50
>>> )
"""
# Initialize AsyncUrlSeeder here if it hasn't been already
if not self.url_seeder:
# Pass the crawler's base_directory for seeder's cache management
# Pass the crawler's logger for consistent logging
self.url_seeder = AsyncUrlSeeder(
base_directory=self.crawl4ai_folder,
logger=self.logger
)
# Merge config object with direct kwargs, giving kwargs precedence
seeding_config = config.clone(**kwargs) if config else SeedingConfig.from_kwargs(kwargs)
# Ensure base_directory is set for the seeder's cache
seeding_config.base_directory = seeding_config.base_directory or self.crawl4ai_folder
# Ensure the seeder uses the crawler's logger (if not already set)
if not self.url_seeder.logger:
self.url_seeder.logger = self.logger
# Pass verbose setting if explicitly provided in SeedingConfig or kwargs
if seeding_config.verbose is not None:
self.url_seeder.logger.verbose = seeding_config.verbose
else: # Default to crawler's verbose setting
self.url_seeder.logger.verbose = self.logger.verbose
if isinstance(domain_or_domains, str):
self.logger.info(
message="Starting URL seeding for domain: {domain}",
tag="SEED",
params={"domain": domain_or_domains}
)
return await self.url_seeder.urls(
domain_or_domains,
seeding_config
)
elif isinstance(domain_or_domains, (list, tuple)):
self.logger.info(
message="Starting URL seeding for {count} domains",
tag="SEED",
params={"count": len(domain_or_domains)}
)
# AsyncUrlSeeder.many_urls directly accepts a list of domains and individual params.
return await self.url_seeder.many_urls(
domain_or_domains,
seeding_config
)
else:
raise ValueError("`domain_or_domains` must be a string or a list of strings.")