Compare commits

..

8 Commits

Author SHA1 Message Date
unclecode
1a22fb4d4f docs: rename Docker deployment to self-hosting guide with comprehensive monitoring documentation
Major documentation restructuring to emphasize self-hosting capabilities and fully document the real-time monitoring system.

Changes:
- Renamed docker-deployment.md → self-hosting.md to better reflect the value proposition
- Updated mkdocs.yml navigation to "Self-Hosting Guide"
- Completely rewrote introduction emphasizing self-hosting benefits:
  * Data privacy and ownership
  * Cost control and transparency
  * Performance and security advantages
  * Full customization capabilities

- Expanded "Metrics & Monitoring" → "Real-time Monitoring & Operations" with:
  * Monitoring Dashboard section documenting the /monitor UI
  * Complete feature breakdown (system health, requests, browsers, janitor, errors)
  * Monitor API Endpoints with all REST endpoints and examples
  * WebSocket Streaming integration guide with Python examples
  * Control Actions for manual browser management
  * Production Integration patterns (Prometheus, custom dashboards, alerting)
  * Key production metrics to track

- Enhanced summary section:
  * What users learned checklist
  * Why self-hosting matters
  * Clear next steps
  * Key resources with monitoring dashboard URL

The monitoring dashboard built 2-3 weeks ago is now fully documented and discoverable.
Users will understand they have complete operational visibility at http://localhost:11235/monitor
with real-time updates, browser pool management, and programmatic control via REST/WebSocket APIs.

This positions Crawl4AI as an enterprise-grade self-hosting solution with DevOps-level
monitoring capabilities, not just a Docker deployment.
2025-11-09 13:31:52 +08:00
unclecode
81b5312629 Update gitignore 2025-11-09 10:49:42 +08:00
unclecode
73a5a7b0f5 Update gitignore 2025-10-18 12:41:29 +08:00
unclecode
05921811b8 docs: add comprehensive technical architecture documentation
Created ARCHITECTURE.md as a complete technical reference for the
Crawl4AI Docker server, replacing the stress test pipeline document
with production-grade documentation.

Contents:
- System overview with architecture diagrams
- Core components deep-dive (server, API, utils)
- Smart browser pool implementation details
- Real-time monitoring system architecture
- WebSocket implementation and fallback strategy
- Memory management and container detection
- Production optimizations and code review fixes
- Deployment guides (local, Docker, production)
- Comprehensive troubleshooting section
- Debug tools and performance tuning
- Test suite documentation
- Architecture decision log (ADRs)

Target audience: Developers maintaining or extending the system
Goal: Enable rapid onboarding and confident modifications
2025-10-18 12:05:49 +08:00
unclecode
25507adb5b feat(monitor): implement code review fixes and real-time WebSocket monitoring
Backend Improvements (11 fixes applied):

Critical Fixes:
- Add lock protection for browser pool access in monitor stats
- Ensure async track_janitor_event across all call sites
- Improve error handling in monitor request tracking (already in place)

Important Fixes:
- Replace fire-and-forget Redis with background persistence worker
- Add time-based expiry for completed requests/errors (5min cleanup)
- Implement input validation for monitor route parameters
- Add 4s timeout to timeline updater to prevent hangs
- Add warning when killing browsers with active requests
- Implement monitor cleanup on shutdown with final persistence
- Document memory estimates with TODO for actual tracking

Frontend Enhancements:

WebSocket Real-time Updates:
- Add WebSocket endpoint at /monitor/ws for live monitoring
- Implement auto-reconnect with exponential backoff (max 5 attempts)
- Add graceful fallback to HTTP polling on WebSocket failure
- Send comprehensive updates every 2 seconds (health, requests, browsers, timeline, events)

UI/UX Improvements:
- Add live connection status indicator with pulsing animation
  - Green "Live" = WebSocket connected
  - Yellow "Connecting..." = Attempting connection
  - Blue "Polling" = Fallback to HTTP polling
  - Red "Disconnected" = Connection failed
- Restore original beautiful styling for all sections
- Improve request table layout with flex-grow for URL column
- Add browser type text labels alongside emojis
- Add flex layout to browser section header

Testing:
- Add test-websocket.py for WebSocket validation
- All 7 integration tests passing successfully

Summary: 563 additions across 6 files
2025-10-18 11:38:25 +08:00
unclecode
aba4036ab6 Add demo and test scripts for monitor dashboard activity
- Introduced a demo script (`demo_monitor_dashboard.py`) to showcase various monitoring features through simulated activity.
- Implemented a test script (`test_monitor_demo.py`) to generate dashboard activity and verify monitor health and endpoint statistics.
- Added a logo image to the static assets for branding purposes.
2025-10-17 22:43:06 +08:00
unclecode
e2af031b09 feat(monitor): add real-time monitoring dashboard with Redis persistence
Complete observability solution for production deployments with terminal-style UI.

**Backend Implementation:**
- `monitor.py`: Stats manager tracking requests, browsers, errors, timeline data
- `monitor_routes.py`: REST API endpoints for all monitor functionality
  - GET /monitor/health - System health snapshot
  - GET /monitor/requests - Active & completed requests
  - GET /monitor/browsers - Browser pool details
  - GET /monitor/endpoints/stats - Aggregated endpoint analytics
  - GET /monitor/timeline - Time-series data (memory, requests, browsers)
  - GET /monitor/logs/{janitor,errors} - Event logs
  - POST /monitor/actions/{cleanup,kill_browser,restart_browser} - Control actions
  - POST /monitor/stats/reset - Reset counters
- Redis persistence for endpoint stats (survives restart)
- Timeline tracking (5min window, 5s resolution, 60 data points)

**Frontend Dashboard** (`/dashboard`):
- **System Health Bar**: CPU%, Memory%, Network I/O, Uptime
- **Pool Status**: Live counts (permanent/hot/cold browsers + memory)
- **Live Activity Tabs**:
  - Requests: Active (realtime) + recent completed (last 100)
  - Browsers: Detailed table with actions (kill/restart)
  - Janitor: Cleanup event log with timestamps
  - Errors: Recent errors with stack traces
- **Endpoint Analytics**: Count, avg latency, success%, pool hit%
- **Resource Timeline**: SVG charts (memory/requests/browsers) with terminal aesthetics
- **Control Actions**: Force cleanup, restart permanent, reset stats
- **Auto-refresh**: 5s polling (toggleable)

**Integration:**
- Janitor events tracked (close_cold, close_hot, promote)
- Crawler pool promotion events logged
- Timeline updater background task (5s interval)
- Lifespan hooks for monitor initialization

**UI Design:**
- Terminal vibe matching Crawl4AI theme
- Dark background, cyan/pink accents, monospace font
- Neon glow effects on charts
- Responsive layout, hover interactions
- Cross-navigation: Playground ↔ Monitor

**Key Features:**
- Zero-config: Works out of the box with existing Redis
- Real-time visibility into pool efficiency
- Manual browser management (kill/restart)
- Historical data persistence
- DevOps-friendly UX

Routes:
- API: `/monitor/*` (backend endpoints)
- UI: `/dashboard` (static HTML)
2025-10-17 21:36:25 +08:00
unclecode
b97eaeea4c feat(docker): implement smart browser pool with 10x memory efficiency
Major refactoring to eliminate memory leaks and enable high-scale crawling:

- **Smart 3-Tier Browser Pool**:
  - Permanent browser (always-ready default config)
  - Hot pool (configs used 3+ times, longer TTL)
  - Cold pool (new/rare configs, short TTL)
  - Auto-promotion: cold → hot after 3 uses
  - 100% pool reuse achieved in tests

- **Container-Aware Memory Detection**:
  - Read cgroup v1/v2 memory limits (not host metrics)
  - Accurate memory pressure detection in Docker
  - Memory-based browser creation blocking

- **Adaptive Janitor**:
  - Dynamic cleanup intervals (10s/30s/60s based on memory)
  - Tiered TTLs: cold 30-300s, hot 120-600s
  - Aggressive cleanup at high memory pressure

- **Unified Pool Usage**:
  - All endpoints now use pool (/html, /screenshot, /pdf, /execute_js, /md, /llm)
  - Fixed config signature mismatch (permanent browser matches endpoints)
  - get_default_browser_config() helper for consistency

- **Configuration**:
  - Reduced idle_ttl: 1800s → 300s (30min → 5min)
  - Fixed port: 11234 → 11235 (match Gunicorn)

**Performance Results** (from stress tests):
- Memory: 10x reduction (500-700MB × N → 270MB permanent)
- Latency: 30-50x faster (<100ms pool hits vs 3-5s startup)
- Reuse: 100% for default config, 60%+ for variants
- Capacity: 100+ concurrent requests (vs ~20 before)
- Leak: 0 MB/cycle (stable across tests)

**Test Infrastructure**:
- 7-phase sequential test suite (tests/)
- Docker stats integration + log analysis
- Pool promotion verification
- Memory leak detection
- Full endpoint coverage

Fixes memory issues reported in production deployments.
2025-10-17 20:38:39 +08:00
48 changed files with 5903 additions and 7474 deletions

27
.gitignore vendored
View File

