Merge branch 'next' into 2025-MAR-ALPHA-1
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -255,3 +255,6 @@ continue_config.json
|
||||
|
||||
.llm.env
|
||||
.private/
|
||||
|
||||
CLAUDE_MONITOR.md
|
||||
CLAUDE.md
|
||||
33
CHANGELOG.md
33
CHANGELOG.md
@@ -5,6 +5,39 @@ All notable changes to Crawl4AI will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## Version 0.5.0.post5 (2025-03-14)
|
||||
|
||||
### Added
|
||||
|
||||
- *(crawler)* Add experimental parameters dictionary to CrawlerRunConfig to support beta features
|
||||
- *(tables)* Add comprehensive table detection and extraction functionality with scoring system
|
||||
- *(monitor)* Add real-time crawler monitoring system with memory management
|
||||
- *(content)* Add target_elements parameter for selective content extraction
|
||||
- *(browser)* Add standalone CDP browser launch capability
|
||||
- *(schema)* Add preprocess_html_for_schema utility for better HTML cleaning
|
||||
- *(api)* Add special handling for single URL requests in Docker API
|
||||
|
||||
### Changed
|
||||
|
||||
- *(filters)* Add reverse option to URLPatternFilter for inverting filter logic
|
||||
- *(browser)* Make CSP nonce headers optional via experimental config
|
||||
- *(browser)* Remove default cookie injection from page initialization
|
||||
- *(crawler)* Optimize response handling for single-URL processing
|
||||
- *(api)* Refactor crawl request handling to streamline processing
|
||||
- *(config)* Update default provider to gpt-4o
|
||||
- *(cache)* Change default cache_mode from aggressive to bypass in examples
|
||||
|
||||
### Fixed
|
||||
|
||||
- *(browser)* Clean up browser context creation code
|
||||
- *(api)* Improve code formatting in API handler
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
- WebScrapingStrategy no longer returns 'scraped_html' in its output dictionary
|
||||
- Table extraction logic has been modified to better handle thead/tbody structures
|
||||
- Default cookie injection has been removed from page initialization
|
||||
|
||||
## Version 0.5.0 (2025-03-02)
|
||||
|
||||
### Added
|
||||
|
||||
@@ -33,13 +33,12 @@ from .content_filter_strategy import (
|
||||
LLMContentFilter,
|
||||
RelevantContentFilter,
|
||||
)
|
||||
from .models import CrawlResult, MarkdownGenerationResult
|
||||
from .models import CrawlResult, MarkdownGenerationResult, DisplayMode
|
||||
from .components.crawler_monitor import CrawlerMonitor
|
||||
from .async_dispatcher import (
|
||||
MemoryAdaptiveDispatcher,
|
||||
SemaphoreDispatcher,
|
||||
RateLimiter,
|
||||
CrawlerMonitor,
|
||||
DisplayMode,
|
||||
BaseDispatcher,
|
||||
)
|
||||
from .docker_client import Crawl4aiDockerClient
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import os
|
||||
from .config import (
|
||||
DEFAULT_PROVIDER,
|
||||
DEFAULT_PROVIDER_API_KEY,
|
||||
MIN_WORD_THRESHOLD,
|
||||
IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD,
|
||||
PROVIDER_MODELS,
|
||||
@@ -649,6 +650,12 @@ class CrawlerRunConfig():
|
||||
user_agent_generator_config (dict or None): Configuration for user agent generation if user_agent_mode is set.
|
||||
Default: None.
|
||||
|
||||
# Experimental Parameters
|
||||
experimental (dict): Dictionary containing experimental parameters that are in beta phase.
|
||||
This allows passing temporary features that are not yet fully integrated
|
||||
into the main parameter set.
|
||||
Default: None.
|
||||
|
||||
url: str = None # This is not a compulsory parameter
|
||||
"""
|
||||
|
||||
@@ -731,6 +738,8 @@ class CrawlerRunConfig():
|
||||
user_agent_generator_config: dict = {},
|
||||
# Deep Crawl Parameters
|
||||
deep_crawl_strategy: Optional[DeepCrawlStrategy] = None,
|
||||
# Experimental Parameters
|
||||
experimental: Dict[str, Any] = None,
|
||||
):
|
||||
# TODO: Planning to set properties dynamically based on the __init__ signature
|
||||
self.url = url
|
||||
@@ -844,6 +853,9 @@ class CrawlerRunConfig():
|
||||
|
||||
# Deep Crawl Parameters
|
||||
self.deep_crawl_strategy = deep_crawl_strategy
|
||||
|
||||
# Experimental Parameters
|
||||
self.experimental = experimental or {}
|
||||
|
||||
|
||||
def __getattr__(self, name):
|
||||
@@ -952,6 +964,8 @@ class CrawlerRunConfig():
|
||||
# Deep Crawl Parameters
|
||||
deep_crawl_strategy=kwargs.get("deep_crawl_strategy"),
|
||||
url=kwargs.get("url"),
|
||||
# Experimental Parameters
|
||||
experimental=kwargs.get("experimental"),
|
||||
)
|
||||
|
||||
# Create a funciton returns dict of the object
|
||||
@@ -1036,6 +1050,7 @@ class CrawlerRunConfig():
|
||||
"user_agent_generator_config": self.user_agent_generator_config,
|
||||
"deep_crawl_strategy": self.deep_crawl_strategy,
|
||||
"url": self.url,
|
||||
"experimental": self.experimental,
|
||||
}
|
||||
|
||||
def clone(self, **kwargs):
|
||||
@@ -1071,6 +1086,13 @@ class LLMConfig:
|
||||
provider: str = DEFAULT_PROVIDER,
|
||||
api_token: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
temprature: Optional[float] = None,
|
||||
max_tokens: Optional[int] = None,
|
||||
top_p: Optional[float] = None,
|
||||
frequency_penalty: Optional[float] = None,
|
||||
presence_penalty: Optional[float] = None,
|
||||
stop: Optional[List[str]] = None,
|
||||
n: Optional[int] = None,
|
||||
):
|
||||
"""Configuaration class for LLM provider and API token."""
|
||||
self.provider = provider
|
||||
@@ -1080,10 +1102,16 @@ class LLMConfig:
|
||||
self.api_token = os.getenv(api_token[4:])
|
||||
else:
|
||||
self.api_token = PROVIDER_MODELS.get(provider, "no-token") or os.getenv(
|
||||
"OPENAI_API_KEY"
|
||||
DEFAULT_PROVIDER_API_KEY
|
||||
)
|
||||
self.base_url = base_url
|
||||
|
||||
self.temprature = temprature
|
||||
self.max_tokens = max_tokens
|
||||
self.top_p = top_p
|
||||
self.frequency_penalty = frequency_penalty
|
||||
self.presence_penalty = presence_penalty
|
||||
self.stop = stop
|
||||
self.n = n
|
||||
|
||||
@staticmethod
|
||||
def from_kwargs(kwargs: dict) -> "LLMConfig":
|
||||
@@ -1091,13 +1119,27 @@ class LLMConfig:
|
||||
provider=kwargs.get("provider", DEFAULT_PROVIDER),
|
||||
api_token=kwargs.get("api_token"),
|
||||
base_url=kwargs.get("base_url"),
|
||||
temprature=kwargs.get("temprature"),
|
||||
max_tokens=kwargs.get("max_tokens"),
|
||||
top_p=kwargs.get("top_p"),
|
||||
frequency_penalty=kwargs.get("frequency_penalty"),
|
||||
presence_penalty=kwargs.get("presence_penalty"),
|
||||
stop=kwargs.get("stop"),
|
||||
n=kwargs.get("n")
|
||||
)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"provider": self.provider,
|
||||
"api_token": self.api_token,
|
||||
"base_url": self.base_url
|
||||
"base_url": self.base_url,
|
||||
"temprature": self.temprature,
|
||||
"max_tokens": self.max_tokens,
|
||||
"top_p": self.top_p,
|
||||
"frequency_penalty": self.frequency_penalty,
|
||||
"presence_penalty": self.presence_penalty,
|
||||
"stop": self.stop,
|
||||
"n": self.n
|
||||
}
|
||||
|
||||
def clone(self, **kwargs):
|
||||
|
||||
@@ -507,10 +507,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
# Get page for session
|
||||
page, context = await self.browser_manager.get_page(crawlerRunConfig=config)
|
||||
|
||||
# await page.goto(URL)
|
||||
|
||||
# Add default cookie
|
||||
await context.add_cookies(
|
||||
[{"name": "cookiesEnabled", "value": "true", "url": url}]
|
||||
)
|
||||
# await context.add_cookies(
|
||||
# [{"name": "cookiesEnabled", "value": "true", "url": url}]
|
||||
# )
|
||||
|
||||
# Handle navigator overrides
|
||||
if config.override_navigator or config.simulate_user or config.magic:
|
||||
@@ -562,14 +564,15 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
|
||||
|
||||
try:
|
||||
# Generate a unique nonce for this request
|
||||
nonce = hashlib.sha256(os.urandom(32)).hexdigest()
|
||||
if config.experimental.get("use_csp_nonce", False):
|
||||
nonce = hashlib.sha256(os.urandom(32)).hexdigest()
|
||||
|
||||
# Add CSP headers to the request
|
||||
await page.set_extra_http_headers(
|
||||
{
|
||||
"Content-Security-Policy": f"default-src 'self'; script-src 'self' 'nonce-{nonce}' 'strict-dynamic'"
|
||||
}
|
||||
)
|
||||
# Add CSP headers to the request
|
||||
await page.set_extra_http_headers(
|
||||
{
|
||||
"Content-Security-Policy": f"default-src 'self'; script-src 'self' 'nonce-{nonce}' 'strict-dynamic'"
|
||||
}
|
||||
)
|
||||
|
||||
response = await page.goto(
|
||||
url, wait_until=config.wait_until, timeout=config.page_timeout
|
||||
|
||||
@@ -4,17 +4,15 @@ from .models import (
|
||||
CrawlResult,
|
||||
CrawlerTaskResult,
|
||||
CrawlStatus,
|
||||
DisplayMode,
|
||||
CrawlStats,
|
||||
DomainState,
|
||||
)
|
||||
|
||||
from rich.live import Live
|
||||
from rich.table import Table
|
||||
from rich.console import Console
|
||||
from rich import box
|
||||
from datetime import timedelta, datetime
|
||||
from .components.crawler_monitor import CrawlerMonitor
|
||||
|
||||
from .types import AsyncWebCrawler
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import time
|
||||
import psutil
|
||||
import asyncio
|
||||
@@ -24,8 +22,6 @@ from urllib.parse import urlparse
|
||||
import random
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from math import inf as infinity
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
def __init__(
|
||||
@@ -87,201 +83,6 @@ class RateLimiter:
|
||||
return True
|
||||
|
||||
|
||||
class CrawlerMonitor:
|
||||
def __init__(
|
||||
self,
|
||||
max_visible_rows: int = 15,
|
||||
display_mode: DisplayMode = DisplayMode.DETAILED,
|
||||
):
|
||||
self.console = Console()
|
||||
self.max_visible_rows = max_visible_rows
|
||||
self.display_mode = display_mode
|
||||
self.stats: Dict[str, CrawlStats] = {}
|
||||
self.process = psutil.Process()
|
||||
self.start_time = time.time()
|
||||
self.live = Live(self._create_table(), refresh_per_second=2)
|
||||
|
||||
def start(self):
|
||||
self.live.start()
|
||||
|
||||
def stop(self):
|
||||
self.live.stop()
|
||||
|
||||
def add_task(self, task_id: str, url: str):
|
||||
self.stats[task_id] = CrawlStats(
|
||||
task_id=task_id, url=url, status=CrawlStatus.QUEUED
|
||||
)
|
||||
self.live.update(self._create_table())
|
||||
|
||||
def update_task(self, task_id: str, **kwargs):
|
||||
if task_id in self.stats:
|
||||
for key, value in kwargs.items():
|
||||
setattr(self.stats[task_id], key, value)
|
||||
self.live.update(self._create_table())
|
||||
|
||||
def _create_aggregated_table(self) -> Table:
|
||||
"""Creates a compact table showing only aggregated statistics"""
|
||||
table = Table(
|
||||
box=box.ROUNDED,
|
||||
title="Crawler Status Overview",
|
||||
title_style="bold magenta",
|
||||
header_style="bold blue",
|
||||
show_lines=True,
|
||||
)
|
||||
|
||||
# Calculate statistics
|
||||
total_tasks = len(self.stats)
|
||||
queued = sum(
|
||||
1 for stat in self.stats.values() if stat.status == CrawlStatus.QUEUED
|
||||
)
|
||||
in_progress = sum(
|
||||
1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS
|
||||
)
|
||||
completed = sum(
|
||||
1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED
|
||||
)
|
||||
failed = sum(
|
||||
1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED
|
||||
)
|
||||
|
||||
# Memory statistics
|
||||
current_memory = self.process.memory_info().rss / (1024 * 1024)
|
||||
total_task_memory = sum(stat.memory_usage for stat in self.stats.values())
|
||||
peak_memory = max(
|
||||
(stat.peak_memory for stat in self.stats.values()), default=0.0
|
||||
)
|
||||
|
||||
# Duration
|
||||
duration = time.time() - self.start_time
|
||||
|
||||
# Create status row
|
||||
table.add_column("Status", style="bold cyan")
|
||||
table.add_column("Count", justify="right")
|
||||
table.add_column("Percentage", justify="right")
|
||||
|
||||
table.add_row("Total Tasks", str(total_tasks), "100%")
|
||||
table.add_row(
|
||||
"[yellow]In Queue[/yellow]",
|
||||
str(queued),
|
||||
f"{(queued / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
|
||||
)
|
||||
table.add_row(
|
||||
"[blue]In Progress[/blue]",
|
||||
str(in_progress),
|
||||
f"{(in_progress / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
|
||||
)
|
||||
table.add_row(
|
||||
"[green]Completed[/green]",
|
||||
str(completed),
|
||||
f"{(completed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
|
||||
)
|
||||
table.add_row(
|
||||
"[red]Failed[/red]",
|
||||
str(failed),
|
||||
f"{(failed / total_tasks * 100):.1f}%" if total_tasks > 0 else "0%",
|
||||
)
|
||||
|
||||
# Add memory information
|
||||
table.add_section()
|
||||
table.add_row(
|
||||
"[magenta]Current Memory[/magenta]", f"{current_memory:.1f} MB", ""
|
||||
)
|
||||
table.add_row(
|
||||
"[magenta]Total Task Memory[/magenta]", f"{total_task_memory:.1f} MB", ""
|
||||
)
|
||||
table.add_row(
|
||||
"[magenta]Peak Task Memory[/magenta]", f"{peak_memory:.1f} MB", ""
|
||||
)
|
||||
table.add_row(
|
||||
"[yellow]Runtime[/yellow]",
|
||||
str(timedelta(seconds=int(duration))),
|
||||
"",
|
||||
)
|
||||
|
||||
return table
|
||||
|
||||
def _create_detailed_table(self) -> Table:
|
||||
table = Table(
|
||||
box=box.ROUNDED,
|
||||
title="Crawler Performance Monitor",
|
||||
title_style="bold magenta",
|
||||
header_style="bold blue",
|
||||
)
|
||||
|
||||
# Add columns
|
||||
table.add_column("Task ID", style="cyan", no_wrap=True)
|
||||
table.add_column("URL", style="cyan", no_wrap=True)
|
||||
table.add_column("Status", style="bold")
|
||||
table.add_column("Memory (MB)", justify="right")
|
||||
table.add_column("Peak (MB)", justify="right")
|
||||
table.add_column("Duration", justify="right")
|
||||
table.add_column("Info", style="italic")
|
||||
|
||||
# Add summary row
|
||||
total_memory = sum(stat.memory_usage for stat in self.stats.values())
|
||||
active_count = sum(
|
||||
1 for stat in self.stats.values() if stat.status == CrawlStatus.IN_PROGRESS
|
||||
)
|
||||
completed_count = sum(
|
||||
1 for stat in self.stats.values() if stat.status == CrawlStatus.COMPLETED
|
||||
)
|
||||
failed_count = sum(
|
||||
1 for stat in self.stats.values() if stat.status == CrawlStatus.FAILED
|
||||
)
|
||||
|
||||
table.add_row(
|
||||
"[bold yellow]SUMMARY",
|
||||
f"Total: {len(self.stats)}",
|
||||
f"Active: {active_count}",
|
||||
f"{total_memory:.1f}",
|
||||
f"{self.process.memory_info().rss / (1024 * 1024):.1f}",
|
||||
str(
|
||||
timedelta(
|
||||
seconds=int(time.time() - self.start_time)
|
||||
)
|
||||
),
|
||||
f"✓{completed_count} ✗{failed_count}",
|
||||
style="bold",
|
||||
)
|
||||
|
||||
table.add_section()
|
||||
|
||||
# Add rows for each task
|
||||
visible_stats = sorted(
|
||||
self.stats.values(),
|
||||
key=lambda x: (
|
||||
x.status != CrawlStatus.IN_PROGRESS,
|
||||
x.status != CrawlStatus.QUEUED,
|
||||
x.end_time or infinity,
|
||||
),
|
||||
)[: self.max_visible_rows]
|
||||
|
||||
for stat in visible_stats:
|
||||
status_style = {
|
||||
CrawlStatus.QUEUED: "white",
|
||||
CrawlStatus.IN_PROGRESS: "yellow",
|
||||
CrawlStatus.COMPLETED: "green",
|
||||
CrawlStatus.FAILED: "red",
|
||||
}[stat.status]
|
||||
|
||||
table.add_row(
|
||||
stat.task_id[:8], # Show first 8 chars of task ID
|
||||
stat.url[:40] + "..." if len(stat.url) > 40 else stat.url,
|
||||
f"[{status_style}]{stat.status.value}[/{status_style}]",
|
||||
f"{stat.memory_usage:.1f}",
|
||||
f"{stat.peak_memory:.1f}",
|
||||
stat.duration,
|
||||
stat.error_message[:40] if stat.error_message else "",
|
||||
)
|
||||
|
||||
return table
|
||||
|
||||
def _create_table(self) -> Table:
|
||||
"""Creates the appropriate table based on display mode"""
|
||||
if self.display_mode == DisplayMode.AGGREGATED:
|
||||
return self._create_aggregated_table()
|
||||
return self._create_detailed_table()
|
||||
|
||||
|
||||
class BaseDispatcher(ABC):
|
||||
def __init__(
|
||||
@@ -309,7 +110,7 @@ class BaseDispatcher(ABC):
|
||||
async def run_urls(
|
||||
self,
|
||||
urls: List[str],
|
||||
crawler: "AsyncWebCrawler", # noqa: F821
|
||||
crawler: AsyncWebCrawler, # noqa: F821
|
||||
config: CrawlerRunConfig,
|
||||
monitor: Optional[CrawlerMonitor] = None,
|
||||
) -> List[CrawlerTaskResult]:
|
||||
@@ -320,71 +121,144 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
def __init__(
|
||||
self,
|
||||
memory_threshold_percent: float = 90.0,
|
||||
critical_threshold_percent: float = 95.0, # New critical threshold
|
||||
recovery_threshold_percent: float = 85.0, # New recovery threshold
|
||||
check_interval: float = 1.0,
|
||||
max_session_permit: int = 20,
|
||||
memory_wait_timeout: float = 300.0, # 5 minutes default timeout
|
||||
fairness_timeout: float = 600.0, # 10 minutes before prioritizing long-waiting URLs
|
||||
rate_limiter: Optional[RateLimiter] = None,
|
||||
monitor: Optional[CrawlerMonitor] = None,
|
||||
):
|
||||
super().__init__(rate_limiter, monitor)
|
||||
self.memory_threshold_percent = memory_threshold_percent
|
||||
self.critical_threshold_percent = critical_threshold_percent
|
||||
self.recovery_threshold_percent = recovery_threshold_percent
|
||||
self.check_interval = check_interval
|
||||
self.max_session_permit = max_session_permit
|
||||
self.memory_wait_timeout = memory_wait_timeout
|
||||
self.result_queue = asyncio.Queue() # Queue for storing results
|
||||
|
||||
self.fairness_timeout = fairness_timeout
|
||||
self.result_queue = asyncio.Queue()
|
||||
self.task_queue = asyncio.PriorityQueue() # Priority queue for better management
|
||||
self.memory_pressure_mode = False # Flag to indicate when we're in memory pressure mode
|
||||
self.current_memory_percent = 0.0 # Track current memory usage
|
||||
|
||||
async def _memory_monitor_task(self):
|
||||
"""Background task to continuously monitor memory usage and update state"""
|
||||
while True:
|
||||
self.current_memory_percent = psutil.virtual_memory().percent
|
||||
|
||||
# Enter memory pressure mode if we cross the threshold
|
||||
if not self.memory_pressure_mode and self.current_memory_percent >= self.memory_threshold_percent:
|
||||
self.memory_pressure_mode = True
|
||||
if self.monitor:
|
||||
self.monitor.update_memory_status("PRESSURE")
|
||||
|
||||
# Exit memory pressure mode if we go below recovery threshold
|
||||
elif self.memory_pressure_mode and self.current_memory_percent <= self.recovery_threshold_percent:
|
||||
self.memory_pressure_mode = False
|
||||
if self.monitor:
|
||||
self.monitor.update_memory_status("NORMAL")
|
||||
|
||||
# In critical mode, we might need to take more drastic action
|
||||
if self.current_memory_percent >= self.critical_threshold_percent:
|
||||
if self.monitor:
|
||||
self.monitor.update_memory_status("CRITICAL")
|
||||
# We could implement additional memory-saving measures here
|
||||
|
||||
await asyncio.sleep(self.check_interval)
|
||||
|
||||
def _get_priority_score(self, wait_time: float, retry_count: int) -> float:
|
||||
"""Calculate priority score (lower is higher priority)
|
||||
- URLs waiting longer than fairness_timeout get higher priority
|
||||
- More retry attempts decreases priority
|
||||
"""
|
||||
if wait_time > self.fairness_timeout:
|
||||
# High priority for long-waiting URLs
|
||||
return -wait_time
|
||||
# Standard priority based on retries
|
||||
return retry_count
|
||||
|
||||
async def crawl_url(
|
||||
self,
|
||||
url: str,
|
||||
config: CrawlerRunConfig,
|
||||
task_id: str,
|
||||
retry_count: int = 0,
|
||||
) -> CrawlerTaskResult:
|
||||
start_time = time.time()
|
||||
error_message = ""
|
||||
memory_usage = peak_memory = 0.0
|
||||
|
||||
|
||||
# Get starting memory for accurate measurement
|
||||
process = psutil.Process()
|
||||
start_memory = process.memory_info().rss / (1024 * 1024)
|
||||
|
||||
try:
|
||||
if self.monitor:
|
||||
self.monitor.update_task(
|
||||
task_id, status=CrawlStatus.IN_PROGRESS, start_time=start_time
|
||||
task_id,
|
||||
status=CrawlStatus.IN_PROGRESS,
|
||||
start_time=start_time,
|
||||
retry_count=retry_count
|
||||
)
|
||||
|
||||
self.concurrent_sessions += 1
|
||||
|
||||
|
||||
if self.rate_limiter:
|
||||
await self.rate_limiter.wait_if_needed(url)
|
||||
|
||||
process = psutil.Process()
|
||||
start_memory = process.memory_info().rss / (1024 * 1024)
|
||||
|
||||
# Check if we're in critical memory state
|
||||
if self.current_memory_percent >= self.critical_threshold_percent:
|
||||
# Requeue this task with increased priority and retry count
|
||||
enqueue_time = time.time()
|
||||
priority = self._get_priority_score(enqueue_time - start_time, retry_count + 1)
|
||||
await self.task_queue.put((priority, (url, task_id, retry_count + 1, enqueue_time)))
|
||||
|
||||
# Update monitoring
|
||||
if self.monitor:
|
||||
self.monitor.update_task(
|
||||
task_id,
|
||||
status=CrawlStatus.QUEUED,
|
||||
error_message="Requeued due to critical memory pressure"
|
||||
)
|
||||
|
||||
# Return placeholder result with requeued status
|
||||
return CrawlerTaskResult(
|
||||
task_id=task_id,
|
||||
url=url,
|
||||
result=CrawlResult(
|
||||
url=url, html="", metadata={"status": "requeued"},
|
||||
success=False, error_message="Requeued due to critical memory pressure"
|
||||
),
|
||||
memory_usage=0,
|
||||
peak_memory=0,
|
||||
start_time=start_time,
|
||||
end_time=time.time(),
|
||||
error_message="Requeued due to critical memory pressure",
|
||||
retry_count=retry_count + 1
|
||||
)
|
||||
|
||||
# Execute the crawl
|
||||
result = await self.crawler.arun(url, config=config, session_id=task_id)
|
||||
|
||||
# Measure memory usage
|
||||
end_memory = process.memory_info().rss / (1024 * 1024)
|
||||
|
||||
memory_usage = peak_memory = end_memory - start_memory
|
||||
|
||||
|
||||
# Handle rate limiting
|
||||
if self.rate_limiter and result.status_code:
|
||||
if not self.rate_limiter.update_delay(url, result.status_code):
|
||||
error_message = f"Rate limit retry count exceeded for domain {urlparse(url).netloc}"
|
||||
if self.monitor:
|
||||
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
|
||||
result = CrawlerTaskResult(
|
||||
task_id=task_id,
|
||||
url=url,
|
||||
result=result,
|
||||
memory_usage=memory_usage,
|
||||
peak_memory=peak_memory,
|
||||
start_time=start_time,
|
||||
end_time=time.time(),
|
||||
error_message=error_message,
|
||||
)
|
||||
await self.result_queue.put(result)
|
||||
return result
|
||||
|
||||
|
||||
# Update status based on result
|
||||
if not result.success:
|
||||
error_message = result.error_message
|
||||
if self.monitor:
|
||||
self.monitor.update_task(task_id, status=CrawlStatus.FAILED)
|
||||
elif self.monitor:
|
||||
self.monitor.update_task(task_id, status=CrawlStatus.COMPLETED)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
error_message = str(e)
|
||||
if self.monitor:
|
||||
@@ -392,7 +266,7 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
result = CrawlResult(
|
||||
url=url, html="", metadata={}, success=False, error_message=str(e)
|
||||
)
|
||||
|
||||
|
||||
finally:
|
||||
end_time = time.time()
|
||||
if self.monitor:
|
||||
@@ -402,9 +276,10 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
memory_usage=memory_usage,
|
||||
peak_memory=peak_memory,
|
||||
error_message=error_message,
|
||||
retry_count=retry_count
|
||||
)
|
||||
self.concurrent_sessions -= 1
|
||||
|
||||
|
||||
return CrawlerTaskResult(
|
||||
task_id=task_id,
|
||||
url=url,
|
||||
@@ -414,116 +289,240 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
error_message=error_message,
|
||||
retry_count=retry_count
|
||||
)
|
||||
|
||||
|
||||
async def run_urls(
|
||||
self,
|
||||
urls: List[str],
|
||||
crawler: "AsyncWebCrawler", # noqa: F821
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> List[CrawlerTaskResult]:
|
||||
self.crawler = crawler
|
||||
|
||||
|
||||
# Start the memory monitor task
|
||||
memory_monitor = asyncio.create_task(self._memory_monitor_task())
|
||||
|
||||
if self.monitor:
|
||||
self.monitor.start()
|
||||
|
||||
|
||||
results = []
|
||||
|
||||
try:
|
||||
pending_tasks = []
|
||||
active_tasks = []
|
||||
task_queue = []
|
||||
|
||||
for url in urls:
|
||||
task_id = str(uuid.uuid4())
|
||||
if self.monitor:
|
||||
self.monitor.add_task(task_id, url)
|
||||
task_queue.append((url, task_id))
|
||||
|
||||
while task_queue or active_tasks:
|
||||
wait_start_time = time.time()
|
||||
while len(active_tasks) < self.max_session_permit and task_queue:
|
||||
if psutil.virtual_memory().percent >= self.memory_threshold_percent:
|
||||
# Check if we've exceeded the timeout
|
||||
if time.time() - wait_start_time > self.memory_wait_timeout:
|
||||
raise MemoryError(
|
||||
f"Memory usage above threshold ({self.memory_threshold_percent}%) for more than {self.memory_wait_timeout} seconds"
|
||||
)
|
||||
await asyncio.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
url, task_id = task_queue.pop(0)
|
||||
task = asyncio.create_task(self.crawl_url(url, config, task_id))
|
||||
active_tasks.append(task)
|
||||
|
||||
if not active_tasks:
|
||||
await asyncio.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
done, pending = await asyncio.wait(
|
||||
active_tasks, return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
|
||||
pending_tasks.extend(done)
|
||||
active_tasks = list(pending)
|
||||
|
||||
return await asyncio.gather(*pending_tasks)
|
||||
finally:
|
||||
if self.monitor:
|
||||
self.monitor.stop()
|
||||
|
||||
async def run_urls_stream(
|
||||
self,
|
||||
urls: List[str],
|
||||
crawler: "AsyncWebCrawler", # noqa: F821
|
||||
config: CrawlerRunConfig,
|
||||
) -> AsyncGenerator[CrawlerTaskResult, None]:
|
||||
self.crawler = crawler
|
||||
if self.monitor:
|
||||
self.monitor.start()
|
||||
|
||||
try:
|
||||
active_tasks = []
|
||||
task_queue = []
|
||||
completed_count = 0
|
||||
total_urls = len(urls)
|
||||
|
||||
# Initialize task queue
|
||||
for url in urls:
|
||||
task_id = str(uuid.uuid4())
|
||||
if self.monitor:
|
||||
self.monitor.add_task(task_id, url)
|
||||
task_queue.append((url, task_id))
|
||||
|
||||
while completed_count < total_urls:
|
||||
# Start new tasks if memory permits
|
||||
while len(active_tasks) < self.max_session_permit and task_queue:
|
||||
if psutil.virtual_memory().percent >= self.memory_threshold_percent:
|
||||
await asyncio.sleep(self.check_interval)
|
||||
continue
|
||||
|
||||
url, task_id = task_queue.pop(0)
|
||||
task = asyncio.create_task(self.crawl_url(url, config, task_id))
|
||||
active_tasks.append(task)
|
||||
|
||||
if not active_tasks and not task_queue:
|
||||
break
|
||||
|
||||
# Wait for any task to complete and yield results
|
||||
# Add to queue with initial priority 0, retry count 0, and current time
|
||||
await self.task_queue.put((0, (url, task_id, 0, time.time())))
|
||||
|
||||
active_tasks = []
|
||||
|
||||
# Process until both queues are empty
|
||||
while not self.task_queue.empty() or active_tasks:
|
||||
# If memory pressure is low, start new tasks
|
||||
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
|
||||
try:
|
||||
# Try to get a task with timeout to avoid blocking indefinitely
|
||||
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for(
|
||||
self.task_queue.get(), timeout=0.1
|
||||
)
|
||||
|
||||
# Create and start the task
|
||||
task = asyncio.create_task(
|
||||
self.crawl_url(url, config, task_id, retry_count)
|
||||
)
|
||||
active_tasks.append(task)
|
||||
|
||||
# Update waiting time in monitor
|
||||
if self.monitor:
|
||||
wait_time = time.time() - enqueue_time
|
||||
self.monitor.update_task(
|
||||
task_id,
|
||||
wait_time=wait_time,
|
||||
status=CrawlStatus.IN_PROGRESS
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No tasks in queue, that's fine
|
||||
pass
|
||||
|
||||
# Wait for completion even if queue is starved
|
||||
if active_tasks:
|
||||
done, pending = await asyncio.wait(
|
||||
active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
|
||||
# Process completed tasks
|
||||
for completed_task in done:
|
||||
result = await completed_task
|
||||
completed_count += 1
|
||||
yield result
|
||||
results.append(result)
|
||||
|
||||
# Update active tasks list
|
||||
active_tasks = list(pending)
|
||||
else:
|
||||
await asyncio.sleep(self.check_interval)
|
||||
# If no active tasks but still waiting, sleep briefly
|
||||
await asyncio.sleep(self.check_interval / 2)
|
||||
|
||||
# Update priorities for waiting tasks if needed
|
||||
await self._update_queue_priorities()
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
if self.monitor:
|
||||
self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}")
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
memory_monitor.cancel()
|
||||
if self.monitor:
|
||||
self.monitor.stop()
|
||||
|
||||
|
||||
async def _update_queue_priorities(self):
|
||||
"""Periodically update priorities of items in the queue to prevent starvation"""
|
||||
# Skip if queue is empty
|
||||
if self.task_queue.empty():
|
||||
return
|
||||
|
||||
# Use a drain-and-refill approach to update all priorities
|
||||
temp_items = []
|
||||
|
||||
# Drain the queue (with a safety timeout to prevent blocking)
|
||||
try:
|
||||
drain_start = time.time()
|
||||
while not self.task_queue.empty() and time.time() - drain_start < 5.0: # 5 second safety timeout
|
||||
try:
|
||||
# Get item from queue with timeout
|
||||
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for(
|
||||
self.task_queue.get(), timeout=0.1
|
||||
)
|
||||
|
||||
# Calculate new priority based on current wait time
|
||||
current_time = time.time()
|
||||
wait_time = current_time - enqueue_time
|
||||
new_priority = self._get_priority_score(wait_time, retry_count)
|
||||
|
||||
# Store with updated priority
|
||||
temp_items.append((new_priority, (url, task_id, retry_count, enqueue_time)))
|
||||
|
||||
# Update monitoring stats for this task
|
||||
if self.monitor and task_id in self.monitor.stats:
|
||||
self.monitor.update_task(task_id, wait_time=wait_time)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# Queue might be empty or very slow
|
||||
break
|
||||
except Exception as e:
|
||||
# If anything goes wrong, make sure we refill the queue with what we've got
|
||||
self.monitor.update_memory_status(f"QUEUE_ERROR: {str(e)}")
|
||||
|
||||
# Calculate queue statistics
|
||||
if temp_items and self.monitor:
|
||||
total_queued = len(temp_items)
|
||||
wait_times = [item[1][3] for item in temp_items]
|
||||
highest_wait_time = time.time() - min(wait_times) if wait_times else 0
|
||||
avg_wait_time = sum(time.time() - t for t in wait_times) / len(wait_times) if wait_times else 0
|
||||
|
||||
# Update queue statistics in monitor
|
||||
self.monitor.update_queue_statistics(
|
||||
total_queued=total_queued,
|
||||
highest_wait_time=highest_wait_time,
|
||||
avg_wait_time=avg_wait_time
|
||||
)
|
||||
|
||||
# Sort by priority (lowest number = highest priority)
|
||||
temp_items.sort(key=lambda x: x[0])
|
||||
|
||||
# Refill the queue with updated priorities
|
||||
for item in temp_items:
|
||||
await self.task_queue.put(item)
|
||||
|
||||
async def run_urls_stream(
|
||||
self,
|
||||
urls: List[str],
|
||||
crawler: AsyncWebCrawler,
|
||||
config: CrawlerRunConfig,
|
||||
) -> AsyncGenerator[CrawlerTaskResult, None]:
|
||||
self.crawler = crawler
|
||||
|
||||
# Start the memory monitor task
|
||||
memory_monitor = asyncio.create_task(self._memory_monitor_task())
|
||||
|
||||
if self.monitor:
|
||||
self.monitor.start()
|
||||
|
||||
try:
|
||||
# Initialize task queue
|
||||
for url in urls:
|
||||
task_id = str(uuid.uuid4())
|
||||
if self.monitor:
|
||||
self.monitor.add_task(task_id, url)
|
||||
# Add to queue with initial priority 0, retry count 0, and current time
|
||||
await self.task_queue.put((0, (url, task_id, 0, time.time())))
|
||||
|
||||
active_tasks = []
|
||||
completed_count = 0
|
||||
total_urls = len(urls)
|
||||
|
||||
while completed_count < total_urls:
|
||||
# If memory pressure is low, start new tasks
|
||||
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
|
||||
try:
|
||||
# Try to get a task with timeout
|
||||
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for(
|
||||
self.task_queue.get(), timeout=0.1
|
||||
)
|
||||
|
||||
# Create and start the task
|
||||
task = asyncio.create_task(
|
||||
self.crawl_url(url, config, task_id, retry_count)
|
||||
)
|
||||
active_tasks.append(task)
|
||||
|
||||
# Update waiting time in monitor
|
||||
if self.monitor:
|
||||
wait_time = time.time() - enqueue_time
|
||||
self.monitor.update_task(
|
||||
task_id,
|
||||
wait_time=wait_time,
|
||||
status=CrawlStatus.IN_PROGRESS
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No tasks in queue, that's fine
|
||||
pass
|
||||
|
||||
# Process completed tasks and yield results
|
||||
if active_tasks:
|
||||
done, pending = await asyncio.wait(
|
||||
active_tasks, timeout=0.1, return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
|
||||
for completed_task in done:
|
||||
result = await completed_task
|
||||
|
||||
# Only count as completed if it wasn't requeued
|
||||
if "requeued" not in result.error_message:
|
||||
completed_count += 1
|
||||
yield result
|
||||
|
||||
# Update active tasks list
|
||||
active_tasks = list(pending)
|
||||
else:
|
||||
# If no active tasks but still waiting, sleep briefly
|
||||
await asyncio.sleep(self.check_interval / 2)
|
||||
|
||||
# Update priorities for waiting tasks if needed
|
||||
await self._update_queue_priorities()
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
memory_monitor.cancel()
|
||||
if self.monitor:
|
||||
self.monitor.stop()
|
||||
|
||||
|
||||
class SemaphoreDispatcher(BaseDispatcher):
|
||||
def __init__(
|
||||
@@ -620,7 +619,7 @@ class SemaphoreDispatcher(BaseDispatcher):
|
||||
|
||||
async def run_urls(
|
||||
self,
|
||||
crawler: "AsyncWebCrawler", # noqa: F821
|
||||
crawler: AsyncWebCrawler, # noqa: F821
|
||||
urls: List[str],
|
||||
config: CrawlerRunConfig,
|
||||
) -> List[CrawlerTaskResult]:
|
||||
@@ -644,4 +643,4 @@ class SemaphoreDispatcher(BaseDispatcher):
|
||||
return await asyncio.gather(*tasks, return_exceptions=True)
|
||||
finally:
|
||||
if self.monitor:
|
||||
self.monitor.stop()
|
||||
self.monitor.stop()
|
||||
@@ -443,19 +443,6 @@ class BrowserManager:
|
||||
self.default_context = contexts[0]
|
||||
else:
|
||||
self.default_context = await self.create_browser_context()
|
||||
# self.default_context = await self.browser.new_context(
|
||||
# viewport={
|
||||
# "width": self.config.viewport_width,
|
||||
# "height": self.config.viewport_height,
|
||||
# },
|
||||
# storage_state=self.config.storage_state,
|
||||
# user_agent=self.config.headers.get(
|
||||
# "User-Agent", self.config.user_agent
|
||||
# ),
|
||||
# accept_downloads=self.config.accept_downloads,
|
||||
# ignore_https_errors=self.config.ignore_https_errors,
|
||||
# java_script_enabled=self.config.java_script_enabled,
|
||||
# )
|
||||
await self.setup_context(self.default_context)
|
||||
else:
|
||||
browser_args = self._build_browser_args()
|
||||
@@ -470,6 +457,7 @@ class BrowserManager:
|
||||
|
||||
self.default_context = self.browser
|
||||
|
||||
|
||||
def _build_browser_args(self) -> dict:
|
||||
"""Build browser launch arguments from config."""
|
||||
args = [
|
||||
|
||||
837
crawl4ai/components/crawler_monitor.py
Normal file
837
crawl4ai/components/crawler_monitor.py
Normal file
@@ -0,0 +1,837 @@
|
||||
import time
|
||||
import uuid
|
||||
import threading
|
||||
import psutil
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Optional, List
|
||||
import threading
|
||||
from rich.console import Console
|
||||
from rich.layout import Layout
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
from rich.live import Live
|
||||
from rich import box
|
||||
from ..models import CrawlStatus
|
||||
|
||||
class TerminalUI:
|
||||
"""Terminal user interface for CrawlerMonitor using rich library."""
|
||||
|
||||
def __init__(self, refresh_rate: float = 1.0, max_width: int = 120):
|
||||
"""
|
||||
Initialize the terminal UI.
|
||||
|
||||
Args:
|
||||
refresh_rate: How often to refresh the UI (in seconds)
|
||||
max_width: Maximum width of the UI in characters
|
||||
"""
|
||||
self.console = Console(width=max_width)
|
||||
self.layout = Layout()
|
||||
self.refresh_rate = refresh_rate
|
||||
self.stop_event = threading.Event()
|
||||
self.ui_thread = None
|
||||
self.monitor = None # Will be set by CrawlerMonitor
|
||||
self.max_width = max_width
|
||||
|
||||
# Setup layout - vertical layout (top to bottom)
|
||||
self.layout.split(
|
||||
Layout(name="header", size=3),
|
||||
Layout(name="pipeline_status", size=10),
|
||||
Layout(name="task_details", ratio=1),
|
||||
Layout(name="footer", size=3) # Increased footer size to fit all content
|
||||
)
|
||||
|
||||
def start(self, monitor):
|
||||
"""Start the UI thread."""
|
||||
self.monitor = monitor
|
||||
self.stop_event.clear()
|
||||
self.ui_thread = threading.Thread(target=self._ui_loop)
|
||||
self.ui_thread.daemon = True
|
||||
self.ui_thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the UI thread."""
|
||||
if self.ui_thread and self.ui_thread.is_alive():
|
||||
self.stop_event.set()
|
||||
# Only try to join if we're not in the UI thread
|
||||
# This prevents "cannot join current thread" errors
|
||||
if threading.current_thread() != self.ui_thread:
|
||||
self.ui_thread.join(timeout=5.0)
|
||||
|
||||
def _ui_loop(self):
|
||||
"""Main UI rendering loop."""
|
||||
import sys
|
||||
import select
|
||||
import termios
|
||||
import tty
|
||||
|
||||
# Setup terminal for non-blocking input
|
||||
old_settings = termios.tcgetattr(sys.stdin)
|
||||
try:
|
||||
tty.setcbreak(sys.stdin.fileno())
|
||||
|
||||
# Use Live display to render the UI
|
||||
with Live(self.layout, refresh_per_second=1/self.refresh_rate, screen=True) as live:
|
||||
self.live = live # Store the live display for updates
|
||||
|
||||
# Main UI loop
|
||||
while not self.stop_event.is_set():
|
||||
self._update_display()
|
||||
|
||||
# Check for key press (non-blocking)
|
||||
if select.select([sys.stdin], [], [], 0)[0]:
|
||||
key = sys.stdin.read(1)
|
||||
# Check for 'q' to quit
|
||||
if key == 'q':
|
||||
# Signal stop but don't call monitor.stop() from UI thread
|
||||
# as it would cause the thread to try to join itself
|
||||
self.stop_event.set()
|
||||
self.monitor.is_running = False
|
||||
break
|
||||
|
||||
time.sleep(self.refresh_rate)
|
||||
|
||||
# Just check if the monitor was stopped
|
||||
if not self.monitor.is_running:
|
||||
break
|
||||
finally:
|
||||
# Restore terminal settings
|
||||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
|
||||
|
||||
def _update_display(self):
|
||||
"""Update the terminal display with current statistics."""
|
||||
if not self.monitor:
|
||||
return
|
||||
|
||||
# Update crawler status panel
|
||||
self.layout["header"].update(self._create_status_panel())
|
||||
|
||||
# Update pipeline status panel and task details panel
|
||||
self.layout["pipeline_status"].update(self._create_pipeline_panel())
|
||||
self.layout["task_details"].update(self._create_task_details_panel())
|
||||
|
||||
# Update footer
|
||||
self.layout["footer"].update(self._create_footer())
|
||||
|
||||
def _create_status_panel(self) -> Panel:
|
||||
"""Create the crawler status panel."""
|
||||
summary = self.monitor.get_summary()
|
||||
|
||||
# Format memory status with icon
|
||||
memory_status = self.monitor.get_memory_status()
|
||||
memory_icon = "🟢" # Default NORMAL
|
||||
if memory_status == "PRESSURE":
|
||||
memory_icon = "🟠"
|
||||
elif memory_status == "CRITICAL":
|
||||
memory_icon = "🔴"
|
||||
|
||||
# Get current memory usage
|
||||
current_memory = psutil.Process().memory_info().rss / (1024 * 1024) # MB
|
||||
memory_percent = (current_memory / psutil.virtual_memory().total) * 100
|
||||
|
||||
# Format runtime
|
||||
runtime = self.monitor._format_time(time.time() - self.monitor.start_time if self.monitor.start_time else 0)
|
||||
|
||||
# Create the status text
|
||||
status_text = Text()
|
||||
status_text.append(f"Web Crawler Dashboard | Runtime: {runtime} | Memory: {memory_percent:.1f}% {memory_icon}\n")
|
||||
status_text.append(f"Status: {memory_status} | URLs: {summary['urls_completed']}/{summary['urls_total']} | ")
|
||||
status_text.append(f"Peak Mem: {summary['peak_memory_percent']:.1f}% at {self.monitor._format_time(summary['peak_memory_time'])}")
|
||||
|
||||
return Panel(status_text, title="Crawler Status", border_style="blue")
|
||||
|
||||
def _create_pipeline_panel(self) -> Panel:
|
||||
"""Create the pipeline status panel."""
|
||||
summary = self.monitor.get_summary()
|
||||
queue_stats = self.monitor.get_queue_stats()
|
||||
|
||||
# Create a table for status counts
|
||||
table = Table(show_header=True, box=None)
|
||||
table.add_column("Status", style="cyan")
|
||||
table.add_column("Count", justify="right")
|
||||
table.add_column("Percentage", justify="right")
|
||||
table.add_column("Stat", style="cyan")
|
||||
table.add_column("Value", justify="right")
|
||||
|
||||
# Calculate overall progress
|
||||
progress = f"{summary['urls_completed']}/{summary['urls_total']}"
|
||||
progress_percent = f"{summary['completion_percentage']:.1f}%"
|
||||
|
||||
# Add rows for each status
|
||||
table.add_row(
|
||||
"Overall Progress",
|
||||
progress,
|
||||
progress_percent,
|
||||
"Est. Completion",
|
||||
summary.get('estimated_completion_time', "N/A")
|
||||
)
|
||||
|
||||
# Add rows for each status
|
||||
status_counts = summary['status_counts']
|
||||
total = summary['urls_total'] or 1 # Avoid division by zero
|
||||
|
||||
# Status rows
|
||||
table.add_row(
|
||||
"Completed",
|
||||
str(status_counts.get(CrawlStatus.COMPLETED.name, 0)),
|
||||
f"{status_counts.get(CrawlStatus.COMPLETED.name, 0) / total * 100:.1f}%",
|
||||
"Avg. Time/URL",
|
||||
f"{summary.get('avg_task_duration', 0):.2f}s"
|
||||
)
|
||||
|
||||
table.add_row(
|
||||
"Failed",
|
||||
str(status_counts.get(CrawlStatus.FAILED.name, 0)),
|
||||
f"{status_counts.get(CrawlStatus.FAILED.name, 0) / total * 100:.1f}%",
|
||||
"Concurrent Tasks",
|
||||
str(status_counts.get(CrawlStatus.IN_PROGRESS.name, 0))
|
||||
)
|
||||
|
||||
table.add_row(
|
||||
"In Progress",
|
||||
str(status_counts.get(CrawlStatus.IN_PROGRESS.name, 0)),
|
||||
f"{status_counts.get(CrawlStatus.IN_PROGRESS.name, 0) / total * 100:.1f}%",
|
||||
"Queue Size",
|
||||
str(queue_stats['total_queued'])
|
||||
)
|
||||
|
||||
table.add_row(
|
||||
"Queued",
|
||||
str(status_counts.get(CrawlStatus.QUEUED.name, 0)),
|
||||
f"{status_counts.get(CrawlStatus.QUEUED.name, 0) / total * 100:.1f}%",
|
||||
"Max Wait Time",
|
||||
f"{queue_stats['highest_wait_time']:.1f}s"
|
||||
)
|
||||
|
||||
# Requeued is a special case as it's not a status
|
||||
requeued_count = summary.get('requeued_count', 0)
|
||||
table.add_row(
|
||||
"Requeued",
|
||||
str(requeued_count),
|
||||
f"{summary.get('requeue_rate', 0):.1f}%",
|
||||
"Avg Wait Time",
|
||||
f"{queue_stats['avg_wait_time']:.1f}s"
|
||||
)
|
||||
|
||||
# Add empty row for spacing
|
||||
table.add_row(
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
"Requeue Rate",
|
||||
f"{summary.get('requeue_rate', 0):.1f}%"
|
||||
)
|
||||
|
||||
return Panel(table, title="Pipeline Status", border_style="green")
|
||||
|
||||
def _create_task_details_panel(self) -> Panel:
|
||||
"""Create the task details panel."""
|
||||
# Create a table for task details
|
||||
table = Table(show_header=True, expand=True)
|
||||
table.add_column("Task ID", style="cyan", no_wrap=True, width=10)
|
||||
table.add_column("URL", style="blue", ratio=3)
|
||||
table.add_column("Status", style="green", width=15)
|
||||
table.add_column("Memory", justify="right", width=8)
|
||||
table.add_column("Peak", justify="right", width=8)
|
||||
table.add_column("Duration", justify="right", width=10)
|
||||
|
||||
# Get all task stats
|
||||
task_stats = self.monitor.get_all_task_stats()
|
||||
|
||||
# Add summary row
|
||||
active_tasks = sum(1 for stats in task_stats.values()
|
||||
if stats['status'] == CrawlStatus.IN_PROGRESS.name)
|
||||
|
||||
total_memory = sum(stats['memory_usage'] for stats in task_stats.values())
|
||||
total_peak = sum(stats['peak_memory'] for stats in task_stats.values())
|
||||
|
||||
# Summary row with separators
|
||||
table.add_row(
|
||||
"SUMMARY",
|
||||
f"Total: {len(task_stats)}",
|
||||
f"Active: {active_tasks}",
|
||||
f"{total_memory:.1f}",
|
||||
f"{total_peak:.1f}",
|
||||
"N/A"
|
||||
)
|
||||
|
||||
# Add a separator
|
||||
table.add_row("—" * 10, "—" * 20, "—" * 10, "—" * 8, "—" * 8, "—" * 10)
|
||||
|
||||
# Status icons
|
||||
status_icons = {
|
||||
CrawlStatus.QUEUED.name: "⏳",
|
||||
CrawlStatus.IN_PROGRESS.name: "🔄",
|
||||
CrawlStatus.COMPLETED.name: "✅",
|
||||
CrawlStatus.FAILED.name: "❌"
|
||||
}
|
||||
|
||||
# Calculate how many rows we can display based on available space
|
||||
# We can display more rows now that we have a dedicated panel
|
||||
display_count = min(len(task_stats), 20) # Display up to 20 tasks
|
||||
|
||||
# Add rows for each task
|
||||
for task_id, stats in sorted(
|
||||
list(task_stats.items())[:display_count],
|
||||
# Sort: 1. IN_PROGRESS first, 2. QUEUED, 3. COMPLETED/FAILED by recency
|
||||
key=lambda x: (
|
||||
0 if x[1]['status'] == CrawlStatus.IN_PROGRESS.name else
|
||||
1 if x[1]['status'] == CrawlStatus.QUEUED.name else
|
||||
2,
|
||||
-1 * (x[1].get('end_time', 0) or 0) # Most recent first
|
||||
)
|
||||
):
|
||||
# Truncate task_id and URL for display
|
||||
short_id = task_id[:8]
|
||||
url = stats['url']
|
||||
if len(url) > 50: # Allow longer URLs in the dedicated panel
|
||||
url = url[:47] + "..."
|
||||
|
||||
# Format status with icon
|
||||
status = f"{status_icons.get(stats['status'], '?')} {stats['status']}"
|
||||
|
||||
# Add row
|
||||
table.add_row(
|
||||
short_id,
|
||||
url,
|
||||
status,
|
||||
f"{stats['memory_usage']:.1f}",
|
||||
f"{stats['peak_memory']:.1f}",
|
||||
stats['duration'] if 'duration' in stats else "0:00"
|
||||
)
|
||||
|
||||
return Panel(table, title="Task Details", border_style="yellow")
|
||||
|
||||
def _create_footer(self) -> Panel:
|
||||
"""Create the footer panel."""
|
||||
from rich.columns import Columns
|
||||
from rich.align import Align
|
||||
|
||||
memory_status = self.monitor.get_memory_status()
|
||||
memory_icon = "🟢" # Default NORMAL
|
||||
if memory_status == "PRESSURE":
|
||||
memory_icon = "🟠"
|
||||
elif memory_status == "CRITICAL":
|
||||
memory_icon = "🔴"
|
||||
|
||||
# Left section - memory status
|
||||
left_text = Text()
|
||||
left_text.append("Memory Status: ", style="bold")
|
||||
status_style = "green" if memory_status == "NORMAL" else "yellow" if memory_status == "PRESSURE" else "red bold"
|
||||
left_text.append(f"{memory_icon} {memory_status}", style=status_style)
|
||||
|
||||
# Center section - copyright
|
||||
center_text = Text("© Crawl4AI 2025 | Made by UnclecCode", style="cyan italic")
|
||||
|
||||
# Right section - quit instruction
|
||||
right_text = Text()
|
||||
right_text.append("Press ", style="bold")
|
||||
right_text.append("q", style="white on blue")
|
||||
right_text.append(" to quit", style="bold")
|
||||
|
||||
# Create columns with the three sections
|
||||
footer_content = Columns(
|
||||
[
|
||||
Align.left(left_text),
|
||||
Align.center(center_text),
|
||||
Align.right(right_text)
|
||||
],
|
||||
expand=True
|
||||
)
|
||||
|
||||
# Create a more visible footer panel
|
||||
return Panel(
|
||||
footer_content,
|
||||
border_style="white",
|
||||
padding=(0, 1) # Add padding for better visibility
|
||||
)
|
||||
|
||||
|
||||
class CrawlerMonitor:
|
||||
"""
|
||||
Comprehensive monitoring and visualization system for tracking web crawler operations in real-time.
|
||||
Provides a terminal-based dashboard that displays task statuses, memory usage, queue statistics,
|
||||
and performance metrics.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
urls_total: int = 0,
|
||||
refresh_rate: float = 1.0,
|
||||
enable_ui: bool = True,
|
||||
max_width: int = 120
|
||||
):
|
||||
"""
|
||||
Initialize the CrawlerMonitor.
|
||||
|
||||
Args:
|
||||
urls_total: Total number of URLs to be crawled
|
||||
refresh_rate: How often to refresh the UI (in seconds)
|
||||
enable_ui: Whether to display the terminal UI
|
||||
max_width: Maximum width of the UI in characters
|
||||
"""
|
||||
# Core monitoring attributes
|
||||
self.stats = {} # Task ID -> stats dict
|
||||
self.memory_status = "NORMAL"
|
||||
self.start_time = None
|
||||
self.end_time = None
|
||||
self.is_running = False
|
||||
self.queue_stats = {
|
||||
"total_queued": 0,
|
||||
"highest_wait_time": 0.0,
|
||||
"avg_wait_time": 0.0
|
||||
}
|
||||
self.urls_total = urls_total
|
||||
self.urls_completed = 0
|
||||
self.peak_memory_percent = 0.0
|
||||
self.peak_memory_time = 0.0
|
||||
|
||||
# Status counts
|
||||
self.status_counts = {
|
||||
CrawlStatus.QUEUED.name: 0,
|
||||
CrawlStatus.IN_PROGRESS.name: 0,
|
||||
CrawlStatus.COMPLETED.name: 0,
|
||||
CrawlStatus.FAILED.name: 0
|
||||
}
|
||||
|
||||
# Requeue tracking
|
||||
self.requeued_count = 0
|
||||
|
||||
# Thread-safety
|
||||
self._lock = threading.RLock()
|
||||
|
||||
# Terminal UI
|
||||
self.enable_ui = enable_ui
|
||||
self.terminal_ui = TerminalUI(
|
||||
refresh_rate=refresh_rate,
|
||||
max_width=max_width
|
||||
) if enable_ui else None
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the monitoring session.
|
||||
|
||||
- Initializes the start_time
|
||||
- Sets is_running to True
|
||||
- Starts the terminal UI if enabled
|
||||
"""
|
||||
with self._lock:
|
||||
self.start_time = time.time()
|
||||
self.is_running = True
|
||||
|
||||
# Start the terminal UI
|
||||
if self.enable_ui and self.terminal_ui:
|
||||
self.terminal_ui.start(self)
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop the monitoring session.
|
||||
|
||||
- Records end_time
|
||||
- Sets is_running to False
|
||||
- Stops the terminal UI
|
||||
- Generates final summary statistics
|
||||
"""
|
||||
with self._lock:
|
||||
self.end_time = time.time()
|
||||
self.is_running = False
|
||||
|
||||
# Stop the terminal UI
|
||||
if self.enable_ui and self.terminal_ui:
|
||||
self.terminal_ui.stop()
|
||||
|
||||
def add_task(self, task_id: str, url: str):
|
||||
"""
|
||||
Register a new task with the monitor.
|
||||
|
||||
Args:
|
||||
task_id: Unique identifier for the task
|
||||
url: URL being crawled
|
||||
|
||||
The task is initialized with:
|
||||
- status: QUEUED
|
||||
- url: The URL to crawl
|
||||
- enqueue_time: Current time
|
||||
- memory_usage: 0
|
||||
- peak_memory: 0
|
||||
- wait_time: 0
|
||||
- retry_count: 0
|
||||
"""
|
||||
with self._lock:
|
||||
self.stats[task_id] = {
|
||||
"task_id": task_id,
|
||||
"url": url,
|
||||
"status": CrawlStatus.QUEUED.name,
|
||||
"enqueue_time": time.time(),
|
||||
"start_time": None,
|
||||
"end_time": None,
|
||||
"memory_usage": 0.0,
|
||||
"peak_memory": 0.0,
|
||||
"error_message": "",
|
||||
"wait_time": 0.0,
|
||||
"retry_count": 0,
|
||||
"duration": "0:00",
|
||||
"counted_requeue": False
|
||||
}
|
||||
|
||||
# Update status counts
|
||||
self.status_counts[CrawlStatus.QUEUED.name] += 1
|
||||
|
||||
def update_task(
|
||||
self,
|
||||
task_id: str,
|
||||
status: Optional[CrawlStatus] = None,
|
||||
start_time: Optional[float] = None,
|
||||
end_time: Optional[float] = None,
|
||||
memory_usage: Optional[float] = None,
|
||||
peak_memory: Optional[float] = None,
|
||||
error_message: Optional[str] = None,
|
||||
retry_count: Optional[int] = None,
|
||||
wait_time: Optional[float] = None
|
||||
):
|
||||
"""
|
||||
Update statistics for a specific task.
|
||||
|
||||
Args:
|
||||
task_id: Unique identifier for the task
|
||||
status: New status (QUEUED, IN_PROGRESS, COMPLETED, FAILED)
|
||||
start_time: When task execution started
|
||||
end_time: When task execution ended
|
||||
memory_usage: Current memory usage in MB
|
||||
peak_memory: Maximum memory usage in MB
|
||||
error_message: Error description if failed
|
||||
retry_count: Number of retry attempts
|
||||
wait_time: Time spent in queue
|
||||
|
||||
Updates task statistics and updates status counts.
|
||||
If status changes, decrements old status count and
|
||||
increments new status count.
|
||||
"""
|
||||
with self._lock:
|
||||
# Check if task exists
|
||||
if task_id not in self.stats:
|
||||
return
|
||||
|
||||
task_stats = self.stats[task_id]
|
||||
|
||||
# Update status counts if status is changing
|
||||
old_status = task_stats["status"]
|
||||
if status and status.name != old_status:
|
||||
self.status_counts[old_status] -= 1
|
||||
self.status_counts[status.name] += 1
|
||||
|
||||
# Track completion
|
||||
if status == CrawlStatus.COMPLETED:
|
||||
self.urls_completed += 1
|
||||
|
||||
# Track requeues
|
||||
if old_status in [CrawlStatus.COMPLETED.name, CrawlStatus.FAILED.name] and not task_stats.get("counted_requeue", False):
|
||||
self.requeued_count += 1
|
||||
task_stats["counted_requeue"] = True
|
||||
|
||||
# Update task statistics
|
||||
if status:
|
||||
task_stats["status"] = status.name
|
||||
if start_time is not None:
|
||||
task_stats["start_time"] = start_time
|
||||
if end_time is not None:
|
||||
task_stats["end_time"] = end_time
|
||||
if memory_usage is not None:
|
||||
task_stats["memory_usage"] = memory_usage
|
||||
|
||||
# Update peak memory if necessary
|
||||
current_percent = (memory_usage / psutil.virtual_memory().total) * 100
|
||||
if current_percent > self.peak_memory_percent:
|
||||
self.peak_memory_percent = current_percent
|
||||
self.peak_memory_time = time.time()
|
||||
|
||||
if peak_memory is not None:
|
||||
task_stats["peak_memory"] = peak_memory
|
||||
if error_message is not None:
|
||||
task_stats["error_message"] = error_message
|
||||
if retry_count is not None:
|
||||
task_stats["retry_count"] = retry_count
|
||||
if wait_time is not None:
|
||||
task_stats["wait_time"] = wait_time
|
||||
|
||||
# Calculate duration
|
||||
if task_stats["start_time"]:
|
||||
end = task_stats["end_time"] or time.time()
|
||||
duration = end - task_stats["start_time"]
|
||||
task_stats["duration"] = self._format_time(duration)
|
||||
|
||||
def update_memory_status(self, status: str):
|
||||
"""
|
||||
Update the current memory status.
|
||||
|
||||
Args:
|
||||
status: Memory status (NORMAL, PRESSURE, CRITICAL, or custom)
|
||||
|
||||
Also updates the UI to reflect the new status.
|
||||
"""
|
||||
with self._lock:
|
||||
self.memory_status = status
|
||||
|
||||
def update_queue_statistics(
|
||||
self,
|
||||
total_queued: int,
|
||||
highest_wait_time: float,
|
||||
avg_wait_time: float
|
||||
):
|
||||
"""
|
||||
Update statistics related to the task queue.
|
||||
|
||||
Args:
|
||||
total_queued: Number of tasks currently in queue
|
||||
highest_wait_time: Longest wait time of any queued task
|
||||
avg_wait_time: Average wait time across all queued tasks
|
||||
"""
|
||||
with self._lock:
|
||||
self.queue_stats = {
|
||||
"total_queued": total_queued,
|
||||
"highest_wait_time": highest_wait_time,
|
||||
"avg_wait_time": avg_wait_time
|
||||
}
|
||||
|
||||
def get_task_stats(self, task_id: str) -> Dict:
|
||||
"""
|
||||
Get statistics for a specific task.
|
||||
|
||||
Args:
|
||||
task_id: Unique identifier for the task
|
||||
|
||||
Returns:
|
||||
Dictionary containing all task statistics
|
||||
"""
|
||||
with self._lock:
|
||||
return self.stats.get(task_id, {}).copy()
|
||||
|
||||
def get_all_task_stats(self) -> Dict[str, Dict]:
|
||||
"""
|
||||
Get statistics for all tasks.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping task_ids to their statistics
|
||||
"""
|
||||
with self._lock:
|
||||
return self.stats.copy()
|
||||
|
||||
def get_memory_status(self) -> str:
|
||||
"""
|
||||
Get the current memory status.
|
||||
|
||||
Returns:
|
||||
Current memory status string
|
||||
"""
|
||||
with self._lock:
|
||||
return self.memory_status
|
||||
|
||||
def get_queue_stats(self) -> Dict:
|
||||
"""
|
||||
Get current queue statistics.
|
||||
|
||||
Returns:
|
||||
Dictionary with queue statistics including:
|
||||
- total_queued: Number of tasks in queue
|
||||
- highest_wait_time: Longest wait time
|
||||
- avg_wait_time: Average wait time
|
||||
"""
|
||||
with self._lock:
|
||||
return self.queue_stats.copy()
|
||||
|
||||
def get_summary(self) -> Dict:
|
||||
"""
|
||||
Get a summary of all crawler statistics.
|
||||
|
||||
Returns:
|
||||
Dictionary containing:
|
||||
- runtime: Total runtime in seconds
|
||||
- urls_total: Total URLs to process
|
||||
- urls_completed: Number of completed URLs
|
||||
- completion_percentage: Percentage complete
|
||||
- status_counts: Count of tasks in each status
|
||||
- memory_status: Current memory status
|
||||
- peak_memory_percent: Highest memory usage
|
||||
- peak_memory_time: When peak memory occurred
|
||||
- avg_task_duration: Average task processing time
|
||||
- estimated_completion_time: Projected finish time
|
||||
- requeue_rate: Percentage of tasks requeued
|
||||
"""
|
||||
with self._lock:
|
||||
# Calculate runtime
|
||||
current_time = time.time()
|
||||
runtime = current_time - (self.start_time or current_time)
|
||||
|
||||
# Calculate completion percentage
|
||||
completion_percentage = 0
|
||||
if self.urls_total > 0:
|
||||
completion_percentage = (self.urls_completed / self.urls_total) * 100
|
||||
|
||||
# Calculate average task duration for completed tasks
|
||||
completed_tasks = [
|
||||
task for task in self.stats.values()
|
||||
if task["status"] == CrawlStatus.COMPLETED.name and task.get("start_time") and task.get("end_time")
|
||||
]
|
||||
|
||||
avg_task_duration = 0
|
||||
if completed_tasks:
|
||||
total_duration = sum(task["end_time"] - task["start_time"] for task in completed_tasks)
|
||||
avg_task_duration = total_duration / len(completed_tasks)
|
||||
|
||||
# Calculate requeue rate
|
||||
requeue_rate = 0
|
||||
if len(self.stats) > 0:
|
||||
requeue_rate = (self.requeued_count / len(self.stats)) * 100
|
||||
|
||||
# Calculate estimated completion time
|
||||
estimated_completion_time = "N/A"
|
||||
if avg_task_duration > 0 and self.urls_total > 0 and self.urls_completed > 0:
|
||||
remaining_tasks = self.urls_total - self.urls_completed
|
||||
estimated_seconds = remaining_tasks * avg_task_duration
|
||||
estimated_completion_time = self._format_time(estimated_seconds)
|
||||
|
||||
return {
|
||||
"runtime": runtime,
|
||||
"urls_total": self.urls_total,
|
||||
"urls_completed": self.urls_completed,
|
||||
"completion_percentage": completion_percentage,
|
||||
"status_counts": self.status_counts.copy(),
|
||||
"memory_status": self.memory_status,
|
||||
"peak_memory_percent": self.peak_memory_percent,
|
||||
"peak_memory_time": self.peak_memory_time,
|
||||
"avg_task_duration": avg_task_duration,
|
||||
"estimated_completion_time": estimated_completion_time,
|
||||
"requeue_rate": requeue_rate,
|
||||
"requeued_count": self.requeued_count
|
||||
}
|
||||
|
||||
def render(self):
|
||||
"""
|
||||
Render the terminal UI.
|
||||
|
||||
This is the main UI rendering loop that:
|
||||
1. Updates all statistics
|
||||
2. Formats the display
|
||||
3. Renders the ASCII interface
|
||||
4. Handles keyboard input
|
||||
|
||||
Note: The actual rendering is handled by the TerminalUI class
|
||||
which uses the rich library's Live display.
|
||||
"""
|
||||
if self.enable_ui and self.terminal_ui:
|
||||
# Force an update of the UI
|
||||
if hasattr(self.terminal_ui, '_update_display'):
|
||||
self.terminal_ui._update_display()
|
||||
|
||||
def _format_time(self, seconds: float) -> str:
|
||||
"""
|
||||
Format time in hours:minutes:seconds.
|
||||
|
||||
Args:
|
||||
seconds: Time in seconds
|
||||
|
||||
Returns:
|
||||
Formatted time string (e.g., "1:23:45")
|
||||
"""
|
||||
delta = timedelta(seconds=int(seconds))
|
||||
hours, remainder = divmod(delta.seconds, 3600)
|
||||
minutes, seconds = divmod(remainder, 60)
|
||||
|
||||
if hours > 0:
|
||||
return f"{hours}:{minutes:02}:{seconds:02}"
|
||||
else:
|
||||
return f"{minutes}:{seconds:02}"
|
||||
|
||||
def _calculate_estimated_completion(self) -> str:
|
||||
"""
|
||||
Calculate estimated completion time based on current progress.
|
||||
|
||||
Returns:
|
||||
Formatted time string
|
||||
"""
|
||||
summary = self.get_summary()
|
||||
return summary.get("estimated_completion_time", "N/A")
|
||||
|
||||
|
||||
# Example code for testing
|
||||
if __name__ == "__main__":
|
||||
# Initialize the monitor
|
||||
monitor = CrawlerMonitor(urls_total=100)
|
||||
|
||||
# Start monitoring
|
||||
monitor.start()
|
||||
|
||||
try:
|
||||
# Simulate some tasks
|
||||
for i in range(20):
|
||||
task_id = str(uuid.uuid4())
|
||||
url = f"https://example.com/page{i}"
|
||||
monitor.add_task(task_id, url)
|
||||
|
||||
# Simulate 20% of tasks are already running
|
||||
if i < 4:
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.IN_PROGRESS,
|
||||
start_time=time.time() - 30, # Started 30 seconds ago
|
||||
memory_usage=10.5
|
||||
)
|
||||
|
||||
# Simulate 10% of tasks are completed
|
||||
if i >= 4 and i < 6:
|
||||
start_time = time.time() - 60
|
||||
end_time = time.time() - 15
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.IN_PROGRESS,
|
||||
start_time=start_time,
|
||||
memory_usage=8.2
|
||||
)
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.COMPLETED,
|
||||
end_time=end_time,
|
||||
memory_usage=0,
|
||||
peak_memory=15.7
|
||||
)
|
||||
|
||||
# Simulate 5% of tasks fail
|
||||
if i >= 6 and i < 7:
|
||||
start_time = time.time() - 45
|
||||
end_time = time.time() - 20
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.IN_PROGRESS,
|
||||
start_time=start_time,
|
||||
memory_usage=12.3
|
||||
)
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.FAILED,
|
||||
end_time=end_time,
|
||||
memory_usage=0,
|
||||
peak_memory=18.2,
|
||||
error_message="Connection timeout"
|
||||
)
|
||||
|
||||
# Simulate memory pressure
|
||||
monitor.update_memory_status("PRESSURE")
|
||||
|
||||
# Simulate queue statistics
|
||||
monitor.update_queue_statistics(
|
||||
total_queued=16, # 20 - 4 (in progress)
|
||||
highest_wait_time=120.5,
|
||||
avg_wait_time=60.2
|
||||
)
|
||||
|
||||
# Keep the monitor running for a demonstration
|
||||
print("Crawler Monitor is running. Press 'q' to exit.")
|
||||
while monitor.is_running:
|
||||
time.sleep(0.1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nExiting crawler monitor...")
|
||||
finally:
|
||||
# Stop the monitor
|
||||
monitor.stop()
|
||||
print("Crawler monitor exited successfully.")
|
||||
@@ -4,7 +4,8 @@ from dotenv import load_dotenv
|
||||
load_dotenv() # Load environment variables from .env file
|
||||
|
||||
# Default provider, ONLY used when the extraction strategy is LLMExtractionStrategy
|
||||
DEFAULT_PROVIDER = "openai/gpt-4o-mini"
|
||||
DEFAULT_PROVIDER = "openai/gpt-4o"
|
||||
DEFAULT_PROVIDER_API_KEY = "OPENAI_API_KEY"
|
||||
MODEL_REPO_BRANCH = "new-release-0.0.2"
|
||||
# Provider-model dictionary, ONLY used when the extraction strategy is LLMExtractionStrategy
|
||||
PROVIDER_MODELS = {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from crawl4ai import BrowserConfig, AsyncWebCrawler, CrawlerRunConfig, CacheMode
|
||||
from crawl4ai.hub import BaseCrawler
|
||||
from crawl4ai.utils import optimize_html, get_home_folder
|
||||
from crawl4ai.utils import optimize_html, get_home_folder, preprocess_html_for_schema
|
||||
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
|
||||
from pathlib import Path
|
||||
import json
|
||||
@@ -68,7 +68,8 @@ class GoogleSearchCrawler(BaseCrawler):
|
||||
home_dir = get_home_folder() if not schema_cache_path else schema_cache_path
|
||||
os.makedirs(f"{home_dir}/schema", exist_ok=True)
|
||||
|
||||
cleaned_html = optimize_html(html, threshold=100)
|
||||
# cleaned_html = optimize_html(html, threshold=100)
|
||||
cleaned_html = preprocess_html_for_schema(html)
|
||||
|
||||
organic_schema = None
|
||||
if os.path.exists(f"{home_dir}/schema/organic_schema.json"):
|
||||
|
||||
@@ -34,7 +34,7 @@ from .model_loader import (
|
||||
calculate_batch_size
|
||||
)
|
||||
|
||||
from .types import LLMConfig
|
||||
from .types import LLMConfig, create_llm_config
|
||||
|
||||
from functools import partial
|
||||
import numpy as np
|
||||
@@ -757,8 +757,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
|
||||
#######################################################
|
||||
# New extraction strategies for JSON-based extraction #
|
||||
#######################################################
|
||||
|
||||
|
||||
class JsonElementExtractionStrategy(ExtractionStrategy):
|
||||
"""
|
||||
Abstract base class for extracting structured JSON from HTML content.
|
||||
@@ -1049,7 +1047,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
|
||||
schema_type: str = "CSS", # or XPATH
|
||||
query: str = None,
|
||||
target_json_example: str = None,
|
||||
llm_config: 'LLMConfig' = None,
|
||||
llm_config: 'LLMConfig' = create_llm_config(),
|
||||
provider: str = None,
|
||||
api_token: str = None,
|
||||
**kwargs
|
||||
@@ -1140,7 +1138,6 @@ In this scenario, use your best judgment to generate the schema. Try to maximize
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to generate schema: {str(e)}")
|
||||
|
||||
|
||||
class JsonCssExtractionStrategy(JsonElementExtractionStrategy):
|
||||
"""
|
||||
Concrete implementation of `JsonElementExtractionStrategy` using CSS selectors.
|
||||
|
||||
@@ -28,6 +28,12 @@ class CrawlerTaskResult:
|
||||
start_time: Union[datetime, float]
|
||||
end_time: Union[datetime, float]
|
||||
error_message: str = ""
|
||||
retry_count: int = 0
|
||||
wait_time: float = 0.0
|
||||
|
||||
@property
|
||||
def success(self) -> bool:
|
||||
return self.result.success
|
||||
|
||||
|
||||
class CrawlStatus(Enum):
|
||||
@@ -67,6 +73,9 @@ class CrawlStats:
|
||||
memory_usage: float = 0.0
|
||||
peak_memory: float = 0.0
|
||||
error_message: str = ""
|
||||
wait_time: float = 0.0
|
||||
retry_count: int = 0
|
||||
counted_requeue: bool = False
|
||||
|
||||
@property
|
||||
def duration(self) -> str:
|
||||
@@ -87,6 +96,7 @@ class CrawlStats:
|
||||
duration = end - start
|
||||
return str(timedelta(seconds=int(duration.total_seconds())))
|
||||
|
||||
|
||||
class DisplayMode(Enum):
|
||||
DETAILED = "DETAILED"
|
||||
AGGREGATED = "AGGREGATED"
|
||||
|
||||
@@ -178,4 +178,10 @@ if TYPE_CHECKING:
|
||||
BestFirstCrawlingStrategy as BestFirstCrawlingStrategyType,
|
||||
DFSDeepCrawlStrategy as DFSDeepCrawlStrategyType,
|
||||
DeepCrawlDecorator as DeepCrawlDecoratorType,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
|
||||
def create_llm_config(*args, **kwargs) -> 'LLMConfigType':
|
||||
from .async_configs import LLMConfig
|
||||
return LLMConfig(*args, **kwargs)
|
||||
|
||||
@@ -26,7 +26,7 @@ import cProfile
|
||||
import pstats
|
||||
from functools import wraps
|
||||
import asyncio
|
||||
|
||||
from lxml import etree, html as lhtml
|
||||
import sqlite3
|
||||
import hashlib
|
||||
|
||||
@@ -2617,3 +2617,116 @@ class HeadPeekr:
|
||||
def get_title(head_content: str):
|
||||
title_match = re.search(r'<title>(.*?)</title>', head_content, re.IGNORECASE | re.DOTALL)
|
||||
return title_match.group(1) if title_match else None
|
||||
|
||||
def preprocess_html_for_schema(html_content, text_threshold=100, attr_value_threshold=200, max_size=100000):
|
||||
"""
|
||||
Preprocess HTML to reduce size while preserving structure for schema generation.
|
||||
|
||||
Args:
|
||||
html_content (str): Raw HTML content
|
||||
text_threshold (int): Maximum length for text nodes before truncation
|
||||
attr_value_threshold (int): Maximum length for attribute values before truncation
|
||||
max_size (int): Target maximum size for output HTML
|
||||
|
||||
Returns:
|
||||
str: Preprocessed HTML content
|
||||
"""
|
||||
try:
|
||||
# Parse HTML with error recovery
|
||||
parser = etree.HTMLParser(remove_comments=True, remove_blank_text=True)
|
||||
tree = lhtml.fromstring(html_content, parser=parser)
|
||||
|
||||
# 1. Remove HEAD section (keep only BODY)
|
||||
head_elements = tree.xpath('//head')
|
||||
for head in head_elements:
|
||||
if head.getparent() is not None:
|
||||
head.getparent().remove(head)
|
||||
|
||||
# 2. Define tags to remove completely
|
||||
tags_to_remove = [
|
||||
'script', 'style', 'noscript', 'iframe', 'canvas', 'svg',
|
||||
'video', 'audio', 'source', 'track', 'map', 'area'
|
||||
]
|
||||
|
||||
# Remove unwanted elements
|
||||
for tag in tags_to_remove:
|
||||
elements = tree.xpath(f'//{tag}')
|
||||
for element in elements:
|
||||
if element.getparent() is not None:
|
||||
element.getparent().remove(element)
|
||||
|
||||
# 3. Process remaining elements to clean attributes and truncate text
|
||||
for element in tree.iter():
|
||||
# Skip if we're at the root level
|
||||
if element.getparent() is None:
|
||||
continue
|
||||
|
||||
# Clean non-essential attributes but preserve structural ones
|
||||
# attribs_to_keep = {'id', 'class', 'name', 'href', 'src', 'type', 'value', 'data-'}
|
||||
|
||||
# This is more aggressive than the previous version
|
||||
attribs_to_keep = {'id', 'class', 'name', 'type', 'value'}
|
||||
|
||||
# attributes_hates_truncate = ['id', 'class', "data-"]
|
||||
|
||||
# This means, I don't care, if an attribute is too long, truncate it, go and find a better css selector to build a schema
|
||||
attributes_hates_truncate = []
|
||||
|
||||
# Process each attribute
|
||||
for attrib in list(element.attrib.keys()):
|
||||
# Keep if it's essential or starts with data-
|
||||
if not (attrib in attribs_to_keep or attrib.startswith('data-')):
|
||||
element.attrib.pop(attrib)
|
||||
# Truncate long attribute values except for selectors
|
||||
elif attrib not in attributes_hates_truncate and len(element.attrib[attrib]) > attr_value_threshold:
|
||||
element.attrib[attrib] = element.attrib[attrib][:attr_value_threshold] + '...'
|
||||
|
||||
# Truncate text content if it's too long
|
||||
if element.text and len(element.text.strip()) > text_threshold:
|
||||
element.text = element.text.strip()[:text_threshold] + '...'
|
||||
|
||||
# Also truncate tail text if present
|
||||
if element.tail and len(element.tail.strip()) > text_threshold:
|
||||
element.tail = element.tail.strip()[:text_threshold] + '...'
|
||||
|
||||
# 4. Find repeated patterns and keep only a few examples
|
||||
# This is a simplistic approach - more sophisticated pattern detection could be implemented
|
||||
pattern_elements = {}
|
||||
for element in tree.xpath('//*[contains(@class, "")]'):
|
||||
parent = element.getparent()
|
||||
if parent is None:
|
||||
continue
|
||||
|
||||
# Create a signature based on tag and classes
|
||||
classes = element.get('class', '')
|
||||
if not classes:
|
||||
continue
|
||||
signature = f"{element.tag}.{classes}"
|
||||
|
||||
if signature in pattern_elements:
|
||||
pattern_elements[signature].append(element)
|
||||
else:
|
||||
pattern_elements[signature] = [element]
|
||||
|
||||
# Keep only 3 examples of each repeating pattern
|
||||
for signature, elements in pattern_elements.items():
|
||||
if len(elements) > 3:
|
||||
# Keep the first 2 and last elements
|
||||
for element in elements[2:-1]:
|
||||
if element.getparent() is not None:
|
||||
element.getparent().remove(element)
|
||||
|
||||
# 5. Convert back to string
|
||||
result = etree.tostring(tree, encoding='unicode', method='html')
|
||||
|
||||
# If still over the size limit, apply more aggressive truncation
|
||||
if len(result) > max_size:
|
||||
return result[:max_size] + "..."
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Fallback for parsing errors
|
||||
return html_content[:max_size] if len(html_content) > max_size else html_content
|
||||
|
||||
|
||||
|
||||
@@ -554,7 +554,7 @@ async def test_stream_crawl(session, token: str):
|
||||
"https://example.com/page3",
|
||||
],
|
||||
"browser_config": {"headless": True, "viewport": {"width": 1200}},
|
||||
"crawler_config": {"stream": True, "cache_mode": "aggressive"}
|
||||
"crawler_config": {"stream": True, "cache_mode": "bypass"}
|
||||
}
|
||||
|
||||
# headers = {"Authorization": f"Bearer {token}"} # If JWT is enabled, more on this later
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
import json
|
||||
import asyncio
|
||||
from typing import List, Tuple
|
||||
from functools import partial
|
||||
|
||||
import logging
|
||||
from typing import Optional, AsyncGenerator
|
||||
@@ -391,12 +392,13 @@ async def handle_crawl_request(
|
||||
)
|
||||
|
||||
async with AsyncWebCrawler(config=browser_config) as crawler:
|
||||
results = await crawler.arun_many(
|
||||
urls=urls,
|
||||
config=crawler_config,
|
||||
dispatcher=dispatcher
|
||||
)
|
||||
|
||||
results = []
|
||||
func = getattr(crawler, "arun" if len(urls) == 1 else "arun_many")
|
||||
partial_func = partial(func,
|
||||
urls[0] if len(urls) == 1 else urls,
|
||||
config=crawler_config,
|
||||
dispatcher=dispatcher)
|
||||
results = await partial_func()
|
||||
return {
|
||||
"success": True,
|
||||
"results": [result.model_dump() for result in results]
|
||||
|
||||
79
docs/examples/arun_vs_arun_many.py
Normal file
79
docs/examples/arun_vs_arun_many.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import asyncio
|
||||
import time
|
||||
from crawl4ai.async_webcrawler import AsyncWebCrawler, CacheMode
|
||||
from crawl4ai.async_configs import CrawlerRunConfig
|
||||
from crawl4ai.async_dispatcher import MemoryAdaptiveDispatcher, RateLimiter
|
||||
|
||||
VERBOSE = False
|
||||
|
||||
async def crawl_sequential(urls):
|
||||
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE)
|
||||
results = []
|
||||
start_time = time.perf_counter()
|
||||
async with AsyncWebCrawler() as crawler:
|
||||
for url in urls:
|
||||
result_container = await crawler.arun(url=url, config=config)
|
||||
results.append(result_container[0])
|
||||
total_time = time.perf_counter() - start_time
|
||||
return total_time, results
|
||||
|
||||
async def crawl_parallel_dispatcher(urls):
|
||||
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE)
|
||||
# Dispatcher with rate limiter enabled (default behavior)
|
||||
dispatcher = MemoryAdaptiveDispatcher(
|
||||
rate_limiter=RateLimiter(base_delay=(1.0, 3.0), max_delay=60.0, max_retries=3),
|
||||
max_session_permit=50,
|
||||
)
|
||||
start_time = time.perf_counter()
|
||||
async with AsyncWebCrawler() as crawler:
|
||||
result_container = await crawler.arun_many(urls=urls, config=config, dispatcher=dispatcher)
|
||||
results = []
|
||||
if isinstance(result_container, list):
|
||||
results = result_container
|
||||
else:
|
||||
async for res in result_container:
|
||||
results.append(res)
|
||||
total_time = time.perf_counter() - start_time
|
||||
return total_time, results
|
||||
|
||||
async def crawl_parallel_no_rate_limit(urls):
|
||||
config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, verbose=VERBOSE)
|
||||
# Dispatcher with no rate limiter and a high session permit to avoid queuing
|
||||
dispatcher = MemoryAdaptiveDispatcher(
|
||||
rate_limiter=None,
|
||||
max_session_permit=len(urls) # allow all URLs concurrently
|
||||
)
|
||||
start_time = time.perf_counter()
|
||||
async with AsyncWebCrawler() as crawler:
|
||||
result_container = await crawler.arun_many(urls=urls, config=config, dispatcher=dispatcher)
|
||||
results = []
|
||||
if isinstance(result_container, list):
|
||||
results = result_container
|
||||
else:
|
||||
async for res in result_container:
|
||||
results.append(res)
|
||||
total_time = time.perf_counter() - start_time
|
||||
return total_time, results
|
||||
|
||||
async def main():
|
||||
urls = ["https://example.com"] * 100
|
||||
print(f"Crawling {len(urls)} URLs sequentially...")
|
||||
seq_time, seq_results = await crawl_sequential(urls)
|
||||
print(f"Sequential crawling took: {seq_time:.2f} seconds\n")
|
||||
|
||||
print(f"Crawling {len(urls)} URLs in parallel using arun_many with dispatcher (with rate limit)...")
|
||||
disp_time, disp_results = await crawl_parallel_dispatcher(urls)
|
||||
print(f"Parallel (dispatcher with rate limiter) took: {disp_time:.2f} seconds\n")
|
||||
|
||||
print(f"Crawling {len(urls)} URLs in parallel using dispatcher with no rate limiter...")
|
||||
no_rl_time, no_rl_results = await crawl_parallel_no_rate_limit(urls)
|
||||
print(f"Parallel (dispatcher without rate limiter) took: {no_rl_time:.2f} seconds\n")
|
||||
|
||||
print("Crawl4ai - Crawling Comparison")
|
||||
print("--------------------------------------------------------")
|
||||
print(f"Sequential crawling took: {seq_time:.2f} seconds")
|
||||
print(f"Parallel (dispatcher with rate limiter) took: {disp_time:.2f} seconds")
|
||||
print(f"Parallel (dispatcher without rate limiter) took: {no_rl_time:.2f} seconds")
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
209
docs/examples/crawler_monitor_example.py
Normal file
209
docs/examples/crawler_monitor_example.py
Normal file
@@ -0,0 +1,209 @@
|
||||
"""
|
||||
CrawlerMonitor Example
|
||||
|
||||
This example demonstrates how to use the CrawlerMonitor component
|
||||
to visualize and track web crawler operations in real-time.
|
||||
"""
|
||||
|
||||
import time
|
||||
import uuid
|
||||
import random
|
||||
import threading
|
||||
from crawl4ai.components.crawler_monitor import CrawlerMonitor
|
||||
from crawl4ai.models import CrawlStatus
|
||||
|
||||
def simulate_webcrawler_operations(monitor, num_tasks=20):
|
||||
"""
|
||||
Simulates a web crawler's operations with multiple tasks and different states.
|
||||
|
||||
Args:
|
||||
monitor: The CrawlerMonitor instance
|
||||
num_tasks: Number of tasks to simulate
|
||||
"""
|
||||
print(f"Starting simulation with {num_tasks} tasks...")
|
||||
|
||||
# Create and register all tasks first
|
||||
task_ids = []
|
||||
for i in range(num_tasks):
|
||||
task_id = str(uuid.uuid4())
|
||||
url = f"https://example.com/page{i}"
|
||||
monitor.add_task(task_id, url)
|
||||
task_ids.append((task_id, url))
|
||||
|
||||
# Small delay between task creation
|
||||
time.sleep(0.2)
|
||||
|
||||
# Process tasks with a variety of different behaviors
|
||||
threads = []
|
||||
for i, (task_id, url) in enumerate(task_ids):
|
||||
# Create a thread for each task
|
||||
thread = threading.Thread(
|
||||
target=process_task,
|
||||
args=(monitor, task_id, url, i)
|
||||
)
|
||||
thread.daemon = True
|
||||
threads.append(thread)
|
||||
|
||||
# Start threads in batches to simulate concurrent processing
|
||||
batch_size = 4 # Process 4 tasks at a time
|
||||
for i in range(0, len(threads), batch_size):
|
||||
batch = threads[i:i+batch_size]
|
||||
for thread in batch:
|
||||
thread.start()
|
||||
time.sleep(0.5) # Stagger thread start times
|
||||
|
||||
# Wait a bit before starting next batch
|
||||
time.sleep(random.uniform(1.0, 3.0))
|
||||
|
||||
# Update queue statistics
|
||||
update_queue_stats(monitor)
|
||||
|
||||
# Simulate memory pressure changes
|
||||
active_threads = [t for t in threads if t.is_alive()]
|
||||
if len(active_threads) > 8:
|
||||
monitor.update_memory_status("CRITICAL")
|
||||
elif len(active_threads) > 4:
|
||||
monitor.update_memory_status("PRESSURE")
|
||||
else:
|
||||
monitor.update_memory_status("NORMAL")
|
||||
|
||||
# Wait for all threads to complete
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Final updates
|
||||
update_queue_stats(monitor)
|
||||
monitor.update_memory_status("NORMAL")
|
||||
|
||||
print("Simulation completed!")
|
||||
|
||||
def process_task(monitor, task_id, url, index):
|
||||
"""Simulate processing of a single task."""
|
||||
# Tasks start in queued state (already added)
|
||||
|
||||
# Simulate waiting in queue
|
||||
wait_time = random.uniform(0.5, 3.0)
|
||||
time.sleep(wait_time)
|
||||
|
||||
# Start processing - move to IN_PROGRESS
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.IN_PROGRESS,
|
||||
start_time=time.time(),
|
||||
wait_time=wait_time
|
||||
)
|
||||
|
||||
# Simulate task processing with memory usage changes
|
||||
total_process_time = random.uniform(2.0, 10.0)
|
||||
step_time = total_process_time / 5 # Update in 5 steps
|
||||
|
||||
for step in range(5):
|
||||
# Simulate increasing then decreasing memory usage
|
||||
if step < 3: # First 3 steps - increasing
|
||||
memory_usage = random.uniform(5.0, 20.0) * (step + 1)
|
||||
else: # Last 2 steps - decreasing
|
||||
memory_usage = random.uniform(5.0, 20.0) * (5 - step)
|
||||
|
||||
# Update peak memory if this is higher
|
||||
peak = max(memory_usage, monitor.get_task_stats(task_id).get("peak_memory", 0))
|
||||
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
memory_usage=memory_usage,
|
||||
peak_memory=peak
|
||||
)
|
||||
|
||||
time.sleep(step_time)
|
||||
|
||||
# Determine final state - 80% success, 20% failure
|
||||
if index % 5 == 0: # Every 5th task fails
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.FAILED,
|
||||
end_time=time.time(),
|
||||
memory_usage=0.0,
|
||||
error_message="Connection timeout"
|
||||
)
|
||||
else:
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.COMPLETED,
|
||||
end_time=time.time(),
|
||||
memory_usage=0.0
|
||||
)
|
||||
|
||||
def update_queue_stats(monitor):
|
||||
"""Update queue statistics based on current tasks."""
|
||||
task_stats = monitor.get_all_task_stats()
|
||||
|
||||
# Count queued tasks
|
||||
queued_tasks = [
|
||||
stats for stats in task_stats.values()
|
||||
if stats["status"] == CrawlStatus.QUEUED.name
|
||||
]
|
||||
|
||||
total_queued = len(queued_tasks)
|
||||
|
||||
if total_queued > 0:
|
||||
current_time = time.time()
|
||||
# Calculate wait times
|
||||
wait_times = [
|
||||
current_time - stats.get("enqueue_time", current_time)
|
||||
for stats in queued_tasks
|
||||
]
|
||||
highest_wait_time = max(wait_times) if wait_times else 0.0
|
||||
avg_wait_time = sum(wait_times) / len(wait_times) if wait_times else 0.0
|
||||
else:
|
||||
highest_wait_time = 0.0
|
||||
avg_wait_time = 0.0
|
||||
|
||||
# Update monitor
|
||||
monitor.update_queue_statistics(
|
||||
total_queued=total_queued,
|
||||
highest_wait_time=highest_wait_time,
|
||||
avg_wait_time=avg_wait_time
|
||||
)
|
||||
|
||||
def main():
|
||||
# Initialize the monitor
|
||||
monitor = CrawlerMonitor(
|
||||
urls_total=20, # Total URLs to process
|
||||
refresh_rate=0.5, # Update UI twice per second
|
||||
enable_ui=True, # Enable terminal UI
|
||||
max_width=120 # Set maximum width to 120 characters
|
||||
)
|
||||
|
||||
# Start the monitor
|
||||
monitor.start()
|
||||
|
||||
try:
|
||||
# Run simulation
|
||||
simulate_webcrawler_operations(monitor)
|
||||
|
||||
# Keep monitor running a bit to see final state
|
||||
print("Waiting to view final state...")
|
||||
time.sleep(5)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nExample interrupted by user")
|
||||
finally:
|
||||
# Stop the monitor
|
||||
monitor.stop()
|
||||
print("Example completed!")
|
||||
|
||||
# Print some statistics
|
||||
summary = monitor.get_summary()
|
||||
print("\nCrawler Statistics Summary:")
|
||||
print(f"Total URLs: {summary['urls_total']}")
|
||||
print(f"Completed: {summary['urls_completed']}")
|
||||
print(f"Completion percentage: {summary['completion_percentage']:.1f}%")
|
||||
print(f"Peak memory usage: {summary['peak_memory_percent']:.1f}%")
|
||||
|
||||
# Print task status counts
|
||||
status_counts = summary['status_counts']
|
||||
print("\nTask Status Counts:")
|
||||
for status, count in status_counts.items():
|
||||
print(f" {status}: {count}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -73,7 +73,7 @@ async def test_stream_crawl(session, token: str):
|
||||
# "https://news.ycombinator.com/news"
|
||||
],
|
||||
"browser_config": {"headless": True, "viewport": {"width": 1200}},
|
||||
"crawler_config": {"stream": True, "cache_mode": "aggressive"}
|
||||
"crawler_config": {"stream": True, "cache_mode": "bypass"}
|
||||
}
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
print(f"\nTesting Streaming Crawl: {url}")
|
||||
|
||||
@@ -42,7 +42,7 @@ dependencies = [
|
||||
"pyperclip>=1.8.2",
|
||||
"faust-cchardet>=2.1.19",
|
||||
"aiohttp>=3.11.11",
|
||||
"humanize>=4.10.0"
|
||||
"humanize>=4.10.0",
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
|
||||
@@ -10,6 +10,7 @@ import asyncio
|
||||
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
|
||||
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
|
||||
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, JsonXPathExtractionStrategy
|
||||
from crawl4ai.utils import preprocess_html_for_schema, JsonXPathExtractionStrategy
|
||||
import json
|
||||
|
||||
# Test HTML - A complex job board with companies, departments, and positions
|
||||
|
||||
@@ -73,7 +73,7 @@ async def test_stream_crawl(session, token: str):
|
||||
# "https://news.ycombinator.com/news"
|
||||
],
|
||||
"browser_config": {"headless": True, "viewport": {"width": 1200}},
|
||||
"crawler_config": {"stream": True, "cache_mode": "aggressive"}
|
||||
"crawler_config": {"stream": True, "cache_mode": "bypass"}
|
||||
}
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
print(f"\nTesting Streaming Crawl: {url}")
|
||||
|
||||
168
tests/memory/test_crawler_monitor.py
Normal file
168
tests/memory/test_crawler_monitor.py
Normal file
@@ -0,0 +1,168 @@
|
||||
"""
|
||||
Test script for the CrawlerMonitor component.
|
||||
This script simulates a crawler with multiple tasks to demonstrate the real-time monitoring capabilities.
|
||||
"""
|
||||
|
||||
import time
|
||||
import uuid
|
||||
import random
|
||||
import threading
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add the parent directory to the path to import crawl4ai
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
|
||||
|
||||
from crawl4ai.components.crawler_monitor import CrawlerMonitor
|
||||
from crawl4ai.models import CrawlStatus
|
||||
|
||||
def simulate_crawler_task(monitor, task_id, url, simulate_failure=False):
|
||||
"""Simulate a crawler task with different states."""
|
||||
# Task starts in the QUEUED state
|
||||
wait_time = random.uniform(0.5, 3.0)
|
||||
time.sleep(wait_time)
|
||||
|
||||
# Update to IN_PROGRESS state
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.IN_PROGRESS,
|
||||
start_time=time.time(),
|
||||
wait_time=wait_time
|
||||
)
|
||||
|
||||
# Simulate task running
|
||||
process_time = random.uniform(1.0, 5.0)
|
||||
for i in range(int(process_time * 2)):
|
||||
# Simulate memory usage changes
|
||||
memory_usage = random.uniform(5.0, 25.0)
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
memory_usage=memory_usage,
|
||||
peak_memory=max(memory_usage, monitor.get_task_stats(task_id).get("peak_memory", 0))
|
||||
)
|
||||
time.sleep(0.5)
|
||||
|
||||
# Update to COMPLETED or FAILED state
|
||||
if simulate_failure and random.random() < 0.8: # 80% chance of failure if simulate_failure is True
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.FAILED,
|
||||
end_time=time.time(),
|
||||
error_message="Simulated failure: Connection timeout",
|
||||
memory_usage=0.0
|
||||
)
|
||||
else:
|
||||
monitor.update_task(
|
||||
task_id=task_id,
|
||||
status=CrawlStatus.COMPLETED,
|
||||
end_time=time.time(),
|
||||
memory_usage=0.0
|
||||
)
|
||||
|
||||
def update_queue_stats(monitor, num_queued_tasks):
|
||||
"""Update queue statistics periodically."""
|
||||
while monitor.is_running:
|
||||
queued_tasks = [
|
||||
task for task_id, task in monitor.get_all_task_stats().items()
|
||||
if task["status"] == CrawlStatus.QUEUED.name
|
||||
]
|
||||
|
||||
total_queued = len(queued_tasks)
|
||||
|
||||
if total_queued > 0:
|
||||
current_time = time.time()
|
||||
wait_times = [
|
||||
current_time - task.get("enqueue_time", current_time)
|
||||
for task in queued_tasks
|
||||
]
|
||||
highest_wait_time = max(wait_times) if wait_times else 0.0
|
||||
avg_wait_time = sum(wait_times) / len(wait_times) if wait_times else 0.0
|
||||
else:
|
||||
highest_wait_time = 0.0
|
||||
avg_wait_time = 0.0
|
||||
|
||||
monitor.update_queue_statistics(
|
||||
total_queued=total_queued,
|
||||
highest_wait_time=highest_wait_time,
|
||||
avg_wait_time=avg_wait_time
|
||||
)
|
||||
|
||||
# Simulate memory pressure based on number of active tasks
|
||||
active_tasks = len([
|
||||
task for task_id, task in monitor.get_all_task_stats().items()
|
||||
if task["status"] == CrawlStatus.IN_PROGRESS.name
|
||||
])
|
||||
|
||||
if active_tasks > 8:
|
||||
monitor.update_memory_status("CRITICAL")
|
||||
elif active_tasks > 4:
|
||||
monitor.update_memory_status("PRESSURE")
|
||||
else:
|
||||
monitor.update_memory_status("NORMAL")
|
||||
|
||||
time.sleep(1.0)
|
||||
|
||||
def test_crawler_monitor():
|
||||
"""Test the CrawlerMonitor with simulated crawler tasks."""
|
||||
# Total number of URLs to crawl
|
||||
total_urls = 50
|
||||
|
||||
# Initialize the monitor
|
||||
monitor = CrawlerMonitor(urls_total=total_urls, refresh_rate=0.5)
|
||||
|
||||
# Start the monitor
|
||||
monitor.start()
|
||||
|
||||
# Start thread to update queue statistics
|
||||
queue_stats_thread = threading.Thread(target=update_queue_stats, args=(monitor, total_urls))
|
||||
queue_stats_thread.daemon = True
|
||||
queue_stats_thread.start()
|
||||
|
||||
try:
|
||||
# Create task threads
|
||||
threads = []
|
||||
for i in range(total_urls):
|
||||
task_id = str(uuid.uuid4())
|
||||
url = f"https://example.com/page{i}"
|
||||
|
||||
# Add task to monitor
|
||||
monitor.add_task(task_id, url)
|
||||
|
||||
# Determine if this task should simulate failure
|
||||
simulate_failure = (i % 10 == 0) # Every 10th task
|
||||
|
||||
# Create and start thread for this task
|
||||
thread = threading.Thread(
|
||||
target=simulate_crawler_task,
|
||||
args=(monitor, task_id, url, simulate_failure)
|
||||
)
|
||||
thread.daemon = True
|
||||
threads.append(thread)
|
||||
|
||||
# Start threads with delay to simulate tasks being added over time
|
||||
batch_size = 5
|
||||
for i in range(0, len(threads), batch_size):
|
||||
batch = threads[i:i+batch_size]
|
||||
for thread in batch:
|
||||
thread.start()
|
||||
time.sleep(0.5) # Small delay between starting threads
|
||||
|
||||
# Wait a bit before starting the next batch
|
||||
time.sleep(2.0)
|
||||
|
||||
# Wait for all threads to complete
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
# Keep monitor running a bit longer to see the final state
|
||||
time.sleep(5.0)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nTest interrupted by user")
|
||||
finally:
|
||||
# Stop the monitor
|
||||
monitor.stop()
|
||||
print("\nCrawler monitor test completed")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_crawler_monitor()
|
||||
410
tests/memory/test_dispatcher_stress.py
Normal file
410
tests/memory/test_dispatcher_stress.py
Normal file
@@ -0,0 +1,410 @@
|
||||
import asyncio
|
||||
import time
|
||||
import psutil
|
||||
import logging
|
||||
import random
|
||||
from typing import List, Dict
|
||||
import uuid
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Import your crawler components
|
||||
from crawl4ai.models import DisplayMode, CrawlStatus, CrawlResult
|
||||
from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig, CacheMode
|
||||
from crawl4ai import AsyncWebCrawler
|
||||
from crawl4ai import MemoryAdaptiveDispatcher, CrawlerMonitor
|
||||
|
||||
# Global configuration
|
||||
STREAM = False # Toggle between streaming and non-streaming modes
|
||||
|
||||
# Configure logging to file only (to avoid breaking the rich display)
|
||||
os.makedirs("logs", exist_ok=True)
|
||||
file_handler = logging.FileHandler("logs/memory_stress_test.log")
|
||||
file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
|
||||
|
||||
# Root logger - only to file, not console
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.INFO)
|
||||
root_logger.addHandler(file_handler)
|
||||
|
||||
# Our test logger also writes to file only
|
||||
logger = logging.getLogger("memory_stress_test")
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.addHandler(file_handler)
|
||||
logger.propagate = False # Don't propagate to root logger
|
||||
|
||||
# Create a memory restrictor to simulate limited memory environment
|
||||
class MemorySimulator:
|
||||
def __init__(self, target_percent: float = 85.0, aggressive: bool = False):
|
||||
"""Simulates memory pressure by allocating memory"""
|
||||
self.target_percent = target_percent
|
||||
self.memory_blocks: List[bytearray] = []
|
||||
self.aggressive = aggressive
|
||||
|
||||
def apply_pressure(self, additional_percent: float = 0.0):
|
||||
"""Fill memory until we reach target percentage"""
|
||||
current_percent = psutil.virtual_memory().percent
|
||||
target = self.target_percent + additional_percent
|
||||
|
||||
if current_percent >= target:
|
||||
return # Already at target
|
||||
|
||||
logger.info(f"Current memory: {current_percent}%, target: {target}%")
|
||||
|
||||
# Calculate how much memory we need to allocate
|
||||
total_memory = psutil.virtual_memory().total
|
||||
target_usage = (target / 100.0) * total_memory
|
||||
current_usage = (current_percent / 100.0) * total_memory
|
||||
bytes_to_allocate = int(target_usage - current_usage)
|
||||
|
||||
if bytes_to_allocate <= 0:
|
||||
return
|
||||
|
||||
# Allocate in smaller chunks to avoid overallocation
|
||||
if self.aggressive:
|
||||
# Use larger chunks for faster allocation in aggressive mode
|
||||
chunk_size = min(bytes_to_allocate, 200 * 1024 * 1024) # 200MB chunks
|
||||
else:
|
||||
chunk_size = min(bytes_to_allocate, 50 * 1024 * 1024) # 50MB chunks
|
||||
|
||||
try:
|
||||
logger.info(f"Allocating {chunk_size / (1024 * 1024):.1f}MB to reach target memory usage")
|
||||
self.memory_blocks.append(bytearray(chunk_size))
|
||||
time.sleep(0.5) # Give system time to register the allocation
|
||||
except MemoryError:
|
||||
logger.warning("Unable to allocate more memory")
|
||||
|
||||
def release_pressure(self, percent: float = None):
|
||||
"""
|
||||
Release allocated memory
|
||||
If percent is specified, release that percentage of blocks
|
||||
"""
|
||||
if not self.memory_blocks:
|
||||
return
|
||||
|
||||
if percent is None:
|
||||
# Release all
|
||||
logger.info(f"Releasing all {len(self.memory_blocks)} memory blocks")
|
||||
self.memory_blocks.clear()
|
||||
else:
|
||||
# Release specified percentage
|
||||
blocks_to_release = int(len(self.memory_blocks) * (percent / 100.0))
|
||||
if blocks_to_release > 0:
|
||||
logger.info(f"Releasing {blocks_to_release} of {len(self.memory_blocks)} memory blocks ({percent}%)")
|
||||
self.memory_blocks = self.memory_blocks[blocks_to_release:]
|
||||
|
||||
def spike_pressure(self, duration: float = 5.0):
|
||||
"""
|
||||
Create a temporary spike in memory pressure then release
|
||||
Useful for forcing requeues
|
||||
"""
|
||||
logger.info(f"Creating memory pressure spike for {duration} seconds")
|
||||
# Save current blocks count
|
||||
initial_blocks = len(self.memory_blocks)
|
||||
|
||||
# Create spike with extra 5%
|
||||
self.apply_pressure(additional_percent=5.0)
|
||||
|
||||
# Schedule release after duration
|
||||
asyncio.create_task(self._delayed_release(duration, initial_blocks))
|
||||
|
||||
async def _delayed_release(self, delay: float, target_blocks: int):
|
||||
"""Helper for spike_pressure - releases extra blocks after delay"""
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
# Remove blocks added since spike started
|
||||
if len(self.memory_blocks) > target_blocks:
|
||||
logger.info(f"Releasing memory spike ({len(self.memory_blocks) - target_blocks} blocks)")
|
||||
self.memory_blocks = self.memory_blocks[:target_blocks]
|
||||
|
||||
# Test statistics collector
|
||||
class TestResults:
|
||||
def __init__(self):
|
||||
self.start_time = time.time()
|
||||
self.completed_urls: List[str] = []
|
||||
self.failed_urls: List[str] = []
|
||||
self.requeued_count = 0
|
||||
self.memory_warnings = 0
|
||||
self.max_memory_usage = 0.0
|
||||
self.max_queue_size = 0
|
||||
self.max_wait_time = 0.0
|
||||
self.url_to_attempt: Dict[str, int] = {} # Track retries per URL
|
||||
|
||||
def log_summary(self):
|
||||
duration = time.time() - self.start_time
|
||||
logger.info("===== TEST SUMMARY =====")
|
||||
logger.info(f"Stream mode: {'ON' if STREAM else 'OFF'}")
|
||||
logger.info(f"Total duration: {duration:.1f} seconds")
|
||||
logger.info(f"Completed URLs: {len(self.completed_urls)}")
|
||||
logger.info(f"Failed URLs: {len(self.failed_urls)}")
|
||||
logger.info(f"Requeue events: {self.requeued_count}")
|
||||
logger.info(f"Memory warnings: {self.memory_warnings}")
|
||||
logger.info(f"Max memory usage: {self.max_memory_usage:.1f}%")
|
||||
logger.info(f"Max queue size: {self.max_queue_size}")
|
||||
logger.info(f"Max wait time: {self.max_wait_time:.1f} seconds")
|
||||
|
||||
# Log URLs with multiple attempts
|
||||
retried_urls = {url: count for url, count in self.url_to_attempt.items() if count > 1}
|
||||
if retried_urls:
|
||||
logger.info(f"URLs with retries: {len(retried_urls)}")
|
||||
# Log the top 5 most retried
|
||||
top_retries = sorted(retried_urls.items(), key=lambda x: x[1], reverse=True)[:5]
|
||||
for url, count in top_retries:
|
||||
logger.info(f" URL {url[-30:]} had {count} attempts")
|
||||
|
||||
# Write summary to a separate human-readable file
|
||||
with open("logs/test_summary.txt", "w") as f:
|
||||
f.write(f"Stream mode: {'ON' if STREAM else 'OFF'}\n")
|
||||
f.write(f"Total duration: {duration:.1f} seconds\n")
|
||||
f.write(f"Completed URLs: {len(self.completed_urls)}\n")
|
||||
f.write(f"Failed URLs: {len(self.failed_urls)}\n")
|
||||
f.write(f"Requeue events: {self.requeued_count}\n")
|
||||
f.write(f"Memory warnings: {self.memory_warnings}\n")
|
||||
f.write(f"Max memory usage: {self.max_memory_usage:.1f}%\n")
|
||||
f.write(f"Max queue size: {self.max_queue_size}\n")
|
||||
f.write(f"Max wait time: {self.max_wait_time:.1f} seconds\n")
|
||||
|
||||
# Custom monitor with stats tracking
|
||||
# Custom monitor that extends CrawlerMonitor with test-specific tracking
|
||||
class StressTestMonitor(CrawlerMonitor):
|
||||
def __init__(self, test_results: TestResults, **kwargs):
|
||||
# Initialize the parent CrawlerMonitor
|
||||
super().__init__(**kwargs)
|
||||
self.test_results = test_results
|
||||
|
||||
def update_memory_status(self, status: str):
|
||||
if status != self.memory_status:
|
||||
logger.info(f"Memory status changed: {self.memory_status} -> {status}")
|
||||
if "CRITICAL" in status or "PRESSURE" in status:
|
||||
self.test_results.memory_warnings += 1
|
||||
|
||||
# Track peak memory usage in test results
|
||||
current_memory = psutil.virtual_memory().percent
|
||||
self.test_results.max_memory_usage = max(self.test_results.max_memory_usage, current_memory)
|
||||
|
||||
# Call parent method to update the dashboard
|
||||
super().update_memory_status(status)
|
||||
|
||||
def update_queue_statistics(self, total_queued: int, highest_wait_time: float, avg_wait_time: float):
|
||||
# Track queue metrics in test results
|
||||
self.test_results.max_queue_size = max(self.test_results.max_queue_size, total_queued)
|
||||
self.test_results.max_wait_time = max(self.test_results.max_wait_time, highest_wait_time)
|
||||
|
||||
# Call parent method to update the dashboard
|
||||
super().update_queue_statistics(total_queued, highest_wait_time, avg_wait_time)
|
||||
|
||||
def update_task(self, task_id: str, **kwargs):
|
||||
# Track URL status changes for test results
|
||||
if task_id in self.stats:
|
||||
old_status = self.stats[task_id].status
|
||||
|
||||
# If this is a requeue event (requeued due to memory pressure)
|
||||
if 'error_message' in kwargs and 'requeued' in kwargs['error_message']:
|
||||
if not hasattr(self.stats[task_id], 'counted_requeue') or not self.stats[task_id].counted_requeue:
|
||||
self.test_results.requeued_count += 1
|
||||
self.stats[task_id].counted_requeue = True
|
||||
|
||||
# Track completion status for test results
|
||||
if 'status' in kwargs:
|
||||
new_status = kwargs['status']
|
||||
if old_status != new_status:
|
||||
if new_status == CrawlStatus.COMPLETED:
|
||||
if task_id not in self.test_results.completed_urls:
|
||||
self.test_results.completed_urls.append(task_id)
|
||||
elif new_status == CrawlStatus.FAILED:
|
||||
if task_id not in self.test_results.failed_urls:
|
||||
self.test_results.failed_urls.append(task_id)
|
||||
|
||||
# Call parent method to update the dashboard
|
||||
super().update_task(task_id, **kwargs)
|
||||
self.live.update(self._create_table())
|
||||
|
||||
# Generate test URLs - use example.com with unique paths to avoid browser caching
|
||||
def generate_test_urls(count: int) -> List[str]:
|
||||
urls = []
|
||||
for i in range(count):
|
||||
# Add random path and query parameters to create unique URLs
|
||||
path = f"/path/{uuid.uuid4()}"
|
||||
query = f"?test={i}&random={random.randint(1, 100000)}"
|
||||
urls.append(f"https://example.com{path}{query}")
|
||||
return urls
|
||||
|
||||
# Process result callback
|
||||
async def process_result(result, test_results: TestResults):
|
||||
# Track attempt counts
|
||||
if result.url not in test_results.url_to_attempt:
|
||||
test_results.url_to_attempt[result.url] = 1
|
||||
else:
|
||||
test_results.url_to_attempt[result.url] += 1
|
||||
|
||||
if "requeued" in result.error_message:
|
||||
test_results.requeued_count += 1
|
||||
logger.debug(f"Requeued due to memory pressure: {result.url}")
|
||||
elif result.success:
|
||||
test_results.completed_urls.append(result.url)
|
||||
logger.debug(f"Successfully processed: {result.url}")
|
||||
else:
|
||||
test_results.failed_urls.append(result.url)
|
||||
logger.warning(f"Failed to process: {result.url} - {result.error_message}")
|
||||
|
||||
# Process multiple results (used in non-streaming mode)
|
||||
async def process_results(results, test_results: TestResults):
|
||||
for result in results:
|
||||
await process_result(result, test_results)
|
||||
|
||||
# Main test function for extreme memory pressure simulation
|
||||
async def run_memory_stress_test(
|
||||
url_count: int = 100,
|
||||
target_memory_percent: float = 92.0, # Push to dangerous levels
|
||||
chunk_size: int = 20, # Larger chunks for more chaos
|
||||
aggressive: bool = False,
|
||||
spikes: bool = True
|
||||
):
|
||||
test_results = TestResults()
|
||||
memory_simulator = MemorySimulator(target_percent=target_memory_percent, aggressive=aggressive)
|
||||
|
||||
logger.info(f"Starting stress test with {url_count} URLs in {'STREAM' if STREAM else 'NON-STREAM'} mode")
|
||||
logger.info(f"Target memory usage: {target_memory_percent}%")
|
||||
|
||||
# First, elevate memory usage to create pressure
|
||||
logger.info("Creating initial memory pressure...")
|
||||
memory_simulator.apply_pressure()
|
||||
|
||||
# Create test URLs in chunks to simulate real-world crawling where URLs are discovered
|
||||
all_urls = generate_test_urls(url_count)
|
||||
url_chunks = [all_urls[i:i+chunk_size] for i in range(0, len(all_urls), chunk_size)]
|
||||
|
||||
# Set up the crawler components - low memory thresholds to create more requeues
|
||||
browser_config = BrowserConfig(headless=True, verbose=False)
|
||||
run_config = CrawlerRunConfig(
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
verbose=False,
|
||||
stream=STREAM # Use the global STREAM variable to set mode
|
||||
)
|
||||
|
||||
# Create monitor with reference to test results
|
||||
monitor = StressTestMonitor(
|
||||
test_results=test_results,
|
||||
display_mode=DisplayMode.DETAILED,
|
||||
max_visible_rows=20,
|
||||
total_urls=url_count # Pass total URLs count
|
||||
)
|
||||
|
||||
# Create dispatcher with EXTREME settings - pure survival mode
|
||||
# These settings are designed to create a memory battleground
|
||||
dispatcher = MemoryAdaptiveDispatcher(
|
||||
memory_threshold_percent=63.0, # Start throttling at just 60% memory
|
||||
critical_threshold_percent=70.0, # Start requeuing at 70% - incredibly aggressive
|
||||
recovery_threshold_percent=55.0, # Only resume normal ops when plenty of memory available
|
||||
check_interval=0.1, # Check extremely frequently (100ms)
|
||||
max_session_permit=20 if aggressive else 10, # Double the concurrent sessions - pure chaos
|
||||
fairness_timeout=10.0, # Extremely low timeout - rapid priority changes
|
||||
monitor=monitor
|
||||
)
|
||||
|
||||
# Set up spike schedule if enabled
|
||||
if spikes:
|
||||
spike_intervals = []
|
||||
# Create 3-5 random spike times
|
||||
num_spikes = random.randint(3, 5)
|
||||
for _ in range(num_spikes):
|
||||
# Schedule spikes at random chunks
|
||||
chunk_index = random.randint(1, len(url_chunks) - 1)
|
||||
spike_intervals.append(chunk_index)
|
||||
logger.info(f"Scheduled memory spikes at chunks: {spike_intervals}")
|
||||
|
||||
try:
|
||||
async with AsyncWebCrawler(config=browser_config) as crawler:
|
||||
# Process URLs in chunks to simulate discovering URLs over time
|
||||
for chunk_index, url_chunk in enumerate(url_chunks):
|
||||
logger.info(f"Processing chunk {chunk_index+1}/{len(url_chunks)} ({len(url_chunk)} URLs)")
|
||||
|
||||
# Regular pressure increases
|
||||
if chunk_index % 2 == 0:
|
||||
logger.info("Increasing memory pressure...")
|
||||
memory_simulator.apply_pressure()
|
||||
|
||||
# Memory spike if scheduled for this chunk
|
||||
if spikes and chunk_index in spike_intervals:
|
||||
logger.info(f"⚠️ CREATING MASSIVE MEMORY SPIKE at chunk {chunk_index+1} ⚠️")
|
||||
# Create a nightmare scenario - multiple overlapping spikes
|
||||
memory_simulator.spike_pressure(duration=10.0) # 10-second spike
|
||||
|
||||
# 50% chance of double-spike (pure evil)
|
||||
if random.random() < 0.5:
|
||||
await asyncio.sleep(2.0) # Wait 2 seconds
|
||||
logger.info("💀 DOUBLE SPIKE - EXTREME MEMORY PRESSURE 💀")
|
||||
memory_simulator.spike_pressure(duration=8.0) # 8-second overlapping spike
|
||||
|
||||
if STREAM:
|
||||
# Stream mode - process results as they come in
|
||||
async for result in dispatcher.run_urls_stream(
|
||||
urls=url_chunk,
|
||||
crawler=crawler,
|
||||
config=run_config
|
||||
):
|
||||
await process_result(result, test_results)
|
||||
else:
|
||||
# Non-stream mode - get all results at once
|
||||
results = await dispatcher.run_urls(
|
||||
urls=url_chunk,
|
||||
crawler=crawler,
|
||||
config=run_config
|
||||
)
|
||||
await process_results(results, test_results)
|
||||
|
||||
# Simulate discovering more URLs while others are still processing
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# RARELY release pressure - make the system fight for resources
|
||||
if chunk_index % 5 == 4: # Less frequent releases
|
||||
release_percent = random.choice([10, 15, 20]) # Smaller, inconsistent releases
|
||||
logger.info(f"Releasing {release_percent}% of memory blocks - brief respite")
|
||||
memory_simulator.release_pressure(percent=release_percent)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Test error: {str(e)}")
|
||||
raise
|
||||
finally:
|
||||
# Release memory pressure
|
||||
memory_simulator.release_pressure()
|
||||
# Log final results
|
||||
test_results.log_summary()
|
||||
|
||||
# Check for success criteria
|
||||
if len(test_results.completed_urls) + len(test_results.failed_urls) < url_count:
|
||||
logger.error(f"TEST FAILED: Not all URLs were processed. {url_count - len(test_results.completed_urls) - len(test_results.failed_urls)} URLs missing.")
|
||||
return False
|
||||
|
||||
logger.info("TEST PASSED: All URLs were processed without crashing.")
|
||||
return True
|
||||
|
||||
# Command-line entry point
|
||||
if __name__ == "__main__":
|
||||
# Parse command line arguments
|
||||
url_count = int(sys.argv[1]) if len(sys.argv) > 1 else 100
|
||||
target_memory = float(sys.argv[2]) if len(sys.argv) > 2 else 85.0
|
||||
|
||||
# Check if stream mode is specified
|
||||
if len(sys.argv) > 3:
|
||||
STREAM = sys.argv[3].lower() in ('true', 'yes', '1', 'stream')
|
||||
|
||||
# Check if aggressive mode is specified
|
||||
aggressive = False
|
||||
if len(sys.argv) > 4:
|
||||
aggressive = sys.argv[4].lower() in ('true', 'yes', '1', 'aggressive')
|
||||
|
||||
print(f"Starting test with {url_count} URLs, {target_memory}% memory target")
|
||||
print(f"Stream mode: {STREAM}, Aggressive: {aggressive}")
|
||||
print("Logs will be written to the logs directory")
|
||||
print("Live display starting now...")
|
||||
|
||||
# Run the test
|
||||
result = asyncio.run(run_memory_stress_test(
|
||||
url_count=url_count,
|
||||
target_memory_percent=target_memory,
|
||||
aggressive=aggressive
|
||||
))
|
||||
|
||||
# Exit with status code
|
||||
sys.exit(0 if result else 1)
|
||||
Reference in New Issue
Block a user