feat(cache): introduce CacheMode and CacheContext for enhanced caching behavior

chore(requirements): add colorama dependency
refactor(config): add SHOW_DEPRECATION_WARNINGS flag and clean up code
fix(docs): update example scripts for clarity and consistency
This commit is contained in:
UncleCode
2024-11-17 15:30:56 +08:00
parent 4b45b28f25
commit 3a66aa8a60
10 changed files with 979 additions and 95 deletions

View File

@@ -1,7 +1,10 @@
import os
import time
import warnings
from enum import Enum
from colorama import init, Fore, Back, Style
from pathlib import Path
from typing import Optional
from typing import Optional, List, Union
import json
import asyncio
from .models import CrawlResult
@@ -9,8 +12,13 @@ from .async_database import async_db_manager
from .chunking_strategy import *
from .extraction_strategy import *
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy, AsyncCrawlResponse
from .cache_context import CacheMode, CacheContext, _legacy_to_cache_mode
from .content_scrapping_strategy import WebScrapingStrategy
from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD
from .config import (
MIN_WORD_THRESHOLD,
IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD,
SHOW_DEPRECATION_WARNINGS # New import
)
from .utils import (
sanitize_input_encode,
InvalidCSSSelectorError,
@@ -18,19 +26,77 @@ from .utils import (
)
from .__version__ import __version__ as crawl4ai_version
class AsyncWebCrawler:
"""
Asynchronous web crawler with flexible caching capabilities.
Migration Guide (from version X.X.X):
Old way (deprecated):
crawler = AsyncWebCrawler(always_by_pass_cache=True)
result = await crawler.arun(
url="https://example.com",
bypass_cache=True,
no_cache_read=True,
no_cache_write=False
)
New way (recommended):
crawler = AsyncWebCrawler(always_bypass_cache=True)
result = await crawler.arun(
url="https://example.com",
cache_mode=CacheMode.WRITE_ONLY
)
To disable deprecation warnings:
Set SHOW_DEPRECATION_WARNINGS = False in config.py
"""
def __init__(
self,
crawler_strategy: Optional[AsyncCrawlerStrategy] = None,
always_by_pass_cache: bool = False,
always_bypass_cache: bool = False,
always_by_pass_cache: Optional[bool] = None, # Deprecated parameter
base_directory: str = str(Path.home()),
**kwargs,
):
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
**kwargs
)
self.always_by_pass_cache = always_by_pass_cache
# self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai")
"""
Initialize the AsyncWebCrawler.
Args:
crawler_strategy: Strategy for crawling web pages
always_bypass_cache: Whether to always bypass cache (new parameter)
always_by_pass_cache: Deprecated, use always_bypass_cache instead
base_directory: Base directory for storing cache
"""
init()
self.log_width = 10 # Width of "[COMPLETE]"
self.tag_format = lambda tag: f"[{tag}]".ljust(self.log_width, ".")
self.log_icons = {
'INIT': '', # Alternative: '▶' or '►'
'READY': '', # Alternative: '√'
'FETCH': '', # Alternative: '▼'
'SCRAPE': '', # Alternative: '♦'
'EXTRACT': '', # Alternative: '□'
'COMPLETE': '', # Alternative: '○'
'ERROR': '×'
}
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(**kwargs)
# Handle deprecated parameter
if always_by_pass_cache is not None:
if SHOW_DEPRECATION_WARNINGS:
warnings.warn(
"'always_by_pass_cache' is deprecated and will be removed in version X.X.X. "
"Use 'always_bypass_cache' instead. "
"Set SHOW_DEPRECATION_WARNINGS=False in config.py to suppress this warning.",
DeprecationWarning,
stacklevel=2
)
self.always_bypass_cache = always_by_pass_cache
else:
self.always_bypass_cache = always_bypass_cache
self.crawl4ai_folder = os.path.join(base_directory, ".crawl4ai")
os.makedirs(self.crawl4ai_folder, exist_ok=True)
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True)
@@ -46,21 +112,13 @@ class AsyncWebCrawler:
await self.crawler_strategy.__aexit__(exc_type, exc_val, exc_tb)
async def awarmup(self):
# Print a message for crawl4ai and its version
"""Initialize the crawler with warm-up sequence."""
if self.verbose:
print(f"[LOG] 🚀 Crawl4AI {crawl4ai_version}")
print("[LOG] 🌤️ Warming up the AsyncWebCrawler")
# await async_db_manager.ainit_db()
# # await async_db_manager.initialize()
# await self.arun(
# url="https://google.com/",
# word_count_threshold=5,
# bypass_cache=False,
# verbose=False,
# )
print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Crawl4AI {crawl4ai_version}{Style.RESET_ALL}")
print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Warming up AsyncWebCrawler{Style.RESET_ALL}")
self.ready = True
if self.verbose:
print("[LOG] 🌞 AsyncWebCrawler is ready to crawl")
print(f"{Fore.GREEN}{self.tag_format('READY')} {self.log_icons['READY']} AsyncWebCrawler initialized{Style.RESET_ALL}")
async def arun(
self,
@@ -68,35 +126,81 @@ class AsyncWebCrawler:
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
cache_mode: Optional[CacheMode] = None,
# Deprecated parameters
bypass_cache: bool = False,
disable_cache: bool = False,
no_cache_read: bool = False,
no_cache_write: bool = False,
# Other parameters
css_selector: str = None,
screenshot: bool = False,
user_agent: str = None,
verbose=True,
disable_cache: bool = False,
no_cache_read: bool = False,
no_cache_write: bool = False,
**kwargs,
) -> CrawlResult:
"""
Runs the crawler for a single source: URL (web, local file, or raw HTML).
Migration from legacy cache parameters:
Old way (deprecated):
await crawler.arun(url, bypass_cache=True, no_cache_read=True)
New way:
await crawler.arun(url, cache_mode=CacheMode.BYPASS)
Args:
url (str): The URL to crawl. Supported prefixes:
- 'http://' or 'https://': Web URL to crawl.
- 'file://': Local file path to process.
- 'raw:': Raw HTML content to process.
... [other existing parameters]
url: The URL to crawl (http://, https://, file://, or raw:)
cache_mode: Cache behavior control (recommended)
word_count_threshold: Minimum word count threshold
extraction_strategy: Strategy for content extraction
chunking_strategy: Strategy for content chunking
css_selector: CSS selector for content extraction
screenshot: Whether to capture screenshot
user_agent: Custom user agent
verbose: Enable verbose logging
Deprecated Args:
bypass_cache: Use cache_mode=CacheMode.BYPASS instead
disable_cache: Use cache_mode=CacheMode.DISABLED instead
no_cache_read: Use cache_mode=CacheMode.WRITE_ONLY instead
no_cache_write: Use cache_mode=CacheMode.READ_ONLY instead
Returns:
CrawlResult: The result of the crawling and processing.
CrawlResult: The result of crawling and processing
"""
try:
if disable_cache:
bypass_cache = True
no_cache_read = True
no_cache_write = True
# Handle deprecated parameters
if any([bypass_cache, disable_cache, no_cache_read, no_cache_write]):
if SHOW_DEPRECATION_WARNINGS:
warnings.warn(
"Cache control boolean flags are deprecated and will be removed in version X.X.X. "
"Use 'cache_mode' parameter instead. Examples:\n"
"- For bypass_cache=True, use cache_mode=CacheMode.BYPASS\n"
"- For disable_cache=True, use cache_mode=CacheMode.DISABLED\n"
"- For no_cache_read=True, use cache_mode=CacheMode.WRITE_ONLY\n"
"- For no_cache_write=True, use cache_mode=CacheMode.READ_ONLY\n"
"Set SHOW_DEPRECATION_WARNINGS=False in config.py to suppress this warning.",
DeprecationWarning,
stacklevel=2
)
# Convert legacy parameters if cache_mode not provided
if cache_mode is None:
cache_mode = _legacy_to_cache_mode(
disable_cache=disable_cache,
bypass_cache=bypass_cache,
no_cache_read=no_cache_read,
no_cache_write=no_cache_write
)
# Default to ENABLED if no cache mode specified
if cache_mode is None:
cache_mode = CacheMode.ENABLED
# Create cache context
cache_context = CacheContext(url, cache_mode, self.always_bypass_cache)
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
if not isinstance(extraction_strategy, ExtractionStrategy):
@@ -107,18 +211,14 @@ class AsyncWebCrawler:
word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD)
async_response: AsyncCrawlResponse = None
cached = None
cached_result = None
screenshot_data = None
extracted_content = None
is_web_url = url.startswith(('http://', 'https://'))
is_local_file = url.startswith("file://")
is_raw_html = url.startswith("raw:")
_url = url if not is_raw_html else "Raw HTML"
start_time = time.perf_counter()
cached_result = None
if is_web_url and (not bypass_cache or not no_cache_read) and not self.always_by_pass_cache:
# Try to get cached result if appropriate
if cache_context.should_read():
cached_result = await async_db_manager.aget_cached_url(url)
if cached_result:
@@ -129,26 +229,27 @@ class AsyncWebCrawler:
if not screenshot_data:
cached_result = None
if verbose:
print(
f"[LOG] 1⃣ ✅ Page fetched (cache) for {_url}, success: {bool(html)}, time taken: {time.perf_counter() - start_time:.2f} seconds"
)
print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Cache hit for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {time.perf_counter() - start_time:.2f}s")
if not cached or not html:
# Fetch fresh content if needed
if not cached_result or not html:
t1 = time.perf_counter()
if user_agent:
self.crawler_strategy.update_user_agent(user_agent)
async_response: AsyncCrawlResponse = await self.crawler_strategy.crawl(url, screenshot=screenshot, **kwargs)
async_response: AsyncCrawlResponse = await self.crawler_strategy.crawl(
url,
screenshot=screenshot,
**kwargs
)
html = sanitize_input_encode(async_response.html)
screenshot_data = async_response.screenshot
t2 = time.perf_counter()
if verbose:
print(
f"[LOG] 1⃣ ✅ Page fetched (no-cache) for {_url}, success: {bool(html)}, time taken: {t2 - t1:.2f} seconds"
)
print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Live fetch for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {t2 - t1:.2f}s")
t1 = time.perf_counter()
# Process the HTML content
crawl_result = await self.aprocess_html(
url=url,
html=html,
@@ -159,15 +260,15 @@ class AsyncWebCrawler:
css_selector=css_selector,
screenshot=screenshot_data,
verbose=verbose,
is_cached=bool(cached),
is_cached=bool(cached_result),
async_response=async_response,
bypass_cache=bypass_cache,
is_web_url = is_web_url,
is_local_file = is_local_file,
is_raw_html = is_raw_html,
is_web_url=cache_context.is_web_url,
is_local_file=cache_context.is_local_file,
is_raw_html=cache_context.is_raw_html,
**kwargs,
)
# Set response data
if async_response:
crawl_result.status_code = async_response.status_code
crawl_result.response_headers = async_response.response_headers
@@ -180,22 +281,26 @@ class AsyncWebCrawler:
crawl_result.session_id = kwargs.get("session_id", None)
if verbose:
print(
f"[LOG] 🔥 🚀 Crawling done for {_url}, success: {crawl_result.success}, time taken: {time.perf_counter() - start_time:.2f} seconds"
)
print(f"{Fore.GREEN}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} {cache_context.display_url} | Status: {Fore.GREEN if crawl_result.success else Fore.RED}{crawl_result.success} | {Fore.YELLOW}Total: {time.perf_counter() - start_time:.2f}s{Style.RESET_ALL}")
if not is_raw_html and not no_cache_write:
if not bool(cached_result) or kwargs.get("bypass_cache", False) or self.always_by_pass_cache:
await async_db_manager.acache_url(crawl_result)
# Update cache if appropriate
if cache_context.should_write() and not bool(cached_result):
await async_db_manager.acache_url(crawl_result)
return crawl_result
except Exception as e:
if not hasattr(e, "msg"):
e.msg = str(e)
print(f"[ERROR] 🚫 arun(): Failed to crawl {_url}, error: {e.msg}")
return CrawlResult(url=url, html="", markdown = f"[ERROR] 🚫 arun(): Failed to crawl {_url}, error: {e.msg}", success=False, error_message=e.msg)
print(f"{Fore.RED}{self.tag_format('ERROR')} {self.log_icons['ERROR']} Failed to crawl {cache_context.display_url} | {e.msg}{Style.RESET_ALL}")
return CrawlResult(
url=url,
html="",
markdown=f"[ERROR] 🚫 arun(): Failed to crawl {cache_context.display_url}, error: {e.msg}",
success=False,
error_message=e.msg
)
async def arun_many(
self,
@@ -203,6 +308,8 @@ class AsyncWebCrawler:
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
cache_mode: Optional[CacheMode] = None,
# Deprecated parameters
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
@@ -211,19 +318,35 @@ class AsyncWebCrawler:
**kwargs,
) -> List[CrawlResult]:
"""
Runs the crawler for multiple sources: URLs (web, local files, or raw HTML).
Runs the crawler for multiple URLs concurrently.
Migration from legacy parameters:
Old way (deprecated):
results = await crawler.arun_many(urls, bypass_cache=True)
New way:
results = await crawler.arun_many(urls, cache_mode=CacheMode.BYPASS)
Args:
urls (List[str]): A list of URLs with supported prefixes:
- 'http://' or 'https://': Web URL to crawl.
- 'file://': Local file path to process.
- 'raw:': Raw HTML content to process.
... [other existing parameters]
urls: List of URLs to crawl
cache_mode: Cache behavior control (recommended)
[other parameters same as arun()]
Returns:
List[CrawlResult]: The results of the crawling and processing.
List[CrawlResult]: Results for each URL
"""
semaphore_count = kwargs.get('semaphore_count', 5) # Adjust as needed
if bypass_cache and SHOW_DEPRECATION_WARNINGS:
warnings.warn(
"'bypass_cache' is deprecated and will be removed in version X.X.X. "
"Use 'cache_mode=CacheMode.BYPASS' instead. "
"Set SHOW_DEPRECATION_WARNINGS=False in config.py to suppress this warning.",
DeprecationWarning,
stacklevel=2
)
if cache_mode is None:
cache_mode = CacheMode.BYPASS
semaphore_count = kwargs.get('semaphore_count', 5)
semaphore = asyncio.Semaphore(semaphore_count)
async def crawl_with_semaphore(url):
@@ -233,7 +356,7 @@ class AsyncWebCrawler:
word_count_threshold=word_count_threshold,
extraction_strategy=extraction_strategy,
chunking_strategy=chunking_strategy,
bypass_cache=bypass_cache,
cache_mode=cache_mode,
css_selector=css_selector,
screenshot=screenshot,
user_agent=user_agent,
@@ -245,6 +368,7 @@ class AsyncWebCrawler:
results = await asyncio.gather(*tasks, return_exceptions=True)
return [result if not isinstance(result, Exception) else str(result) for result in results]
async def aprocess_html(
self,
url: str,
@@ -258,7 +382,6 @@ class AsyncWebCrawler:
verbose: bool,
**kwargs,
) -> CrawlResult:
t = time.perf_counter()
# Extract content from HTML
try:
_url = url if not kwargs.get("is_raw_html", False) else "Raw HTML"
@@ -293,9 +416,9 @@ class AsyncWebCrawler:
metadata = result.get("metadata", {})
if verbose:
print(
f"[LOG] 2⃣ ✅ Scraping done for {_url}, success: True, time taken: {time.perf_counter() - t1:.2f} seconds"
)
print(f"{Fore.MAGENTA}{self.tag_format('SCRAPE')} {self.log_icons['SCRAPE']} Processed {_url}{Style.RESET_ALL} | Time: {int((time.perf_counter() - t1) * 1000)}ms")
if extracted_content is None and extraction_strategy and chunking_strategy and not isinstance(extraction_strategy, NoExtractionStrategy):
t1 = time.perf_counter()
@@ -309,9 +432,9 @@ class AsyncWebCrawler:
extracted_content = extraction_strategy.run(url, sections)
extracted_content = json.dumps(extracted_content, indent=4, default=str, ensure_ascii=False)
if verbose:
print(
f"[LOG] 3⃣ ✅ Extraction done for {_url}, time taken: {time.perf_counter() - t1:.2f} seconds"
)
print(f"{Fore.YELLOW}{self.tag_format('EXTRACT')} {self.log_icons['EXTRACT']} Completed for {_url}{Style.RESET_ALL} | Time: {time.perf_counter() - t1:.2f}s{Style.RESET_ALL}")
screenshot = None if not screenshot else screenshot
@@ -332,13 +455,15 @@ class AsyncWebCrawler:
)
async def aclear_cache(self):
# await async_db_manager.aclear_db()
"""Clear the cache database."""
await async_db_manager.cleanup()
async def aflush_cache(self):
"""Flush the cache database."""
await async_db_manager.aflush_db()
async def aget_cache_size(self):
"""Get the total number of cached items."""
return await async_db_manager.aget_total_count()