@@ -1,6 +1,13 @@
# Scripts folder (private tools)
.scripts/
# Database files
*.db
# Environment files
.env
.env.local
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -261,18 +268,28 @@ continue_config.json
CLAUDE_MONITOR.md
CLAUDE.md
.claude/
scripts/
.claude/
tests/**/test_site
tests/**/reports
tests/**/benchmark_reports
test_scripts/
docs/**/data
.codecat/
docs/apps/linkdin/debug*/
docs/apps/linkdin/samples/insights/*
docs/md_v2/marketplace/backend/uploads/
docs/md_v2/marketplace/backend/marketplace.db
scripts/
# Databse files
*.sqlite3
*.sqlite3-journal
*.db-journal
*.db-wal
*.db-shm
*.db
*.rdb
*.ldb

View File

@@ -1,73 +0,0 @@
# ✅ FIXED: Chat Mode Now Fully Functional!
## Issues Resolved:
### Issue 1: Agent wasn't responding with text ❌ → ✅ FIXED
**Problem:** After tool execution, no response text was shown
**Root Cause:** Not extracting text from `message_output_item.raw_item.content[].text`
**Fix:** Added proper extraction from content blocks
### Issue 2: Chat didn't continue after first turn ❌ → ✅ FIXED
**Problem:** Chat appeared stuck, no response to follow-up questions
**Root Cause:** Same as Issue 1 - responses weren't being displayed
**Fix:** Chat loop was always working, just needed to show the responses
---
## Working Example:
```
You: Crawl example.com and tell me the title
Agent: thinking...
🔧 Calling: quick_crawl
(url=https://example.com, output_format=markdown)
✓ completed
Agent: The title of the page at example.com is:
Example Domain
Let me know if you need more information from this site!
Tools used: quick_crawl
You: So what is it?
Agent: thinking...
Agent: The title is "Example Domain" - this is a standard placeholder...
```
---
## Test It Now:
```bash
export OPENAI_API_KEY="sk-..."
python -m crawl4ai.agent.agent_crawl --chat
```
Then try:
```
Crawl example.com and tell me the title
What else can you tell me about it?
Start a session called 'test' and navigate to example.org
Extract the markdown
Close the session
/exit
```
---
## What Works:
✅ Full streaming visibility
✅ Tool calls shown with arguments
✅ Agent responses shown
✅ Multi-turn conversations
✅ Session management
✅ All 7 tools working
**Everything is working perfectly now!** 🎉

View File

@@ -1,141 +0,0 @@
# Crawl4AI Agent - Claude SDK → OpenAI SDK Migration
**Status:** ✅ Complete
**Date:** 2025-10-17
## What Changed
### Files Created/Rewritten:
1.`crawl_tools.py` - Converted from Claude SDK `@tool` to OpenAI SDK `@function_tool`
2.`crawl_prompts.py` - Cleaned up prompt (removed Claude-specific references)
3.`agent_crawl.py` - Complete rewrite using OpenAI `Agent` + `Runner`
4.`chat_mode.py` - Rewrit with **streaming visibility** and real-time status updates
### Files Kept (No Changes):
-`browser_manager.py` - Singleton pattern is SDK-agnostic
-`terminal_ui.py` - Minor updates (added /browser command)
### Files Backed Up:
- `agent_crawl.py.old` - Original Claude SDK version
- `chat_mode.py.old` - Original Claude SDK version
## Key Improvements
### 1. **No CLI Dependency**
- ❌ OLD: Spawned `claude` CLI subprocess
- ✅ NEW: Direct OpenAI API calls
### 2. **Cleaner Tool API**
```python
# OLD (Claude SDK)
@tool("quick_crawl", "Description", {"url": str, ...})
async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]:
return {"content": [{"type": "text", "text": json.dumps(...)}]}
# NEW (OpenAI SDK)
@function_tool
async def quick_crawl(url: str, output_format: str = "markdown", ...) -> str:
return json.dumps(...) # Direct return
```
### 3. **Simpler Execution**
```python
# OLD (Claude SDK)
async with ClaudeSDKClient(options) as client:
await client.query(message_generator())
async for message in client.receive_messages():
# Complex message handling...
# NEW (OpenAI SDK)
result = await Runner.run(agent, input=prompt, context=None)
print(result.final_output)
```
### 4. **Streaming Chat with Visibility** (MAIN FEATURE!)
The new chat mode shows:
-**"thinking..."** indicator when agent starts
-**Tool calls** with parameters: `🔧 Calling: quick_crawl (url=example.com)`
-**Tool completion**: `✓ completed`
-**Real-time text streaming** character-by-character
-**Summary** after response: Tools used, token count
-**Clear status** at every step
**Example output:**
```
You: Crawl example.com and extract the title
Agent: thinking...
🔧 Calling: quick_crawl
(url=https://example.com, output_format=markdown)
✓ completed
Agent: I've successfully crawled example.com. The title is "Example Domain"...
Tools used: quick_crawl
Tokens: input=45, output=23
```
## Installation
```bash
# Install OpenAI Agents SDK
pip install git+https://github.com/openai/openai-agents-python.git
# Set API key
export OPENAI_API_KEY="sk-..."
```
## Usage
### Chat Mode (Recommended):
```bash
python -m crawl4ai.agent.agent_crawl --chat
```
### Single-Shot Mode:
```bash
python -m crawl4ai.agent.agent_crawl "Crawl example.com"
```
### Commands in Chat:
- `/exit` - Exit chat
- `/clear` - Clear screen
- `/help` - Show help
- `/browser` - Show browser status
## Testing
Tests need to be updated (not done yet):
-`test_chat.py` - Update for OpenAI SDK
-`test_tools.py` - Update execution model
-`test_scenarios.py` - Update multi-turn tests
-`run_all_tests.py` - Update imports
## Migration Benefits
| Metric | Claude SDK | OpenAI SDK | Improvement |
|--------|------------|------------|-------------|
| **Startup Time** | ~2s (CLI spawn) | ~0.1s | **20x faster** |
| **Dependencies** | Node.js + CLI | Python only | **Simpler** |
| **Session Isolation** | Shared `~/.claude/` | Isolated | **Cleaner** |
| **Tool API** | Dict-based | Type-safe | **Better DX** |
| **Visibility** | Minimal | Full streaming | **Much better** |
| **Production Ready** | No (CLI dep) | Yes | **Production** |
## Known Issues
- OpenAI SDK upgraded to 2.4.0, conflicts with:
- `instructor` (requires <2.0.0)
- `pandasai` (requires <2)
- `shell-gpt` (requires <2.0.0)
These are acceptable conflicts if you're not using those packages.
## Next Steps
1. Test the new chat mode thoroughly
2. Update test files
3. Update documentation
4. Consider adding more streaming events (progress bars, etc.)

View File

@@ -1,172 +0,0 @@
# ✅ Crawl4AI Agent - OpenAI SDK Migration Complete
## Status: READY TO USE
All migration completed and tested successfully!
---
## What's New
### 🚀 Key Improvements:
1. **No CLI Dependency** - Direct OpenAI API calls (20x faster startup)
2. **Full Visibility** - See every tool call, argument, and status in real-time
3. **Cleaner Code** - 50% less code, type-safe tools
4. **Better UX** - Streaming responses with clear status indicators
---
## Usage
### Chat Mode (Recommended):
```bash
export OPENAI_API_KEY="sk-..."
python -m crawl4ai.agent.agent_crawl --chat
```
**What you'll see:**
```
🕷️ Crawl4AI Agent - Chat Mode
Powered by OpenAI Agents SDK
You: Crawl example.com and get the title
Agent: thinking...
🔧 Calling: quick_crawl
(url=https://example.com, output_format=markdown)
✓ completed
Agent: The title of example.com is "Example Domain"
Tools used: quick_crawl
```
### Single-Shot Mode:
```bash
python -m crawl4ai.agent.agent_crawl "Get title from example.com"
```
### Commands in Chat:
- `/exit` - Exit chat
- `/clear` - Clear screen
- `/help` - Show help
- `/browser` - Browser status
---
## Files Changed
### ✅ Created/Rewritten:
- `crawl_tools.py` - 7 tools with `@function_tool` decorator
- `crawl_prompts.py` - Clean system prompt
- `agent_crawl.py` - Simple Agent + Runner
- `chat_mode.py` - Streaming chat with full visibility
- `__init__.py` - Updated exports
### ✅ Updated:
- `terminal_ui.py` - Added /browser command
### ✅ Unchanged:
- `browser_manager.py` - Works perfectly as-is
### ❌ Removed:
- `c4ai_tools.py` (old Claude SDK tools)
- `c4ai_prompts.py` (old prompts)
- All `.old` backup files
---
## Tests Performed
**Import Tests** - All modules import correctly
**Agent Creation** - Agent created with 7 tools
**Single-Shot Mode** - Successfully crawled example.com
**Chat Mode Streaming** - Full visibility working:
- Shows "thinking..." indicator
- Shows tool calls: `🔧 Calling: quick_crawl`
- Shows arguments: `(url=https://example.com, output_format=markdown)`
- Shows completion: `✓ completed`
- Shows summary: `Tools used: quick_crawl`
---
## Chat Mode Features (YOUR MAIN REQUEST!)
### Real-Time Visibility:
1. **Thinking Indicator**
```
Agent: thinking...
```
2. **Tool Calls with Arguments**
```
🔧 Calling: quick_crawl
(url=https://example.com, output_format=markdown)
```
3. **Tool Completion**
```
✓ completed
```
4. **Agent Response (Streaming)**
```
Agent: The title is "Example Domain"...
```
5. **Summary**
```
Tools used: quick_crawl
```
You now have **complete observability** - you'll see exactly what the agent is doing at every step!
---
## Migration Stats
| Metric | Before (Claude SDK) | After (OpenAI SDK) |
|--------|---------------------|-------------------|
| Lines of code | ~400 | ~200 |
| Startup time | 2s | 0.1s |
| Dependencies | Node.js + CLI | Python only |
| Visibility | Minimal | Full streaming |
| Tool API | Dict-based | Type-safe |
| Production ready | No | Yes |
---
## Known Issues
None! Everything tested and working.
---
## Next Steps (Optional)
1. Update test files (`test_chat.py`, `test_tools.py`, `test_scenarios.py`)
2. Add more streaming events (progress bars, etc.)
3. Add session persistence
4. Add conversation history
---
## Try It Now!
```bash
cd /Users/unclecode/devs/crawl4ai
export OPENAI_API_KEY="sk-..."
python -m crawl4ai.agent.agent_crawl --chat
```
Then try:
```
Crawl example.com and extract the title
Start session 'test', navigate to example.org, and extract the markdown
Close the session
```
Enjoy your new agent with **full visibility**! 🎉

View File

@@ -1,429 +0,0 @@
# Crawl4AI Agent Technical Specification
*AI-to-AI Knowledge Transfer Document*
## Context Documents
**MUST READ FIRST:**
1. `/Users/unclecode/devs/crawl4ai/tmp/CRAWL4AI_SDK.md` - Crawl4AI complete API reference
2. `/Users/unclecode/devs/crawl4ai/tmp/cc_stream.md` - Claude SDK streaming input mode
3. `/Users/unclecode/devs/crawl4ai/tmp/CC_PYTHON_SDK.md` - Claude Code Python SDK complete reference
## Architecture Overview
**Core Principle:** Singleton browser instance + streaming chat mode + MCP tools
```
┌─────────────────────────────────────────────────────────────┐
│ Agent Entry Point │
│ agent_crawl.py (CLI: --chat | single-shot) │
└─────────────────────────────────────────────────────────────┘
┌───────────────────┼───────────────────┐
│ │ │
[Chat Mode] [Single-shot] [Browser Manager]
│ │ │
▼ ▼ ▼
ChatMode.run() CrawlAgent.run() BrowserManager
- Streaming - One prompt (Singleton)
- Interactive - Exit after │
- Commands - Uses same ▼
│ browser AsyncWebCrawler
│ │ (persistent)
└───────────────────┴────────────────┘
┌───────┴────────┐
│ │
MCP Tools Claude SDK
(Crawl4AI) (Built-in)
│ │
┌───────────┴────┐ ┌──────┴──────┐
│ │ │ │
quick_crawl session Read Edit
navigate tools Write Glob
extract_data Bash Grep
execute_js
screenshot
close_session
```
## File Structure
```
crawl4ai/agent/
├── __init__.py # Module exports
├── agent_crawl.py # Main CLI entry (190 lines)
│ ├── SessionStorage # JSONL logging to ~/.crawl4ai/agents/projects/
│ ├── CrawlAgent # Single-shot wrapper
│ └── main() # CLI parser (--chat flag)
├── browser_manager.py # Singleton pattern (70 lines)
│ └── BrowserManager # Class methods only, no instances
│ ├── get_browser() # Returns singleton AsyncWebCrawler
│ ├── reconfigure_browser()
│ ├── close_browser()
│ └── is_browser_active()
├── c4ai_tools.py # 7 MCP tools (310 lines)
│ ├── @tool decorators # Claude SDK decorator
│ ├── CRAWLER_SESSIONS # Dict[str, AsyncWebCrawler] for named sessions
│ ├── CRAWLER_SESSION_URLS # Dict[str, str] track current URL per session
│ └── CRAWL_TOOLS # List of tool functions
├── c4ai_prompts.py # System prompt (130 lines)
│ └── SYSTEM_PROMPT # Agent behavior definition
├── terminal_ui.py # Rich-based UI (120 lines)
│ └── TerminalUI # Console rendering
│ ├── show_header()
│ ├── print_markdown()
│ ├── print_code()
│ └── with_spinner()
├── chat_mode.py # Streaming chat (160 lines)
│ └── ChatMode
│ ├── message_generator() # AsyncGenerator per cc_stream.md
│ ├── _handle_command() # /exit /clear /help /browser
│ └── run() # Main chat loop
├── test_tools.py # Direct tool tests (130 lines)
├── test_chat.py # Component tests (90 lines)
└── test_scenarios.py # Multi-turn scenarios (500 lines)
├── SIMPLE_SCENARIOS
├── MEDIUM_SCENARIOS
├── COMPLEX_SCENARIOS
└── ScenarioRunner
```
## Critical Implementation Details
### 1. Browser Singleton Pattern
**Key:** ONE browser instance for ENTIRE agent session
```python
# browser_manager.py
class BrowserManager:
_crawler: Optional[AsyncWebCrawler] = None # Singleton
_config: Optional[BrowserConfig] = None
@classmethod
async def get_browser(cls, config=None) -> AsyncWebCrawler:
if cls._crawler is None:
cls._crawler = AsyncWebCrawler(config or BrowserConfig())
await cls._crawler.start() # Manual lifecycle
return cls._crawler
```
**Behavior:**
- First call: creates browser with `config` (or default)
- Subsequent calls: returns same instance, **ignores config param**
- To change config: `reconfigure_browser(new_config)` (closes old, creates new)
- Tools use: `crawler = await BrowserManager.get_browser()`
- No `async with` context manager - manual `start()` / `close()`
### 2. Tool Architecture
**Two types of browser usage:**
**A) Quick operations** (quick_crawl):
```python
@tool("quick_crawl", ...)
async def quick_crawl(args):
crawler = await BrowserManager.get_browser() # Singleton
result = await crawler.arun(url=args["url"], config=run_config)
# No close - browser stays alive
```
**B) Named sessions** (start_session, navigate, extract_data, etc.):
```python
CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {} # Named refs
CRAWLER_SESSION_URLS: Dict[str, str] = {} # Track current URL
@tool("start_session", ...)
async def start_session(args):
crawler = await BrowserManager.get_browser()
CRAWLER_SESSIONS[args["session_id"]] = crawler # Store ref
@tool("navigate", ...)
async def navigate(args):
crawler = CRAWLER_SESSIONS[args["session_id"]]
result = await crawler.arun(url=args["url"], ...)
CRAWLER_SESSION_URLS[args["session_id"]] = result.url # Track URL
@tool("extract_data", ...)
async def extract_data(args):
crawler = CRAWLER_SESSIONS[args["session_id"]]
current_url = CRAWLER_SESSION_URLS[args["session_id"]] # Must have URL
result = await crawler.arun(url=current_url, ...) # Re-crawl current page
@tool("close_session", ...)
async def close_session(args):
CRAWLER_SESSIONS.pop(args["session_id"]) # Remove ref
CRAWLER_SESSION_URLS.pop(args["session_id"], None)
# Browser stays alive (singleton)
```
**Important:** Named sessions are just **references** to singleton browser. Multiple sessions = same browser instance.
### 3. Markdown Handling (CRITICAL BUG FIX)
**OLD (WRONG):**
```python
result.markdown_v2.raw_markdown # DEPRECATED
```
**NEW (CORRECT):**
```python
# result.markdown can be:
# - str (simple mode)
# - MarkdownGenerationResult object (with filters)
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
```
Reference: `CRAWL4AI_SDK.md` line 614 - `markdown_v2` deprecated, use `markdown`
### 4. Chat Mode Streaming Input
**Per cc_stream.md:** Use message generator pattern
```python
# chat_mode.py
async def message_generator(self) -> AsyncGenerator[Dict[str, Any], None]:
while not self._exit_requested:
user_input = await asyncio.to_thread(self.ui.get_user_input)
if user_input.startswith('/'):
await self._handle_command(user_input)
continue
# Yield in streaming input format
yield {
"type": "user",
"message": {
"role": "user",
"content": user_input
}
}
async def run(self):
async with ClaudeSDKClient(options=self.options) as client:
await client.query(self.message_generator()) # Pass generator
async for message in client.receive_messages():
# Process streaming responses
```
**Key:** Generator keeps yielding user inputs, SDK streams responses back.
### 5. Claude SDK Integration
**Setup:**
```python
from claude_agent_sdk import tool, create_sdk_mcp_server, ClaudeSDKClient, ClaudeAgentOptions
# 1. Define tools with @tool decorator
@tool("quick_crawl", "description", {"url": str, "output_format": str})
async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]:
return {"content": [{"type": "text", "text": json.dumps(result)}]}
# 2. Create MCP server
crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=[quick_crawl, start_session, ...] # List of @tool functions
)
# 3. Configure options
options = ClaudeAgentOptions(
mcp_servers={"crawler": crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl", # Format: mcp__{server}__{tool}
"mcp__crawler__start_session",
# Built-in tools:
"Read", "Write", "Edit", "Glob", "Grep", "Bash", "NotebookEdit"
],
system_prompt=SYSTEM_PROMPT,
permission_mode="acceptEdits"
)
# 4. Use client
async with ClaudeSDKClient(options=options) as client:
await client.query(prompt_or_generator)
async for message in client.receive_messages():
# Process AssistantMessage, ResultMessage, etc.
```
**Tool response format:**
```python
return {
"content": [{
"type": "text",
"text": json.dumps({"success": True, "data": "..."})
}]
}
```
## Operating Modes
### Single-Shot Mode
```bash
python -m crawl4ai.agent.agent_crawl "Crawl example.com"
```
- One prompt → execute → exit
- Uses singleton browser
- No cleanup of browser (process exit handles it)
### Chat Mode
```bash
python -m crawl4ai.agent.agent_crawl --chat
```
- Interactive loop with streaming I/O
- Commands: `/exit` `/clear` `/help` `/browser`
- Browser persists across all turns
- Cleanup on exit: `BrowserManager.close_browser()`
## Testing Architecture
**3 test levels:**
1. **Component tests** (`test_chat.py`): Non-interactive, tests individual classes
2. **Tool tests** (`test_tools.py`): Direct AsyncWebCrawler calls, validates Crawl4AI integration
3. **Scenario tests** (`test_scenarios.py`): Automated multi-turn conversations
- Injects messages programmatically
- Validates tool calls, keywords, files created
- Categories: SIMPLE (2), MEDIUM (3), COMPLEX (4)
## Dependencies
```python
# External
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from claude_agent_sdk import (
tool, create_sdk_mcp_server, ClaudeSDKClient, ClaudeAgentOptions,
AssistantMessage, TextBlock, ResultMessage, ToolUseBlock
)
from rich.console import Console # Already installed
from rich.markdown import Markdown
from rich.syntax import Syntax
# Stdlib
import asyncio, json, uuid, argparse
from pathlib import Path
from typing import Optional, Dict, Any, AsyncGenerator
```
## Common Pitfalls
1. **DON'T** use `async with AsyncWebCrawler()` - breaks singleton pattern
2. **DON'T** use `result.markdown_v2` - deprecated field
3. **DON'T** call `crawler.arun()` without URL in session tools - needs current_url
4. **DON'T** close browser in tools - managed by BrowserManager
5. **DON'T** use `break` in message iteration - causes asyncio issues
6. **DO** track session URLs in `CRAWLER_SESSION_URLS` for session tools
7. **DO** handle both `str` and `MarkdownGenerationResult` for `result.markdown`
8. **DO** use manual lifecycle `await crawler.start()` / `await crawler.close()`
## Session Storage
**Location:** `~/.crawl4ai/agents/projects/{sanitized_cwd}/{uuid}.jsonl`
**Format:** JSONL with events:
```json
{"timestamp": "...", "event": "session_start", "data": {...}}
{"timestamp": "...", "event": "user_message", "data": {"text": "..."}}
{"timestamp": "...", "event": "assistant_message", "data": {"turn": 1, "text": "..."}}
{"timestamp": "...", "event": "session_end", "data": {"duration_ms": 1000, ...}}
```
## CLI Options
```
--chat Interactive chat mode
--model MODEL Claude model override
--permission-mode MODE acceptEdits|bypassPermissions|default|plan
--add-dir DIR [DIR...] Additional accessible directories
--system-prompt TEXT Custom system prompt
--session-id UUID Resume/specify session
--debug Full tracebacks
```
## Performance Characteristics
- **Browser startup:** ~2-4s (once per session)
- **Quick crawl:** ~1-2s (reuses browser)
- **Session operations:** ~1-2s (same browser)
- **Chat latency:** Real-time streaming, no buffering
- **Memory:** One browser instance regardless of operations
## Extension Points
1. **New tools:** Add `@tool` function → add to `CRAWL_TOOLS` → add to `allowed_tools`
2. **New commands:** Add handler in `ChatMode._handle_command()`
3. **Custom UI:** Replace `TerminalUI` with different renderer
4. **Persistent sessions:** Serialize browser cookies/state to disk in `BrowserManager`
5. **Multi-browser:** Modify `BrowserManager` to support multiple configs (not recommended)
## Next Steps: Testing & Evaluation Pipeline
### Phase 1: Automated Testing (CURRENT)
**Objective:** Verify codebase correctness, not agent quality
**Test Execution:**
```bash
# 1. Component tests (fast, non-interactive)
python crawl4ai/agent/test_chat.py
# Expected: All components instantiate correctly
# 2. Tool integration tests (medium, requires browser)
python crawl4ai/agent/test_tools.py
# Expected: All 7 tools work with Crawl4AI
# 3. Multi-turn scenario tests (slow, comprehensive)
python crawl4ai/agent/test_scenarios.py
# Expected: 9 scenarios pass (2 simple, 3 medium, 4 complex)
# Output: test_agent_output/test_results.json
```
**Success Criteria:**
- All component tests pass
- All tool tests pass
- ≥80% scenario tests pass (7/9)
- No crashes, exceptions, or hangs
- Browser cleanup verified
**Automated Pipeline:**
```bash
# Run all tests in sequence, exit on first failure
cd /Users/unclecode/devs/crawl4ai
python crawl4ai/agent/test_chat.py && \
python crawl4ai/agent/test_tools.py && \
python crawl4ai/agent/test_scenarios.py
echo "Exit code: $?" # 0 = all passed
```
### Phase 2: Evaluation (NEXT)
**Objective:** Measure agent performance quality
**Metrics to define:**
- Task completion rate
- Tool selection accuracy
- Context retention across turns
- Planning effectiveness
- Error recovery capability
**Eval framework needed:**
- Expand scenario tests with quality scoring
- Add ground truth comparisons
- Measure token efficiency
- Track reasoning quality
**Not in scope yet** - wait for Phase 1 completion
---
**Last Updated:** 2025-01-17
**Version:** 1.0.0
**Status:** Testing Phase - Ready for automated test runs

View File

@@ -1,16 +0,0 @@
# __init__.py
"""Crawl4AI Agent - Browser automation agent powered by OpenAI Agents SDK."""
# Import only the components needed for library usage
# Don't import agent_crawl here to avoid warning when running with python -m
from .crawl_tools import CRAWL_TOOLS
from .crawl_prompts import SYSTEM_PROMPT
from .browser_manager import BrowserManager
from .terminal_ui import TerminalUI
__all__ = [
"CRAWL_TOOLS",
"SYSTEM_PROMPT",
"BrowserManager",
"TerminalUI",
]

View File

@@ -1,593 +0,0 @@
```python
# c4ai_tools.py
"""Crawl4AI tools for Claude Code SDK agent."""
import json
import asyncio
from typing import Any, Dict
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from claude_agent_sdk import tool
# Global session storage
CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {}
@tool("quick_crawl", "One-shot crawl for simple extraction. Returns markdown, HTML, or structured data.", {
"url": str,
"output_format": str, # "markdown" | "html" | "structured" | "screenshot"
"extraction_schema": str, # Optional: JSON schema for structured extraction
"js_code": str, # Optional: JavaScript to execute before extraction
"wait_for": str, # Optional: CSS selector to wait for
})
async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]:
"""Fast single-page crawl without session management."""
crawler_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code=args.get("js_code"),
wait_for=args.get("wait_for"),
)
# Add extraction strategy if structured data requested
if args.get("extraction_schema"):
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(args["extraction_schema"]),
instruction="Extract data according to the provided schema."
)
async with AsyncWebCrawler(config=crawler_config) as crawler:
result = await crawler.arun(url=args["url"], config=run_config)
if not result.success:
return {
"content": [{
"type": "text",
"text": json.dumps({"error": result.error_message, "success": False})
}]
}
output_map = {
"markdown": result.markdown_v2.raw_markdown if result.markdown_v2 else "",
"html": result.html,
"structured": result.extracted_content,
"screenshot": result.screenshot,
}
response = {
"success": True,
"url": result.url,
"data": output_map.get(args["output_format"], result.markdown_v2.raw_markdown)
}
return {"content": [{"type": "text", "text": json.dumps(response, indent=2)}]}
@tool("start_session", "Start a persistent browser session for multi-step crawling and automation.", {
"session_id": str,
"headless": bool, # Default True
})
async def start_session(args: Dict[str, Any]) -> Dict[str, Any]:
"""Initialize a persistent crawler session."""
session_id = args["session_id"]
if session_id in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} already exists",
"success": False
})}]}
crawler_config = BrowserConfig(
headless=args.get("headless", True),
verbose=False
)
crawler = AsyncWebCrawler(config=crawler_config)
await crawler.__aenter__()
CRAWLER_SESSIONS[session_id] = crawler
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"session_id": session_id,
"message": f"Browser session {session_id} started"
})}]}
@tool("navigate", "Navigate to a URL in an active session.", {
"session_id": str,
"url": str,
"wait_for": str, # Optional: CSS selector to wait for
"js_code": str, # Optional: JavaScript to execute after load
})
async def navigate(args: Dict[str, Any]) -> Dict[str, Any]:
"""Navigate to URL in session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_for=args.get("wait_for"),
js_code=args.get("js_code"),
)
result = await crawler.arun(url=args["url"], config=run_config)
return {"content": [{"type": "text", "text": json.dumps({
"success": result.success,
"url": result.url,
"message": f"Navigated to {args['url']}"
})}]}
@tool("extract_data", "Extract data from current page in session using schema or return markdown.", {
"session_id": str,
"output_format": str, # "markdown" | "structured"
"extraction_schema": str, # Required for structured, JSON schema
"wait_for": str, # Optional: Wait for element before extraction
"js_code": str, # Optional: Execute JS before extraction
})
async def extract_data(args: Dict[str, Any]) -> Dict[str, Any]:
"""Extract data from current page."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_for=args.get("wait_for"),
js_code=args.get("js_code"),
)
if args["output_format"] == "structured" and args.get("extraction_schema"):
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(args["extraction_schema"]),
instruction="Extract data according to schema."
)
result = await crawler.arun(config=run_config)
if not result.success:
return {"content": [{"type": "text", "text": json.dumps({
"error": result.error_message,
"success": False
})}]}
data = (result.extracted_content if args["output_format"] == "structured"
else result.markdown_v2.raw_markdown if result.markdown_v2 else "")
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"data": data
}, indent=2)}]}
@tool("execute_js", "Execute JavaScript in the current page context.", {
"session_id": str,
"js_code": str,
"wait_for": str, # Optional: Wait for element after execution
})
async def execute_js(args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute JavaScript in session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code=args["js_code"],
wait_for=args.get("wait_for"),
)
result = await crawler.arun(config=run_config)
return {"content": [{"type": "text", "text": json.dumps({
"success": result.success,
"message": "JavaScript executed"
})}]}
@tool("screenshot", "Take a screenshot of the current page.", {
"session_id": str,
})
async def screenshot(args: Dict[str, Any]) -> Dict[str, Any]:
"""Capture screenshot."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
result = await crawler.arun(config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS))
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"screenshot": result.screenshot if result.success else None
})}]}
@tool("close_session", "Close and cleanup a browser session.", {
"session_id": str,
})
async def close_session(args: Dict[str, Any]) -> Dict[str, Any]:
"""Close crawler session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS.pop(session_id)
await crawler.__aexit__(None, None, None)
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"message": f"Session {session_id} closed"
})}]}
# Export all tools
CRAWL_TOOLS = [
quick_crawl,
start_session,
navigate,
extract_data,
execute_js,
screenshot,
close_session,
]
```
```python
# c4ai_prompts.py
"""System prompts for Crawl4AI agent."""
SYSTEM_PROMPT = """You are an expert web crawling and browser automation agent powered by Crawl4AI.
# Core Capabilities
You can perform sophisticated multi-step web scraping and automation tasks through two modes:
## Quick Mode (simple tasks)
- Use `quick_crawl` for single-page data extraction
- Best for: simple scrapes, getting page content, one-time extractions
## Session Mode (complex tasks)
- Use `start_session` to create persistent browser sessions
- Navigate, interact, extract data across multiple pages
- Essential for: workflows requiring JS execution, pagination, filtering, multi-step automation
# Tool Usage Patterns
## Simple Extraction
1. Use `quick_crawl` with appropriate output_format
2. Provide extraction_schema for structured data
## Multi-Step Workflow
1. `start_session` - Create browser session with unique ID
2. `navigate` - Go to target URL
3. `execute_js` - Interact with page (click buttons, scroll, fill forms)
4. `extract_data` - Get data using schema or markdown
5. Repeat steps 2-4 as needed
6. `close_session` - Clean up when done
# Critical Instructions
1. **Iteration & Validation**: When tasks require filtering or conditional logic:
- Extract data first, analyze results
- Filter/validate in your reasoning
- Make subsequent tool calls based on validation
- Continue until task criteria are met
2. **Structured Extraction**: Always use JSON schemas for structured data:
```json
{
"type": "object",
"properties": {
"field_name": {"type": "string"},
"price": {"type": "number"}
}
}
```
3. **Session Management**:
- Generate unique session IDs (e.g., "product_scrape_001")
- Always close sessions when done
- Use sessions for tasks requiring multiple page visits
4. **JavaScript Execution**:
- Use for: clicking buttons, scrolling, waiting for dynamic content
- Example: `js_code: "document.querySelector('.load-more').click()"`
- Combine with `wait_for` to ensure content loads
5. **Error Handling**:
- Check `success` field in all responses
- Retry with different strategies if extraction fails
- Report specific errors to user
6. **Data Persistence**:
- Save results using `Write` tool to JSON files
- Use descriptive filenames with timestamps
- Structure data clearly for user consumption
# Example Workflows
## Workflow 1: Filter & Crawl
Task: "Find products >$10, crawl each, extract details"
1. `quick_crawl` product listing page with schema for [name, price, url]
2. Analyze results, filter price > 10 in reasoning
3. `start_session` for detailed crawling
4. For each filtered product:
- `navigate` to product URL
- `extract_data` with detail schema
5. Aggregate results
6. `close_session`
7. `Write` results to JSON
## Workflow 2: Paginated Scraping
Task: "Scrape all items across multiple pages"
1. `start_session`
2. `navigate` to page 1
3. `extract_data` items from current page
4. Check for "next" button
5. `execute_js` to click next
6. Repeat 3-5 until no more pages
7. `close_session`
8. Save aggregated data
## Workflow 3: Dynamic Content
Task: "Scrape reviews after clicking 'Load More'"
1. `start_session`
2. `navigate` to product page
3. `execute_js` to click load more button
4. `wait_for` reviews container
5. `extract_data` all reviews
6. `close_session`
# Quality Guidelines
- **Be thorough**: Don't stop until task requirements are fully met
- **Validate data**: Check extracted data matches expected format
- **Handle edge cases**: Empty results, pagination limits, rate limiting
- **Clear reporting**: Summarize what was found, any issues encountered
- **Efficient**: Use quick_crawl when possible, sessions only when needed
# Output Format
When saving data, use clean JSON structure:
```json
{
"metadata": {
"scraped_at": "ISO timestamp",
"source_url": "...",
"total_items": 0
},
"data": [...]
}
```
Always provide a final summary of:
- Items found/processed
- Time taken
- Files created
- Any warnings/errors
Remember: You have unlimited turns to complete the task. Take your time, validate each step, and ensure quality results."""
```
```python
# agent_crawl.py
"""Crawl4AI Agent CLI - Browser automation agent powered by Claude Code SDK."""
import asyncio
import sys
import json
import uuid
from pathlib import Path
from datetime import datetime
from typing import Optional
import argparse
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server
from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage
from c4ai_tools import CRAWL_TOOLS
from c4ai_prompts import SYSTEM_PROMPT
class SessionStorage:
"""Manage session storage in ~/.crawl4ai/agents/projects/"""
def __init__(self, cwd: Optional[str] = None):
self.cwd = Path(cwd) if cwd else Path.cwd()
self.base_dir = Path.home() / ".crawl4ai" / "agents" / "projects"
self.project_dir = self.base_dir / self._sanitize_path(str(self.cwd.resolve()))
self.project_dir.mkdir(parents=True, exist_ok=True)
self.session_id = str(uuid.uuid4())
self.log_file = self.project_dir / f"{self.session_id}.jsonl"
@staticmethod
def _sanitize_path(path: str) -> str:
"""Convert /Users/unclecode/devs/test to -Users-unclecode-devs-test"""
return path.replace("/", "-").replace("\\", "-")
def log(self, event_type: str, data: dict):
"""Append event to JSONL log."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": event_type,
"session_id": self.session_id,
"data": data
}
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
def get_session_path(self) -> str:
"""Return path to current session log."""
return str(self.log_file)
class CrawlAgent:
"""Crawl4AI agent wrapper."""
def __init__(self, args: argparse.Namespace):
self.args = args
self.storage = SessionStorage(args.add_dir[0] if args.add_dir else None)
self.client: Optional[ClaudeSDKClient] = None
# Create MCP server with crawl tools
self.crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=CRAWL_TOOLS
)
# Build options
self.options = ClaudeAgentOptions(
mcp_servers={"crawler": self.crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl",
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data",
"mcp__crawler__execute_js",
"mcp__crawler__screenshot",
"mcp__crawler__close_session",
"Write", "Read", "Bash"
],
system_prompt=SYSTEM_PROMPT if not args.system_prompt else args.system_prompt,
permission_mode=args.permission_mode or "acceptEdits",
cwd=args.add_dir[0] if args.add_dir else str(Path.cwd()),
model=args.model,
session_id=args.session_id or self.storage.session_id,
)
async def run(self, prompt: str):
"""Execute crawl task."""
self.storage.log("session_start", {
"prompt": prompt,
"cwd": self.options.cwd,
"model": self.options.model
})
print(f"\n🕷 Crawl4AI Agent")
print(f"📁 Session: {self.storage.session_id}")
print(f"💾 Log: {self.storage.get_session_path()}")
print(f"🎯 Task: {prompt}\n")
async with ClaudeSDKClient(options=self.options) as client:
self.client = client
await client.query(prompt)
turn = 0
async for message in client.receive_messages():
turn += 1
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"\n💭 [{turn}] {block.text}")
self.storage.log("assistant_message", {"turn": turn, "text": block.text})
elif isinstance(message, ResultMessage):
print(f"\n✅ Completed in {message.duration_ms/1000:.2f}s")
print(f"💰 Cost: ${message.total_cost_usd:.4f}" if message.total_cost_usd else "")
print(f"🔄 Turns: {message.num_turns}")
self.storage.log("session_end", {
"duration_ms": message.duration_ms,
"cost_usd": message.total_cost_usd,
"turns": message.num_turns,
"success": not message.is_error
})
break
print(f"\n📊 Session log: {self.storage.get_session_path()}\n")
def main():
parser = argparse.ArgumentParser(
description="Crawl4AI Agent - Browser automation powered by Claude Code SDK",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("prompt", nargs="?", help="Your crawling task prompt")
parser.add_argument("--system-prompt", help="Custom system prompt")
parser.add_argument("--permission-mode", choices=["acceptEdits", "bypassPermissions", "default", "plan"],
help="Permission mode for tool execution")
parser.add_argument("--model", help="Model to use (e.g., 'sonnet', 'opus')")
parser.add_argument("--add-dir", nargs="+", help="Additional directories for file access")
parser.add_argument("--session-id", help="Use specific session ID (UUID)")
parser.add_argument("-v", "--version", action="version", version="Crawl4AI Agent 1.0.0")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
if not args.prompt:
parser.print_help()
print("\nExample usage:")
print(' crawl-agent "Scrape all products from example.com with price > $10"')
print(' crawl-agent --add-dir ~/projects "Find all Python files and analyze imports"')
sys.exit(1)
try:
agent = CrawlAgent(args)
asyncio.run(agent.run(args.prompt))
except KeyboardInterrupt:
print("\n\n⚠ Interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
if args.debug:
raise
sys.exit(1)
if __name__ == "__main__":
main()
```
**Usage:**
```bash
# Simple scrape
python agent_crawl.py "Get all product names from example.com"
# Complex filtering
python agent_crawl.py "Find products >$10 from shop.com, crawl each, extract id/name/price"
# Multi-step automation
python agent_crawl.py "Go to amazon.com, search 'laptop', filter 4+ stars, scrape top 10"
# With options
python agent_crawl.py --add-dir ~/projects --model sonnet "Scrape competitor prices"
```
**Session logs stored at:**
`~/.crawl4ai/agents/projects/-Users-unclecode-devs-test/{uuid}.jsonl`

View File

@@ -1,126 +0,0 @@
# agent_crawl.py
"""Crawl4AI Agent CLI - Browser automation agent powered by OpenAI Agents SDK."""
import asyncio
import sys
import os
import argparse
from pathlib import Path
from agents import Agent, Runner, set_default_openai_key
from .crawl_tools import CRAWL_TOOLS
from .crawl_prompts import SYSTEM_PROMPT
from .browser_manager import BrowserManager
from .terminal_ui import TerminalUI
class CrawlAgent:
"""Crawl4AI agent wrapper using OpenAI Agents SDK."""
def __init__(self, args: argparse.Namespace):
self.args = args
self.ui = TerminalUI()
# Set API key
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
set_default_openai_key(api_key)
# Create agent
self.agent = Agent(
name="Crawl4AI Agent",
instructions=SYSTEM_PROMPT,
model=args.model or "gpt-4.1",
tools=CRAWL_TOOLS,
tool_use_behavior="run_llm_again", # CRITICAL: Run LLM again after tools to generate response
)
async def run_single_shot(self, prompt: str):
"""Execute a single crawl task."""
self.ui.console.print(f"\n🕷️ [bold cyan]Crawl4AI Agent[/bold cyan]")
self.ui.console.print(f"🎯 Task: {prompt}\n")
try:
result = await Runner.run(
starting_agent=self.agent,
input=prompt,
context=None,
max_turns=100, # Allow up to 100 turns for complex tasks
)
self.ui.console.print(f"\n[bold green]Result:[/bold green]")
self.ui.console.print(result.final_output)
if hasattr(result, 'usage'):
self.ui.console.print(f"\n[dim]Tokens: {result.usage}[/dim]")
except Exception as e:
self.ui.print_error(f"Error: {e}")
if self.args.debug:
raise
async def run_chat_mode(self):
"""Run interactive chat mode with streaming visibility."""
from .chat_mode import ChatMode
chat = ChatMode(self.agent, self.ui)
await chat.run()
def main():
parser = argparse.ArgumentParser(
description="Crawl4AI Agent - Browser automation powered by OpenAI Agents SDK",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("prompt", nargs="?", help="Your crawling task prompt (not used in --chat mode)")
parser.add_argument("--chat", action="store_true", help="Start interactive chat mode")
parser.add_argument("--model", help="Model to use (e.g., 'gpt-4.1', 'gpt-5-nano')", default="gpt-4.1")
parser.add_argument("-v", "--version", action="version", version="Crawl4AI Agent 2.0.0")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
# Chat mode - interactive
if args.chat:
try:
agent = CrawlAgent(args)
asyncio.run(agent.run_chat_mode())
except KeyboardInterrupt:
print("\n\n⚠️ Chat interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
if args.debug:
raise
sys.exit(1)
return
# Single-shot mode - requires prompt
if not args.prompt:
parser.print_help()
print("\nExample usage:")
print(' # Single-shot mode:')
print(' python -m crawl4ai.agent.agent_crawl "Scrape products from example.com"')
print()
print(' # Interactive chat mode:')
print(' python -m crawl4ai.agent.agent_crawl --chat')
sys.exit(1)
try:
agent = CrawlAgent(args)
asyncio.run(agent.run_single_shot(args.prompt))
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
if args.debug:
raise
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,73 +0,0 @@
"""Browser session management with singleton pattern for persistent browser instances."""
from typing import Optional
from crawl4ai import AsyncWebCrawler, BrowserConfig
class BrowserManager:
"""Singleton browser manager for persistent browser sessions across agent operations."""
_instance: Optional['BrowserManager'] = None
_crawler: Optional[AsyncWebCrawler] = None
_config: Optional[BrowserConfig] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
async def get_browser(cls, config: Optional[BrowserConfig] = None) -> AsyncWebCrawler:
"""
Get or create the singleton browser instance.
Args:
config: Optional browser configuration. Only used if no browser exists yet.
To change config, use reconfigure_browser() instead.
Returns:
AsyncWebCrawler instance
"""
# Create new browser if needed
if cls._crawler is None:
# Create default config if none provided
if config is None:
config = BrowserConfig(headless=True, verbose=False)
cls._crawler = AsyncWebCrawler(config=config)
await cls._crawler.start()
cls._config = config
return cls._crawler
@classmethod
async def reconfigure_browser(cls, new_config: BrowserConfig) -> AsyncWebCrawler:
"""
Close current browser and create a new one with different configuration.
Args:
new_config: New browser configuration
Returns:
New AsyncWebCrawler instance
"""
await cls.close_browser()
return await cls.get_browser(new_config)
@classmethod
async def close_browser(cls):
"""Close the current browser instance and cleanup."""
if cls._crawler is not None:
await cls._crawler.close()
cls._crawler = None
cls._config = None
@classmethod
def is_browser_active(cls) -> bool:
"""Check if browser is currently active."""
return cls._crawler is not None
@classmethod
def get_current_config(cls) -> Optional[BrowserConfig]:
"""Get the current browser configuration."""
return cls._config

View File

@@ -1,213 +0,0 @@
# chat_mode.py
"""Interactive chat mode with streaming visibility for Crawl4AI Agent."""
import asyncio
from typing import Optional
from agents import Agent, Runner
from .terminal_ui import TerminalUI
from .browser_manager import BrowserManager
class ChatMode:
"""Interactive chat mode with real-time status updates and tool visibility."""
def __init__(self, agent: Agent, ui: TerminalUI):
self.agent = agent
self.ui = ui
self._exit_requested = False
self.conversation_history = [] # Track full conversation for context
# Generate unique session ID
import time
self.session_id = f"session_{int(time.time())}"
async def _handle_command(self, command: str) -> bool:
"""Handle special chat commands.
Returns:
True if command was /exit, False otherwise
"""
cmd = command.lower().strip()
if cmd == '/exit' or cmd == '/quit':
self._exit_requested = True
self.ui.print_info("Exiting chat mode...")
return True
elif cmd == '/clear':
self.ui.clear_screen()
self.ui.show_header(session_id=self.session_id)
return False
elif cmd == '/help':
self.ui.show_commands()
return False
elif cmd == '/browser':
# Show browser status
if BrowserManager.is_browser_active():
config = BrowserManager.get_current_config()
self.ui.print_info(f"Browser active: headless={config.headless if config else 'unknown'}")
else:
self.ui.print_info("No browser instance active")
return False
else:
self.ui.print_error(f"Unknown command: {command}")
self.ui.print_info("Available commands: /exit, /clear, /help, /browser")
return False
async def run(self):
"""Run the interactive chat loop with streaming responses and visibility."""
# Show header with session ID (tips are now inside)
self.ui.show_header(session_id=self.session_id)
try:
while not self._exit_requested:
# Get user input
try:
user_input = await asyncio.to_thread(self.ui.get_user_input)
except EOFError:
break
# Handle commands
if user_input.startswith('/'):
should_exit = await self._handle_command(user_input)
if should_exit:
break
continue
# Skip empty input
if not user_input.strip():
continue
# Add user message to conversation history
self.conversation_history.append({
"role": "user",
"content": user_input
})
# Show thinking indicator
self.ui.console.print("\n[cyan]Agent:[/cyan] [dim italic]thinking...[/dim italic]")
try:
# Run agent with streaming, passing conversation history for context
result = Runner.run_streamed(
self.agent,
input=self.conversation_history, # Pass full conversation history
context=None,
max_turns=100, # Allow up to 100 turns for complex multi-step tasks
)
# Track what we've seen
response_text = []
tools_called = []
current_tool = None
# Process streaming events
async for event in result.stream_events():
# DEBUG: Print all event types
# self.ui.console.print(f"[dim]DEBUG: event type={event.type}[/dim]")
# Agent switched
if event.type == "agent_updated_stream_event":
self.ui.console.print(f"\n[dim]→ Agent: {event.new_agent.name}[/dim]")
# Items generated (tool calls, outputs, text)
elif event.type == "run_item_stream_event":
item = event.item
# Tool call started
if item.type == "tool_call_item":
# Get tool name from raw_item
current_tool = item.raw_item.name if hasattr(item.raw_item, 'name') else "unknown"
tools_called.append(current_tool)
# Show tool name and args clearly
tool_display = current_tool
self.ui.console.print(f"\n[yellow]🔧 Calling:[/yellow] [bold]{tool_display}[/bold]")
# Show tool arguments if present
if hasattr(item.raw_item, 'arguments'):
try:
import json
args_str = item.raw_item.arguments
args = json.loads(args_str) if isinstance(args_str, str) else args_str
# Show key args only
key_args = {k: v for k, v in args.items() if k in ['url', 'session_id', 'output_format']}
if key_args:
params_str = ", ".join(f"{k}={v}" for k, v in key_args.items())
self.ui.console.print(f" [dim]({params_str})[/dim]")
except:
pass
# Tool output received
elif item.type == "tool_call_output_item":
if current_tool:
self.ui.console.print(f" [green]✓[/green] [dim]completed[/dim]")
current_tool = None
# Agent text response (multiple types)
elif item.type == "text_item":
# Clear "thinking..." line if this is first text
if not response_text:
self.ui.console.print("\r[cyan]Agent:[/cyan] ", end="")
# Stream the text
self.ui.console.print(item.text, end="")
response_text.append(item.text)
# Message output (final response)
elif item.type == "message_output_item":
# This is the final formatted response
if not response_text:
self.ui.console.print("\n[cyan]Agent:[/cyan] ", end="")
# Extract text from content blocks
if hasattr(item.raw_item, 'content') and item.raw_item.content:
for content_block in item.raw_item.content:
if hasattr(content_block, 'text'):
text = content_block.text
self.ui.console.print(text, end="")
response_text.append(text)
# Text deltas (real-time streaming)
elif event.type == "text_delta_stream_event":
# Clear "thinking..." if this is first delta
if not response_text:
self.ui.console.print("\r[cyan]Agent:[/cyan] ", end="")
# Stream character by character for responsiveness
self.ui.console.print(event.delta, end="", markup=False)
response_text.append(event.delta)
# Newline after response
self.ui.console.print()
# Show summary after response
if tools_called:
self.ui.console.print(f"\n[dim]Tools used: {', '.join(set(tools_called))}[/dim]")
# Add agent response to conversation history
if response_text:
agent_response = "".join(response_text)
self.conversation_history.append({
"role": "assistant",
"content": agent_response
})
except Exception as e:
self.ui.print_error(f"Error during agent execution: {e}")
import traceback
traceback.print_exc()
except KeyboardInterrupt:
self.ui.print_info("\n\nChat interrupted by user")
finally:
# Cleanup browser on exit
self.ui.console.print("\n[dim]Cleaning up...[/dim]")
await BrowserManager.close_browser()
self.ui.print_info("Browser closed")
self.ui.console.print("[bold green]Goodbye![/bold green]\n")

View File

@@ -1,142 +0,0 @@
# crawl_prompts.py
"""System prompts for Crawl4AI agent."""
SYSTEM_PROMPT = """You are an expert web crawling and browser automation agent powered by Crawl4AI.
# Core Capabilities
You can perform sophisticated multi-step web scraping and automation tasks through two modes:
## Quick Mode (simple tasks)
- Use `quick_crawl` for single-page data extraction
- Best for: simple scrapes, getting page content, one-time extractions
- Returns markdown or HTML content immediately
## Session Mode (complex tasks)
- Use `start_session` to create persistent browser sessions
- Navigate, interact, extract data across multiple pages
- Essential for: workflows requiring JS execution, pagination, filtering, multi-step automation
- ALWAYS close sessions with `close_session` when done
# Tool Usage Patterns
## Simple Extraction
1. Use `quick_crawl` with appropriate output_format (markdown or html)
2. Provide extraction_schema for structured data if needed
## Multi-Step Workflow
1. `start_session` - Create browser session with unique ID
2. `navigate` - Go to target URL
3. `execute_js` - Interact with page (click buttons, scroll, fill forms)
4. `extract_data` - Get data using schema or markdown
5. Repeat steps 2-4 as needed
6. `close_session` - REQUIRED - Clean up when done
# Critical Instructions
1. **Session Management - CRITICAL**:
- Generate unique session IDs (e.g., "product_scrape_001")
- ALWAYS close sessions when done using `close_session`
- Use sessions for tasks requiring multiple page visits
- Track which session you're using
2. **JavaScript Execution**:
- Use for: clicking buttons, scrolling, waiting for dynamic content
- Example: `js_code: "document.querySelector('.load-more').click()"`
- Combine with `wait_for` to ensure content loads
3. **Error Handling**:
- Check `success` field in all tool responses
- If a tool fails, analyze why and try alternative approach
- Report specific errors to user
- Don't give up - try different strategies
4. **Structured Extraction**: Use JSON schemas for structured data:
```json
{
"type": "object",
"properties": {
"field_name": {"type": "string"},
"price": {"type": "number"}
}
}
```
# Example Workflows
## Workflow 1: Simple Multi-Page Crawl
Task: "Crawl example.com and example.org, extract titles"
```
Step 1: Crawl both pages
- Use quick_crawl(url="https://example.com", output_format="markdown")
- Use quick_crawl(url="https://example.org", output_format="markdown")
- Extract titles from markdown content
Step 2: Report
- Summarize the titles found
```
## Workflow 2: Session-Based Extraction
Task: "Start session, navigate, extract, save"
```
Step 1: Create and navigate
- start_session(session_id="extract_001")
- navigate(session_id="extract_001", url="https://example.com")
Step 2: Extract content
- extract_data(session_id="extract_001", output_format="markdown")
- Report the extracted content to user
Step 3: Cleanup (REQUIRED)
- close_session(session_id="extract_001")
```
## Workflow 3: Error Recovery
Task: "Handle failed crawl gracefully"
```
Step 1: Attempt crawl
- quick_crawl(url="https://invalid-site.com")
- Check success field in response
Step 2: On failure
- Acknowledge the error to user
- Provide clear error message
- DON'T give up - suggest alternative or retry
Step 3: Continue with valid request
- quick_crawl(url="https://example.com")
- Complete the task successfully
```
## Workflow 4: Paginated Scraping
Task: "Scrape all items across multiple pages"
1. `start_session`
2. `navigate` to page 1
3. `extract_data` items from current page
4. Check for "next" button
5. `execute_js` to click next
6. Repeat 3-5 until no more pages
7. `close_session` (REQUIRED)
8. Report aggregated data
# Quality Guidelines
- **Be thorough**: Don't stop until task requirements are fully met
- **Validate data**: Check extracted data matches expected format
- **Handle edge cases**: Empty results, pagination limits, rate limiting
- **Clear reporting**: Summarize what was found, any issues encountered
- **Efficient**: Use quick_crawl when possible, sessions only when needed
- **Session cleanup**: ALWAYS close sessions you created
# Key Reminders
1. **Sessions**: Always close what you open
2. **Errors**: Handle gracefully, don't stop at first failure
3. **Validation**: Check tool responses, verify success
4. **Completion**: Confirm all steps done, report results clearly
Remember: You have unlimited turns to complete the task. Take your time, validate each step, and ensure quality results."""

View File

@@ -1,362 +0,0 @@
# crawl_tools.py
"""Crawl4AI tools for OpenAI Agents SDK."""
import json
from typing import Any, Dict, Optional
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from agents import function_tool
from .browser_manager import BrowserManager
# Global session storage (for named sessions only)
CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {}
CRAWLER_SESSION_URLS: Dict[str, str] = {} # Track current URL per session
@function_tool
async def quick_crawl(
url: str,
output_format: str = "markdown",
extraction_schema: Optional[str] = None,
js_code: Optional[str] = None,
wait_for: Optional[str] = None
) -> str:
"""One-shot crawl for simple extraction. Returns markdown, HTML, or structured data.
Args:
url: The URL to crawl
output_format: Output format - "markdown", "html", "structured", or "screenshot"
extraction_schema: Optional JSON schema for structured extraction
js_code: Optional JavaScript to execute before extraction
wait_for: Optional CSS selector to wait for
Returns:
JSON string with success status, url, and extracted data
"""
# Use singleton browser manager
crawler_config = BrowserConfig(headless=True, verbose=False)
crawler = await BrowserManager.get_browser(crawler_config)
run_config = CrawlerRunConfig(
verbose=False,
cache_mode=CacheMode.BYPASS,
js_code=js_code,
wait_for=wait_for,
)
# Add extraction strategy if structured data requested
if extraction_schema:
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(extraction_schema),
instruction="Extract data according to the provided schema."
)
result = await crawler.arun(url=url, config=run_config)
if not result.success:
return json.dumps({
"error": result.error_message,
"success": False
}, indent=2)
# Handle markdown - can be string or MarkdownGenerationResult object
markdown_content = ""
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
output_map = {
"markdown": markdown_content,
"html": result.html,
"structured": result.extracted_content,
"screenshot": result.screenshot,
}
response = {
"success": True,
"url": result.url,
"data": output_map.get(output_format, markdown_content)
}
return json.dumps(response, indent=2)
@function_tool
async def start_session(
session_id: str,
headless: bool = True
) -> str:
"""Start a named browser session for multi-step crawling and automation.
Args:
session_id: Unique identifier for the session
headless: Whether to run browser in headless mode (default True)
Returns:
JSON string with success status and session info
"""
if session_id in CRAWLER_SESSIONS:
return json.dumps({
"error": f"Session {session_id} already exists",
"success": False
}, indent=2)
# Use the singleton browser
crawler_config = BrowserConfig(
headless=headless,
verbose=False
)
crawler = await BrowserManager.get_browser(crawler_config)
# Store reference for named session
CRAWLER_SESSIONS[session_id] = crawler
return json.dumps({
"success": True,
"session_id": session_id,
"message": f"Browser session {session_id} started"
}, indent=2)
@function_tool
async def navigate(
session_id: str,
url: str,
wait_for: Optional[str] = None,
js_code: Optional[str] = None
) -> str:
"""Navigate to a URL in an active session.
Args:
session_id: The session identifier
url: The URL to navigate to
wait_for: Optional CSS selector to wait for
js_code: Optional JavaScript to execute after load
Returns:
JSON string with navigation result
"""
if session_id not in CRAWLER_SESSIONS:
return json.dumps({
"error": f"Session {session_id} not found",
"success": False
}, indent=2)
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
verbose=False,
cache_mode=CacheMode.BYPASS,
wait_for=wait_for,
js_code=js_code,
)
result = await crawler.arun(url=url, config=run_config)
# Store current URL for this session
if result.success:
CRAWLER_SESSION_URLS[session_id] = result.url
return json.dumps({
"success": result.success,
"url": result.url,
"message": f"Navigated to {url}"
}, indent=2)
@function_tool
async def extract_data(
session_id: str,
output_format: str = "markdown",
extraction_schema: Optional[str] = None,
wait_for: Optional[str] = None,
js_code: Optional[str] = None
) -> str:
"""Extract data from current page in session using schema or return markdown.
Args:
session_id: The session identifier
output_format: "markdown" or "structured"
extraction_schema: Required for structured - JSON schema
wait_for: Optional - Wait for element before extraction
js_code: Optional - Execute JS before extraction
Returns:
JSON string with extracted data
"""
if session_id not in CRAWLER_SESSIONS:
return json.dumps({
"error": f"Session {session_id} not found",
"success": False
}, indent=2)
# Check if we have a current URL for this session
if session_id not in CRAWLER_SESSION_URLS:
return json.dumps({
"error": "No page loaded in session. Use 'navigate' first.",
"success": False
}, indent=2)
crawler = CRAWLER_SESSIONS[session_id]
current_url = CRAWLER_SESSION_URLS[session_id]
run_config = CrawlerRunConfig(
verbose=False,
cache_mode=CacheMode.BYPASS,
wait_for=wait_for,
js_code=js_code,
)
if output_format == "structured" and extraction_schema:
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(extraction_schema),
instruction="Extract data according to schema."
)
result = await crawler.arun(url=current_url, config=run_config)
if not result.success:
return json.dumps({
"error": result.error_message,
"success": False
}, indent=2)
# Handle markdown - can be string or MarkdownGenerationResult object
markdown_content = ""
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
data = (result.extracted_content if output_format == "structured"
else markdown_content)
return json.dumps({
"success": True,
"data": data
}, indent=2)
@function_tool
async def execute_js(
session_id: str,
js_code: str,
wait_for: Optional[str] = None
) -> str:
"""Execute JavaScript in the current page context.
Args:
session_id: The session identifier
js_code: JavaScript code to execute
wait_for: Optional - Wait for element after execution
Returns:
JSON string with execution result
"""
if session_id not in CRAWLER_SESSIONS:
return json.dumps({
"error": f"Session {session_id} not found",
"success": False
}, indent=2)
# Check if we have a current URL for this session
if session_id not in CRAWLER_SESSION_URLS:
return json.dumps({
"error": "No page loaded in session. Use 'navigate' first.",
"success": False
}, indent=2)
crawler = CRAWLER_SESSIONS[session_id]
current_url = CRAWLER_SESSION_URLS[session_id]
run_config = CrawlerRunConfig(
verbose=False,
cache_mode=CacheMode.BYPASS,
js_code=js_code,
wait_for=wait_for,
)
result = await crawler.arun(url=current_url, config=run_config)
return json.dumps({
"success": result.success,
"message": "JavaScript executed"
}, indent=2)
@function_tool
async def screenshot(session_id: str) -> str:
"""Take a screenshot of the current page.
Args:
session_id: The session identifier
Returns:
JSON string with screenshot data
"""
if session_id not in CRAWLER_SESSIONS:
return json.dumps({
"error": f"Session {session_id} not found",
"success": False
}, indent=2)
# Check if we have a current URL for this session
if session_id not in CRAWLER_SESSION_URLS:
return json.dumps({
"error": "No page loaded in session. Use 'navigate' first.",
"success": False
}, indent=2)
crawler = CRAWLER_SESSIONS[session_id]
current_url = CRAWLER_SESSION_URLS[session_id]
result = await crawler.arun(
url=current_url,
config=CrawlerRunConfig(verbose=False, cache_mode=CacheMode.BYPASS, screenshot=True)
)
return json.dumps({
"success": True,
"screenshot": result.screenshot if result.success else None
}, indent=2)
@function_tool
async def close_session(session_id: str) -> str:
"""Close and cleanup a named browser session.
Args:
session_id: The session identifier
Returns:
JSON string with closure confirmation
"""
if session_id not in CRAWLER_SESSIONS:
return json.dumps({
"error": f"Session {session_id} not found",
"success": False
}, indent=2)
# Remove from named sessions, but don't close the singleton browser
CRAWLER_SESSIONS.pop(session_id)
CRAWLER_SESSION_URLS.pop(session_id, None) # Remove URL tracking
return json.dumps({
"success": True,
"message": f"Session {session_id} closed"
}, indent=2)
# Export all tools
CRAWL_TOOLS = [
quick_crawl,
start_session,
navigate,
extract_data,
execute_js,
screenshot,
close_session,
]

File diff suppressed because it is too large Load Diff

View File

@@ -1,321 +0,0 @@
#!/usr/bin/env python
"""
Automated Test Suite Runner for Crawl4AI Agent
Runs all tests in sequence: Component → Tools → Scenarios
Generates comprehensive test report with timing and pass/fail metrics.
"""
import sys
import asyncio
import time
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
class TestSuiteRunner:
"""Orchestrates all test suites with reporting."""
def __init__(self, output_dir: Path):
self.output_dir = output_dir
self.output_dir.mkdir(exist_ok=True, parents=True)
self.results = {
"timestamp": datetime.now().isoformat(),
"test_suites": [],
"overall_status": "PENDING"
}
def print_banner(self, text: str, char: str = "="):
"""Print a formatted banner."""
width = 70
print(f"\n{char * width}")
print(f"{text:^{width}}")
print(f"{char * width}\n")
async def run_component_tests(self) -> Dict[str, Any]:
"""Run component tests (test_chat.py)."""
self.print_banner("TEST SUITE 1/3: COMPONENT TESTS", "=")
print("Testing: BrowserManager, TerminalUI, MCP Server, ChatMode")
print("Expected duration: ~5 seconds\n")
start_time = time.time()
suite_result = {
"name": "Component Tests",
"file": "test_chat.py",
"status": "PENDING",
"duration_seconds": 0,
"tests_run": 4,
"tests_passed": 0,
"tests_failed": 0,
"details": []
}
try:
# Import and run the test
from crawl4ai.agent import test_chat
# Capture the result
success = await test_chat.test_components()
duration = time.time() - start_time
suite_result["duration_seconds"] = duration
if success:
suite_result["status"] = "PASS"
suite_result["tests_passed"] = 4
print(f"\n✓ Component tests PASSED in {duration:.2f}s")
else:
suite_result["status"] = "FAIL"
suite_result["tests_failed"] = 4
print(f"\n✗ Component tests FAILED in {duration:.2f}s")
except Exception as e:
duration = time.time() - start_time
suite_result["status"] = "ERROR"
suite_result["error"] = str(e)
suite_result["duration_seconds"] = duration
suite_result["tests_failed"] = 4
print(f"\n✗ Component tests ERROR: {e}")
return suite_result
async def run_tool_tests(self) -> Dict[str, Any]:
"""Run tool integration tests (test_tools.py)."""
self.print_banner("TEST SUITE 2/3: TOOL INTEGRATION TESTS", "=")
print("Testing: Quick crawl, Session workflow, HTML format")
print("Expected duration: ~30 seconds (uses browser)\n")
start_time = time.time()
suite_result = {
"name": "Tool Integration Tests",
"file": "test_tools.py",
"status": "PENDING",
"duration_seconds": 0,
"tests_run": 3,
"tests_passed": 0,
"tests_failed": 0,
"details": []
}
try:
# Import and run the test
from crawl4ai.agent import test_tools
# Run the main test function
success = await test_tools.main()
duration = time.time() - start_time
suite_result["duration_seconds"] = duration
if success:
suite_result["status"] = "PASS"
suite_result["tests_passed"] = 3
print(f"\n✓ Tool tests PASSED in {duration:.2f}s")
else:
suite_result["status"] = "FAIL"
suite_result["tests_failed"] = 3
print(f"\n✗ Tool tests FAILED in {duration:.2f}s")
except Exception as e:
duration = time.time() - start_time
suite_result["status"] = "ERROR"
suite_result["error"] = str(e)
suite_result["duration_seconds"] = duration
suite_result["tests_failed"] = 3
print(f"\n✗ Tool tests ERROR: {e}")
return suite_result
async def run_scenario_tests(self) -> Dict[str, Any]:
"""Run multi-turn scenario tests (test_scenarios.py)."""
self.print_banner("TEST SUITE 3/3: MULTI-TURN SCENARIO TESTS", "=")
print("Testing: 9 scenarios (2 simple, 3 medium, 4 complex)")
print("Expected duration: ~3-5 minutes\n")
start_time = time.time()
suite_result = {
"name": "Multi-turn Scenario Tests",
"file": "test_scenarios.py",
"status": "PENDING",
"duration_seconds": 0,
"tests_run": 9,
"tests_passed": 0,
"tests_failed": 0,
"details": [],
"pass_rate_percent": 0.0
}
try:
# Import and run the test
from crawl4ai.agent import test_scenarios
# Run all scenarios
success = await test_scenarios.run_all_scenarios(self.output_dir)
duration = time.time() - start_time
suite_result["duration_seconds"] = duration
# Load detailed results from the generated file
results_file = self.output_dir / "test_results.json"
if results_file.exists():
with open(results_file) as f:
scenario_results = json.load(f)
passed = sum(1 for r in scenario_results if r["status"] == "PASS")
total = len(scenario_results)
suite_result["tests_passed"] = passed
suite_result["tests_failed"] = total - passed
suite_result["pass_rate_percent"] = (passed / total * 100) if total > 0 else 0
suite_result["details"] = scenario_results
if success:
suite_result["status"] = "PASS"
print(f"\n✓ Scenario tests PASSED ({passed}/{total}) in {duration:.2f}s")
else:
suite_result["status"] = "FAIL"
print(f"\n✗ Scenario tests FAILED ({passed}/{total}) in {duration:.2f}s")
else:
suite_result["status"] = "FAIL"
suite_result["tests_failed"] = 9
print(f"\n✗ Scenario results file not found")
except Exception as e:
duration = time.time() - start_time
suite_result["status"] = "ERROR"
suite_result["error"] = str(e)
suite_result["duration_seconds"] = duration
suite_result["tests_failed"] = 9
print(f"\n✗ Scenario tests ERROR: {e}")
import traceback
traceback.print_exc()
return suite_result
async def run_all(self) -> bool:
"""Run all test suites in sequence."""
self.print_banner("CRAWL4AI AGENT - AUTOMATED TEST SUITE", "")
print("This will run 3 test suites in sequence:")
print(" 1. Component Tests (~5s)")
print(" 2. Tool Integration Tests (~30s)")
print(" 3. Multi-turn Scenario Tests (~3-5 min)")
print(f"\nOutput directory: {self.output_dir}")
print(f"Started at: {self.results['timestamp']}\n")
overall_start = time.time()
# Run all test suites
component_result = await self.run_component_tests()
self.results["test_suites"].append(component_result)
# Only continue if components pass
if component_result["status"] != "PASS":
print("\n⚠️ Component tests failed. Stopping execution.")
print("Fix component issues before running integration tests.")
self.results["overall_status"] = "FAILED"
self._save_report()
return False
tool_result = await self.run_tool_tests()
self.results["test_suites"].append(tool_result)
# Only continue if tools pass
if tool_result["status"] != "PASS":
print("\n⚠️ Tool tests failed. Stopping execution.")
print("Fix tool integration issues before running scenarios.")
self.results["overall_status"] = "FAILED"
self._save_report()
return False
scenario_result = await self.run_scenario_tests()
self.results["test_suites"].append(scenario_result)
# Calculate overall results
overall_duration = time.time() - overall_start
self.results["total_duration_seconds"] = overall_duration
# Determine overall status
all_passed = all(s["status"] == "PASS" for s in self.results["test_suites"])
# For scenarios, we accept ≥80% pass rate
if scenario_result["status"] == "FAIL" and scenario_result.get("pass_rate_percent", 0) >= 80.0:
self.results["overall_status"] = "PASS_WITH_WARNINGS"
elif all_passed:
self.results["overall_status"] = "PASS"
else:
self.results["overall_status"] = "FAIL"
# Print final summary
self._print_summary()
self._save_report()
return self.results["overall_status"] in ["PASS", "PASS_WITH_WARNINGS"]
def _print_summary(self):
"""Print final test summary."""
self.print_banner("FINAL TEST SUMMARY", "")
for suite in self.results["test_suites"]:
status_icon = "" if suite["status"] == "PASS" else ""
duration = suite["duration_seconds"]
if "pass_rate_percent" in suite:
# Scenario tests
passed = suite["tests_passed"]
total = suite["tests_run"]
pass_rate = suite["pass_rate_percent"]
print(f"{status_icon} {suite['name']}: {passed}/{total} passed ({pass_rate:.1f}%) in {duration:.2f}s")
else:
# Component/Tool tests
passed = suite["tests_passed"]
total = suite["tests_run"]
print(f"{status_icon} {suite['name']}: {passed}/{total} passed in {duration:.2f}s")
print(f"\nTotal duration: {self.results['total_duration_seconds']:.2f}s")
print(f"Overall status: {self.results['overall_status']}")
if self.results["overall_status"] == "PASS":
print("\n🎉 ALL TESTS PASSED! Ready for evaluation phase.")
elif self.results["overall_status"] == "PASS_WITH_WARNINGS":
print("\n⚠️ Tests passed with warnings (≥80% scenario pass rate).")
print("Consider investigating failed scenarios before evaluation.")
else:
print("\n❌ TESTS FAILED. Please fix issues before proceeding to evaluation.")
def _save_report(self):
"""Save detailed test report to JSON."""
report_file = self.output_dir / "test_suite_report.json"
with open(report_file, "w") as f:
json.dump(self.results, f, indent=2)
print(f"\n📄 Detailed report saved to: {report_file}")
async def main():
"""Main entry point."""
# Set up output directory
output_dir = Path.cwd() / "test_agent_output"
# Run all tests
runner = TestSuiteRunner(output_dir)
success = await runner.run_all()
return success
if __name__ == "__main__":
try:
success = asyncio.run(main())
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n\n⚠️ Tests interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n\n❌ Fatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@@ -1,289 +0,0 @@
"""Terminal UI components using Rich for beautiful agent output."""
import readline
from rich.console import Console
from rich.markdown import Markdown
from rich.syntax import Syntax
from rich.panel import Panel
from rich.live import Live
from rich.spinner import Spinner
from rich.text import Text
from rich.prompt import Prompt
from rich.rule import Rule
# Crawl4AI Logo (>X< shape)
CRAWL4AI_LOGO = """
██ ██
▓ ██ ██ ▓
▓ ██ ▓
▓ ██ ██ ▓
██ ██
"""
VERSION = "0.1.0"
class TerminalUI:
"""Rich-based terminal interface for the Crawl4AI agent."""
def __init__(self):
self.console = Console()
self._current_text = ""
# Configure readline for command history
# History will persist in memory during session
readline.parse_and_bind('tab: complete') # Enable tab completion
readline.parse_and_bind('set editing-mode emacs') # Emacs-style editing (Ctrl+A, Ctrl+E, etc.)
# Up/Down arrows already work by default for history
def show_header(self, session_id: str = None, log_path: str = None):
"""Display agent session header - Claude Code style with vertical divider."""
import os
self.console.print()
# Get current directory
current_dir = os.getcwd()
# Build left and right columns separately to avoid padding issues
from rich.table import Table
from rich.text import Text
# Create a table with two columns
table = Table.grid(padding=(0, 2))
table.add_column(width=30, style="") # Left column
table.add_column(width=1, style="dim") # Divider
table.add_column(style="") # Right column
# Row 1: Welcome / Tips header (centered)
table.add_row(
Text("Welcome back!", style="bold white", justify="center"),
"",
Text("Tips", style="bold white")
)
# Row 2: Empty / Tip 1
table.add_row(
"",
"",
Text("• Press ", style="dim") + Text("Enter", style="cyan") + Text(" to send", style="dim")
)
# Row 3: Logo line 1 / Tip 2
table.add_row(
Text(" ██ ██", style="bold cyan"),
"",
Text("• Press ", style="dim") + Text("Option+Enter", style="cyan") + Text(" or ", style="dim") + Text("Ctrl+J", style="cyan") + Text(" for new line", style="dim")
)
# Row 4: Logo line 2 / Tip 3
table.add_row(
Text(" ▓ ██ ██ ▓", style="bold cyan"),
"",
Text("• Use ", style="dim") + Text("/exit", style="cyan") + Text(", ", style="dim") + Text("/clear", style="cyan") + Text(", ", style="dim") + Text("/help", style="cyan") + Text(", ", style="dim") + Text("/browser", style="cyan")
)
# Row 5: Logo line 3 / Empty
table.add_row(
Text(" ▓ ██ ▓", style="bold cyan"),
"",
""
)
# Row 6: Logo line 4 / Session header
table.add_row(
Text(" ▓ ██ ██ ▓", style="bold cyan"),
"",
Text("Session", style="bold white")
)
# Row 7: Logo line 5 / Session ID
session_name = os.path.basename(session_id) if session_id else "unknown"
table.add_row(
Text(" ██ ██", style="bold cyan"),
"",
Text(session_name, style="dim")
)
# Row 8: Empty
table.add_row("", "", "")
# Row 9: Version (centered)
table.add_row(
Text(f"Version {VERSION}", style="dim", justify="center"),
"",
""
)
# Row 10: Path (centered)
table.add_row(
Text(current_dir, style="dim", justify="center"),
"",
""
)
# Create panel with title
panel = Panel(
table,
title=f"[bold cyan]─── Crawl4AI Agent v{VERSION} ───[/bold cyan]",
title_align="left",
border_style="cyan",
padding=(1, 1),
expand=True
)
self.console.print(panel)
self.console.print()
def show_commands(self):
"""Display available commands."""
self.console.print("\n[dim]Commands:[/dim]")
self.console.print(" [cyan]/exit[/cyan] - Exit chat")
self.console.print(" [cyan]/clear[/cyan] - Clear screen")
self.console.print(" [cyan]/help[/cyan] - Show this help")
self.console.print(" [cyan]/browser[/cyan] - Show browser status\n")
def get_user_input(self) -> str:
"""Get user input with multi-line support and paste handling.
Usage:
- Press Enter to submit
- Press Option+Enter (or Ctrl+J) for new line
- Paste multi-line text works perfectly
"""
from prompt_toolkit import prompt
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.keys import Keys
from prompt_toolkit.formatted_text import HTML
# Create custom key bindings
bindings = KeyBindings()
# Enter to submit (reversed from default multiline behavior)
@bindings.add(Keys.Enter)
def _(event):
"""Submit the input when Enter is pressed."""
event.current_buffer.validate_and_handle()
# Option+Enter for newline (sends Esc+Enter when iTerm2 configured with "Esc+")
@bindings.add(Keys.Escape, Keys.Enter)
def _(event):
"""Insert newline with Option+Enter (or Esc then Enter)."""
event.current_buffer.insert_text("\n")
# Ctrl+J as alternative for newline (works everywhere)
@bindings.add(Keys.ControlJ)
def _(event):
"""Insert newline with Ctrl+J."""
event.current_buffer.insert_text("\n")
try:
# Tips are now in header, no need for extra hint
# Use prompt_toolkit with HTML formatting (no ANSI codes)
user_input = prompt(
HTML("\n<ansigreen><b>You:</b></ansigreen> "),
multiline=True,
key_bindings=bindings,
enable_open_in_editor=False,
)
return user_input.strip()
except (EOFError, KeyboardInterrupt):
raise EOFError()
def print_separator(self):
"""Print a visual separator."""
self.console.print(Rule(style="dim"))
def print_thinking(self):
"""Show thinking indicator."""
self.console.print("\n[cyan]Agent:[/cyan] [dim]thinking...[/dim]", end="")
def print_agent_text(self, text: str, stream: bool = False):
"""
Print agent response text.
Args:
text: Text to print
stream: If True, append to current streaming output
"""
if stream:
# For streaming, just print without newline
self.console.print(f"\r[cyan]Agent:[/cyan] {text}", end="")
else:
# For complete messages
self.console.print(f"\n[cyan]Agent:[/cyan] {text}")
def print_markdown(self, markdown_text: str):
"""Render markdown content."""
self.console.print()
self.console.print(Markdown(markdown_text))
def print_code(self, code: str, language: str = "python"):
"""Render code with syntax highlighting."""
self.console.print()
self.console.print(Syntax(code, language, theme="monokai", line_numbers=True))
def print_error(self, error_msg: str):
"""Display error message."""
self.console.print(f"\n[bold red]Error:[/bold red] {error_msg}")
def print_success(self, msg: str):
"""Display success message."""
self.console.print(f"\n[bold green]✓[/bold green] {msg}")
def print_info(self, msg: str):
"""Display info message."""
self.console.print(f"\n[bold blue][/bold blue] {msg}")
def clear_screen(self):
"""Clear the terminal screen."""
self.console.clear()
def print_session_summary(self, duration_s: float, turns: int, cost_usd: float = None):
"""Display session completion summary."""
self.console.print()
self.console.print(Panel(
f"[green]✅ Completed[/green]\n"
f"⏱ Duration: {duration_s:.2f}s\n"
f"🔄 Turns: {turns}\n"
+ (f"💰 Cost: ${cost_usd:.4f}" if cost_usd else ""),
border_style="green"
))
def print_tool_use(self, tool_name: str, tool_input: dict = None):
"""Indicate tool usage with parameters."""
# Shorten crawl4ai tool names for readability
display_name = tool_name.replace("mcp__crawler__", "")
if tool_input:
# Show key parameters only
params = []
if "url" in tool_input:
url = tool_input["url"]
# Truncate long URLs
if len(url) > 50:
url = url[:47] + "..."
params.append(f"[dim]url=[/dim]{url}")
if "session_id" in tool_input:
params.append(f"[dim]session=[/dim]{tool_input['session_id']}")
if "file_path" in tool_input:
params.append(f"[dim]file=[/dim]{tool_input['file_path']}")
if "output_format" in tool_input:
params.append(f"[dim]format=[/dim]{tool_input['output_format']}")
param_str = ", ".join(params) if params else ""
self.console.print(f" [yellow]🔧 {display_name}[/yellow]({param_str})")
else:
self.console.print(f" [yellow]🔧 {display_name}[/yellow]")
def with_spinner(self, text: str = "Processing..."):
"""
Context manager for showing a spinner.
Usage:
with ui.with_spinner("Crawling page..."):
# do work
"""
return self.console.status(f"[cyan]{text}[/cyan]", spinner="dots")

View File

@@ -1,114 +0,0 @@
#!/usr/bin/env python
"""Test script to verify chat mode setup (non-interactive)."""
import sys
import asyncio
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from crawl4ai.agent.browser_manager import BrowserManager
from crawl4ai.agent.terminal_ui import TerminalUI
from crawl4ai.agent.chat_mode import ChatMode
from crawl4ai.agent.c4ai_tools import CRAWL_TOOLS
from crawl4ai.agent.c4ai_prompts import SYSTEM_PROMPT
from claude_agent_sdk import ClaudeAgentOptions, create_sdk_mcp_server
class MockStorage:
"""Mock storage for testing."""
def log(self, event_type: str, data: dict):
print(f"[LOG] {event_type}: {data}")
def get_session_path(self):
return "/tmp/test_session.jsonl"
async def test_components():
"""Test individual components."""
print("="*60)
print("CHAT MODE COMPONENT TESTS")
print("="*60)
# Test 1: BrowserManager
print("\n[TEST 1] BrowserManager singleton")
try:
browser1 = await BrowserManager.get_browser()
browser2 = await BrowserManager.get_browser()
assert browser1 is browser2, "Browser instances should be same (singleton)"
print("✓ BrowserManager singleton works")
await BrowserManager.close_browser()
except Exception as e:
print(f"✗ BrowserManager failed: {e}")
return False
# Test 2: TerminalUI
print("\n[TEST 2] TerminalUI rendering")
try:
ui = TerminalUI()
ui.show_header("test-123", "/tmp/test.log")
ui.print_agent_text("Hello from agent")
ui.print_markdown("# Test\nThis is **bold**")
ui.print_success("Test success message")
print("✓ TerminalUI renders correctly")
except Exception as e:
print(f"✗ TerminalUI failed: {e}")
return False
# Test 3: MCP Server Setup
print("\n[TEST 3] MCP Server with tools")
try:
crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=CRAWL_TOOLS
)
print(f"✓ MCP server created with {len(CRAWL_TOOLS)} tools")
except Exception as e:
print(f"✗ MCP Server failed: {e}")
return False
# Test 4: ChatMode instantiation
print("\n[TEST 4] ChatMode instantiation")
try:
options = ClaudeAgentOptions(
mcp_servers={"crawler": crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl",
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data",
"mcp__crawler__execute_js",
"mcp__crawler__screenshot",
"mcp__crawler__close_session",
],
system_prompt=SYSTEM_PROMPT,
permission_mode="acceptEdits"
)
ui = TerminalUI()
storage = MockStorage()
chat = ChatMode(options, ui, storage)
print("✓ ChatMode instance created successfully")
except Exception as e:
print(f"✗ ChatMode failed: {e}")
import traceback
traceback.print_exc()
return False
print("\n" + "="*60)
print("ALL COMPONENT TESTS PASSED ✓")
print("="*60)
print("\nTo test interactive chat mode, run:")
print(" python -m crawl4ai.agent.agent_crawl --chat")
return True
if __name__ == "__main__":
success = asyncio.run(test_components())
sys.exit(0 if success else 1)

View File

@@ -1,524 +0,0 @@
#!/usr/bin/env python
"""
Automated multi-turn chat scenario tests for Crawl4AI Agent.
Tests agent's ability to handle complex conversations, maintain state,
plan and execute tasks without human interaction.
"""
import asyncio
import json
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server
from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage, ToolUseBlock
from .c4ai_tools import CRAWL_TOOLS
from .c4ai_prompts import SYSTEM_PROMPT
from .browser_manager import BrowserManager
class TurnResult(Enum):
"""Result of a single conversation turn."""
PASS = "PASS"
FAIL = "FAIL"
TIMEOUT = "TIMEOUT"
ERROR = "ERROR"
@dataclass
class TurnExpectation:
"""Expectations for a single conversation turn."""
user_message: str
expect_tools: Optional[List[str]] = None # Tools that should be called
expect_keywords: Optional[List[str]] = None # Keywords in response
expect_files_created: Optional[List[str]] = None # File patterns created
expect_success: bool = True # Should complete without error
expect_min_turns: int = 1 # Minimum agent turns to complete
timeout_seconds: int = 60
@dataclass
class Scenario:
"""A complete multi-turn conversation scenario."""
name: str
category: str # "simple", "medium", "complex"
description: str
turns: List[TurnExpectation]
cleanup_files: Optional[List[str]] = None # Files to cleanup after test
# =============================================================================
# TEST SCENARIOS - Categorized from Simple to Complex
# =============================================================================
SIMPLE_SCENARIOS = [
Scenario(
name="Single quick crawl",
category="simple",
description="Basic one-shot crawl with markdown extraction",
turns=[
TurnExpectation(
user_message="Use quick_crawl to get the title from example.com",
expect_tools=["mcp__crawler__quick_crawl"],
expect_keywords=["Example Domain", "title"],
timeout_seconds=30
)
]
),
Scenario(
name="Session lifecycle",
category="simple",
description="Start session, navigate, close - basic session management",
turns=[
TurnExpectation(
user_message="Start a session named 'simple_test'",
expect_tools=["mcp__crawler__start_session"],
expect_keywords=["session", "started"],
timeout_seconds=20
),
TurnExpectation(
user_message="Navigate to example.com",
expect_tools=["mcp__crawler__navigate"],
expect_keywords=["navigated", "example.com"],
timeout_seconds=25
),
TurnExpectation(
user_message="Close the session",
expect_tools=["mcp__crawler__close_session"],
expect_keywords=["closed"],
timeout_seconds=15
)
]
),
]
MEDIUM_SCENARIOS = [
Scenario(
name="Multi-page crawl with file output",
category="medium",
description="Crawl multiple pages and save results to file",
turns=[
TurnExpectation(
user_message="Crawl example.com and example.org, extract titles from both",
expect_tools=["mcp__crawler__quick_crawl"],
expect_min_turns=2, # Should make 2 separate crawls
timeout_seconds=45
),
TurnExpectation(
user_message="Use the Write tool to save the titles you extracted to a file called crawl_results.txt",
expect_tools=["Write"],
expect_files_created=["crawl_results.txt"],
timeout_seconds=30
)
],
cleanup_files=["crawl_results.txt"]
),
Scenario(
name="Session-based data extraction",
category="medium",
description="Use session to navigate and extract data in steps",
turns=[
TurnExpectation(
user_message="Start session 'extract_test', navigate to example.com, and extract the markdown",
expect_tools=["mcp__crawler__start_session", "mcp__crawler__navigate", "mcp__crawler__extract_data"],
expect_keywords=["Example Domain"],
timeout_seconds=50
),
TurnExpectation(
user_message="Use the Write tool to save the extracted markdown to example_content.md",
expect_tools=["Write"],
expect_files_created=["example_content.md"],
timeout_seconds=30
),
TurnExpectation(
user_message="Close the session",
expect_tools=["mcp__crawler__close_session"],
timeout_seconds=15
)
],
cleanup_files=["example_content.md"]
),
Scenario(
name="Context retention across turns",
category="medium",
description="Agent should remember previous context",
turns=[
TurnExpectation(
user_message="Crawl example.com and tell me the title",
expect_tools=["mcp__crawler__quick_crawl"],
expect_keywords=["Example Domain"],
timeout_seconds=30
),
TurnExpectation(
user_message="What was the URL I just asked you to crawl?",
expect_keywords=["example.com"],
expect_tools=[], # Should answer from memory, no tools needed
timeout_seconds=15
)
]
),
]
COMPLEX_SCENARIOS = [
Scenario(
name="Multi-step task with planning",
category="complex",
description="Complex task requiring agent to plan, execute, and verify",
turns=[
TurnExpectation(
user_message="Crawl example.com and example.org, compare their content, and create a markdown report with: 1) titles of both, 2) word count comparison, 3) save to comparison_report.md",
expect_tools=["mcp__crawler__quick_crawl", "Write"],
expect_files_created=["comparison_report.md"],
expect_min_turns=3, # Plan, crawl both, write report
timeout_seconds=90
),
TurnExpectation(
user_message="Read back the report you just created",
expect_tools=["Read"],
expect_keywords=["Example Domain"],
timeout_seconds=20
)
],
cleanup_files=["comparison_report.md"]
),
Scenario(
name="Session with state manipulation",
category="complex",
description="Complex session workflow with multiple operations",
turns=[
TurnExpectation(
user_message="Start session 'complex_session' and navigate to example.com",
expect_tools=["mcp__crawler__start_session", "mcp__crawler__navigate"],
timeout_seconds=30
),
TurnExpectation(
user_message="Extract the page content and count how many times the word 'example' appears (case insensitive)",
expect_tools=["mcp__crawler__extract_data"],
expect_keywords=["example"],
timeout_seconds=30
),
TurnExpectation(
user_message="Take a screenshot of the current page",
expect_tools=["mcp__crawler__screenshot"],
expect_keywords=["screenshot"],
timeout_seconds=25
),
TurnExpectation(
user_message="Close the session",
expect_tools=["mcp__crawler__close_session"],
timeout_seconds=15
)
]
),
Scenario(
name="Error recovery and continuation",
category="complex",
description="Agent should handle errors gracefully and continue",
turns=[
TurnExpectation(
user_message="Crawl https://this-site-definitely-does-not-exist-12345.com",
expect_success=False, # Should fail gracefully
expect_keywords=["error", "fail"],
timeout_seconds=30
),
TurnExpectation(
user_message="That's okay, crawl example.com instead",
expect_tools=["mcp__crawler__quick_crawl"],
expect_keywords=["Example Domain"],
timeout_seconds=30
)
]
),
]
# Combine all scenarios
ALL_SCENARIOS = SIMPLE_SCENARIOS + MEDIUM_SCENARIOS + COMPLEX_SCENARIOS
# =============================================================================
# TEST RUNNER
# =============================================================================
class ScenarioRunner:
"""Runs automated chat scenarios without human interaction."""
def __init__(self, working_dir: Path):
self.working_dir = working_dir
self.results = []
async def run_scenario(self, scenario: Scenario) -> Dict[str, Any]:
"""Run a single scenario and return results."""
print(f"\n{'='*70}")
print(f"[{scenario.category.upper()}] {scenario.name}")
print(f"{'='*70}")
print(f"Description: {scenario.description}\n")
start_time = time.time()
turn_results = []
try:
# Setup agent options
crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=CRAWL_TOOLS
)
options = ClaudeAgentOptions(
mcp_servers={"crawler": crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl",
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data",
"mcp__crawler__execute_js",
"mcp__crawler__screenshot",
"mcp__crawler__close_session",
"Read", "Write", "Edit", "Glob", "Grep", "Bash"
],
system_prompt=SYSTEM_PROMPT,
permission_mode="acceptEdits",
cwd=str(self.working_dir)
)
# Run conversation
async with ClaudeSDKClient(options=options) as client:
for turn_idx, expectation in enumerate(scenario.turns, 1):
print(f"\nTurn {turn_idx}: {expectation.user_message}")
turn_result = await self._run_turn(
client, expectation, turn_idx
)
turn_results.append(turn_result)
if turn_result["status"] != TurnResult.PASS.value:
print(f" ✗ FAILED: {turn_result['reason']}")
break
else:
print(f" ✓ PASSED")
# Cleanup
if scenario.cleanup_files:
self._cleanup_files(scenario.cleanup_files)
# Overall result
all_passed = all(r["status"] == TurnResult.PASS.value for r in turn_results)
duration = time.time() - start_time
result = {
"scenario": scenario.name,
"category": scenario.category,
"status": "PASS" if all_passed else "FAIL",
"duration_seconds": duration,
"turns": turn_results
}
return result
except Exception as e:
print(f"\n✗ SCENARIO ERROR: {e}")
return {
"scenario": scenario.name,
"category": scenario.category,
"status": "ERROR",
"error": str(e),
"duration_seconds": time.time() - start_time,
"turns": turn_results
}
finally:
# Ensure browser cleanup
await BrowserManager.close_browser()
async def _run_turn(
self,
client: ClaudeSDKClient,
expectation: TurnExpectation,
turn_number: int
) -> Dict[str, Any]:
"""Execute a single conversation turn and validate."""
tools_used = []
response_text = ""
agent_turns = 0
try:
# Send user message
await client.query(expectation.user_message)
# Collect response
start_time = time.time()
async for message in client.receive_messages():
if time.time() - start_time > expectation.timeout_seconds:
return {
"turn": turn_number,
"status": TurnResult.TIMEOUT.value,
"reason": f"Exceeded {expectation.timeout_seconds}s timeout"
}
if isinstance(message, AssistantMessage):
agent_turns += 1
for block in message.content:
if isinstance(block, TextBlock):
response_text += block.text + " "
elif isinstance(block, ToolUseBlock):
tools_used.append(block.name)
elif isinstance(message, ResultMessage):
# Check if error when expecting success
if expectation.expect_success and message.is_error:
return {
"turn": turn_number,
"status": TurnResult.FAIL.value,
"reason": f"Agent returned error: {message.result}"
}
break
# Validate expectations
validation = self._validate_turn(
expectation, tools_used, response_text, agent_turns
)
return {
"turn": turn_number,
"status": validation["status"],
"reason": validation.get("reason", "All checks passed"),
"tools_used": tools_used,
"agent_turns": agent_turns
}
except Exception as e:
return {
"turn": turn_number,
"status": TurnResult.ERROR.value,
"reason": f"Exception: {str(e)}"
}
def _validate_turn(
self,
expectation: TurnExpectation,
tools_used: List[str],
response_text: str,
agent_turns: int
) -> Dict[str, Any]:
"""Validate turn results against expectations."""
# Check expected tools
if expectation.expect_tools:
for tool in expectation.expect_tools:
if tool not in tools_used:
return {
"status": TurnResult.FAIL.value,
"reason": f"Expected tool '{tool}' was not used"
}
# Check keywords
if expectation.expect_keywords:
response_lower = response_text.lower()
for keyword in expectation.expect_keywords:
if keyword.lower() not in response_lower:
return {
"status": TurnResult.FAIL.value,
"reason": f"Expected keyword '{keyword}' not found in response"
}
# Check files created
if expectation.expect_files_created:
for pattern in expectation.expect_files_created:
matches = list(self.working_dir.glob(pattern))
if not matches:
return {
"status": TurnResult.FAIL.value,
"reason": f"Expected file matching '{pattern}' was not created"
}
# Check minimum turns
if agent_turns < expectation.expect_min_turns:
return {
"status": TurnResult.FAIL.value,
"reason": f"Expected at least {expectation.expect_min_turns} agent turns, got {agent_turns}"
}
return {"status": TurnResult.PASS.value}
def _cleanup_files(self, patterns: List[str]):
"""Remove files created during test."""
for pattern in patterns:
for file_path in self.working_dir.glob(pattern):
try:
file_path.unlink()
except Exception as e:
print(f" Warning: Could not delete {file_path}: {e}")
async def run_all_scenarios(working_dir: Optional[Path] = None):
"""Run all test scenarios and report results."""
if working_dir is None:
working_dir = Path.cwd() / "test_agent_output"
working_dir.mkdir(exist_ok=True)
runner = ScenarioRunner(working_dir)
print("\n" + "="*70)
print("CRAWL4AI AGENT SCENARIO TESTS")
print("="*70)
print(f"Working directory: {working_dir}")
print(f"Total scenarios: {len(ALL_SCENARIOS)}")
print(f" Simple: {len(SIMPLE_SCENARIOS)}")
print(f" Medium: {len(MEDIUM_SCENARIOS)}")
print(f" Complex: {len(COMPLEX_SCENARIOS)}")
results = []
for scenario in ALL_SCENARIOS:
result = await runner.run_scenario(scenario)
results.append(result)
# Summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
by_category = {"simple": [], "medium": [], "complex": []}
for result in results:
by_category[result["category"]].append(result)
for category in ["simple", "medium", "complex"]:
cat_results = by_category[category]
passed = sum(1 for r in cat_results if r["status"] == "PASS")
total = len(cat_results)
print(f"\n{category.upper()}: {passed}/{total} passed")
for r in cat_results:
status_icon = "" if r["status"] == "PASS" else ""
print(f" {status_icon} {r['scenario']} ({r['duration_seconds']:.1f}s)")
total_passed = sum(1 for r in results if r["status"] == "PASS")
total = len(results)
print(f"\nOVERALL: {total_passed}/{total} scenarios passed ({total_passed/total*100:.1f}%)")
# Save detailed results
results_file = working_dir / "test_results.json"
with open(results_file, "w") as f:
json.dump(results, f, indent=2)
print(f"\nDetailed results saved to: {results_file}")
return total_passed == total
if __name__ == "__main__":
import sys
success = asyncio.run(run_all_scenarios())
sys.exit(0 if success else 1)

View File

@@ -1,140 +0,0 @@
#!/usr/bin/env python
"""Test script for Crawl4AI tools - tests tools directly without the agent."""
import asyncio
import json
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
async def test_quick_crawl():
"""Test quick_crawl tool logic directly."""
print("\n" + "="*60)
print("TEST 1: Quick Crawl - Markdown Format")
print("="*60)
crawler_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=crawler_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success}")
print(f"URL: {result.url}")
# Handle markdown - can be string or MarkdownGenerationResult object
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
else:
markdown_content = str(result.markdown)
print(f"Markdown type: {type(result.markdown)}")
print(f"Markdown length: {len(markdown_content)}")
print(f"Markdown preview:\n{markdown_content[:300]}")
return result.success
async def test_session_workflow():
"""Test session-based workflow."""
print("\n" + "="*60)
print("TEST 2: Session-Based Workflow")
print("="*60)
crawler_config = BrowserConfig(headless=True, verbose=False)
# Start session
crawler = AsyncWebCrawler(config=crawler_config)
await crawler.__aenter__()
print("✓ Session started")
try:
# Navigate to URL
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"✓ Navigated to {result.url}, success: {result.success}")
# Extract data
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
else:
markdown_content = str(result.markdown)
print(f"✓ Extracted {len(markdown_content)} chars of markdown")
print(f" Preview: {markdown_content[:200]}")
# Screenshot test - need to re-fetch with screenshot enabled
screenshot_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, screenshot=True)
result2 = await crawler.arun(url=result.url, config=screenshot_config)
print(f"✓ Screenshot captured: {result2.screenshot is not None}")
return True
finally:
# Close session
await crawler.__aexit__(None, None, None)
print("✓ Session closed")
async def test_html_format():
"""Test HTML output format."""
print("\n" + "="*60)
print("TEST 3: Quick Crawl - HTML Format")
print("="*60)
crawler_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=crawler_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success}")
print(f"HTML length: {len(result.html)}")
print(f"HTML preview:\n{result.html[:300]}")
return result.success
async def main():
"""Run all tests."""
print("\n" + "="*70)
print(" CRAWL4AI TOOLS TEST SUITE")
print("="*70)
tests = [
("Quick Crawl (Markdown)", test_quick_crawl),
("Session Workflow", test_session_workflow),
("Quick Crawl (HTML)", test_html_format),
]
results = []
for name, test_func in tests:
try:
result = await test_func()
results.append((name, result, None))
except Exception as e:
results.append((name, False, str(e)))
# Summary
print("\n" + "="*70)
print(" TEST SUMMARY")
print("="*70)
for name, success, error in results:
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status} - {name}")
if error:
print(f" Error: {error}")
total = len(results)
passed = sum(1 for _, success, _ in results if success)
print(f"\nTotal: {total} | Passed: {passed} | Failed: {total - passed}")
return all(success for _, success, _ in results)
if __name__ == "__main__":
success = asyncio.run(main())
exit(0 if success else 1)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,241 @@
# Crawl4AI Docker Memory & Pool Optimization - Implementation Log
## Critical Issues Identified
### Memory Management
- **Host vs Container**: `psutil.virtual_memory()` reported host memory, not container limits
- **Browser Pooling**: No pool reuse - every endpoint created new browsers
- **Warmup Waste**: Permanent browser sat idle with mismatched config signature
- **Idle Cleanup**: 30min TTL too long, janitor ran every 60s
- **Endpoint Inconsistency**: 75% of endpoints bypassed pool (`/md`, `/html`, `/screenshot`, `/pdf`, `/execute_js`, `/llm`)
### Pool Design Flaws
- **Config Mismatch**: Permanent browser used `config.yml` args, endpoints used empty `BrowserConfig()`
- **Logging Level**: Pool hit markers at DEBUG, invisible with INFO logging
## Implementation Changes
### 1. Container-Aware Memory Detection (`utils.py`)
```python
def get_container_memory_percent() -> float:
# Try cgroup v2 → v1 → fallback to psutil
# Reads /sys/fs/cgroup/memory.{current,max} OR memory/memory.{usage,limit}_in_bytes
```
### 2. Smart Browser Pool (`crawler_pool.py`)
**3-Tier System:**
- **PERMANENT**: Always-ready default browser (never cleaned)
- **HOT_POOL**: Configs used 3+ times (longer TTL)
- **COLD_POOL**: New/rare configs (short TTL)
**Key Functions:**
- `get_crawler(cfg)`: Check permanent → hot → cold → create new
- `init_permanent(cfg)`: Initialize permanent at startup
- `janitor()`: Adaptive cleanup (10s/30s/60s intervals based on memory)
- `_sig(cfg)`: SHA1 hash of config dict for pool keys
**Logging Fix**: Changed `logger.debug()``logger.info()` for pool hits
### 3. Endpoint Unification
**Helper Function** (`server.py`):
```python
def get_default_browser_config() -> BrowserConfig:
return BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
)
```
**Migrated Endpoints:**
- `/html`, `/screenshot`, `/pdf`, `/execute_js` → use `get_default_browser_config()`
- `handle_llm_qa()`, `handle_markdown_request()` → same
**Result**: All endpoints now hit permanent browser pool
### 4. Config Updates (`config.yml`)
- `idle_ttl_sec: 1800``300` (30min → 5min base TTL)
- `port: 11234``11235` (fixed mismatch with Gunicorn)
### 5. Lifespan Fix (`server.py`)
```python
await init_permanent(BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
))
```
Permanent browser now matches endpoint config signatures
## Test Results
### Test 1: Basic Health
- 10 requests to `/health`
- **Result**: 100% success, avg 3ms latency
- **Baseline**: Container starts in ~5s, 270 MB idle
### Test 2: Memory Monitoring
- 20 requests with Docker stats tracking
- **Result**: 100% success, no memory leak (-0.2 MB delta)
- **Baseline**: 269.7 MB container overhead
### Test 3: Pool Validation
- 30 requests to `/html` endpoint
- **Result**: **100% permanent browser hits**, 0 new browsers created
- **Memory**: 287 MB baseline → 396 MB active (+109 MB)
- **Latency**: Avg 4s (includes network to httpbin.org)
### Test 4: Concurrent Load
- Light (10) → Medium (50) → Heavy (100) concurrent
- **Total**: 320 requests
- **Result**: 100% success, **320/320 permanent hits**, 0 new browsers
- **Memory**: 269 MB → peak 1533 MB → final 993 MB
- **Latency**: P99 at 100 concurrent = 34s (expected with single browser)
### Test 5: Pool Stress (Mixed Configs)
- 20 requests with 4 different viewport configs
- **Result**: 4 new browsers, 4 cold hits, **4 promotions to hot**, 8 hot hits
- **Reuse Rate**: 60% (12 pool hits / 20 requests)
- **Memory**: 270 MB → 928 MB peak (+658 MB = ~165 MB per browser)
- **Proves**: Cold → hot promotion at 3 uses working perfectly
### Test 6: Multi-Endpoint
- 10 requests each: `/html`, `/screenshot`, `/pdf`, `/crawl`
- **Result**: 100% success across all 4 endpoints
- **Latency**: 5-8s avg (PDF slowest at 7.2s)
### Test 7: Cleanup Verification
- 20 requests (load spike) → 90s idle
- **Memory**: 269 MB → peak 1107 MB → final 780 MB
- **Recovery**: 327 MB (39%) - partial cleanup
- **Note**: Hot pool browsers persist (by design), janitor working correctly
## Performance Metrics
| Metric | Before | After | Improvement |
|--------|--------|-------|-------------|
| Pool Reuse | 0% | 100% (default config) | ∞ |
| Memory Leak | Unknown | 0 MB/cycle | Stable |
| Browser Reuse | No | Yes | ~3-5s saved per request |
| Idle Memory | 500-700 MB × N | 270-400 MB | 10x reduction |
| Concurrent Capacity | ~20 | 100+ | 5x |
## Key Learnings
1. **Config Signature Matching**: Permanent browser MUST match endpoint default config exactly (SHA1 hash)
2. **Logging Levels**: Pool diagnostics need INFO level, not DEBUG
3. **Memory in Docker**: Must read cgroup files, not host metrics
4. **Janitor Timing**: 60s interval adequate, but TTLs should be short (5min) for cold pool
5. **Hot Promotion**: 3-use threshold works well for production patterns
6. **Memory Per Browser**: ~150-200 MB per Chromium instance with headless + text_mode
## Test Infrastructure
**Location**: `deploy/docker/tests/`
**Dependencies**: `httpx`, `docker` (Python SDK)
**Pattern**: Sequential build - each test adds one capability
**Files**:
- `test_1_basic.py`: Health check + container lifecycle
- `test_2_memory.py`: + Docker stats monitoring
- `test_3_pool.py`: + Log analysis for pool markers
- `test_4_concurrent.py`: + asyncio.Semaphore for concurrency control
- `test_5_pool_stress.py`: + Config variants (viewports)
- `test_6_multi_endpoint.py`: + Multiple endpoint testing
- `test_7_cleanup.py`: + Time-series memory tracking for janitor
**Run Pattern**:
```bash
cd deploy/docker/tests
pip install -r requirements.txt
# Rebuild after code changes:
cd /path/to/repo && docker buildx build -t crawl4ai-local:latest --load .
# Run test:
python test_N_name.py
```
## Architecture Decisions
**Why Permanent Browser?**
- 90% of requests use default config → single browser serves most traffic
- Eliminates 3-5s startup overhead per request
**Why 3-Tier Pool?**
- Permanent: Zero cost for common case
- Hot: Amortized cost for frequent variants
- Cold: Lazy allocation for rare configs
**Why Adaptive Janitor?**
- Memory pressure triggers aggressive cleanup
- Low memory allows longer TTLs for better reuse
**Why Not Close After Each Request?**
- Browser startup: 3-5s overhead
- Pool reuse: <100ms overhead
- Net: 30-50x faster
## Future Optimizations
1. **Request Queuing**: When at capacity, queue instead of reject
2. **Pre-warming**: Predict common configs, pre-create browsers
3. **Metrics Export**: Prometheus metrics for pool efficiency
4. **Config Normalization**: Group similar viewports (e.g., 1920±50 → 1920)
## Critical Code Paths
**Browser Acquisition** (`crawler_pool.py:34-78`):
```
get_crawler(cfg) →
_sig(cfg) →
if sig == DEFAULT_CONFIG_SIG → PERMANENT
elif sig in HOT_POOL → HOT_POOL[sig]
elif sig in COLD_POOL → promote if count >= 3
else → create new in COLD_POOL
```
**Janitor Loop** (`crawler_pool.py:107-146`):
```
while True:
mem% = get_container_memory_percent()
if mem% > 80: interval=10s, cold_ttl=30s
elif mem% > 60: interval=30s, cold_ttl=60s
else: interval=60s, cold_ttl=300s
sleep(interval)
close idle browsers (COLD then HOT)
```
**Endpoint Pattern** (`server.py` example):
```python
@app.post("/html")
async def generate_html(...):
from crawler_pool import get_crawler
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
# No crawler.close() - returned to pool
```
## Debugging Tips
**Check Pool Activity**:
```bash
docker logs crawl4ai-test | grep -E "(🔥|♨️|❄️|🆕|⬆️)"
```
**Verify Config Signature**:
```python
from crawl4ai import BrowserConfig
import json, hashlib
cfg = BrowserConfig(...)
sig = hashlib.sha1(json.dumps(cfg.to_dict(), sort_keys=True).encode()).hexdigest()
print(sig[:8]) # Compare with logs
```
**Monitor Memory**:
```bash
docker stats crawl4ai-test
```
## Known Limitations
- **Mac Docker Stats**: CPU metrics unreliable, memory works
- **PDF Generation**: Slowest endpoint (~7s), no optimization yet
- **Hot Pool Persistence**: May hold memory longer than needed (trade-off for performance)
- **Janitor Lag**: Up to 60s before cleanup triggers in low-memory scenarios

View File

@@ -66,6 +66,7 @@ async def handle_llm_qa(
config: dict
) -> str:
"""Process QA using LLM with crawled content as context."""
from crawler_pool import get_crawler
try:
if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")):
url = 'https://' + url
@@ -74,15 +75,21 @@ async def handle_llm_qa(
if last_q_index != -1:
url = url[:last_q_index]
# Get markdown content
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url)
if not result.success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message
)
content = result.markdown.fit_markdown or result.markdown.raw_markdown
# Get markdown content (use default config)
from utils import load_config
cfg = load_config()
browser_cfg = BrowserConfig(
extra_args=cfg["crawler"]["browser"].get("extra_args", []),
**cfg["crawler"]["browser"].get("kwargs", {}),
)
crawler = await get_crawler(browser_cfg)
result = await crawler.arun(url)
if not result.success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message
)
content = result.markdown.fit_markdown or result.markdown.raw_markdown
# Create prompt and get LLM response
prompt = f"""Use the following content as context to answer the question.
@@ -224,25 +231,32 @@ async def handle_markdown_request(
cache_mode = CacheMode.ENABLED if cache == "1" else CacheMode.WRITE_ONLY
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url=decoded_url,
config=CrawlerRunConfig(
markdown_generator=md_generator,
scraping_strategy=LXMLWebScrapingStrategy(),
cache_mode=cache_mode
)
from crawler_pool import get_crawler
from utils import load_config as _load_config
_cfg = _load_config()
browser_cfg = BrowserConfig(
extra_args=_cfg["crawler"]["browser"].get("extra_args", []),
**_cfg["crawler"]["browser"].get("kwargs", {}),
)
crawler = await get_crawler(browser_cfg)
result = await crawler.arun(
url=decoded_url,
config=CrawlerRunConfig(
markdown_generator=md_generator,
scraping_strategy=LXMLWebScrapingStrategy(),
cache_mode=cache_mode
)
if not result.success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message
)
)
return (result.markdown.raw_markdown
if filter_type == FilterType.RAW
else result.markdown.fit_markdown)
if not result.success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message
)
return (result.markdown.raw_markdown
if filter_type == FilterType.RAW
else result.markdown.fit_markdown)
except Exception as e:
logger.error(f"Markdown error: {str(e)}", exc_info=True)
@@ -446,12 +460,22 @@ async def handle_crawl_request(
hooks_config: Optional[dict] = None
) -> dict:
"""Handle non-streaming crawl requests with optional hooks."""
# Track request start
request_id = f"req_{uuid4().hex[:8]}"
try:
from monitor import get_monitor
await get_monitor().track_request_start(
request_id, "/crawl", urls[0] if urls else "batch", browser_config
)
except:
pass # Monitor not critical
start_mem_mb = _get_memory_mb() # <--- Get memory before
start_time = time.time()
mem_delta_mb = None
peak_mem_mb = start_mem_mb
hook_manager = None
try:
urls = [('https://' + url) if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")) else url for url in urls]
browser_config = BrowserConfig.load(browser_config)
@@ -556,7 +580,16 @@ async def handle_crawl_request(
"server_memory_delta_mb": mem_delta_mb,
"server_peak_memory_mb": peak_mem_mb
}
# Track request completion
try:
from monitor import get_monitor
await get_monitor().track_request_end(
request_id, success=True, pool_hit=True, status_code=200
)
except:
pass
# Add hooks information if hooks were used
if hooks_config and hook_manager:
from hook_manager import UserHookManager
@@ -585,6 +618,16 @@ async def handle_crawl_request(
except Exception as e:
logger.error(f"Crawl error: {str(e)}", exc_info=True)
# Track request error
try:
from monitor import get_monitor
await get_monitor().track_request_end(
request_id, success=False, error=str(e), status_code=500
)
except:
pass
if 'crawler' in locals() and crawler.ready: # Check if crawler was initialized and started
# try:
# await crawler.close()

View File

@@ -3,7 +3,7 @@ app:
title: "Crawl4AI API"
version: "1.0.0"
host: "0.0.0.0"
port: 11234
port: 11235
reload: False
workers: 1
timeout_keep_alive: 300
@@ -61,7 +61,7 @@ crawler:
batch_process: 300.0 # Timeout for batch processing
pool:
max_pages: 40 # ← GLOBAL_SEM permits
idle_ttl_sec: 1800 # ← 30 min janitor cutoff
idle_ttl_sec: 300 # ← 30 min janitor cutoff
browser:
kwargs:
headless: true

View File

@@ -1,60 +1,170 @@
# crawler_pool.py (new file)
import asyncio, json, hashlib, time, psutil
# crawler_pool.py - Smart browser pool with tiered management
import asyncio, json, hashlib, time
from contextlib import suppress
from typing import Dict
from typing import Dict, Optional
from crawl4ai import AsyncWebCrawler, BrowserConfig
from typing import Dict
from utils import load_config
from utils import load_config, get_container_memory_percent
import logging
logger = logging.getLogger(__name__)
CONFIG = load_config()
POOL: Dict[str, AsyncWebCrawler] = {}
# Pool tiers
PERMANENT: Optional[AsyncWebCrawler] = None # Always-ready default browser
HOT_POOL: Dict[str, AsyncWebCrawler] = {} # Frequent configs
COLD_POOL: Dict[str, AsyncWebCrawler] = {} # Rare configs
LAST_USED: Dict[str, float] = {}
USAGE_COUNT: Dict[str, int] = {}
LOCK = asyncio.Lock()
MEM_LIMIT = CONFIG.get("crawler", {}).get("memory_threshold_percent", 95.0) # % RAM refuse new browsers above this
IDLE_TTL = CONFIG.get("crawler", {}).get("pool", {}).get("idle_ttl_sec", 1800) # close if unused for 30min
# Config
MEM_LIMIT = CONFIG.get("crawler", {}).get("memory_threshold_percent", 95.0)
BASE_IDLE_TTL = CONFIG.get("crawler", {}).get("pool", {}).get("idle_ttl_sec", 300)
DEFAULT_CONFIG_SIG = None # Cached sig for default config
def _sig(cfg: BrowserConfig) -> str:
"""Generate config signature."""
payload = json.dumps(cfg.to_dict(), sort_keys=True, separators=(",",":"))
return hashlib.sha1(payload.encode()).hexdigest()
def _is_default_config(sig: str) -> bool:
"""Check if config matches default."""
return sig == DEFAULT_CONFIG_SIG
async def get_crawler(cfg: BrowserConfig) -> AsyncWebCrawler:
try:
sig = _sig(cfg)
async with LOCK:
if sig in POOL:
LAST_USED[sig] = time.time();
return POOL[sig]
if psutil.virtual_memory().percent >= MEM_LIMIT:
raise MemoryError("RAM pressure new browser denied")
crawler = AsyncWebCrawler(config=cfg, thread_safe=False)
await crawler.start()
POOL[sig] = crawler; LAST_USED[sig] = time.time()
return crawler
except MemoryError as e:
raise MemoryError(f"RAM pressure new browser denied: {e}")
except Exception as e:
raise RuntimeError(f"Failed to start browser: {e}")
finally:
if sig in POOL:
LAST_USED[sig] = time.time()
else:
# If we failed to start the browser, we should remove it from the pool
POOL.pop(sig, None)
LAST_USED.pop(sig, None)
# If we failed to start the browser, we should remove it from the pool
async def close_all():
"""Get crawler from pool with tiered strategy."""
sig = _sig(cfg)
async with LOCK:
await asyncio.gather(*(c.close() for c in POOL.values()), return_exceptions=True)
POOL.clear(); LAST_USED.clear()
# Check permanent browser for default config
if PERMANENT and _is_default_config(sig):
LAST_USED[sig] = time.time()
USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1
logger.info("🔥 Using permanent browser")
return PERMANENT
# Check hot pool
if sig in HOT_POOL:
LAST_USED[sig] = time.time()
USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1
logger.info(f"♨️ Using hot pool browser (sig={sig[:8]})")
return HOT_POOL[sig]
# Check cold pool (promote to hot if used 3+ times)
if sig in COLD_POOL:
LAST_USED[sig] = time.time()
USAGE_COUNT[sig] = USAGE_COUNT.get(sig, 0) + 1
if USAGE_COUNT[sig] >= 3:
logger.info(f"⬆️ Promoting to hot pool (sig={sig[:8]}, count={USAGE_COUNT[sig]})")
HOT_POOL[sig] = COLD_POOL.pop(sig)
# Track promotion in monitor
try:
from monitor import get_monitor
await get_monitor().track_janitor_event("promote", sig, {"count": USAGE_COUNT[sig]})
except:
pass
return HOT_POOL[sig]
logger.info(f"❄️ Using cold pool browser (sig={sig[:8]})")
return COLD_POOL[sig]
# Memory check before creating new
mem_pct = get_container_memory_percent()
if mem_pct >= MEM_LIMIT:
logger.error(f"💥 Memory pressure: {mem_pct:.1f}% >= {MEM_LIMIT}%")
raise MemoryError(f"Memory at {mem_pct:.1f}%, refusing new browser")
# Create new in cold pool
logger.info(f"🆕 Creating new browser in cold pool (sig={sig[:8]}, mem={mem_pct:.1f}%)")
crawler = AsyncWebCrawler(config=cfg, thread_safe=False)
await crawler.start()
COLD_POOL[sig] = crawler
LAST_USED[sig] = time.time()
USAGE_COUNT[sig] = 1
return crawler
async def init_permanent(cfg: BrowserConfig):
"""Initialize permanent default browser."""
global PERMANENT, DEFAULT_CONFIG_SIG
async with LOCK:
if PERMANENT:
return
DEFAULT_CONFIG_SIG = _sig(cfg)
logger.info("🔥 Creating permanent default browser")
PERMANENT = AsyncWebCrawler(config=cfg, thread_safe=False)
await PERMANENT.start()
LAST_USED[DEFAULT_CONFIG_SIG] = time.time()
USAGE_COUNT[DEFAULT_CONFIG_SIG] = 0
async def close_all():
"""Close all browsers."""
async with LOCK:
tasks = []
if PERMANENT:
tasks.append(PERMANENT.close())
tasks.extend([c.close() for c in HOT_POOL.values()])
tasks.extend([c.close() for c in COLD_POOL.values()])
await asyncio.gather(*tasks, return_exceptions=True)
HOT_POOL.clear()
COLD_POOL.clear()
LAST_USED.clear()
USAGE_COUNT.clear()
async def janitor():
"""Adaptive cleanup based on memory pressure."""
while True:
await asyncio.sleep(60)
mem_pct = get_container_memory_percent()
# Adaptive intervals and TTLs
if mem_pct > 80:
interval, cold_ttl, hot_ttl = 10, 30, 120
elif mem_pct > 60:
interval, cold_ttl, hot_ttl = 30, 60, 300
else:
interval, cold_ttl, hot_ttl = 60, BASE_IDLE_TTL, BASE_IDLE_TTL * 2
await asyncio.sleep(interval)
now = time.time()
async with LOCK:
for sig, crawler in list(POOL.items()):
if now - LAST_USED[sig] > IDLE_TTL:
with suppress(Exception): await crawler.close()
POOL.pop(sig, None); LAST_USED.pop(sig, None)
# Clean cold pool
for sig in list(COLD_POOL.keys()):
if now - LAST_USED.get(sig, now) > cold_ttl:
idle_time = now - LAST_USED[sig]
logger.info(f"🧹 Closing cold browser (sig={sig[:8]}, idle={idle_time:.0f}s)")
with suppress(Exception):
await COLD_POOL[sig].close()
COLD_POOL.pop(sig, None)
LAST_USED.pop(sig, None)
USAGE_COUNT.pop(sig, None)
# Track in monitor
try:
from monitor import get_monitor
await get_monitor().track_janitor_event("close_cold", sig, {"idle_seconds": int(idle_time), "ttl": cold_ttl})
except:
pass
# Clean hot pool (more conservative)
for sig in list(HOT_POOL.keys()):
if now - LAST_USED.get(sig, now) > hot_ttl:
idle_time = now - LAST_USED[sig]
logger.info(f"🧹 Closing hot browser (sig={sig[:8]}, idle={idle_time:.0f}s)")
with suppress(Exception):
await HOT_POOL[sig].close()
HOT_POOL.pop(sig, None)
LAST_USED.pop(sig, None)
USAGE_COUNT.pop(sig, None)
# Track in monitor
try:
from monitor import get_monitor
await get_monitor().track_janitor_event("close_hot", sig, {"idle_seconds": int(idle_time), "ttl": hot_ttl})
except:
pass
# Log pool stats
if mem_pct > 60:
logger.info(f"📊 Pool: hot={len(HOT_POOL)}, cold={len(COLD_POOL)}, mem={mem_pct:.1f}%")

382
deploy/docker/monitor.py Normal file
View File

@@ -0,0 +1,382 @@
# monitor.py - Real-time monitoring stats with Redis persistence
import time
import json
import asyncio
from typing import Dict, List, Optional
from datetime import datetime, timezone
from collections import deque
from redis import asyncio as aioredis
from utils import get_container_memory_percent
import psutil
import logging
logger = logging.getLogger(__name__)
class MonitorStats:
"""Tracks real-time server stats with Redis persistence."""
def __init__(self, redis: aioredis.Redis):
self.redis = redis
self.start_time = time.time()
# In-memory queues (fast reads, Redis backup)
self.active_requests: Dict[str, Dict] = {} # id -> request info
self.completed_requests: deque = deque(maxlen=100) # Last 100
self.janitor_events: deque = deque(maxlen=100)
self.errors: deque = deque(maxlen=100)
# Endpoint stats (persisted in Redis)
self.endpoint_stats: Dict[str, Dict] = {} # endpoint -> {count, total_time, errors, ...}
# Background persistence queue (max 10 pending persist requests)
self._persist_queue: asyncio.Queue = asyncio.Queue(maxsize=10)
self._persist_worker_task: Optional[asyncio.Task] = None
# Timeline data (5min window, 5s resolution = 60 points)
self.memory_timeline: deque = deque(maxlen=60)
self.requests_timeline: deque = deque(maxlen=60)
self.browser_timeline: deque = deque(maxlen=60)
async def track_request_start(self, request_id: str, endpoint: str, url: str, config: Dict = None):
"""Track new request start."""
req_info = {
"id": request_id,
"endpoint": endpoint,
"url": url[:100], # Truncate long URLs
"start_time": time.time(),
"config_sig": config.get("sig", "default") if config else "default",
"mem_start": psutil.Process().memory_info().rss / (1024 * 1024)
}
self.active_requests[request_id] = req_info
# Increment endpoint counter
if endpoint not in self.endpoint_stats:
self.endpoint_stats[endpoint] = {
"count": 0, "total_time": 0, "errors": 0,
"pool_hits": 0, "success": 0
}
self.endpoint_stats[endpoint]["count"] += 1
# Queue persistence (handled by background worker)
try:
self._persist_queue.put_nowait(True)
except asyncio.QueueFull:
logger.warning("Persistence queue full, skipping")
async def track_request_end(self, request_id: str, success: bool, error: str = None,
pool_hit: bool = True, status_code: int = 200):
"""Track request completion."""
if request_id not in self.active_requests:
return
req_info = self.active_requests.pop(request_id)
end_time = time.time()
elapsed = end_time - req_info["start_time"]
mem_end = psutil.Process().memory_info().rss / (1024 * 1024)
mem_delta = mem_end - req_info["mem_start"]
# Update stats
endpoint = req_info["endpoint"]
if endpoint in self.endpoint_stats:
self.endpoint_stats[endpoint]["total_time"] += elapsed
if success:
self.endpoint_stats[endpoint]["success"] += 1
else:
self.endpoint_stats[endpoint]["errors"] += 1
if pool_hit:
self.endpoint_stats[endpoint]["pool_hits"] += 1
# Add to completed queue
completed = {
**req_info,
"end_time": end_time,
"elapsed": round(elapsed, 2),
"mem_delta": round(mem_delta, 1),
"success": success,
"error": error,
"status_code": status_code,
"pool_hit": pool_hit
}
self.completed_requests.append(completed)
# Track errors
if not success and error:
self.errors.append({
"timestamp": end_time,
"endpoint": endpoint,
"url": req_info["url"],
"error": error,
"request_id": request_id
})
await self._persist_endpoint_stats()
async def track_janitor_event(self, event_type: str, sig: str, details: Dict):
"""Track janitor cleanup events."""
self.janitor_events.append({
"timestamp": time.time(),
"type": event_type, # "close_cold", "close_hot", "promote"
"sig": sig[:8],
"details": details
})
def _cleanup_old_entries(self, max_age_seconds: int = 300):
"""Remove entries older than max_age_seconds (default 5min)."""
now = time.time()
cutoff = now - max_age_seconds
# Clean completed requests
while self.completed_requests and self.completed_requests[0].get("end_time", 0) < cutoff:
self.completed_requests.popleft()
# Clean janitor events
while self.janitor_events and self.janitor_events[0].get("timestamp", 0) < cutoff:
self.janitor_events.popleft()
# Clean errors
while self.errors and self.errors[0].get("timestamp", 0) < cutoff:
self.errors.popleft()
async def update_timeline(self):
"""Update timeline data points (called every 5s)."""
now = time.time()
mem_pct = get_container_memory_percent()
# Clean old entries (keep last 5 minutes)
self._cleanup_old_entries(max_age_seconds=300)
# Count requests in last 5s
recent_reqs = sum(1 for req in self.completed_requests
if now - req.get("end_time", 0) < 5)
# Browser counts (acquire lock to prevent race conditions)
from crawler_pool import PERMANENT, HOT_POOL, COLD_POOL, LOCK
async with LOCK:
browser_count = {
"permanent": 1 if PERMANENT else 0,
"hot": len(HOT_POOL),
"cold": len(COLD_POOL)
}
self.memory_timeline.append({"time": now, "value": mem_pct})
self.requests_timeline.append({"time": now, "value": recent_reqs})
self.browser_timeline.append({"time": now, "browsers": browser_count})
async def _persist_endpoint_stats(self):
"""Persist endpoint stats to Redis."""
try:
await self.redis.set(
"monitor:endpoint_stats",
json.dumps(self.endpoint_stats),
ex=86400 # 24h TTL
)
except Exception as e:
logger.warning(f"Failed to persist endpoint stats: {e}")
async def _persistence_worker(self):
"""Background worker to persist stats to Redis."""
while True:
try:
await self._persist_queue.get()
await self._persist_endpoint_stats()
self._persist_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Persistence worker error: {e}")
def start_persistence_worker(self):
"""Start the background persistence worker."""
if not self._persist_worker_task:
self._persist_worker_task = asyncio.create_task(self._persistence_worker())
logger.info("Started persistence worker")
async def stop_persistence_worker(self):
"""Stop the background persistence worker."""
if self._persist_worker_task:
self._persist_worker_task.cancel()
try:
await self._persist_worker_task
except asyncio.CancelledError:
pass
self._persist_worker_task = None
logger.info("Stopped persistence worker")
async def cleanup(self):
"""Cleanup on shutdown - persist final stats and stop workers."""
logger.info("Monitor cleanup starting...")
try:
# Persist final stats before shutdown
await self._persist_endpoint_stats()
# Stop background worker
await self.stop_persistence_worker()
logger.info("Monitor cleanup completed")
except Exception as e:
logger.error(f"Monitor cleanup error: {e}")
async def load_from_redis(self):
"""Load persisted stats from Redis."""
try:
data = await self.redis.get("monitor:endpoint_stats")
if data:
self.endpoint_stats = json.loads(data)
logger.info("Loaded endpoint stats from Redis")
except Exception as e:
logger.warning(f"Failed to load from Redis: {e}")
async def get_health_summary(self) -> Dict:
"""Get current system health snapshot."""
mem_pct = get_container_memory_percent()
cpu_pct = psutil.cpu_percent(interval=0.1)
# Network I/O (delta since last call)
net = psutil.net_io_counters()
# Pool status (acquire lock to prevent race conditions)
from crawler_pool import PERMANENT, HOT_POOL, COLD_POOL, LOCK
async with LOCK:
# TODO: Track actual browser process memory instead of estimates
# These are conservative estimates based on typical Chromium usage
permanent_mem = 270 if PERMANENT else 0 # Estimate: ~270MB for permanent browser
hot_mem = len(HOT_POOL) * 180 # Estimate: ~180MB per hot pool browser
cold_mem = len(COLD_POOL) * 180 # Estimate: ~180MB per cold pool browser
permanent_active = PERMANENT is not None
hot_count = len(HOT_POOL)
cold_count = len(COLD_POOL)
return {
"container": {
"memory_percent": round(mem_pct, 1),
"cpu_percent": round(cpu_pct, 1),
"network_sent_mb": round(net.bytes_sent / (1024**2), 2),
"network_recv_mb": round(net.bytes_recv / (1024**2), 2),
"uptime_seconds": int(time.time() - self.start_time)
},
"pool": {
"permanent": {"active": permanent_active, "memory_mb": permanent_mem},
"hot": {"count": hot_count, "memory_mb": hot_mem},
"cold": {"count": cold_count, "memory_mb": cold_mem},
"total_memory_mb": permanent_mem + hot_mem + cold_mem
},
"janitor": {
"next_cleanup_estimate": "adaptive", # Would need janitor state
"memory_pressure": "LOW" if mem_pct < 60 else "MEDIUM" if mem_pct < 80 else "HIGH"
}
}
def get_active_requests(self) -> List[Dict]:
"""Get list of currently active requests."""
now = time.time()
return [
{
**req,
"elapsed": round(now - req["start_time"], 1),
"status": "running"
}
for req in self.active_requests.values()
]
def get_completed_requests(self, limit: int = 50, filter_status: str = "all") -> List[Dict]:
"""Get recent completed requests."""
requests = list(self.completed_requests)[-limit:]
if filter_status == "success":
requests = [r for r in requests if r.get("success")]
elif filter_status == "error":
requests = [r for r in requests if not r.get("success")]
return requests
async def get_browser_list(self) -> List[Dict]:
"""Get detailed browser pool information."""
from crawler_pool import PERMANENT, HOT_POOL, COLD_POOL, LAST_USED, USAGE_COUNT, DEFAULT_CONFIG_SIG, LOCK
browsers = []
now = time.time()
# Acquire lock to prevent race conditions during iteration
async with LOCK:
if PERMANENT:
browsers.append({
"type": "permanent",
"sig": DEFAULT_CONFIG_SIG[:8] if DEFAULT_CONFIG_SIG else "unknown",
"age_seconds": int(now - self.start_time),
"last_used_seconds": int(now - LAST_USED.get(DEFAULT_CONFIG_SIG, now)),
"memory_mb": 270,
"hits": USAGE_COUNT.get(DEFAULT_CONFIG_SIG, 0),
"killable": False
})
for sig, crawler in HOT_POOL.items():
browsers.append({
"type": "hot",
"sig": sig[:8],
"age_seconds": int(now - self.start_time), # Approximation
"last_used_seconds": int(now - LAST_USED.get(sig, now)),
"memory_mb": 180, # Estimate
"hits": USAGE_COUNT.get(sig, 0),
"killable": True
})
for sig, crawler in COLD_POOL.items():
browsers.append({
"type": "cold",
"sig": sig[:8],
"age_seconds": int(now - self.start_time),
"last_used_seconds": int(now - LAST_USED.get(sig, now)),
"memory_mb": 180,
"hits": USAGE_COUNT.get(sig, 0),
"killable": True
})
return browsers
def get_endpoint_stats_summary(self) -> Dict[str, Dict]:
"""Get aggregated endpoint statistics."""
summary = {}
for endpoint, stats in self.endpoint_stats.items():
count = stats["count"]
avg_time = (stats["total_time"] / count) if count > 0 else 0
success_rate = (stats["success"] / count * 100) if count > 0 else 0
pool_hit_rate = (stats["pool_hits"] / count * 100) if count > 0 else 0
summary[endpoint] = {
"count": count,
"avg_latency_ms": round(avg_time * 1000, 1),
"success_rate_percent": round(success_rate, 1),
"pool_hit_rate_percent": round(pool_hit_rate, 1),
"errors": stats["errors"]
}
return summary
def get_timeline_data(self, metric: str, window: str = "5m") -> Dict:
"""Get timeline data for charts."""
# For now, only 5m window supported
if metric == "memory":
data = list(self.memory_timeline)
elif metric == "requests":
data = list(self.requests_timeline)
elif metric == "browsers":
data = list(self.browser_timeline)
else:
return {"timestamps": [], "values": []}
return {
"timestamps": [int(d["time"]) for d in data],
"values": [d.get("value", d.get("browsers")) for d in data]
}
def get_janitor_log(self, limit: int = 100) -> List[Dict]:
"""Get recent janitor events."""
return list(self.janitor_events)[-limit:]
def get_errors_log(self, limit: int = 100) -> List[Dict]:
"""Get recent errors."""
return list(self.errors)[-limit:]
# Global instance (initialized in server.py)
monitor_stats: Optional[MonitorStats] = None
def get_monitor() -> MonitorStats:
"""Get global monitor instance."""
if monitor_stats is None:
raise RuntimeError("Monitor not initialized")
return monitor_stats

View File

@@ -0,0 +1,405 @@
# monitor_routes.py - Monitor API endpoints
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from typing import Optional
from monitor import get_monitor
import logging
import asyncio
import json
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/monitor", tags=["monitor"])
@router.get("/health")
async def get_health():
"""Get current system health snapshot."""
try:
monitor = get_monitor()
return await monitor.get_health_summary()
except Exception as e:
logger.error(f"Error getting health: {e}")
raise HTTPException(500, str(e))
@router.get("/requests")
async def get_requests(status: str = "all", limit: int = 50):
"""Get active and completed requests.
Args:
status: Filter by 'active', 'completed', 'success', 'error', or 'all'
limit: Max number of completed requests to return (default 50)
"""
# Input validation
if status not in ["all", "active", "completed", "success", "error"]:
raise HTTPException(400, f"Invalid status: {status}. Must be one of: all, active, completed, success, error")
if limit < 1 or limit > 1000:
raise HTTPException(400, f"Invalid limit: {limit}. Must be between 1 and 1000")
try:
monitor = get_monitor()
if status == "active":
return {"active": monitor.get_active_requests(), "completed": []}
elif status == "completed":
return {"active": [], "completed": monitor.get_completed_requests(limit)}
elif status in ["success", "error"]:
return {"active": [], "completed": monitor.get_completed_requests(limit, status)}
else: # "all"
return {
"active": monitor.get_active_requests(),
"completed": monitor.get_completed_requests(limit)
}
except Exception as e:
logger.error(f"Error getting requests: {e}")
raise HTTPException(500, str(e))
@router.get("/browsers")
async def get_browsers():
"""Get detailed browser pool information."""
try:
monitor = get_monitor()
browsers = await monitor.get_browser_list()
# Calculate summary stats
total_browsers = len(browsers)
total_memory = sum(b["memory_mb"] for b in browsers)
# Calculate reuse rate from recent requests
recent = monitor.get_completed_requests(100)
pool_hits = sum(1 for r in recent if r.get("pool_hit", False))
reuse_rate = (pool_hits / len(recent) * 100) if recent else 0
return {
"browsers": browsers,
"summary": {
"total_count": total_browsers,
"total_memory_mb": total_memory,
"reuse_rate_percent": round(reuse_rate, 1)
}
}
except Exception as e:
logger.error(f"Error getting browsers: {e}")
raise HTTPException(500, str(e))
@router.get("/endpoints/stats")
async def get_endpoint_stats():
"""Get aggregated endpoint statistics."""
try:
monitor = get_monitor()
return monitor.get_endpoint_stats_summary()
except Exception as e:
logger.error(f"Error getting endpoint stats: {e}")
raise HTTPException(500, str(e))
@router.get("/timeline")
async def get_timeline(metric: str = "memory", window: str = "5m"):
"""Get timeline data for charts.
Args:
metric: 'memory', 'requests', or 'browsers'
window: Time window (only '5m' supported for now)
"""
# Input validation
if metric not in ["memory", "requests", "browsers"]:
raise HTTPException(400, f"Invalid metric: {metric}. Must be one of: memory, requests, browsers")
if window != "5m":
raise HTTPException(400, f"Invalid window: {window}. Only '5m' is currently supported")
try:
monitor = get_monitor()
return monitor.get_timeline_data(metric, window)
except Exception as e:
logger.error(f"Error getting timeline: {e}")
raise HTTPException(500, str(e))
@router.get("/logs/janitor")
async def get_janitor_log(limit: int = 100):
"""Get recent janitor cleanup events."""
# Input validation
if limit < 1 or limit > 1000:
raise HTTPException(400, f"Invalid limit: {limit}. Must be between 1 and 1000")
try:
monitor = get_monitor()
return {"events": monitor.get_janitor_log(limit)}
except Exception as e:
logger.error(f"Error getting janitor log: {e}")
raise HTTPException(500, str(e))
@router.get("/logs/errors")
async def get_errors_log(limit: int = 100):
"""Get recent errors."""
# Input validation
if limit < 1 or limit > 1000:
raise HTTPException(400, f"Invalid limit: {limit}. Must be between 1 and 1000")
try:
monitor = get_monitor()
return {"errors": monitor.get_errors_log(limit)}
except Exception as e:
logger.error(f"Error getting errors log: {e}")
raise HTTPException(500, str(e))
# ========== Control Actions ==========
class KillBrowserRequest(BaseModel):
sig: str
@router.post("/actions/cleanup")
async def force_cleanup():
"""Force immediate janitor cleanup (kills idle cold pool browsers)."""
try:
from crawler_pool import COLD_POOL, LAST_USED, USAGE_COUNT, LOCK
import time
from contextlib import suppress
killed_count = 0
now = time.time()
async with LOCK:
for sig in list(COLD_POOL.keys()):
# Kill all cold pool browsers immediately
logger.info(f"🧹 Force cleanup: closing cold browser (sig={sig[:8]})")
with suppress(Exception):
await COLD_POOL[sig].close()
COLD_POOL.pop(sig, None)
LAST_USED.pop(sig, None)
USAGE_COUNT.pop(sig, None)
killed_count += 1
monitor = get_monitor()
await monitor.track_janitor_event("force_cleanup", "manual", {"killed": killed_count})
return {"success": True, "killed_browsers": killed_count}
except Exception as e:
logger.error(f"Error during force cleanup: {e}")
raise HTTPException(500, str(e))
@router.post("/actions/kill_browser")
async def kill_browser(req: KillBrowserRequest):
"""Kill a specific browser by signature (hot or cold only).
Args:
sig: Browser config signature (first 8 chars)
"""
try:
from crawler_pool import HOT_POOL, COLD_POOL, LAST_USED, USAGE_COUNT, LOCK, DEFAULT_CONFIG_SIG
from contextlib import suppress
# Find full signature matching prefix
target_sig = None
pool_type = None
async with LOCK:
# Check hot pool
for sig in HOT_POOL.keys():
if sig.startswith(req.sig):
target_sig = sig
pool_type = "hot"
break
# Check cold pool
if not target_sig:
for sig in COLD_POOL.keys():
if sig.startswith(req.sig):
target_sig = sig
pool_type = "cold"
break
# Check if trying to kill permanent
if DEFAULT_CONFIG_SIG and DEFAULT_CONFIG_SIG.startswith(req.sig):
raise HTTPException(403, "Cannot kill permanent browser. Use restart instead.")
if not target_sig:
raise HTTPException(404, f"Browser with sig={req.sig} not found")
# Warn if there are active requests (browser might be in use)
monitor = get_monitor()
active_count = len(monitor.get_active_requests())
if active_count > 0:
logger.warning(f"Killing browser {target_sig[:8]} while {active_count} requests are active - may cause failures")
# Kill the browser
if pool_type == "hot":
browser = HOT_POOL.pop(target_sig)
else:
browser = COLD_POOL.pop(target_sig)
with suppress(Exception):
await browser.close()
LAST_USED.pop(target_sig, None)
USAGE_COUNT.pop(target_sig, None)
logger.info(f"🔪 Killed {pool_type} browser (sig={target_sig[:8]})")
monitor = get_monitor()
await monitor.track_janitor_event("kill_browser", target_sig, {"pool": pool_type, "manual": True})
return {"success": True, "killed_sig": target_sig[:8], "pool_type": pool_type}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error killing browser: {e}")
raise HTTPException(500, str(e))
@router.post("/actions/restart_browser")
async def restart_browser(req: KillBrowserRequest):
"""Restart a browser (kill + recreate). Works for permanent too.
Args:
sig: Browser config signature (first 8 chars), or "permanent"
"""
try:
from crawler_pool import (PERMANENT, HOT_POOL, COLD_POOL, LAST_USED,
USAGE_COUNT, LOCK, DEFAULT_CONFIG_SIG, init_permanent)
from crawl4ai import AsyncWebCrawler, BrowserConfig
from contextlib import suppress
import time
# Handle permanent browser restart
if req.sig == "permanent" or (DEFAULT_CONFIG_SIG and DEFAULT_CONFIG_SIG.startswith(req.sig)):
async with LOCK:
if PERMANENT:
with suppress(Exception):
await PERMANENT.close()
# Reinitialize permanent
from utils import load_config
config = load_config()
await init_permanent(BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
))
logger.info("🔄 Restarted permanent browser")
return {"success": True, "restarted": "permanent"}
# Handle hot/cold browser restart
target_sig = None
pool_type = None
browser_config = None
async with LOCK:
# Find browser
for sig in HOT_POOL.keys():
if sig.startswith(req.sig):
target_sig = sig
pool_type = "hot"
# Would need to reconstruct config (not stored currently)
break
if not target_sig:
for sig in COLD_POOL.keys():
if sig.startswith(req.sig):
target_sig = sig
pool_type = "cold"
break
if not target_sig:
raise HTTPException(404, f"Browser with sig={req.sig} not found")
# Kill existing
if pool_type == "hot":
browser = HOT_POOL.pop(target_sig)
else:
browser = COLD_POOL.pop(target_sig)
with suppress(Exception):
await browser.close()
# Note: We can't easily recreate with same config without storing it
# For now, just kill and let new requests create fresh ones
LAST_USED.pop(target_sig, None)
USAGE_COUNT.pop(target_sig, None)
logger.info(f"🔄 Restarted {pool_type} browser (sig={target_sig[:8]})")
monitor = get_monitor()
await monitor.track_janitor_event("restart_browser", target_sig, {"pool": pool_type})
return {"success": True, "restarted_sig": target_sig[:8], "note": "Browser will be recreated on next request"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error restarting browser: {e}")
raise HTTPException(500, str(e))
@router.post("/stats/reset")
async def reset_stats():
"""Reset today's endpoint counters."""
try:
monitor = get_monitor()
monitor.endpoint_stats.clear()
await monitor._persist_endpoint_stats()
return {"success": True, "message": "Endpoint stats reset"}
except Exception as e:
logger.error(f"Error resetting stats: {e}")
raise HTTPException(500, str(e))
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time monitoring updates.
Sends updates every 2 seconds with:
- Health stats
- Active/completed requests
- Browser pool status
- Timeline data
"""
await websocket.accept()
logger.info("WebSocket client connected")
try:
while True:
try:
# Gather all monitoring data
monitor = get_monitor()
data = {
"timestamp": asyncio.get_event_loop().time(),
"health": await monitor.get_health_summary(),
"requests": {
"active": monitor.get_active_requests(),
"completed": monitor.get_completed_requests(limit=10)
},
"browsers": await monitor.get_browser_list(),
"timeline": {
"memory": monitor.get_timeline_data("memory", "5m"),
"requests": monitor.get_timeline_data("requests", "5m"),
"browsers": monitor.get_timeline_data("browsers", "5m")
},
"janitor": monitor.get_janitor_log(limit=10),
"errors": monitor.get_errors_log(limit=10)
}
# Send update to client
await websocket.send_json(data)
# Wait 2 seconds before next update
await asyncio.sleep(2)
except WebSocketDisconnect:
logger.info("WebSocket client disconnected")
break
except Exception as e:
logger.error(f"WebSocket error: {e}", exc_info=True)
await asyncio.sleep(2) # Continue trying
except Exception as e:
logger.error(f"WebSocket connection error: {e}", exc_info=True)
finally:
logger.info("WebSocket connection closed")

View File

@@ -16,6 +16,7 @@ from fastapi import Request, Depends
from fastapi.responses import FileResponse
import base64
import re
import logging
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from api import (
handle_markdown_request, handle_llm_qa,
@@ -78,6 +79,14 @@ __version__ = "0.5.1-d1"
MAX_PAGES = config["crawler"]["pool"].get("max_pages", 30)
GLOBAL_SEM = asyncio.Semaphore(MAX_PAGES)
# ── default browser config helper ─────────────────────────────
def get_default_browser_config() -> BrowserConfig:
"""Get default BrowserConfig from config.yml."""
return BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
)
# import logging
# page_log = logging.getLogger("page_cap")
# orig_arun = AsyncWebCrawler.arun
@@ -103,15 +112,52 @@ AsyncWebCrawler.arun = capped_arun
@asynccontextmanager
async def lifespan(_: FastAPI):
await get_crawler(BrowserConfig(
from crawler_pool import init_permanent
from monitor import MonitorStats
import monitor as monitor_module
# Initialize monitor
monitor_module.monitor_stats = MonitorStats(redis)
await monitor_module.monitor_stats.load_from_redis()
monitor_module.monitor_stats.start_persistence_worker()
# Initialize browser pool
await init_permanent(BrowserConfig(
extra_args=config["crawler"]["browser"].get("extra_args", []),
**config["crawler"]["browser"].get("kwargs", {}),
)) # warmup
app.state.janitor = asyncio.create_task(janitor()) # idle GC
))
# Start background tasks
app.state.janitor = asyncio.create_task(janitor())
app.state.timeline_updater = asyncio.create_task(_timeline_updater())
yield
# Cleanup
app.state.janitor.cancel()
app.state.timeline_updater.cancel()
# Monitor cleanup (persist stats and stop workers)
from monitor import get_monitor
try:
await get_monitor().cleanup()
except Exception as e:
logger.error(f"Monitor cleanup failed: {e}")
await close_all()
async def _timeline_updater():
"""Update timeline data every 5 seconds."""
from monitor import get_monitor
while True:
await asyncio.sleep(5)
try:
await asyncio.wait_for(get_monitor().update_timeline(), timeout=4.0)
except asyncio.TimeoutError:
logger.warning("Timeline update timeout after 4s")
except Exception as e:
logger.warning(f"Timeline update error: {e}")
# ───────────────────── FastAPI instance ──────────────────────
app = FastAPI(
title=config["app"]["title"],
@@ -129,6 +175,25 @@ app.mount(
name="play",
)
# ── static monitor dashboard ────────────────────────────────
MONITOR_DIR = pathlib.Path(__file__).parent / "static" / "monitor"
if not MONITOR_DIR.exists():
raise RuntimeError(f"Monitor assets not found at {MONITOR_DIR}")
app.mount(
"/dashboard",
StaticFiles(directory=MONITOR_DIR, html=True),
name="monitor_ui",
)
# ── static assets (logo, etc) ────────────────────────────────
ASSETS_DIR = pathlib.Path(__file__).parent / "static" / "assets"
if ASSETS_DIR.exists():
app.mount(
"/static/assets",
StaticFiles(directory=ASSETS_DIR),
name="assets",
)
@app.get("/")
async def root():
@@ -212,6 +277,12 @@ def _safe_eval_config(expr: str) -> dict:
# ── job router ──────────────────────────────────────────────
app.include_router(init_job_router(redis, config, token_dep))
# ── monitor router ──────────────────────────────────────────
from monitor_routes import router as monitor_router
app.include_router(monitor_router)
logger = logging.getLogger(__name__)
# ──────────────────────── Endpoints ──────────────────────────
@app.post("/token")
async def get_token(req: TokenRequest):
@@ -266,27 +337,20 @@ async def generate_html(
Crawls the URL, preprocesses the raw HTML for schema extraction, and returns the processed HTML.
Use when you need sanitized HTML structures for building schemas or further processing.
"""
from crawler_pool import get_crawler
cfg = CrawlerRunConfig()
try:
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
# Check if the crawl was successful
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
raw_html = results[0].html
from crawl4ai.utils import preprocess_html_for_schema
processed_html = preprocess_html_for_schema(raw_html)
return JSONResponse({"html": processed_html, "url": body.url, "success": True})
except Exception as e:
# Log and raise as HTTP 500 for other exceptions
raise HTTPException(
status_code=500,
detail=str(e)
)
raise HTTPException(500, detail=str(e))
# Screenshot endpoint
@@ -304,16 +368,13 @@ async def generate_screenshot(
Use when you need an image snapshot of the rendered page. Its recommened to provide an output path to save the screenshot.
Then in result instead of the screenshot you will get a path to the saved file.
"""
from crawler_pool import get_crawler
try:
cfg = CrawlerRunConfig(
screenshot=True, screenshot_wait_for=body.screenshot_wait_for)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
cfg = CrawlerRunConfig(screenshot=True, screenshot_wait_for=body.screenshot_wait_for)
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
screenshot_data = results[0].screenshot
if body.output_path:
abs_path = os.path.abspath(body.output_path)
@@ -323,10 +384,7 @@ async def generate_screenshot(
return {"success": True, "path": abs_path}
return {"success": True, "screenshot": screenshot_data}
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
raise HTTPException(500, detail=str(e))
# PDF endpoint
@@ -344,15 +402,13 @@ async def generate_pdf(
Use when you need a printable or archivable snapshot of the page. It is recommended to provide an output path to save the PDF.
Then in result instead of the PDF you will get a path to the saved file.
"""
from crawler_pool import get_crawler
try:
cfg = CrawlerRunConfig(pdf=True)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
pdf_data = results[0].pdf
if body.output_path:
abs_path = os.path.abspath(body.output_path)
@@ -362,10 +418,7 @@ async def generate_pdf(
return {"success": True, "path": abs_path}
return {"success": True, "pdf": base64.b64encode(pdf_data).decode()}
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
raise HTTPException(500, detail=str(e))
@app.post("/execute_js")
@@ -421,23 +474,17 @@ async def execute_js(
```
"""
from crawler_pool import get_crawler
try:
cfg = CrawlerRunConfig(js_code=body.scripts)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg)
crawler = await get_crawler(get_default_browser_config())
results = await crawler.arun(url=body.url, config=cfg)
if not results[0].success:
raise HTTPException(
status_code=500,
detail=results[0].error_message or "Crawl failed"
)
# Return JSON-serializable dict of the first CrawlResult
raise HTTPException(500, detail=results[0].error_message or "Crawl failed")
data = results[0].model_dump()
return JSONResponse(data)
except Exception as e:
raise HTTPException(
status_code=500,
detail=str(e)
)
raise HTTPException(500, detail=str(e))
@app.get("/llm/{url:path}")

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -167,11 +167,14 @@
</a>
</h1>
<div class="ml-auto flex space-x-2">
<button id="play-tab"
class="px-3 py-1 rounded-t bg-surface border border-b-0 border-border text-primary">Playground</button>
<button id="stress-tab" class="px-3 py-1 rounded-t border border-border hover:bg-surface">Stress
Test</button>
<div class="ml-auto flex items-center space-x-4">
<a href="/dashboard" class="text-xs text-secondary hover:text-primary underline">Monitor</a>
<div class="flex space-x-2">
<button id="play-tab"
class="px-3 py-1 rounded-t bg-surface border border-b-0 border-border text-primary">Playground</button>
<button id="stress-tab" class="px-3 py-1 rounded-t border border-border hover:bg-surface">Stress
Test</button>
</div>
</div>
</header>

34
deploy/docker/test-websocket.py Executable file
View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python3
"""
Quick WebSocket test - Connect to monitor WebSocket and print updates
"""
import asyncio
import websockets
import json
async def test_websocket():
uri = "ws://localhost:11235/monitor/ws"
print(f"Connecting to {uri}...")
try:
async with websockets.connect(uri) as websocket:
print("✅ Connected!")
# Receive and print 5 updates
for i in range(5):
message = await websocket.recv()
data = json.loads(message)
print(f"\n📊 Update #{i+1}:")
print(f" - Health: CPU {data['health']['container']['cpu_percent']}%, Memory {data['health']['container']['memory_percent']}%")
print(f" - Active Requests: {len(data['requests']['active'])}")
print(f" - Browsers: {len(data['browsers'])}")
except Exception as e:
print(f"❌ Error: {e}")
return 1
print("\n✅ WebSocket test passed!")
return 0
if __name__ == "__main__":
exit(asyncio.run(test_websocket()))

View File

@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""
Monitor Dashboard Demo Script
Generates varied activity to showcase all monitoring features for video recording.
"""
import httpx
import asyncio
import time
from datetime import datetime
BASE_URL = "http://localhost:11235"
async def demo_dashboard():
print("🎬 Monitor Dashboard Demo - Starting...\n")
print(f"📊 Dashboard: {BASE_URL}/dashboard")
print("=" * 60)
async with httpx.AsyncClient(timeout=60.0) as client:
# Phase 1: Simple requests (permanent browser)
print("\n🔷 Phase 1: Testing permanent browser pool")
print("-" * 60)
for i in range(5):
print(f" {i+1}/5 Request to /crawl (default config)...")
try:
r = await client.post(
f"{BASE_URL}/crawl",
json={"urls": [f"https://httpbin.org/html?req={i}"], "crawler_config": {}}
)
print(f" ✅ Status: {r.status_code}, Time: {r.elapsed.total_seconds():.2f}s")
except Exception as e:
print(f" ❌ Error: {e}")
await asyncio.sleep(1) # Small delay between requests
# Phase 2: Create variant browsers (different configs)
print("\n🔶 Phase 2: Testing cold→hot pool promotion")
print("-" * 60)
viewports = [
{"width": 1920, "height": 1080},
{"width": 1280, "height": 720},
{"width": 800, "height": 600}
]
for idx, viewport in enumerate(viewports):
print(f" Viewport {viewport['width']}x{viewport['height']}:")
for i in range(4): # 4 requests each to trigger promotion at 3
try:
r = await client.post(
f"{BASE_URL}/crawl",
json={
"urls": [f"https://httpbin.org/json?v={idx}&r={i}"],
"browser_config": {"viewport": viewport},
"crawler_config": {}
}
)
print(f" {i+1}/4 ✅ {r.status_code} - Should see cold→hot after 3 uses")
except Exception as e:
print(f" {i+1}/4 ❌ {e}")
await asyncio.sleep(0.5)
# Phase 3: Concurrent burst (stress pool)
print("\n🔷 Phase 3: Concurrent burst (10 parallel)")
print("-" * 60)
tasks = []
for i in range(10):
tasks.append(
client.post(
f"{BASE_URL}/crawl",
json={"urls": [f"https://httpbin.org/delay/2?burst={i}"], "crawler_config": {}}
)
)
print(" Sending 10 concurrent requests...")
start = time.time()
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.time() - start
successes = sum(1 for r in results if not isinstance(r, Exception) and r.status_code == 200)
print(f"{successes}/10 succeeded in {elapsed:.2f}s")
# Phase 4: Multi-endpoint coverage
print("\n🔶 Phase 4: Testing multiple endpoints")
print("-" * 60)
endpoints = [
("/md", {"url": "https://httpbin.org/html", "f": "fit", "c": "0"}),
("/screenshot", {"url": "https://httpbin.org/html"}),
("/pdf", {"url": "https://httpbin.org/html"}),
]
for endpoint, payload in endpoints:
print(f" Testing {endpoint}...")
try:
if endpoint == "/md":
r = await client.post(f"{BASE_URL}{endpoint}", json=payload)
else:
r = await client.post(f"{BASE_URL}{endpoint}", json=payload)
print(f"{r.status_code}")
except Exception as e:
print(f"{e}")
await asyncio.sleep(1)
# Phase 5: Intentional error (to populate errors tab)
print("\n🔷 Phase 5: Generating error examples")
print("-" * 60)
print(" Triggering invalid URL error...")
try:
r = await client.post(
f"{BASE_URL}/crawl",
json={"urls": ["invalid://bad-url"], "crawler_config": {}}
)
print(f" Response: {r.status_code}")
except Exception as e:
print(f" ✅ Error captured: {type(e).__name__}")
# Phase 6: Wait for janitor activity
print("\n🔶 Phase 6: Waiting for janitor cleanup...")
print("-" * 60)
print(" Idle for 40s to allow janitor to clean cold pool browsers...")
for i in range(40, 0, -10):
print(f" {i}s remaining... (Check dashboard for cleanup events)")
await asyncio.sleep(10)
# Phase 7: Final stats check
print("\n🔷 Phase 7: Final dashboard state")
print("-" * 60)
r = await client.get(f"{BASE_URL}/monitor/health")
health = r.json()
print(f" Memory: {health['container']['memory_percent']:.1f}%")
print(f" Browsers: Perm={health['pool']['permanent']['active']}, "
f"Hot={health['pool']['hot']['count']}, Cold={health['pool']['cold']['count']}")
r = await client.get(f"{BASE_URL}/monitor/endpoints/stats")
stats = r.json()
print(f"\n Endpoint Stats:")
for endpoint, data in stats.items():
print(f" {endpoint}: {data['count']} req, "
f"{data['avg_latency_ms']:.0f}ms avg, "
f"{data['success_rate_percent']:.1f}% success")
r = await client.get(f"{BASE_URL}/monitor/browsers")
browsers = r.json()
print(f"\n Pool Efficiency:")
print(f" Total browsers: {browsers['summary']['total_count']}")
print(f" Memory usage: {browsers['summary']['total_memory_mb']} MB")
print(f" Reuse rate: {browsers['summary']['reuse_rate_percent']:.1f}%")
print("\n" + "=" * 60)
print("✅ Demo complete! Dashboard is now populated with rich data.")
print(f"\n📹 Recording tip: Refresh {BASE_URL}/dashboard")
print(" You should see:")
print(" • Active & completed requests")
print(" • Browser pool (permanent + hot/cold)")
print(" • Janitor cleanup events")
print(" • Endpoint analytics")
print(" • Memory timeline")
if __name__ == "__main__":
try:
asyncio.run(demo_dashboard())
except KeyboardInterrupt:
print("\n\n⚠️ Demo interrupted by user")
except Exception as e:
print(f"\n\n❌ Demo failed: {e}")

View File

@@ -0,0 +1,2 @@
httpx>=0.25.0
docker>=7.0.0

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
Test 1: Basic Container Health + Single Endpoint
- Starts container
- Hits /health endpoint 10 times
- Reports success rate and basic latency
"""
import asyncio
import time
import docker
import httpx
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS = 10
async def test_endpoint(url: str, count: int):
"""Hit endpoint multiple times, return stats."""
results = []
async with httpx.AsyncClient(timeout=30.0) as client:
for i in range(count):
start = time.time()
try:
resp = await client.get(url)
elapsed = (time.time() - start) * 1000 # ms
results.append({
"success": resp.status_code == 200,
"latency_ms": elapsed,
"status": resp.status_code
})
print(f" [{i+1}/{count}] ✓ {resp.status_code} - {elapsed:.0f}ms")
except Exception as e:
results.append({
"success": False,
"latency_ms": None,
"error": str(e)
})
print(f" [{i+1}/{count}] ✗ Error: {e}")
return results
def start_container(client, image: str, name: str, port: int):
"""Start container, return container object."""
# Clean up existing
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container '{name}'...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container '{name}' from image '{image}'...")
container = client.containers.run(
image,
name=name,
ports={f"{port}/tcp": port},
detach=True,
shm_size="1g",
environment={"PYTHON_ENV": "production"}
)
# Wait for health
print(f"⏳ Waiting for container to be healthy...")
for _ in range(30): # 30s timeout
time.sleep(1)
container.reload()
if container.status == "running":
try:
# Quick health check
import requests
resp = requests.get(f"http://localhost:{port}/health", timeout=2)
if resp.status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
def stop_container(container):
"""Stop and remove container."""
print(f"🛑 Stopping container...")
container.stop()
container.remove()
print(f"✅ Container removed")
async def main():
print("="*60)
print("TEST 1: Basic Container Health + Single Endpoint")
print("="*60)
client = docker.from_env()
container = None
try:
# Start container
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
# Test /health endpoint
print(f"\n📊 Testing /health endpoint ({REQUESTS} requests)...")
url = f"http://localhost:{PORT}/health"
results = await test_endpoint(url, REQUESTS)
# Calculate stats
successes = sum(1 for r in results if r["success"])
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if r["latency_ms"] is not None]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
# Print results
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_latency:.0f}ms")
if latencies:
print(f" Min Latency: {min(latencies):.0f}ms")
print(f" Max Latency: {max(latencies):.0f}ms")
print(f"{'='*60}")
# Pass/Fail
if success_rate >= 100:
print(f"✅ TEST PASSED")
return 0
else:
print(f"❌ TEST FAILED (expected 100% success rate)")
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
return 1
finally:
if container:
stop_container(container)
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""
Test 2: Docker Stats Monitoring
- Extends Test 1 with real-time container stats
- Monitors memory % and CPU during requests
- Reports baseline, peak, and final memory
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS = 20 # More requests to see memory usage
# Stats tracking
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background thread to collect container stats."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
# Extract memory stats
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024) # MB
mem_limit = stat['memory_stats'].get('limit', 1) / (1024 * 1024)
mem_percent = (mem_usage / mem_limit * 100) if mem_limit > 0 else 0
# Extract CPU stats (handle missing fields on Mac)
cpu_percent = 0
try:
cpu_delta = stat['cpu_stats']['cpu_usage']['total_usage'] - \
stat['precpu_stats']['cpu_usage']['total_usage']
system_delta = stat['cpu_stats'].get('system_cpu_usage', 0) - \
stat['precpu_stats'].get('system_cpu_usage', 0)
if system_delta > 0:
num_cpus = stat['cpu_stats'].get('online_cpus', 1)
cpu_percent = (cpu_delta / system_delta * num_cpus * 100.0)
except (KeyError, ZeroDivisionError):
pass
stats_history.append({
'timestamp': time.time(),
'memory_mb': mem_usage,
'memory_percent': mem_percent,
'cpu_percent': cpu_percent
})
except Exception as e:
# Skip malformed stats
pass
time.sleep(0.5) # Sample every 500ms
async def test_endpoint(url: str, count: int):
"""Hit endpoint, return stats."""
results = []
async with httpx.AsyncClient(timeout=30.0) as client:
for i in range(count):
start = time.time()
try:
resp = await client.get(url)
elapsed = (time.time() - start) * 1000
results.append({
"success": resp.status_code == 200,
"latency_ms": elapsed,
})
if (i + 1) % 5 == 0: # Print every 5 requests
print(f" [{i+1}/{count}] ✓ {resp.status_code} - {elapsed:.0f}ms")
except Exception as e:
results.append({"success": False, "error": str(e)})
print(f" [{i+1}/{count}] ✗ Error: {e}")
return results
def start_container(client, image: str, name: str, port: int):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container '{name}'...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container '{name}'...")
container = client.containers.run(
image,
name=name,
ports={f"{port}/tcp": port},
detach=True,
shm_size="1g",
mem_limit="4g", # Set explicit memory limit
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
resp = requests.get(f"http://localhost:{port}/health", timeout=2)
if resp.status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
def stop_container(container):
"""Stop container."""
print(f"🛑 Stopping container...")
container.stop()
container.remove()
async def main():
print("="*60)
print("TEST 2: Docker Stats Monitoring")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
# Start container
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
# Start stats monitoring in background
print(f"\n📊 Starting stats monitor...")
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
# Wait a bit for baseline
await asyncio.sleep(2)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline memory: {baseline_mem:.1f} MB")
# Test /health endpoint
print(f"\n🔄 Running {REQUESTS} requests to /health...")
url = f"http://localhost:{PORT}/health"
results = await test_endpoint(url, REQUESTS)
# Wait a bit to capture peak
await asyncio.sleep(1)
# Stop monitoring
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Calculate stats
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
# Memory stats
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
mem_delta = final_mem - baseline_mem
# Print results
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_latency:.0f}ms")
print(f"\n Memory Stats:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {mem_delta:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
if success_rate >= 100 and mem_delta < 100: # No significant memory growth
print(f"✅ TEST PASSED")
return 0
else:
if success_rate < 100:
print(f"❌ TEST FAILED (success rate < 100%)")
if mem_delta >= 100:
print(f"⚠️ WARNING: Memory grew by {mem_delta:.1f} MB")
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
return 1
finally:
stop_monitoring.set()
if container:
stop_container(container)
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""
Test 3: Pool Validation - Permanent Browser Reuse
- Tests /html endpoint (should use permanent browser)
- Monitors container logs for pool hit markers
- Validates browser reuse rate
- Checks memory after browser creation
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS = 30
# Stats tracking
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({
'timestamp': time.time(),
'memory_mb': mem_usage,
})
except:
pass
time.sleep(0.5)
def count_log_markers(container):
"""Extract pool usage markers from logs."""
logs = container.logs().decode('utf-8')
permanent_hits = logs.count("🔥 Using permanent browser")
hot_hits = logs.count("♨️ Using hot pool browser")
cold_hits = logs.count("❄️ Using cold pool browser")
new_created = logs.count("🆕 Creating new browser")
return {
'permanent_hits': permanent_hits,
'hot_hits': hot_hits,
'cold_hits': cold_hits,
'new_created': new_created,
'total_hits': permanent_hits + hot_hits + cold_hits
}
async def test_endpoint(url: str, count: int):
"""Hit endpoint multiple times."""
results = []
async with httpx.AsyncClient(timeout=60.0) as client:
for i in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"})
elapsed = (time.time() - start) * 1000
results.append({
"success": resp.status_code == 200,
"latency_ms": elapsed,
})
if (i + 1) % 10 == 0:
print(f" [{i+1}/{count}] ✓ {resp.status_code} - {elapsed:.0f}ms")
except Exception as e:
results.append({"success": False, "error": str(e)})
print(f" [{i+1}/{count}] ✗ Error: {e}")
return results
def start_container(client, image: str, name: str, port: int):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image,
name=name,
ports={f"{port}/tcp": port},
detach=True,
shm_size="1g",
mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
resp = requests.get(f"http://localhost:{port}/health", timeout=2)
if resp.status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
def stop_container(container):
"""Stop container."""
print(f"🛑 Stopping container...")
container.stop()
container.remove()
async def main():
print("="*60)
print("TEST 3: Pool Validation - Permanent Browser Reuse")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
# Start container
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
# Wait for permanent browser initialization
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start stats monitoring
print(f"📊 Starting stats monitor...")
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline (with permanent browser): {baseline_mem:.1f} MB")
# Test /html endpoint (uses permanent browser for default config)
print(f"\n🔄 Running {REQUESTS} requests to /html...")
url = f"http://localhost:{PORT}/html"
results = await test_endpoint(url, REQUESTS)
# Wait a bit
await asyncio.sleep(1)
# Stop monitoring
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Analyze logs for pool markers
print(f"\n📋 Analyzing pool usage...")
pool_stats = count_log_markers(container)
# Calculate request stats
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
# Memory stats
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
mem_delta = final_mem - baseline_mem
# Calculate reuse rate
total_requests = len(results)
total_pool_hits = pool_stats['total_hits']
reuse_rate = (total_pool_hits / total_requests * 100) if total_requests > 0 else 0
# Print results
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_latency:.0f}ms")
print(f"\n Pool Stats:")
print(f" 🔥 Permanent Hits: {pool_stats['permanent_hits']}")
print(f" ♨️ Hot Pool Hits: {pool_stats['hot_hits']}")
print(f" ❄️ Cold Pool Hits: {pool_stats['cold_hits']}")
print(f" 🆕 New Created: {pool_stats['new_created']}")
print(f" 📊 Reuse Rate: {reuse_rate:.1f}%")
print(f"\n Memory Stats:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {mem_delta:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
if success_rate < 100:
print(f"❌ FAIL: Success rate {success_rate:.1f}% < 100%")
passed = False
if reuse_rate < 80:
print(f"❌ FAIL: Reuse rate {reuse_rate:.1f}% < 80% (expected high permanent browser usage)")
passed = False
if pool_stats['permanent_hits'] < (total_requests * 0.8):
print(f"⚠️ WARNING: Only {pool_stats['permanent_hits']} permanent hits out of {total_requests} requests")
if mem_delta > 200:
print(f"⚠️ WARNING: Memory grew by {mem_delta:.1f} MB (possible browser leak)")
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
stop_container(container)
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""
Test 4: Concurrent Load Testing
- Tests pool under concurrent load
- Escalates: 10 → 50 → 100 concurrent requests
- Validates latency distribution (P50, P95, P99)
- Monitors memory stability
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
from collections import defaultdict
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
LOAD_LEVELS = [
{"name": "Light", "concurrent": 10, "requests": 20},
{"name": "Medium", "concurrent": 50, "requests": 100},
{"name": "Heavy", "concurrent": 100, "requests": 200},
]
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(0.5)
def count_log_markers(container):
"""Extract pool markers."""
logs = container.logs().decode('utf-8')
return {
'permanent': logs.count("🔥 Using permanent browser"),
'hot': logs.count("♨️ Using hot pool browser"),
'cold': logs.count("❄️ Using cold pool browser"),
'new': logs.count("🆕 Creating new browser"),
}
async def hit_endpoint(client, url, payload, semaphore):
"""Single request with concurrency control."""
async with semaphore:
start = time.time()
try:
resp = await client.post(url, json=payload, timeout=60.0)
elapsed = (time.time() - start) * 1000
return {"success": resp.status_code == 200, "latency_ms": elapsed}
except Exception as e:
return {"success": False, "error": str(e)}
async def run_concurrent_test(url, payload, concurrent, total_requests):
"""Run concurrent requests."""
semaphore = asyncio.Semaphore(concurrent)
async with httpx.AsyncClient() as client:
tasks = [hit_endpoint(client, url, payload, semaphore) for _ in range(total_requests)]
results = await asyncio.gather(*tasks)
return results
def calculate_percentiles(latencies):
"""Calculate P50, P95, P99."""
if not latencies:
return 0, 0, 0
sorted_lat = sorted(latencies)
n = len(sorted_lat)
return (
sorted_lat[int(n * 0.50)],
sorted_lat[int(n * 0.95)],
sorted_lat[int(n * 0.99)],
)
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 4: Concurrent Load Testing")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
url = f"http://localhost:{PORT}/html"
payload = {"url": "https://httpbin.org/html"}
all_results = []
level_stats = []
# Run load levels
for level in LOAD_LEVELS:
print(f"{'='*60}")
print(f"🔄 {level['name']} Load: {level['concurrent']} concurrent, {level['requests']} total")
print(f"{'='*60}")
start_time = time.time()
results = await run_concurrent_test(url, payload, level['concurrent'], level['requests'])
duration = time.time() - start_time
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
p50, p95, p99 = calculate_percentiles(latencies)
avg_lat = sum(latencies) / len(latencies) if latencies else 0
print(f" Duration: {duration:.1f}s")
print(f" Success: {success_rate:.1f}% ({successes}/{len(results)})")
print(f" Avg Latency: {avg_lat:.0f}ms")
print(f" P50/P95/P99: {p50:.0f}ms / {p95:.0f}ms / {p99:.0f}ms")
level_stats.append({
'name': level['name'],
'concurrent': level['concurrent'],
'success_rate': success_rate,
'avg_latency': avg_lat,
'p50': p50, 'p95': p95, 'p99': p99,
})
all_results.extend(results)
await asyncio.sleep(2) # Cool down between levels
# Stop monitoring
await asyncio.sleep(1)
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Final stats
pool_stats = count_log_markers(container)
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
print(f"\n{'='*60}")
print(f"FINAL RESULTS:")
print(f"{'='*60}")
print(f" Total Requests: {len(all_results)}")
print(f"\n Pool Utilization:")
print(f" 🔥 Permanent: {pool_stats['permanent']}")
print(f" ♨️ Hot: {pool_stats['hot']}")
print(f" ❄️ Cold: {pool_stats['cold']}")
print(f" 🆕 New: {pool_stats['new']}")
print(f"\n Memory:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {final_mem - baseline_mem:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
for ls in level_stats:
if ls['success_rate'] < 99:
print(f"❌ FAIL: {ls['name']} success rate {ls['success_rate']:.1f}% < 99%")
passed = False
if ls['p99'] > 10000: # 10s threshold
print(f"⚠️ WARNING: {ls['name']} P99 latency {ls['p99']:.0f}ms very high")
if final_mem - baseline_mem > 300:
print(f"⚠️ WARNING: Memory grew {final_mem - baseline_mem:.1f} MB")
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
Test 5: Pool Stress - Mixed Configs
- Tests hot/cold pool with different browser configs
- Uses different viewports to create config variants
- Validates cold → hot promotion after 3 uses
- Monitors pool tier distribution
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
import random
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS_PER_CONFIG = 5 # 5 requests per config variant
# Different viewport configs to test pool tiers
VIEWPORT_CONFIGS = [
None, # Default (permanent browser)
{"width": 1920, "height": 1080}, # Desktop
{"width": 1024, "height": 768}, # Tablet
{"width": 375, "height": 667}, # Mobile
]
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(0.5)
def analyze_pool_logs(container):
"""Extract detailed pool stats from logs."""
logs = container.logs().decode('utf-8')
permanent = logs.count("🔥 Using permanent browser")
hot = logs.count("♨️ Using hot pool browser")
cold = logs.count("❄️ Using cold pool browser")
new = logs.count("🆕 Creating new browser")
promotions = logs.count("⬆️ Promoting to hot pool")
return {
'permanent': permanent,
'hot': hot,
'cold': cold,
'new': new,
'promotions': promotions,
'total': permanent + hot + cold
}
async def crawl_with_viewport(client, url, viewport):
"""Single request with specific viewport."""
payload = {
"urls": ["https://httpbin.org/html"],
"browser_config": {},
"crawler_config": {}
}
# Add viewport if specified
if viewport:
payload["browser_config"] = {
"type": "BrowserConfig",
"params": {
"viewport": {"type": "dict", "value": viewport},
"headless": True,
"text_mode": True,
"extra_args": [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-web-security",
"--allow-insecure-localhost",
"--ignore-certificate-errors"
]
}
}
start = time.time()
try:
resp = await client.post(url, json=payload, timeout=60.0)
elapsed = (time.time() - start) * 1000
return {"success": resp.status_code == 200, "latency_ms": elapsed, "viewport": viewport}
except Exception as e:
return {"success": False, "error": str(e), "viewport": viewport}
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 5: Pool Stress - Mixed Configs")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
url = f"http://localhost:{PORT}/crawl"
print(f"Testing {len(VIEWPORT_CONFIGS)} different configs:")
for i, vp in enumerate(VIEWPORT_CONFIGS):
vp_str = "Default" if vp is None else f"{vp['width']}x{vp['height']}"
print(f" {i+1}. {vp_str}")
print()
# Run requests: repeat each config REQUESTS_PER_CONFIG times
all_results = []
config_sequence = []
for _ in range(REQUESTS_PER_CONFIG):
for viewport in VIEWPORT_CONFIGS:
config_sequence.append(viewport)
# Shuffle to mix configs
random.shuffle(config_sequence)
print(f"🔄 Running {len(config_sequence)} requests with mixed configs...")
async with httpx.AsyncClient() as http_client:
for i, viewport in enumerate(config_sequence):
result = await crawl_with_viewport(http_client, url, viewport)
all_results.append(result)
if (i + 1) % 5 == 0:
vp_str = "default" if result['viewport'] is None else f"{result['viewport']['width']}x{result['viewport']['height']}"
status = "" if result.get('success') else ""
lat = f"{result.get('latency_ms', 0):.0f}ms" if 'latency_ms' in result else "error"
print(f" [{i+1}/{len(config_sequence)}] {status} {vp_str} - {lat}")
# Stop monitoring
await asyncio.sleep(2)
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Analyze results
pool_stats = analyze_pool_logs(container)
successes = sum(1 for r in all_results if r.get("success"))
success_rate = (successes / len(all_results)) * 100
latencies = [r["latency_ms"] for r in all_results if "latency_ms" in r]
avg_lat = sum(latencies) / len(latencies) if latencies else 0
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f"{'='*60}")
print(f" Requests: {len(all_results)}")
print(f" Success Rate: {success_rate:.1f}% ({successes}/{len(all_results)})")
print(f" Avg Latency: {avg_lat:.0f}ms")
print(f"\n Pool Statistics:")
print(f" 🔥 Permanent: {pool_stats['permanent']}")
print(f" ♨️ Hot: {pool_stats['hot']}")
print(f" ❄️ Cold: {pool_stats['cold']}")
print(f" 🆕 New: {pool_stats['new']}")
print(f" ⬆️ Promotions: {pool_stats['promotions']}")
print(f" 📊 Reuse: {(pool_stats['total'] / len(all_results) * 100):.1f}%")
print(f"\n Memory:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {final_mem - baseline_mem:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
if success_rate < 99:
print(f"❌ FAIL: Success rate {success_rate:.1f}% < 99%")
passed = False
# Should see promotions since we repeat each config 5 times
if pool_stats['promotions'] < (len(VIEWPORT_CONFIGS) - 1): # -1 for default
print(f"⚠️ WARNING: Only {pool_stats['promotions']} promotions (expected ~{len(VIEWPORT_CONFIGS)-1})")
# Should have created some browsers for different configs
if pool_stats['new'] == 0:
print(f"⚠️ NOTE: No new browsers created (all used default?)")
if pool_stats['permanent'] == len(all_results):
print(f"⚠️ NOTE: All requests used permanent browser (configs not varying enough?)")
if final_mem - baseline_mem > 500:
print(f"⚠️ WARNING: Memory grew {final_mem - baseline_mem:.1f} MB")
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
Test 6: Multi-Endpoint Testing
- Tests multiple endpoints together: /html, /screenshot, /pdf, /crawl
- Validates each endpoint works correctly
- Monitors success rates per endpoint
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
REQUESTS_PER_ENDPOINT = 10
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(0.5)
async def test_html(client, base_url, count):
"""Test /html endpoint."""
url = f"{base_url}/html"
results = []
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"}, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
async def test_screenshot(client, base_url, count):
"""Test /screenshot endpoint."""
url = f"{base_url}/screenshot"
results = []
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"}, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
async def test_pdf(client, base_url, count):
"""Test /pdf endpoint."""
url = f"{base_url}/pdf"
results = []
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json={"url": "https://httpbin.org/html"}, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
async def test_crawl(client, base_url, count):
"""Test /crawl endpoint."""
url = f"{base_url}/crawl"
results = []
payload = {
"urls": ["https://httpbin.org/html"],
"browser_config": {},
"crawler_config": {}
}
for _ in range(count):
start = time.time()
try:
resp = await client.post(url, json=payload, timeout=30.0)
elapsed = (time.time() - start) * 1000
results.append({"success": resp.status_code == 200, "latency_ms": elapsed})
except Exception as e:
results.append({"success": False, "error": str(e)})
return results
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 6: Multi-Endpoint Testing")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(1)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
base_url = f"http://localhost:{PORT}"
# Test each endpoint
endpoints = {
"/html": test_html,
"/screenshot": test_screenshot,
"/pdf": test_pdf,
"/crawl": test_crawl,
}
all_endpoint_stats = {}
async with httpx.AsyncClient() as http_client:
for endpoint_name, test_func in endpoints.items():
print(f"🔄 Testing {endpoint_name} ({REQUESTS_PER_ENDPOINT} requests)...")
results = await test_func(http_client, base_url, REQUESTS_PER_ENDPOINT)
successes = sum(1 for r in results if r.get("success"))
success_rate = (successes / len(results)) * 100
latencies = [r["latency_ms"] for r in results if "latency_ms" in r]
avg_lat = sum(latencies) / len(latencies) if latencies else 0
all_endpoint_stats[endpoint_name] = {
'success_rate': success_rate,
'avg_latency': avg_lat,
'total': len(results),
'successes': successes
}
print(f" ✓ Success: {success_rate:.1f}% ({successes}/{len(results)}), Avg: {avg_lat:.0f}ms")
# Stop monitoring
await asyncio.sleep(1)
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Final stats
memory_samples = [s['memory_mb'] for s in stats_history]
peak_mem = max(memory_samples) if memory_samples else 0
final_mem = memory_samples[-1] if memory_samples else 0
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f"{'='*60}")
for endpoint, stats in all_endpoint_stats.items():
print(f" {endpoint:12} Success: {stats['success_rate']:5.1f}% Avg: {stats['avg_latency']:6.0f}ms")
print(f"\n Memory:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB")
print(f" Final: {final_mem:.1f} MB")
print(f" Delta: {final_mem - baseline_mem:+.1f} MB")
print(f"{'='*60}")
# Pass/Fail
passed = True
for endpoint, stats in all_endpoint_stats.items():
if stats['success_rate'] < 100:
print(f"❌ FAIL: {endpoint} success rate {stats['success_rate']:.1f}% < 100%")
passed = False
if passed:
print(f"✅ TEST PASSED")
return 0
else:
return 1
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
Test 7: Cleanup Verification (Janitor)
- Creates load spike then goes idle
- Verifies memory returns to near baseline
- Tests janitor cleanup of idle browsers
- Monitors memory recovery time
"""
import asyncio
import time
import docker
import httpx
from threading import Thread, Event
# Config
IMAGE = "crawl4ai-local:latest"
CONTAINER_NAME = "crawl4ai-test"
PORT = 11235
SPIKE_REQUESTS = 20 # Create some browsers
IDLE_TIME = 90 # Wait 90s for janitor (runs every 60s)
# Stats
stats_history = []
stop_monitoring = Event()
def monitor_stats(container):
"""Background stats collector."""
for stat in container.stats(decode=True, stream=True):
if stop_monitoring.is_set():
break
try:
mem_usage = stat['memory_stats'].get('usage', 0) / (1024 * 1024)
stats_history.append({'timestamp': time.time(), 'memory_mb': mem_usage})
except:
pass
time.sleep(1) # Sample every 1s for this test
def start_container(client, image, name, port):
"""Start container."""
try:
old = client.containers.get(name)
print(f"🧹 Stopping existing container...")
old.stop()
old.remove()
except docker.errors.NotFound:
pass
print(f"🚀 Starting container...")
container = client.containers.run(
image, name=name, ports={f"{port}/tcp": port},
detach=True, shm_size="1g", mem_limit="4g",
)
print(f"⏳ Waiting for health...")
for _ in range(30):
time.sleep(1)
container.reload()
if container.status == "running":
try:
import requests
if requests.get(f"http://localhost:{port}/health", timeout=2).status_code == 200:
print(f"✅ Container healthy!")
return container
except:
pass
raise TimeoutError("Container failed to start")
async def main():
print("="*60)
print("TEST 7: Cleanup Verification (Janitor)")
print("="*60)
client = docker.from_env()
container = None
monitor_thread = None
try:
container = start_container(client, IMAGE, CONTAINER_NAME, PORT)
print(f"\n⏳ Waiting for permanent browser init (3s)...")
await asyncio.sleep(3)
# Start monitoring
stop_monitoring.clear()
stats_history.clear()
monitor_thread = Thread(target=monitor_stats, args=(container,), daemon=True)
monitor_thread.start()
await asyncio.sleep(2)
baseline_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f"📏 Baseline: {baseline_mem:.1f} MB\n")
# Create load spike with different configs to populate pool
print(f"🔥 Creating load spike ({SPIKE_REQUESTS} requests with varied configs)...")
url = f"http://localhost:{PORT}/crawl"
viewports = [
{"width": 1920, "height": 1080},
{"width": 1024, "height": 768},
{"width": 375, "height": 667},
]
async with httpx.AsyncClient(timeout=60.0) as http_client:
tasks = []
for i in range(SPIKE_REQUESTS):
vp = viewports[i % len(viewports)]
payload = {
"urls": ["https://httpbin.org/html"],
"browser_config": {
"type": "BrowserConfig",
"params": {
"viewport": {"type": "dict", "value": vp},
"headless": True,
"text_mode": True,
"extra_args": [
"--no-sandbox", "--disable-dev-shm-usage",
"--disable-gpu", "--disable-software-rasterizer",
"--disable-web-security", "--allow-insecure-localhost",
"--ignore-certificate-errors"
]
}
},
"crawler_config": {}
}
tasks.append(http_client.post(url, json=payload))
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = sum(1 for r in results if hasattr(r, 'status_code') and r.status_code == 200)
print(f" ✓ Spike completed: {successes}/{len(results)} successful")
# Measure peak
await asyncio.sleep(2)
peak_mem = max([s['memory_mb'] for s in stats_history]) if stats_history else baseline_mem
print(f" 📊 Peak memory: {peak_mem:.1f} MB (+{peak_mem - baseline_mem:.1f} MB)")
# Now go idle and wait for janitor
print(f"\n⏸️ Going idle for {IDLE_TIME}s (janitor cleanup)...")
print(f" (Janitor runs every 60s, checking for idle browsers)")
for elapsed in range(0, IDLE_TIME, 10):
await asyncio.sleep(10)
current_mem = stats_history[-1]['memory_mb'] if stats_history else 0
print(f" [{elapsed+10:3d}s] Memory: {current_mem:.1f} MB")
# Stop monitoring
stop_monitoring.set()
if monitor_thread:
monitor_thread.join(timeout=2)
# Analyze memory recovery
final_mem = stats_history[-1]['memory_mb'] if stats_history else 0
recovery_mb = peak_mem - final_mem
recovery_pct = (recovery_mb / (peak_mem - baseline_mem) * 100) if (peak_mem - baseline_mem) > 0 else 0
print(f"\n{'='*60}")
print(f"RESULTS:")
print(f"{'='*60}")
print(f" Memory Journey:")
print(f" Baseline: {baseline_mem:.1f} MB")
print(f" Peak: {peak_mem:.1f} MB (+{peak_mem - baseline_mem:.1f} MB)")
print(f" Final: {final_mem:.1f} MB (+{final_mem - baseline_mem:.1f} MB)")
print(f" Recovered: {recovery_mb:.1f} MB ({recovery_pct:.1f}%)")
print(f"{'='*60}")
# Pass/Fail
passed = True
# Should have created some memory pressure
if peak_mem - baseline_mem < 100:
print(f"⚠️ WARNING: Peak increase only {peak_mem - baseline_mem:.1f} MB (expected more browsers)")
# Should recover most memory (within 100MB of baseline)
if final_mem - baseline_mem > 100:
print(f"⚠️ WARNING: Memory didn't recover well (still +{final_mem - baseline_mem:.1f} MB above baseline)")
else:
print(f"✅ Good memory recovery!")
# Baseline + 50MB tolerance
if final_mem - baseline_mem < 50:
print(f"✅ Excellent cleanup (within 50MB of baseline)")
print(f"✅ TEST PASSED")
return 0
except Exception as e:
print(f"\n❌ TEST ERROR: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stop_monitoring.set()
if container:
print(f"🛑 Stopping container...")
container.stop()
container.remove()
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""Quick test to generate monitor dashboard activity"""
import httpx
import asyncio
async def test_dashboard():
async with httpx.AsyncClient(timeout=30.0) as client:
print("📊 Generating dashboard activity...")
# Test 1: Simple crawl
print("\n1⃣ Running simple crawl...")
r1 = await client.post(
"http://localhost:11235/crawl",
json={"urls": ["https://httpbin.org/html"], "crawler_config": {}}
)
print(f" Status: {r1.status_code}")
# Test 2: Multiple URLs
print("\n2⃣ Running multi-URL crawl...")
r2 = await client.post(
"http://localhost:11235/crawl",
json={
"urls": [
"https://httpbin.org/html",
"https://httpbin.org/json"
],
"crawler_config": {}
}
)
print(f" Status: {r2.status_code}")
# Test 3: Check monitor health
print("\n3⃣ Checking monitor health...")
r3 = await client.get("http://localhost:11235/monitor/health")
health = r3.json()
print(f" Memory: {health['container']['memory_percent']}%")
print(f" Browsers: {health['pool']['permanent']['active']}")
# Test 4: Check requests
print("\n4⃣ Checking request log...")
r4 = await client.get("http://localhost:11235/monitor/requests")
reqs = r4.json()
print(f" Active: {len(reqs['active'])}")
print(f" Completed: {len(reqs['completed'])}")
# Test 5: Check endpoint stats
print("\n5⃣ Checking endpoint stats...")
r5 = await client.get("http://localhost:11235/monitor/endpoints/stats")
stats = r5.json()
for endpoint, data in stats.items():
print(f" {endpoint}: {data['count']} requests, {data['avg_latency_ms']}ms avg")
print("\n✅ Dashboard should now show activity!")
print(f"\n🌐 Open: http://localhost:11235/dashboard")
if __name__ == "__main__":
asyncio.run(test_dashboard())

View File

@@ -178,4 +178,29 @@ def verify_email_domain(email: str) -> bool:
records = dns.resolver.resolve(domain, 'MX')
return True if records else False
except Exception as e:
return False
return False
def get_container_memory_percent() -> float:
"""Get actual container memory usage vs limit (cgroup v1/v2 aware)."""
try:
# Try cgroup v2 first
usage_path = Path("/sys/fs/cgroup/memory.current")
limit_path = Path("/sys/fs/cgroup/memory.max")
if not usage_path.exists():
# Fall back to cgroup v1
usage_path = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes")
limit_path = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes")
usage = int(usage_path.read_text())
limit = int(limit_path.read_text())
# Handle unlimited (v2: "max", v1: > 1e18)
if limit > 1e18:
import psutil
limit = psutil.virtual_memory().total
return (usage / limit) * 100
except:
# Non-container or unsupported: fallback to host
import psutil
return psutil.virtual_memory().percent

View File

@@ -1,4 +1,20 @@
# Crawl4AI Docker Guide 🐳
# Self-Hosting Crawl4AI 🚀
**Take Control of Your Web Crawling Infrastructure**
Self-hosting Crawl4AI gives you complete control over your web crawling and data extraction pipeline. Unlike cloud-based solutions, you own your data, infrastructure, and destiny.
## Why Self-Host?
- **🔒 Data Privacy**: Your crawled data never leaves your infrastructure
- **💰 Cost Control**: No per-request pricing - scale within your own resources
- **🎯 Customization**: Full control over browser configurations, extraction strategies, and performance tuning
- **📊 Transparency**: Real-time monitoring dashboard shows exactly what's happening
- **⚡ Performance**: Direct access without API rate limits or geographic restrictions
- **🛡️ Security**: Keep sensitive data extraction workflows behind your firewall
- **🔧 Flexibility**: Customize, extend, and integrate with your existing infrastructure
When you self-host, you can scale from a single container to a full browser infrastructure, all while maintaining complete control and visibility.
## Table of Contents
- [Prerequisites](#prerequisites)
@@ -25,7 +41,12 @@
- [Available MCP Tools](#available-mcp-tools)
- [Testing MCP Connections](#testing-mcp-connections)
- [MCP Schemas](#mcp-schemas)
- [Metrics & Monitoring](#metrics--monitoring)
- [Real-time Monitoring & Operations](#real-time-monitoring--operations)
- [Monitoring Dashboard](#monitoring-dashboard)
- [Monitor API Endpoints](#monitor-api-endpoints)
- [WebSocket Streaming](#websocket-streaming)
- [Control Actions](#control-actions)
- [Production Integration](#production-integration)
- [Deployment Scenarios](#deployment-scenarios)
- [Complete Examples](#complete-examples)
- [Server Configuration](#server-configuration)
@@ -1175,22 +1196,469 @@ async def test_stream_crawl(token: str = None): # Made token optional
---
## Metrics & Monitoring
## Real-time Monitoring & Operations
Keep an eye on your crawler with these endpoints:
One of the key advantages of self-hosting is complete visibility into your infrastructure. Crawl4AI includes a comprehensive real-time monitoring system that gives you full transparency and control.
- `/health` - Quick health check
- `/metrics` - Detailed Prometheus metrics
- `/schema` - Full API schema
### Monitoring Dashboard
Example health check:
Access the **built-in real-time monitoring dashboard** for complete operational visibility:
```
http://localhost:11235/monitor
```
![Monitoring Dashboard](https://via.placeholder.com/800x400?text=Crawl4AI+Monitoring+Dashboard)
**Dashboard Features:**
#### 1. System Health Overview
- **CPU & Memory**: Live usage with progress bars and percentage indicators
- **Network I/O**: Total bytes sent/received since startup
- **Server Uptime**: How long your server has been running
- **Browser Pool Status**:
- 🔥 Permanent browser (always-on default config, ~270MB)
- ♨️ Hot pool (frequently used configs, ~180MB each)
- ❄️ Cold pool (idle browsers awaiting cleanup, ~180MB each)
- **Memory Pressure**: LOW/MEDIUM/HIGH indicator for janitor behavior
#### 2. Live Request Tracking
- **Active Requests**: Currently running crawls with:
- Request ID for tracking
- Target URL (truncated for display)
- Endpoint being used
- Elapsed time (updates in real-time)
- Memory usage from start
- **Completed Requests**: Last 10 finished requests showing:
- Success/failure status (color-coded)
- Total execution time
- Memory delta (how much memory changed)
- Pool hit (was browser reused?)
- HTTP status code
- **Filtering**: View all, success only, or errors only
#### 3. Browser Pool Management
Interactive table showing all active browsers:
| Type | Signature | Age | Last Used | Hits | Actions |
|------|-----------|-----|-----------|------|---------|
| permanent | abc12345 | 2h | 5s ago | 1,247 | Restart |
| hot | def67890 | 45m | 2m ago | 89 | Kill / Restart |
| cold | ghi11213 | 30m | 15m ago | 3 | Kill / Restart |
- **Reuse Rate**: Percentage of requests that reused existing browsers
- **Memory Estimates**: Total memory used by browser pool
- **Manual Control**: Kill or restart individual browsers
#### 4. Janitor Events Log
Real-time log of browser pool cleanup events:
- When cold browsers are closed due to memory pressure
- When browsers are promoted from cold to hot pool
- Forced cleanups triggered manually
- Detailed cleanup reasons and browser signatures
#### 5. Error Monitoring
Recent errors with full context:
- Timestamp
- Endpoint where error occurred
- Target URL
- Error message
- Request ID for correlation
**Live Updates:**
The dashboard connects via WebSocket and refreshes every **2 seconds** with the latest data. Connection status indicator shows when you're connected/disconnected.
---
### Monitor API Endpoints
For programmatic monitoring, automation, and integration with your existing infrastructure:
#### Health & Statistics
**Get System Health**
```bash
curl http://localhost:11235/health
GET /monitor/health
```
Returns current system snapshot:
```json
{
"container": {
"memory_percent": 45.2,
"cpu_percent": 23.1,
"network_sent_mb": 1250.45,
"network_recv_mb": 3421.12,
"uptime_seconds": 7234
},
"pool": {
"permanent": {"active": true, "memory_mb": 270},
"hot": {"count": 3, "memory_mb": 540},
"cold": {"count": 1, "memory_mb": 180},
"total_memory_mb": 990
},
"janitor": {
"next_cleanup_estimate": "adaptive",
"memory_pressure": "MEDIUM"
}
}
```
**Get Request Statistics**
```bash
GET /monitor/requests?status=all&limit=50
```
Query parameters:
- `status`: Filter by `all`, `active`, `completed`, `success`, or `error`
- `limit`: Number of completed requests to return (1-1000)
**Get Browser Pool Details**
```bash
GET /monitor/browsers
```
Returns detailed information about all active browsers:
```json
{
"browsers": [
{
"type": "permanent",
"sig": "abc12345",
"age_seconds": 7234,
"last_used_seconds": 5,
"memory_mb": 270,
"hits": 1247,
"killable": false
},
{
"type": "hot",
"sig": "def67890",
"age_seconds": 2701,
"last_used_seconds": 120,
"memory_mb": 180,
"hits": 89,
"killable": true
}
],
"summary": {
"total_count": 5,
"total_memory_mb": 990,
"reuse_rate_percent": 87.3
}
}
```
**Get Endpoint Performance Statistics**
```bash
GET /monitor/endpoints/stats
```
Returns aggregated metrics per endpoint:
```json
{
"/crawl": {
"count": 1523,
"avg_latency_ms": 2341.5,
"success_rate_percent": 98.2,
"pool_hit_rate_percent": 89.1,
"errors": 27
},
"/md": {
"count": 891,
"avg_latency_ms": 1823.7,
"success_rate_percent": 99.4,
"pool_hit_rate_percent": 92.3,
"errors": 5
}
}
```
**Get Timeline Data**
```bash
GET /monitor/timeline?metric=memory&window=5m
```
Parameters:
- `metric`: `memory`, `requests`, or `browsers`
- `window`: Currently only `5m` (5-minute window, 5-second resolution)
Returns time-series data for charts:
```json
{
"timestamps": [1699564800, 1699564805, 1699564810, ...],
"values": [42.1, 43.5, 41.8, ...]
}
```
#### Logs
**Get Janitor Events**
```bash
GET /monitor/logs/janitor?limit=100
```
**Get Error Log**
```bash
GET /monitor/logs/errors?limit=100
```
---
*(Deployment Scenarios and Complete Examples sections remain the same, maybe update links if examples moved)*
### WebSocket Streaming
For real-time monitoring in your own dashboards or applications:
```bash
WS /monitor/ws
```
**Connection Example (Python):**
```python
import asyncio
import websockets
import json
async def monitor_server():
uri = "ws://localhost:11235/monitor/ws"
async with websockets.connect(uri) as websocket:
print("Connected to Crawl4AI monitor")
while True:
# Receive update every 2 seconds
data = await websocket.recv()
update = json.loads(data)
# Extract key metrics
health = update['health']
active_requests = len(update['requests']['active'])
browsers = len(update['browsers'])
print(f"Memory: {health['container']['memory_percent']:.1f}% | "
f"Active: {active_requests} | "
f"Browsers: {browsers}")
# Check for high memory pressure
if health['janitor']['memory_pressure'] == 'HIGH':
print("⚠️ HIGH MEMORY PRESSURE - Consider cleanup")
asyncio.run(monitor_server())
```
**Update Payload Structure:**
```json
{
"timestamp": 1699564823.456,
"health": { /* System health snapshot */ },
"requests": {
"active": [ /* Currently running */ ],
"completed": [ /* Last 10 completed */ ]
},
"browsers": [ /* All active browsers */ ],
"timeline": {
"memory": { /* Last 5 minutes */ },
"requests": { /* Request rate */ },
"browsers": { /* Pool composition */ }
},
"janitor": [ /* Last 10 cleanup events */ ],
"errors": [ /* Last 10 errors */ ]
}
```
---
### Control Actions
Take manual control when needed:
**Force Immediate Cleanup**
```bash
POST /monitor/actions/cleanup
```
Kills all cold pool browsers immediately (useful when memory is tight):
```json
{
"success": true,
"killed_browsers": 3
}
```
**Kill Specific Browser**
```bash
POST /monitor/actions/kill_browser
Content-Type: application/json
{
"sig": "abc12345" // First 8 chars of browser signature
}
```
Response:
```json
{
"success": true,
"killed_sig": "abc12345",
"pool_type": "hot"
}
```
**Restart Browser**
```bash
POST /monitor/actions/restart_browser
Content-Type: application/json
{
"sig": "permanent" // Or first 8 chars of signature
}
```
For permanent browser, this will close and reinitialize it. For hot/cold browsers, it kills them and lets new requests create fresh ones.
**Reset Statistics**
```bash
POST /monitor/stats/reset
```
Clears endpoint counters (useful for starting fresh after testing).
---
### Production Integration
#### Integration with Existing Monitoring Systems
**Prometheus Integration:**
```bash
# Scrape metrics endpoint
curl http://localhost:11235/metrics
```
**Custom Dashboard Integration:**
```python
# Example: Push metrics to your monitoring system
import asyncio
import websockets
import json
from your_monitoring import push_metric
async def integrate_monitoring():
async with websockets.connect("ws://localhost:11235/monitor/ws") as ws:
while True:
data = json.loads(await ws.recv())
# Push to your monitoring system
push_metric("crawl4ai.memory.percent",
data['health']['container']['memory_percent'])
push_metric("crawl4ai.active_requests",
len(data['requests']['active']))
push_metric("crawl4ai.browser_count",
len(data['browsers']))
```
**Alerting Example:**
```python
import requests
import time
def check_health():
"""Poll health endpoint and alert on issues"""
response = requests.get("http://localhost:11235/monitor/health")
health = response.json()
# Alert on high memory
if health['container']['memory_percent'] > 85:
send_alert(f"High memory: {health['container']['memory_percent']}%")
# Alert on high error rate
stats = requests.get("http://localhost:11235/monitor/endpoints/stats").json()
for endpoint, metrics in stats.items():
if metrics['success_rate_percent'] < 95:
send_alert(f"{endpoint} success rate: {metrics['success_rate_percent']}%")
# Run every minute
while True:
check_health()
time.sleep(60)
```
**Log Aggregation:**
```python
import requests
from datetime import datetime
def aggregate_errors():
"""Fetch and aggregate errors for logging system"""
response = requests.get("http://localhost:11235/monitor/logs/errors?limit=100")
errors = response.json()['errors']
for error in errors:
log_to_system({
'timestamp': datetime.fromtimestamp(error['timestamp']),
'service': 'crawl4ai',
'endpoint': error['endpoint'],
'url': error['url'],
'message': error['error'],
'request_id': error['request_id']
})
```
#### Key Metrics to Track
For production self-hosted deployments, monitor these metrics:
1. **Memory Usage Trends**
- Track `container.memory_percent` over time
- Alert when consistently above 80%
- Prevents OOM kills
2. **Request Success Rates**
- Monitor per-endpoint success rates
- Alert when below 95%
- Indicates crawling issues
3. **Average Latency**
- Track `avg_latency_ms` per endpoint
- Detect performance degradation
- Optimize slow endpoints
4. **Browser Pool Efficiency**
- Monitor `reuse_rate_percent`
- Should be >80% for good efficiency
- Low rates indicate pool churn
5. **Error Frequency**
- Count errors per time window
- Alert on sudden spikes
- Track error patterns
6. **Janitor Activity**
- Monitor cleanup frequency
- Excessive cleanup indicates memory pressure
- Adjust pool settings if needed
---
### Quick Health Check
For simple uptime monitoring:
```bash
curl http://localhost:11235/health
```
Returns:
```json
{
"status": "healthy",
"version": "0.7.4"
}
```
Other useful endpoints:
- `/metrics` - Prometheus metrics
- `/schema` - Full API schema
---
@@ -1350,22 +1818,46 @@ We're here to help you succeed with Crawl4AI! Here's how to get support:
## Summary
In this guide, we've covered everything you need to get started with Crawl4AI's Docker deployment:
- Building and running the Docker container
- Configuring the environment
- Using the interactive playground for testing
- Making API requests with proper typing
- Using the Python SDK
- Leveraging specialized endpoints for screenshots, PDFs, and JavaScript execution
- Connecting via the Model Context Protocol (MCP)
- Monitoring your deployment
Congratulations! You now have everything you need to self-host your own Crawl4AI infrastructure with complete control and visibility.
The new playground interface at `http://localhost:11235/playground` makes it much easier to test configurations and generate the corresponding JSON for API requests.
**What You've Learned:**
- ✅ Multiple deployment options (Docker Hub, Docker Compose, manual builds)
- ✅ Environment configuration and LLM integration
- ✅ Using the interactive playground for testing
- ✅ Making API requests with proper typing (SDK and REST)
- ✅ Specialized endpoints (screenshots, PDFs, JavaScript execution)
- ✅ MCP integration for AI-assisted development
- ✅ **Real-time monitoring dashboard** for operational transparency
- ✅ **Monitor API** for programmatic control and integration
- ✅ Production deployment best practices
For AI application developers, the MCP integration allows tools like Claude Code to directly access Crawl4AI's capabilities without complex API handling.
**Why This Matters:**
Remember, the examples in the `examples` folder are your friends - they show real-world usage patterns that you can adapt for your needs.
By self-hosting Crawl4AI, you:
- 🔒 **Own Your Data**: Everything stays in your infrastructure
- 📊 **See Everything**: Real-time dashboard shows exactly what's happening
- 💰 **Control Costs**: Scale within your resources, no per-request fees
- ⚡ **Maximize Performance**: Direct access with smart browser pooling (10x memory efficiency)
- 🛡️ **Stay Secure**: Keep sensitive workflows behind your firewall
- 🔧 **Customize Freely**: Full control over configs, strategies, and optimizations
Keep exploring, and don't hesitate to reach out if you need help! We're building something amazing together. 🚀
**Next Steps:**
1. **Start Simple**: Deploy with Docker Hub image and test with the playground
2. **Monitor Everything**: Open `http://localhost:11235/monitor` to watch your server
3. **Integrate**: Connect your applications using the Python SDK or REST API
4. **Scale Smart**: Use the monitoring data to optimize your deployment
5. **Go Production**: Set up alerting, log aggregation, and automated cleanup
**Key Resources:**
- 🎮 **Playground**: `http://localhost:11235/playground` - Interactive testing
- 📊 **Monitor Dashboard**: `http://localhost:11235/monitor` - Real-time visibility
- 📖 **Architecture Docs**: `deploy/docker/ARCHITECTURE.md` - Deep technical dive
- 💬 **Discord Community**: Get help and share experiences
- ⭐ **GitHub**: Report issues, contribute, show support
Remember: The monitoring dashboard is your window into your infrastructure. Use it to understand performance, troubleshoot issues, and optimize your deployment. The examples in the `examples` folder show real-world usage patterns you can adapt.
**You're now in control of your web crawling destiny!** 🚀
Happy crawling! 🕷️

