feat(docker): add Docker Compose configurations for local and hub deployment; enhance GPU support checks in Dockerfile

feat(requirements): update requirements.txt to include snowballstemmer
fix(version_manager): correct version parsing to use __version__.__version__
feat(main): introduce chunking strategy and content filter in CrawlRequest model
feat(content_filter): enhance BM25 algorithm with priority tag scoring for improved content relevance
feat(logger): implement new async logger engine replacing print statements throughout library
fix(database): resolve version-related deadlock and circular lock issues in database operations
docs(docker): expand Docker deployment documentation with usage instructions for Docker Compose
This commit is contained in:
UncleCode
2024-11-18 21:00:06 +08:00
parent 152ac35bc2
commit 852729ff38
15 changed files with 952 additions and 232 deletions

View File

@@ -35,13 +35,15 @@ stealth_config = StealthConfig(
class ManagedBrowser:
def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False):
def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False, logger = None):
self.browser_type = browser_type
self.user_data_dir = user_data_dir
self.headless = headless
self.browser_process = None
self.temp_dir = None
self.debugging_port = 9222
self.logger = logger
self.shutting_down = False
async def start(self) -> str:
"""
@@ -76,15 +78,38 @@ class ManagedBrowser:
async def _monitor_browser_process(self):
"""Monitor the browser process for unexpected termination."""
if self.browser_process:
stdout, stderr = await asyncio.gather(
asyncio.to_thread(self.browser_process.stdout.read),
asyncio.to_thread(self.browser_process.stderr.read)
)
if self.browser_process.poll() is not None:
print(f"Browser process terminated unexpectedly with code {self.browser_process.returncode}")
print(f"STDOUT: {stdout.decode()}")
print(f"STDERR: {stderr.decode()}")
await self.cleanup()
try:
stdout, stderr = await asyncio.gather(
asyncio.to_thread(self.browser_process.stdout.read),
asyncio.to_thread(self.browser_process.stderr.read)
)
# Check shutting_down flag BEFORE logging anything
if self.browser_process.poll() is not None:
if not self.shutting_down:
self.logger.error(
message="Browser process terminated unexpectedly | Code: {code} | STDOUT: {stdout} | STDERR: {stderr}",
tag="ERROR",
params={
"code": self.browser_process.returncode,
"stdout": stdout.decode(),
"stderr": stderr.decode()
}
)
await self.cleanup()
else:
self.logger.info(
message="Browser process terminated normally | Code: {code}",
tag="INFO",
params={"code": self.browser_process.returncode}
)
except Exception as e:
if not self.shutting_down:
self.logger.error(
message="Error monitoring browser process: {error}",
tag="ERROR",
params={"error": str(e)}
)
def _get_browser_path(self) -> str:
"""Returns the browser executable path based on OS and browser type"""
@@ -134,20 +159,39 @@ class ManagedBrowser:
async def cleanup(self):
"""Cleanup browser process and temporary directory"""
# Set shutting_down flag BEFORE any termination actions
self.shutting_down = True
if self.browser_process:
try:
self.browser_process.terminate()
await asyncio.sleep(1)
# Wait for process to end gracefully
for _ in range(10): # 10 attempts, 100ms each
if self.browser_process.poll() is not None:
break
await asyncio.sleep(0.1)
# Force kill if still running
if self.browser_process.poll() is None:
self.browser_process.kill()
await asyncio.sleep(0.1) # Brief wait for kill to take effect
except Exception as e:
print(f"Error terminating browser: {e}")
self.logger.error(
message="Error terminating browser: {error}",
tag="ERROR",
params={"error": str(e)}
)
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except Exception as e:
print(f"Error removing temporary directory: {e}")
self.logger.error(
message="Error removing temporary directory: {error}",
tag="ERROR",
params={"error": str(e)}
)
class AsyncCrawlerStrategy(ABC):
@@ -172,7 +216,8 @@ class AsyncCrawlerStrategy(ABC):
pass
class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
def __init__(self, use_cached_html=False, js_code=None, **kwargs):
def __init__(self, use_cached_html=False, js_code=None, logger = None, **kwargs):
self.logger = logger
self.use_cached_html = use_cached_html
self.user_agent = kwargs.get(
"user_agent",
@@ -231,7 +276,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
self.managed_browser = ManagedBrowser(
browser_type=self.browser_type,
user_data_dir=self.user_data_dir,
headless=self.headless
headless=self.headless,
logger=self.logger
)
cdp_url = await self.managed_browser.start()
self.browser = await self.playwright.chromium.connect_over_cdp(cdp_url)
@@ -282,6 +328,10 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Add extra args if provided
if self.extra_args:
browser_args["args"].extend(self.extra_args)
# Add downloads path if downloads are enabled
if self.accept_downloads:
browser_args["downloads_path"] = self.downloads_path
# Add proxy settings if a proxy is specified
if self.proxy:
@@ -344,6 +394,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
self.browser = None
if self.managed_browser:
await asyncio.sleep(0.5)
await self.managed_browser.cleanup()
self.managed_browser = None
@@ -491,9 +542,19 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
}}
""")
else:
print(f"Warning: Could not access content frame for iframe {i}")
# print(f"Warning: Could not access content frame for iframe {i}")
self.logger.warning(
message="Could not access content frame for iframe {index}",
tag="SCRAPE",
params={"index": i}
)
except Exception as e:
print(f"Error processing iframe {i}: {str(e)}")
self.logger.error(
message="Error processing iframe {index}: {error}",
tag="ERROR",
params={"index": i, "error": str(e)}
)
# print(f"Error processing iframe {i}: {str(e)}")
# Return the page object
return page
@@ -620,7 +681,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
context = await self.browser.new_context(
user_agent=self.user_agent,
viewport={"width": 1920, "height": 1080},
proxy={"server": self.proxy} if self.proxy else None
proxy={"server": self.proxy} if self.proxy else None,
accept_downloads=self.accept_downloads,
)
await context.set_extra_http_headers(self.headers)
@@ -917,17 +979,31 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
suggested_filename = download.suggested_filename
download_path = os.path.join(self.downloads_path, suggested_filename)
if self.verbose:
print(f"[LOG] 📥 Downloading {suggested_filename} to {download_path}")
self.logger.info(
message="Downloading {filename} to {path}",
tag="FETCH",
params={"filename": suggested_filename, "path": download_path}
)
start_time = time.perf_counter()
await download.save_as(download_path)
end_time = time.perf_counter()
self._downloaded_files.append(download_path)
if self.verbose:
print(f"[LOG] ✅ Downloaded {suggested_filename} successfully")
self.logger.success(
message="Downloaded {filename} successfully",
tag="COMPLETE",
params={"filename": suggested_filename, "path": download_path, "duration": f"{end_time - start_time:.2f}s"}
)
except Exception as e:
if self.verbose:
print(f"[ERROR] Failed to handle download: {str(e)}")
self.logger.error(
message="Failed to handle download: {error}",
tag="ERROR",
params={"error": str(e)}
)
# if self.verbose:
# print(f"[ERROR] Failed to handle download: {str(e)}")
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
semaphore_count = kwargs.get('semaphore_count', 5) # Adjust as needed
@@ -1070,8 +1146,13 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
await page.evaluate(remove_overlays_js)
await page.wait_for_timeout(500) # Wait for any animations to complete
except Exception as e:
if self.verbose:
print(f"Warning: Failed to remove overlay elements: {str(e)}")
self.logger.warning(
message="Failed to remove overlay elements: {error}",
tag="SCRAPE",
params={"error": str(e)}
)
# if self.verbose:
# print(f"Warning: Failed to remove overlay elements: {str(e)}")
async def take_screenshot(self, page: Page) -> str:
"""
@@ -1089,7 +1170,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return base64.b64encode(screenshot).decode('utf-8')
except Exception as e:
error_message = f"Failed to take screenshot: {str(e)}"
print(error_message)
self.logger.error(
message="Screenshot failed: {error}",
tag="ERROR",
params={"error": error_message}
)
# Generate an error image
img = Image.new('RGB', (800, 600), color='black')
@@ -1123,7 +1209,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return base64.b64encode(screenshot).decode('utf-8')
except Exception as e:
error_message = f"Failed to take screenshot: {str(e)}"
print(error_message)
# print(error_message)
self.logger.error(
message="Screenshot failed: {error}",
tag="ERROR",
params={"error": error_message}
)
# Generate an error image
img = Image.new('RGB', (800, 600), color='black')

View File

@@ -12,10 +12,12 @@ import xxhash
import aiofiles
from .config import NEED_MIGRATION
from .version_manager import VersionManager
from .async_logger import AsyncLogger
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
base_directory = Path.home()
DB_PATH = os.path.join(Path.home(), ".crawl4ai")
os.makedirs(DB_PATH, exist_ok=True)
DB_PATH = os.path.join(DB_PATH, "crawl4ai.db")
@@ -28,15 +30,21 @@ class AsyncDatabaseManager:
self.max_retries = max_retries
self.connection_pool: Dict[int, aiosqlite.Connection] = {}
self.pool_lock = asyncio.Lock()
self.init_lock = asyncio.Lock()
self.connection_semaphore = asyncio.Semaphore(pool_size)
self._initialized = False
self.version_manager = VersionManager()
self.logger = AsyncLogger(
log_file=os.path.join(base_directory, ".crawl4ai", "crawler_db.log"),
verbose=False,
tag_width=10
)
async def initialize(self):
"""Initialize the database and connection pool"""
try:
logger.info("Initializing database...")
self.logger.info("Initializing database", tag="INIT")
# Ensure the database file exists
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
@@ -47,31 +55,39 @@ class AsyncDatabaseManager:
await self.ainit_db()
# Verify the table exists
async def verify_table(db):
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
async with db.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='crawled_data'"
) as cursor:
result = await cursor.fetchone()
if not result:
raise Exception("crawled_data table was not created")
await self.execute_with_retry(verify_table)
# If version changed or fresh install, run updates
if needs_update:
logger.info("New version detected, running updates...")
self.logger.info("New version detected, running updates", tag="INIT")
await self.update_db_schema()
from .migrations import run_migration # Import here to avoid circular imports
await run_migration()
self.version_manager.update_version() # Update stored version after successful migration
logger.info("Version update completed successfully")
self.logger.success("Version update completed successfully", tag="COMPLETE")
else:
logger.info("Database initialization completed successfully")
self.logger.success("Database initialization completed successfully", tag="COMPLETE")
except Exception as e:
logger.error(f"Database initialization error: {e}")
logger.info("Database will be initialized on first use")
self.logger.error(
message="Database initialization error: {error}",
tag="ERROR",
params={"error": str(e)}
)
self.logger.info(
message="Database will be initialized on first use",
tag="INIT"
)
raise
async def cleanup(self):
"""Cleanup connections when shutting down"""
@@ -84,34 +100,41 @@ class AsyncDatabaseManager:
async def get_connection(self):
"""Connection pool manager"""
if not self._initialized:
async with self.pool_lock: # Prevent multiple simultaneous initializations
if not self._initialized: # Double-check after acquiring lock
# Use an asyncio.Lock to ensure only one initialization occurs
async with self.init_lock:
if not self._initialized:
await self.initialize()
self._initialized = True
async with self.connection_semaphore:
task_id = id(asyncio.current_task())
try:
async with self.pool_lock:
if task_id not in self.connection_pool:
conn = await aiosqlite.connect(
self.db_path,
timeout=30.0
)
await conn.execute('PRAGMA journal_mode = WAL')
await conn.execute('PRAGMA busy_timeout = 5000')
self.connection_pool[task_id] = conn
yield self.connection_pool[task_id]
except Exception as e:
logger.error(f"Connection error: {e}")
raise
finally:
async with self.pool_lock:
if task_id in self.connection_pool:
await self.connection_pool[task_id].close()
del self.connection_pool[task_id]
await self.connection_semaphore.acquire()
task_id = id(asyncio.current_task())
try:
async with self.pool_lock:
if task_id not in self.connection_pool:
conn = await aiosqlite.connect(
self.db_path,
timeout=30.0
)
await conn.execute('PRAGMA journal_mode = WAL')
await conn.execute('PRAGMA busy_timeout = 5000')
self.connection_pool[task_id] = conn
yield self.connection_pool[task_id]
except Exception as e:
self.logger.error(
message="Connection error: {error}",
tag="ERROR",
force_verbose=True,
params={"error": str(e)}
)
raise
finally:
async with self.pool_lock:
if task_id in self.connection_pool:
await self.connection_pool[task_id].close()
del self.connection_pool[task_id]
self.connection_semaphore.release()
async def execute_with_retry(self, operation, *args):
@@ -124,13 +147,21 @@ class AsyncDatabaseManager:
return result
except Exception as e:
if attempt == self.max_retries - 1:
logger.error(f"Operation failed after {self.max_retries} attempts: {e}")
self.logger.error(
message="Operation failed after {retries} attempts: {error}",
tag="ERROR",
force_verbose=True,
params={
"retries": self.max_retries,
"error": str(e)
}
)
raise
await asyncio.sleep(1 * (attempt + 1)) # Exponential backoff
async def ainit_db(self):
"""Initialize database schema"""
async def _init(db):
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute('''
CREATE TABLE IF NOT EXISTS crawled_data (
url TEXT PRIMARY KEY,
@@ -147,36 +178,37 @@ class AsyncDatabaseManager:
downloaded_files TEXT DEFAULT "{}" -- New column added
)
''')
await self.execute_with_retry(_init)
await db.commit()
async def update_db_schema(self):
"""Update database schema if needed"""
async def _check_columns(db):
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
cursor = await db.execute("PRAGMA table_info(crawled_data)")
columns = await cursor.fetchall()
return [column[1] for column in columns]
column_names = [column[1] for column in columns]
# List of new columns to add
new_columns = ['media', 'links', 'metadata', 'screenshot', 'response_headers', 'downloaded_files']
for column in new_columns:
if column not in column_names:
await self.aalter_db_add_column(column, db)
await db.commit()
column_names = await self.execute_with_retry(_check_columns)
# List of new columns to add
new_columns = ['media', 'links', 'metadata', 'screenshot', 'response_headers', 'downloaded_files']
for column in new_columns:
if column not in column_names:
await self.aalter_db_add_column(column)
async def aalter_db_add_column(self, new_column: str):
async def aalter_db_add_column(self, new_column: str, db):
"""Add new column to the database"""
async def _alter(db):
if new_column == 'response_headers':
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT "{{}}"')
else:
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
logger.info(f"Added column '{new_column}' to the database.")
if new_column == 'response_headers':
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT "{{}}"')
else:
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
self.logger.info(
message="Added column '{column}' to the database",
tag="INIT",
params={"column": new_column}
)
await self.execute_with_retry(_alter)
async def aget_cached_url(self, url: str) -> Optional[CrawlResult]:
"""Retrieve cached URL data as CrawlResult"""
@@ -235,7 +267,12 @@ class AsyncDatabaseManager:
try:
return await self.execute_with_retry(_get)
except Exception as e:
logger.error(f"Error retrieving cached URL: {e}")
self.logger.error(
message="Error retrieving cached URL: {error}",
tag="ERROR",
force_verbose=True,
params={"error": str(e)}
)
return None
async def acache_url(self, result: CrawlResult):
@@ -291,7 +328,13 @@ class AsyncDatabaseManager:
try:
await self.execute_with_retry(_cache)
except Exception as e:
logger.error(f"Error caching URL: {e}")
self.logger.error(
message="Error caching URL: {error}",
tag="ERROR",
force_verbose=True,
params={"error": str(e)}
)
async def aget_total_count(self) -> int:
"""Get total number of cached URLs"""
@@ -303,7 +346,12 @@ class AsyncDatabaseManager:
try:
return await self.execute_with_retry(_count)
except Exception as e:
logger.error(f"Error getting total count: {e}")
self.logger.error(
message="Error getting total count: {error}",
tag="ERROR",
force_verbose=True,
params={"error": str(e)}
)
return 0
async def aclear_db(self):
@@ -314,7 +362,12 @@ class AsyncDatabaseManager:
try:
await self.execute_with_retry(_clear)
except Exception as e:
logger.error(f"Error clearing database: {e}")
self.logger.error(
message="Error clearing database: {error}",
tag="ERROR",
force_verbose=True,
params={"error": str(e)}
)
async def aflush_db(self):
"""Drop the entire table"""
@@ -324,7 +377,12 @@ class AsyncDatabaseManager:
try:
await self.execute_with_retry(_flush)
except Exception as e:
logger.error(f"Error flushing database: {e}")
self.logger.error(
message="Error flushing database: {error}",
tag="ERROR",
force_verbose=True,
params={"error": str(e)}
)
async def _store_content(self, content: str, content_type: str) -> str:
@@ -352,7 +410,12 @@ class AsyncDatabaseManager:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
return await f.read()
except:
logger.error(f"Failed to load content: {file_path}")
self.logger.error(
message="Failed to load content: {file_path}",
tag="ERROR",
force_verbose=True,
params={"file_path": file_path}
)
return None
# Create a singleton instance

231
crawl4ai/async_logger.py Normal file
View File

@@ -0,0 +1,231 @@
from enum import Enum
from typing import Optional, Dict, Any, Union
from colorama import Fore, Back, Style, init
import time
import os
from datetime import datetime
class LogLevel(Enum):
DEBUG = 1
INFO = 2
SUCCESS = 3
WARNING = 4
ERROR = 5
class AsyncLogger:
"""
Asynchronous logger with support for colored console output and file logging.
Supports templated messages with colored components.
"""
DEFAULT_ICONS = {
'INIT': '',
'READY': '',
'FETCH': '',
'SCRAPE': '',
'EXTRACT': '',
'COMPLETE': '',
'ERROR': '×',
'DEBUG': '',
'INFO': '',
'WARNING': '',
}
DEFAULT_COLORS = {
LogLevel.DEBUG: Fore.LIGHTBLACK_EX,
LogLevel.INFO: Fore.CYAN,
LogLevel.SUCCESS: Fore.GREEN,
LogLevel.WARNING: Fore.YELLOW,
LogLevel.ERROR: Fore.RED,
}
def __init__(
self,
log_file: Optional[str] = None,
log_level: LogLevel = LogLevel.INFO,
tag_width: int = 10,
icons: Optional[Dict[str, str]] = None,
colors: Optional[Dict[LogLevel, str]] = None,
verbose: bool = True
):
"""
Initialize the logger.
Args:
log_file: Optional file path for logging
log_level: Minimum log level to display
tag_width: Width for tag formatting
icons: Custom icons for different tags
colors: Custom colors for different log levels
verbose: Whether to output to console
"""
init() # Initialize colorama
self.log_file = log_file
self.log_level = log_level
self.tag_width = tag_width
self.icons = icons or self.DEFAULT_ICONS
self.colors = colors or self.DEFAULT_COLORS
self.verbose = verbose
# Create log file directory if needed
if log_file:
os.makedirs(os.path.dirname(os.path.abspath(log_file)), exist_ok=True)
def _format_tag(self, tag: str) -> str:
"""Format a tag with consistent width."""
return f"[{tag}]".ljust(self.tag_width, ".")
def _get_icon(self, tag: str) -> str:
"""Get the icon for a tag, defaulting to info icon if not found."""
return self.icons.get(tag, self.icons['INFO'])
def _write_to_file(self, message: str):
"""Write a message to the log file if configured."""
if self.log_file:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
with open(self.log_file, 'a', encoding='utf-8') as f:
# Strip ANSI color codes for file output
clean_message = message.replace(Fore.RESET, '').replace(Style.RESET_ALL, '')
for color in vars(Fore).values():
if isinstance(color, str):
clean_message = clean_message.replace(color, '')
f.write(f"[{timestamp}] {clean_message}\n")
def _log(
self,
level: LogLevel,
message: str,
tag: str,
params: Optional[Dict[str, Any]] = None,
colors: Optional[Dict[str, str]] = None,
base_color: Optional[str] = None,
**kwargs
):
"""
Core logging method that handles message formatting and output.
Args:
level: Log level for this message
message: Message template string
tag: Tag for the message
params: Parameters to format into the message
colors: Color overrides for specific parameters
base_color: Base color for the entire message
"""
if level.value < self.log_level.value:
return
# Format the message with parameters if provided
if params:
try:
# First format the message with raw parameters
formatted_message = message.format(**params)
# Then apply colors if specified
if colors:
for key, color in colors.items():
# Find the formatted value in the message and wrap it with color
if key in params:
value_str = str(params[key])
formatted_message = formatted_message.replace(
value_str,
f"{color}{value_str}{Style.RESET_ALL}"
)
except KeyError as e:
formatted_message = f"LOGGING ERROR: Missing parameter {e} in message template"
level = LogLevel.ERROR
else:
formatted_message = message
# Construct the full log line
color = base_color or self.colors[level]
log_line = f"{color}{self._format_tag(tag)} {self._get_icon(tag)} {formatted_message}{Style.RESET_ALL}"
# Output to console if verbose
if self.verbose or kwargs.get("force_verbose", False):
print(log_line)
# Write to file if configured
self._write_to_file(log_line)
def debug(self, message: str, tag: str = "DEBUG", **kwargs):
"""Log a debug message."""
self._log(LogLevel.DEBUG, message, tag, **kwargs)
def info(self, message: str, tag: str = "INFO", **kwargs):
"""Log an info message."""
self._log(LogLevel.INFO, message, tag, **kwargs)
def success(self, message: str, tag: str = "SUCCESS", **kwargs):
"""Log a success message."""
self._log(LogLevel.SUCCESS, message, tag, **kwargs)
def warning(self, message: str, tag: str = "WARNING", **kwargs):
"""Log a warning message."""
self._log(LogLevel.WARNING, message, tag, **kwargs)
def error(self, message: str, tag: str = "ERROR", **kwargs):
"""Log an error message."""
self._log(LogLevel.ERROR, message, tag, **kwargs)
def url_status(
self,
url: str,
success: bool,
timing: float,
tag: str = "FETCH",
url_length: int = 50
):
"""
Convenience method for logging URL fetch status.
Args:
url: The URL being processed
success: Whether the operation was successful
timing: Time taken for the operation
tag: Tag for the message
url_length: Maximum length for URL in log
"""
self._log(
level=LogLevel.SUCCESS if success else LogLevel.ERROR,
message="{url:.{url_length}}... | Status: {status} | Time: {timing:.2f}s",
tag=tag,
params={
"url": url,
"url_length": url_length,
"status": success,
"timing": timing
},
colors={
"status": Fore.GREEN if success else Fore.RED,
"timing": Fore.YELLOW
}
)
def error_status(
self,
url: str,
error: str,
tag: str = "ERROR",
url_length: int = 50
):
"""
Convenience method for logging error status.
Args:
url: The URL being processed
error: Error message
tag: Tag for the message
url_length: Maximum length for URL in log
"""
self._log(
level=LogLevel.ERROR,
message="{url:.{url_length}}... | Error: {error}",
tag=tag,
params={
"url": url,
"url_length": url_length,
"error": error
}
)

View File

@@ -15,6 +15,7 @@ from .extraction_strategy import *
from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy, AsyncCrawlResponse
from .cache_context import CacheMode, CacheContext, _legacy_to_cache_mode
from .content_scrapping_strategy import WebScrapingStrategy
from .async_logger import AsyncLogger
from .config import (
MIN_WORD_THRESHOLD,
@@ -74,19 +75,29 @@ class AsyncWebCrawler:
always_by_pass_cache: Deprecated, use always_bypass_cache instead
base_directory: Base directory for storing cache
"""
init()
self.log_width = 10 # Width of "[COMPLETE]"
self.tag_format = lambda tag: f"[{tag}]".ljust(self.log_width, ".")
self.log_icons = {
'INIT': '', # Alternative: '▶' or '►'
'READY': '', # Alternative: '√'
'FETCH': '', # Alternative: '▼'
'SCRAPE': '', # Alternative: '♦'
'EXTRACT': '', # Alternative: '□'
'COMPLETE': '', # Alternative: '○'
'ERROR': '×'
}
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(**kwargs)
# init()
# self.log_width = 10 # Width of "[COMPLETE]"
# self.tag_format = lambda tag: f"[{tag}]".ljust(self.log_width, ".")
# self.log_icons = {
# 'INIT': '→', # Alternative: '▶' or '►'
# 'READY': '✓', # Alternative: '√'
# 'FETCH': '↓', # Alternative: '▼'
# 'SCRAPE': '◆', # Alternative: '♦'
# 'EXTRACT': '■', # Alternative: '□'
# 'COMPLETE': '●', # Alternative: '○'
# 'ERROR': '×'
# }
self.verbose = kwargs.get("verbose", False)
self.logger = AsyncLogger(
log_file=os.path.join(base_directory, ".crawl4ai", "crawler.log"),
verbose=self.verbose,
tag_width=10
)
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
logger = self.logger,
**kwargs
)
# Handle deprecated parameter
if always_by_pass_cache is not None:
@@ -118,12 +129,13 @@ class AsyncWebCrawler:
async def awarmup(self):
"""Initialize the crawler with warm-up sequence."""
if self.verbose:
print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Crawl4AI {crawl4ai_version}{Style.RESET_ALL}")
print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Warming up AsyncWebCrawler{Style.RESET_ALL}")
self.logger.info(f"Crawl4AI {crawl4ai_version}", tag="INIT")
# if self.verbose:
# print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Crawl4AI {crawl4ai_version}{Style.RESET_ALL}")
# print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Warming up AsyncWebCrawler{Style.RESET_ALL}")
self.ready = True
if self.verbose:
print(f"{Fore.GREEN}{self.tag_format('READY')} {self.log_icons['READY']} AsyncWebCrawler initialized{Style.RESET_ALL}")
# if self.verbose:
# print(f"{Fore.GREEN}{self.tag_format('READY')} {self.log_icons['READY']} AsyncWebCrawler initialized{Style.RESET_ALL}")
async def arun(
self,
@@ -234,8 +246,14 @@ class AsyncWebCrawler:
screenshot_data = cached_result.screenshot
if not screenshot_data:
cached_result = None
if verbose:
print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Cache hit for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {time.perf_counter() - start_time:.2f}s")
# if verbose:
# print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Cache hit for {cache_context.display_url} | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {time.perf_counter() - start_time:.2f}s")
self.logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=time.perf_counter() - start_time,
tag="FETCH"
)
# Fetch fresh content if needed
@@ -252,8 +270,14 @@ class AsyncWebCrawler:
html = sanitize_input_encode(async_response.html)
screenshot_data = async_response.screenshot
t2 = time.perf_counter()
if verbose:
print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Live fetch for {cache_context.display_url}... | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {t2 - t1:.2f}s")
self.logger.url_status(
url=cache_context.display_url,
success=bool(html),
timing=t2 - t1,
tag="FETCH"
)
# if verbose:
# print(f"{Fore.BLUE}{self.tag_format('FETCH')} {self.log_icons['FETCH']} Live fetch for {cache_context.display_url}... | Status: {Fore.GREEN if bool(html) else Fore.RED}{bool(html)}{Style.RESET_ALL} | Time: {t2 - t1:.2f}s")
# Process the HTML content
crawl_result = await self.aprocess_html(
@@ -287,9 +311,21 @@ class AsyncWebCrawler:
crawl_result.success = bool(html)
crawl_result.session_id = kwargs.get("session_id", None)
if verbose:
print(f"{Fore.GREEN}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | Status: {Fore.GREEN if crawl_result.success else Fore.RED}{crawl_result.success} | {Fore.YELLOW}Total: {time.perf_counter() - start_time:.2f}s{Style.RESET_ALL}")
# if verbose:
# print(f"{Fore.GREEN}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | Status: {Fore.GREEN if crawl_result.success else Fore.RED}{crawl_result.success} | {Fore.YELLOW}Total: {time.perf_counter() - start_time:.2f}s{Style.RESET_ALL}")
self.logger.success(
message="{url:.50}... | Status: {status} | Total: {timing}",
tag="COMPLETE",
params={
"url": cache_context.display_url,
"status": crawl_result.success,
"timing": f"{time.perf_counter() - start_time:.2f}s"
},
colors={
"status": Fore.GREEN if crawl_result.success else Fore.RED,
"timing": Fore.YELLOW
}
)
# Update cache if appropriate
if cache_context.should_write() and not bool(cached_result):
@@ -300,7 +336,12 @@ class AsyncWebCrawler:
except Exception as e:
if not hasattr(e, "msg"):
e.msg = str(e)
print(f"{Fore.RED}{self.tag_format('ERROR')} {self.log_icons['ERROR']} Failed to crawl {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | {e.msg}{Style.RESET_ALL}")
# print(f"{Fore.RED}{self.tag_format('ERROR')} {self.log_icons['ERROR']} Failed to crawl {cache_context.display_url[:URL_LOG_SHORTEN_LENGTH]}... | {e.msg}{Style.RESET_ALL}")
self.logger.error_status(
url=cache_context.display_url,
error=e.msg,
tag="ERROR"
)
return CrawlResult(
url=url,
html="",
@@ -362,7 +403,12 @@ class AsyncWebCrawler:
domain = urlparse(url).netloc
current_time = time.time()
print(f"{Fore.LIGHTBLACK_EX}{self.tag_format('PARALLEL')} Started task for {url[:50]}...{Style.RESET_ALL}")
# print(f"{Fore.LIGHTBLACK_EX}{self.tag_format('PARALLEL')} Started task for {url[:50]}...{Style.RESET_ALL}")
self.logger.debug(
message="Started task for {url:.50}...",
tag="PARALLEL",
params={"url": url}
)
# Get delay settings from kwargs or use defaults
mean_delay = kwargs.get('mean_delay', 0.1) # 0.5 seconds default mean delay
@@ -394,12 +440,26 @@ class AsyncWebCrawler:
)
# Print start message
print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Starting concurrent crawling for {len(urls)} URLs...{Style.RESET_ALL}")
# print(f"{Fore.CYAN}{self.tag_format('INIT')} {self.log_icons['INIT']} Starting concurrent crawling for {len(urls)} URLs...{Style.RESET_ALL}")
self.logger.info(
message="Starting concurrent crawling for {count} URLs...",
tag="INIT",
params={"count": len(urls)}
)
start_time = time.perf_counter()
tasks = [crawl_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
print(f"{Fore.YELLOW}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} Concurrent crawling completed for {len(urls)} URLs | Total time: {end_time - start_time:.2f}s{Style.RESET_ALL}")
# print(f"{Fore.YELLOW}{self.tag_format('COMPLETE')} {self.log_icons['COMPLETE']} Concurrent crawling completed for {len(urls)} URLs | Total time: {end_time - start_time:.2f}s{Style.RESET_ALL}")
self.logger.success(
message="Concurrent crawling completed for {count} URLs | " + Fore.YELLOW + " Total time: {timing}" + Style.RESET_ALL,
tag="COMPLETE",
params={
"count": len(urls),
"timing": f"{end_time - start_time:.2f}s"
},
colors={"timing": Fore.YELLOW}
)
return [result if not isinstance(result, Exception) else str(result) for result in results]
@@ -451,9 +511,16 @@ class AsyncWebCrawler:
links = result.get("links", [])
metadata = result.get("metadata", {})
if verbose:
print(f"{Fore.MAGENTA}{self.tag_format('SCRAPE')} {self.log_icons['SCRAPE']} Processed {_url[:URL_LOG_SHORTEN_LENGTH]}...{Style.RESET_ALL} | Time: {int((time.perf_counter() - t1) * 1000)}ms")
# if verbose:
# print(f"{Fore.MAGENTA}{self.tag_format('SCRAPE')} {self.log_icons['SCRAPE']} Processed {_url[:URL_LOG_SHORTEN_LENGTH]}...{Style.RESET_ALL} | Time: {int((time.perf_counter() - t1) * 1000)}ms")
self.logger.info(
message="Processed {url:.50}... | Time: {timing}ms",
tag="SCRAPE",
params={
"url": _url,
"timing": int((time.perf_counter() - t1) * 1000)
}
)
if extracted_content is None and extraction_strategy and chunking_strategy and not isinstance(extraction_strategy, NoExtractionStrategy):
@@ -467,8 +534,17 @@ class AsyncWebCrawler:
sections = chunking_strategy.chunk(markdown)
extracted_content = extraction_strategy.run(url, sections)
extracted_content = json.dumps(extracted_content, indent=4, default=str, ensure_ascii=False)
if verbose:
print(f"{Fore.YELLOW}{self.tag_format('EXTRACT')} {self.log_icons['EXTRACT']} Completed for {_url[:URL_LOG_SHORTEN_LENGTH]}...{Style.RESET_ALL} | Time: {time.perf_counter() - t1:.2f}s{Style.RESET_ALL}")
# if verbose:
# print(f"{Fore.YELLOW}{self.tag_format('EXTRACT')} {self.log_icons['EXTRACT']} Completed for {_url[:URL_LOG_SHORTEN_LENGTH]}...{Style.RESET_ALL} | Time: {time.perf_counter() - t1:.2f}s{Style.RESET_ALL}")
self.logger.info(
message="Completed for {url:.50}... | Time: {timing}s",
tag="EXTRACT",
params={
"url": _url,
"timing": time.perf_counter() - t1
}
)

View File

@@ -8,6 +8,10 @@ from bs4 import BeautifulSoup, NavigableString, Tag
from .utils import clean_tokens
from abc import ABC, abstractmethod
from snowballstemmer import stemmer
# from nltk.stem import PorterStemmer
# ps = PorterStemmer()
class RelevantContentFilter(ABC):
def __init__(self, user_query: str = None):
self.user_query = user_query
@@ -252,7 +256,7 @@ class RelevantContentFilter(ABC):
return str(tag) # Fallback to original if anything fails
class BM25ContentFilter(RelevantContentFilter):
def __init__(self, user_query: str = None, bm25_threshold: float = 1.0):
def __init__(self, user_query: str = None, bm25_threshold: float = 1.0, language: str = 'english'):
super().__init__(user_query=user_query)
self.bm25_threshold = bm25_threshold
self.priority_tags = {
@@ -268,6 +272,7 @@ class BM25ContentFilter(RelevantContentFilter):
'pre': 1.5,
'th': 1.5, # Table headers
}
self.stemmer = stemmer(language)
def filter_content(self, html: str) -> List[str]:
"""Implements content filtering using BM25 algorithm with priority tag handling"""
@@ -282,58 +287,42 @@ class BM25ContentFilter(RelevantContentFilter):
if not candidates:
return []
# Split into priority and regular candidates
priority_candidates = []
regular_candidates = []
# Tokenize corpus
# tokenized_corpus = [chunk.lower().split() for _, chunk, _, _ in candidates]
# tokenized_query = query.lower().split()
# tokenized_corpus = [[ps.stem(word) for word in chunk.lower().split()]
# for _, chunk, _, _ in candidates]
# tokenized_query = [ps.stem(word) for word in query.lower().split()]
for index, chunk, tag_type, tag in candidates:
if tag.name in self.priority_tags:
priority_candidates.append((index, chunk, tag_type, tag))
else:
regular_candidates.append((index, chunk, tag_type, tag))
tokenized_corpus = [[self.stemmer.stemWord(word) for word in chunk.lower().split()]
for _, chunk, _, _ in candidates]
tokenized_query = [self.stemmer.stemWord(word) for word in query.lower().split()]
# Process regular content with BM25
tokenized_corpus = [chunk.lower().split() for _, chunk, _, _ in regular_candidates]
tokenized_query = query.lower().split()
# Clean from stop words and noise
tokenized_corpus = [clean_tokens(tokens) for tokens in tokenized_corpus]
tokenized_query = clean_tokens(tokenized_query)
bm25 = BM25Okapi(tokenized_corpus)
scores = bm25.get_scores(tokenized_query)
# Score and boost regular candidates
scored_candidates = [
(score * self.priority_tags.get(tag.name, 1.0), index, chunk, tag_type, tag)
for score, (index, chunk, tag_type, tag) in zip(scores, regular_candidates)
# Adjust scores with tag weights
adjusted_candidates = []
for score, (index, chunk, tag_type, tag) in zip(scores, candidates):
tag_weight = self.priority_tags.get(tag.name, 1.0)
adjusted_score = score * tag_weight
adjusted_candidates.append((adjusted_score, index, chunk, tag))
# Filter candidates by threshold
selected_candidates = [
(index, chunk, tag) for adjusted_score, index, chunk, tag in adjusted_candidates
if adjusted_score >= self.bm25_threshold
]
scored_candidates.sort(key=lambda x: x[0], reverse=True)
# Process scored candidates
selected_tags = set()
selected_candidates = []
# First add all priority candidates
for index, chunk, tag_type, tag in priority_candidates:
tag_id = id(tag)
if tag_id not in selected_tags:
selected_candidates.append((index, chunk, tag))
selected_tags.add(tag_id)
# Then add scored regular candidates that meet threshold
for score, index, chunk, tag_type, tag in scored_candidates:
if score < self.bm25_threshold:
continue
tag_id = id(tag)
if tag_id not in selected_tags:
selected_candidates.append((index, chunk, tag))
selected_tags.add(tag_id)
if not selected_candidates:
return []
# Sort by original document order
# Sort selected candidates by original document order
selected_candidates.sort(key=lambda x: x[0])
return [self.clean_element(tag) for _, _, tag in selected_candidates]
return [self.clean_element(tag) for _, _, tag in selected_candidates]

View File

@@ -149,6 +149,15 @@ class ContentScrapingStrategy(ABC):
pass
class WebScrapingStrategy(ContentScrapingStrategy):
def __init__(self, logger=None):
self.logger = logger
def _log(self, level, message, tag="SCRAPE", **kwargs):
"""Helper method to safely use logger."""
if self.logger:
log_method = getattr(self.logger, level)
log_method(message=message, tag=tag, **kwargs)
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
return self._get_content_of_website_optimized(url, html, is_async=False, **kwargs)
@@ -167,7 +176,12 @@ class WebScrapingStrategy(ContentScrapingStrategy):
try:
meta = extract_metadata("", soup)
except Exception as e:
print('Error extracting metadata:', str(e))
self._log('error',
message="Error extracting metadata: {error}",
tag="SCRAPE",
params={"error": str(e)}
)
# print('Error extracting metadata:', str(e))
meta = {}
@@ -430,9 +444,12 @@ class WebScrapingStrategy(ContentScrapingStrategy):
try:
remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False))
except Exception as e:
print('Error removing unwanted attributes:', str(e))
# print('Error removing unwanted attributes:', str(e))
self._log('error',
message="Error removing unwanted attributes: {error}",
tag="SCRAPE",
params={"error": str(e)}
)
# Process children
for child in list(element.children):
if isinstance(child, NavigableString) and not isinstance(child, Comment):
@@ -453,7 +470,12 @@ class WebScrapingStrategy(ContentScrapingStrategy):
return keep_element
except Exception as e:
print('Error processing element:', str(e))
# print('Error processing element:', str(e))
self._log('error',
message="Error processing element: {error}",
tag="SCRAPE",
params={"error": str(e)}
)
return False
process_element(body)
@@ -516,7 +538,10 @@ class WebScrapingStrategy(ContentScrapingStrategy):
str_body = body.encode_contents().decode('utf-8')
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.")
self._log('error',
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.",
tag="SCRAPE"
)
cleaned_html = str_body.replace('\n\n', '\n').replace(' ', ' ')
@@ -525,6 +550,13 @@ class WebScrapingStrategy(ContentScrapingStrategy):
h.update_params(**kwargs.get('html2text', {}))
markdown = h.handle(cleaned_html)
except Exception as e:
if not h:
h = CustomHTML2Text()
self._log('error',
message="Error converting HTML to markdown: {error}",
tag="SCRAPE",
params={"error": str(e)}
)
markdown = h.handle(sanitize_html(cleaned_html))
markdown = markdown.replace(' ```', '```')

View File

@@ -20,11 +20,11 @@ class VersionManager:
def update_version(self):
"""Update the version file to current library version"""
self.version_file.write_text(__version__)
self.version_file.write_text(__version__.__version__)
def needs_update(self):
"""Check if database needs update based on version"""
installed = self.get_installed_version()
current = version.parse(__version__)
current = version.parse(__version__.__version__)
return installed is None or installed < current