View File

@@ -18,7 +18,7 @@ nav:
- "Marketplace Admin": "marketplace/admin/index.html"
- Setup & Installation:
- "Installation": "core/installation.md"
- "Docker Deployment": "core/docker-deployment.md"
- "Self-Hosting Guide": "core/self-hosting.md"
- "Blog & Changelog":
- "Blog Home": "blog/index.md"
- "Changelog": "https://github.com/unclecode/crawl4ai/blob/main/CHANGELOG.md"

View File

@@ -1,297 +0,0 @@
# Crawl4AI Agent - Phase 1 Test Results
**Test Date:** 2025-10-17
**Test Duration:** 4 minutes 14 seconds
**Overall Status:****PASS** (100% success rate)
---
## Executive Summary
All automated tests for the Crawl4AI Agent have **PASSED** successfully:
-**Component Tests:** 4/4 passed (100%)
-**Tool Integration Tests:** 3/3 passed (100%)
-**Multi-turn Scenario Tests:** 8/8 passed (100%)
**Total:** 15/15 tests passed across 3 test suites
---
## Test Suite 1: Component Tests
**Duration:** 2.20 seconds
**Status:** ✅ PASS
Tests the fundamental building blocks of the agent system.
| Component | Status | Description |
|-----------|--------|-------------|
| BrowserManager | ✅ PASS | Singleton pattern verified |
| TerminalUI | ✅ PASS | Rich UI rendering works |
| MCP Server | ✅ PASS | 7 tools registered successfully |
| ChatMode | ✅ PASS | Instance creation successful |
**Key Finding:** All core components initialize correctly and follow expected patterns.
---
## Test Suite 2: Tool Integration Tests
**Duration:** 7.05 seconds
**Status:** ✅ PASS
Tests direct integration with Crawl4AI library.
| Test | Status | Description |
|------|--------|-------------|
| Quick Crawl (Markdown) | ✅ PASS | Single-page extraction works |
| Session Workflow | ✅ PASS | Session lifecycle functions correctly |
| Quick Crawl (HTML) | ✅ PASS | HTML format extraction works |
**Key Finding:** All Crawl4AI integration points work as expected. Markdown handling fixed (using `result.markdown` instead of deprecated `result.markdown_v2`).
---
## Test Suite 3: Multi-turn Scenario Tests
**Duration:** 4 minutes 5 seconds (245.15 seconds)
**Status:** ✅ PASS
**Pass Rate:** 8/8 scenarios (100%)
### Simple Scenarios (2/2 passed)
1. **Single quick crawl** - 14.1s ✅
- Tests basic one-shot crawling
- Tools used: `quick_crawl`
- Agent turns: 3
2. **Session lifecycle** - 28.5s ✅
- Tests session management (start, navigate, close)
- Tools used: `start_session`, `navigate`, `close_session`
- Agent turns: 9 total (3 per turn)
### Medium Scenarios (3/3 passed)
3. **Multi-page crawl with file output** - 25.4s ✅
- Tests crawling multiple URLs and saving results
- Tools used: `quick_crawl` (2x), `Write`
- Agent turns: 6
- **Fix applied:** Improved system prompt to use `Write` tool directly instead of Bash
4. **Session-based data extraction** - 41.3s ✅
- Tests session workflow with data extraction and file saving
- Tools used: `start_session`, `navigate`, `extract_data`, `Write`, `close_session`
- Agent turns: 9
- **Fix applied:** Clear directive in prompt to use `Write` tool for files
5. **Context retention across turns** - 17.4s ✅
- Tests agent's memory across conversation turns
- Tools used: `quick_crawl` (turn 1), none (turn 2 - answered from memory)
- Agent turns: 4
### Complex Scenarios (3/3 passed)
6. **Multi-step task with planning** - 41.2s ✅
- Tests complex task requiring planning and multi-step execution
- Tasks: Crawl 2 sites, compare, create markdown report
- Tools used: `quick_crawl` (2x), `Write`, `Read`
- Agent turns: 8
7. **Session with state manipulation** - 48.6s ✅
- Tests complex session workflow with multiple operations
- Tools used: `start_session`, `navigate`, `extract_data`, `screenshot`, `close_session`
- Agent turns: 13
8. **Error recovery and continuation** - 27.8s ✅
- Tests graceful error handling and recovery
- Scenario: Crawl invalid URL, then valid URL
- Tools used: `quick_crawl` (2x, one fails, one succeeds)
- Agent turns: 6
---
## Critical Fixes Applied
### 1. JSON Serialization Fix
**Issue:** `TurnResult` enum not JSON serializable
**Fix:** Changed all enum returns to use `.value` property
**Files:** `test_scenarios.py`
### 2. System Prompt Improvements
**Issue:** Agent was using Bash for file operations instead of Write tool
**Fix:** Added explicit directives in system prompt:
- "For FILE OPERATIONS: Use Write, Read, Edit tools DIRECTLY"
- "DO NOT use Bash for file operations unless explicitly required"
- Added concrete workflow examples showing correct tool usage
**Files:** `c4ai_prompts.py`
**Impact:**
- Before: 6/8 scenarios passing (75%)
- After: 8/8 scenarios passing (100%)
### 3. Test Scenario Adjustments
**Issue:** Prompts were ambiguous about tool selection
**Fix:** Made prompts more explicit:
- "Use the Write tool to save..." instead of just "save to file"
- Increased timeout for file operations from 20s to 30s
**Files:** `test_scenarios.py`
---
## Performance Metrics
| Metric | Value |
|--------|-------|
| Total test duration | 254.39 seconds (~4.2 minutes) |
| Average scenario duration | 30.6 seconds |
| Fastest scenario | 14.1s (Single quick crawl) |
| Slowest scenario | 48.6s (Session with state manipulation) |
| Total agent turns | 68 across all scenarios |
| Average turns per scenario | 8.5 |
---
## Tool Usage Analysis
### Most Used Tools
1. `quick_crawl` - 12 uses (single-page extraction)
2. `Write` - 4 uses (file operations)
3. `start_session` / `close_session` - 3 uses each (session management)
4. `navigate` - 3 uses (session navigation)
5. `extract_data` - 2 uses (data extraction from sessions)
### Tool Behavior Observations
- Agent correctly chose between quick_crawl (simple) vs session mode (complex)
- File operations now consistently use `Write` tool (no Bash fallback)
- Sessions always properly closed (no resource leaks)
- Error handling works gracefully (invalid URLs don't crash agent)
---
## Test Infrastructure
### Automated Test Runner
**File:** `run_all_tests.py`
**Features:**
- Runs all 3 test suites in sequence
- Stops on critical failures (component/tool tests)
- Generates JSON report with detailed results
- Provides colored console output
- Tracks timing and pass rates
### Test Organization
```
crawl4ai/agent/
├── test_chat.py # Component tests (4 tests)
├── test_tools.py # Tool integration (3 tests)
├── test_scenarios.py # Multi-turn scenarios (8 scenarios)
└── run_all_tests.py # Orchestrator
```
### Output Artifacts
```
test_agent_output/
├── test_results.json # Detailed scenario results
├── test_suite_report.json # Overall test summary
├── TEST_REPORT.md # This report
└── *.txt, *.md # Test-generated files (cleaned up)
```
---
## Success Criteria Verification
**All component tests pass** (4/4)
**All tool tests pass** (3/3)
**≥80% scenario tests pass** (8/8 = 100%, exceeds requirement)
**No crashes, exceptions, or hangs**
**Browser cleanup verified**
**Conclusion:** System ready for Phase 2 (Evaluation Framework)
---
## Next Steps: Phase 2 - Evaluation Framework
Now that automated testing passes, the next phase involves building an **evaluation framework** to measure **agent quality**, not just correctness.
### Proposed Evaluation Metrics
1. **Task Completion Rate**
- Percentage of tasks completed successfully
- Currently: 100% (but need more diverse/realistic tasks)
2. **Tool Selection Accuracy**
- Are tools chosen optimally for each task?
- Measure: Expected tools vs actual tools used
3. **Context Retention**
- How well does agent maintain conversation context?
- Already tested: 1 scenario passes
4. **Planning Effectiveness**
- Quality of multi-step plans
- Measure: Plan coherence, step efficiency
5. **Error Recovery**
- How gracefully does agent handle failures?
- Already tested: 1 scenario passes
6. **Token Efficiency**
- Number of tokens used per task
- Number of turns required
7. **Response Quality**
- Clarity of explanations
- Completeness of summaries
### Evaluation Framework Design
**Proposed Structure:**
```python
# New files to create:
crawl4ai/agent/eval/
metrics.py # Metric definitions
scorers.py # Scoring functions
eval_scenarios.py # Real-world test cases
run_eval.py # Evaluation runner
report_generator.py # Results analysis
```
**Approach:**
1. Define 20-30 realistic web scraping tasks
2. Run agent on each, collect detailed metrics
3. Score against ground truth / expert baselines
4. Generate comparative reports
5. Identify improvement areas
---
## Appendix: System Configuration
**Test Environment:**
- Python: 3.10
- Operating System: macOS (Darwin 24.3.0)
- Working Directory: `/Users/unclecode/devs/crawl4ai`
- Output Directory: `test_agent_output/`
**Agent Configuration:**
- Model: Claude Sonnet 4.5 (`claude-sonnet-4-5-20250929`)
- Permission Mode: `acceptEdits` (auto-accepts file operations)
- MCP Server: Crawl4AI with 7 custom tools
- Built-in Tools: Read, Write, Edit, Glob, Grep, Bash
**Browser Configuration:**
- Browser Type: Chromium (headless)
- Singleton Pattern: One instance for all operations
- Manual Lifecycle: Explicit start()/close()
---
**Test Conducted By:** Claude (AI Assistant)
**Report Generated:** 2025-10-17T12:53:00
**Status:** ✅ READY FOR EVALUATION PHASE

View File

@@ -1,241 +0,0 @@
[
{
"scenario": "Single quick crawl",
"category": "simple",
"status": "PASS",
"duration_seconds": 14.10268497467041,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
}
]
},
{
"scenario": "Session lifecycle",
"category": "simple",
"status": "PASS",
"duration_seconds": 28.519093990325928,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__start_session"
],
"agent_turns": 3
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__navigate"
],
"agent_turns": 3
},
{
"turn": 3,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__close_session"
],
"agent_turns": 3
}
]
},
{
"scenario": "Multi-page crawl with file output",
"category": "medium",
"status": "PASS",
"duration_seconds": 25.359731912612915,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl",
"mcp__crawler__quick_crawl"
],
"agent_turns": 4
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"Write"
],
"agent_turns": 2
}
]
},
{
"scenario": "Session-based data extraction",
"category": "medium",
"status": "PASS",
"duration_seconds": 41.343281984329224,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data"
],
"agent_turns": 5
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"Write"
],
"agent_turns": 2
},
{
"turn": 3,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__close_session"
],
"agent_turns": 2
}
]
},
{
"scenario": "Context retention across turns",
"category": "medium",
"status": "PASS",
"duration_seconds": 17.36746382713318,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [],
"agent_turns": 1
}
]
},
{
"scenario": "Multi-step task with planning",
"category": "complex",
"status": "PASS",
"duration_seconds": 41.23443412780762,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl",
"mcp__crawler__quick_crawl",
"Write"
],
"agent_turns": 6
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"Read"
],
"agent_turns": 2
}
]
},
{
"scenario": "Session with state manipulation",
"category": "complex",
"status": "PASS",
"duration_seconds": 48.59843707084656,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__start_session",
"mcp__crawler__navigate"
],
"agent_turns": 4
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__extract_data"
],
"agent_turns": 3
},
{
"turn": 3,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__screenshot"
],
"agent_turns": 3
},
{
"turn": 4,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__close_session"
],
"agent_turns": 3
}
]
},
{
"scenario": "Error recovery and continuation",
"category": "complex",
"status": "PASS",
"duration_seconds": 27.769640922546387,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
}
]
}
]

View File

@@ -1,278 +0,0 @@
{
"timestamp": "2025-10-17T12:49:20.390879",
"test_suites": [
{
"name": "Component Tests",
"file": "test_chat.py",
"status": "PASS",
"duration_seconds": 2.1958088874816895,
"tests_run": 4,
"tests_passed": 4,
"tests_failed": 0,
"details": []
},
{
"name": "Tool Integration Tests",
"file": "test_tools.py",
"status": "PASS",
"duration_seconds": 7.04535174369812,
"tests_run": 3,
"tests_passed": 3,
"tests_failed": 0,
"details": []
},
{
"name": "Multi-turn Scenario Tests",
"file": "test_scenarios.py",
"status": "PASS",
"duration_seconds": 245.14656591415405,
"tests_run": 9,
"tests_passed": 8,
"tests_failed": 0,
"details": [
{
"scenario": "Single quick crawl",
"category": "simple",
"status": "PASS",
"duration_seconds": 14.10268497467041,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
}
]
},
{
"scenario": "Session lifecycle",
"category": "simple",
"status": "PASS",
"duration_seconds": 28.519093990325928,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__start_session"
],
"agent_turns": 3
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__navigate"
],
"agent_turns": 3
},
{
"turn": 3,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__close_session"
],
"agent_turns": 3
}
]
},
{
"scenario": "Multi-page crawl with file output",
"category": "medium",
"status": "PASS",
"duration_seconds": 25.359731912612915,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl",
"mcp__crawler__quick_crawl"
],
"agent_turns": 4
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"Write"
],
"agent_turns": 2
}
]
},
{
"scenario": "Session-based data extraction",
"category": "medium",
"status": "PASS",
"duration_seconds": 41.343281984329224,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data"
],
"agent_turns": 5
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"Write"
],
"agent_turns": 2
},
{
"turn": 3,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__close_session"
],
"agent_turns": 2
}
]
},
{
"scenario": "Context retention across turns",
"category": "medium",
"status": "PASS",
"duration_seconds": 17.36746382713318,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [],
"agent_turns": 1
}
]
},
{
"scenario": "Multi-step task with planning",
"category": "complex",
"status": "PASS",
"duration_seconds": 41.23443412780762,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl",
"mcp__crawler__quick_crawl",
"Write"
],
"agent_turns": 6
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"Read"
],
"agent_turns": 2
}
]
},
{
"scenario": "Session with state manipulation",
"category": "complex",
"status": "PASS",
"duration_seconds": 48.59843707084656,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__start_session",
"mcp__crawler__navigate"
],
"agent_turns": 4
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__extract_data"
],
"agent_turns": 3
},
{
"turn": 3,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__screenshot"
],
"agent_turns": 3
},
{
"turn": 4,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__close_session"
],
"agent_turns": 3
}
]
},
{
"scenario": "Error recovery and continuation",
"category": "complex",
"status": "PASS",
"duration_seconds": 27.769640922546387,
"turns": [
{
"turn": 1,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
},
{
"turn": 2,
"status": "PASS",
"reason": "All checks passed",
"tools_used": [
"mcp__crawler__quick_crawl"
],
"agent_turns": 3
}
]
}
],
"pass_rate_percent": 100.0
}
],
"overall_status": "PASS",
"total_duration_seconds": 254.38785314559937
}