feat(agent): implement Claude Code SDK agent with chat mode and persistent browser

Implementation:
- Singleton browser pattern (BrowserManager) - one instance for entire session
- 7 MCP tools for Crawl4AI (quick_crawl, sessions, navigation, extraction, JS execution, screenshots)
- Interactive chat mode with streaming I/O using Claude SDK message generator
- Rich-based terminal UI with markdown rendering and syntax highlighting
- Single-shot and chat modes (--chat flag)
- Comprehensive test suite: component tests, tool tests, 9 multi-turn scenarios

Architecture:
- agent_crawl.py: CLI entry point with SessionStorage (JSONL logging)
- browser_manager.py: Singleton pattern for persistent AsyncWebCrawler
- c4ai_tools.py: MCP tools using @tool decorator, integrated with BrowserManager
- chat_mode.py: Streaming input mode per Claude SDK spec
- terminal_ui.py: Rich-based beautiful terminal output
- test_scenarios.py: Automated multi-turn conversation tests (simple/medium/complex)
- TECH_SPEC.md: Complete AI-to-AI knowledge transfer document

Key fixes:
- Use result.markdown (not deprecated result.markdown_v2)
- Handle both str and MarkdownGenerationResult types
- Track current URL per session for extract_data/execute_js/screenshot tools
- Manual browser lifecycle (start/close) instead of context managers

Tools enabled:
- Crawl4AI: quick_crawl, start_session, navigate, extract_data, execute_js, screenshot, close_session
- Claude SDK built-in: Read, Write, Edit, Glob, Grep, Bash, NotebookEdit

Total: 12 files, 2820 lines
This commit is contained in:
unclecode
2025-10-17 12:25:45 +08:00
parent 216019f29a
commit 31741e571a
12 changed files with 2820 additions and 0 deletions

429
crawl4ai/agent/TECH_SPEC.md Normal file
View File

@@ -0,0 +1,429 @@
# Crawl4AI Agent Technical Specification
*AI-to-AI Knowledge Transfer Document*
## Context Documents
**MUST READ FIRST:**
1. `/Users/unclecode/devs/crawl4ai/tmp/CRAWL4AI_SDK.md` - Crawl4AI complete API reference
2. `/Users/unclecode/devs/crawl4ai/tmp/cc_stream.md` - Claude SDK streaming input mode
3. `/Users/unclecode/devs/crawl4ai/tmp/CC_PYTHON_SDK.md` - Claude Code Python SDK complete reference
## Architecture Overview
**Core Principle:** Singleton browser instance + streaming chat mode + MCP tools
```
┌─────────────────────────────────────────────────────────────┐
│ Agent Entry Point │
│ agent_crawl.py (CLI: --chat | single-shot) │
└─────────────────────────────────────────────────────────────┘
┌───────────────────┼───────────────────┐
│ │ │
[Chat Mode] [Single-shot] [Browser Manager]
│ │ │
▼ ▼ ▼
ChatMode.run() CrawlAgent.run() BrowserManager
- Streaming - One prompt (Singleton)
- Interactive - Exit after │
- Commands - Uses same ▼
│ browser AsyncWebCrawler
│ │ (persistent)
└───────────────────┴────────────────┘
┌───────┴────────┐
│ │
MCP Tools Claude SDK
(Crawl4AI) (Built-in)
│ │
┌───────────┴────┐ ┌──────┴──────┐
│ │ │ │
quick_crawl session Read Edit
navigate tools Write Glob
extract_data Bash Grep
execute_js
screenshot
close_session
```
## File Structure
```
crawl4ai/agent/
├── __init__.py # Module exports
├── agent_crawl.py # Main CLI entry (190 lines)
│ ├── SessionStorage # JSONL logging to ~/.crawl4ai/agents/projects/
│ ├── CrawlAgent # Single-shot wrapper
│ └── main() # CLI parser (--chat flag)
├── browser_manager.py # Singleton pattern (70 lines)
│ └── BrowserManager # Class methods only, no instances
│ ├── get_browser() # Returns singleton AsyncWebCrawler
│ ├── reconfigure_browser()
│ ├── close_browser()
│ └── is_browser_active()
├── c4ai_tools.py # 7 MCP tools (310 lines)
│ ├── @tool decorators # Claude SDK decorator
│ ├── CRAWLER_SESSIONS # Dict[str, AsyncWebCrawler] for named sessions
│ ├── CRAWLER_SESSION_URLS # Dict[str, str] track current URL per session
│ └── CRAWL_TOOLS # List of tool functions
├── c4ai_prompts.py # System prompt (130 lines)
│ └── SYSTEM_PROMPT # Agent behavior definition
├── terminal_ui.py # Rich-based UI (120 lines)
│ └── TerminalUI # Console rendering
│ ├── show_header()
│ ├── print_markdown()
│ ├── print_code()
│ └── with_spinner()
├── chat_mode.py # Streaming chat (160 lines)
│ └── ChatMode
│ ├── message_generator() # AsyncGenerator per cc_stream.md
│ ├── _handle_command() # /exit /clear /help /browser
│ └── run() # Main chat loop
├── test_tools.py # Direct tool tests (130 lines)
├── test_chat.py # Component tests (90 lines)
└── test_scenarios.py # Multi-turn scenarios (500 lines)
├── SIMPLE_SCENARIOS
├── MEDIUM_SCENARIOS
├── COMPLEX_SCENARIOS
└── ScenarioRunner
```
## Critical Implementation Details
### 1. Browser Singleton Pattern
**Key:** ONE browser instance for ENTIRE agent session
```python
# browser_manager.py
class BrowserManager:
_crawler: Optional[AsyncWebCrawler] = None # Singleton
_config: Optional[BrowserConfig] = None
@classmethod
async def get_browser(cls, config=None) -> AsyncWebCrawler:
if cls._crawler is None:
cls._crawler = AsyncWebCrawler(config or BrowserConfig())
await cls._crawler.start() # Manual lifecycle
return cls._crawler
```
**Behavior:**
- First call: creates browser with `config` (or default)
- Subsequent calls: returns same instance, **ignores config param**
- To change config: `reconfigure_browser(new_config)` (closes old, creates new)
- Tools use: `crawler = await BrowserManager.get_browser()`
- No `async with` context manager - manual `start()` / `close()`
### 2. Tool Architecture
**Two types of browser usage:**
**A) Quick operations** (quick_crawl):
```python
@tool("quick_crawl", ...)
async def quick_crawl(args):
crawler = await BrowserManager.get_browser() # Singleton
result = await crawler.arun(url=args["url"], config=run_config)
# No close - browser stays alive
```
**B) Named sessions** (start_session, navigate, extract_data, etc.):
```python
CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {} # Named refs
CRAWLER_SESSION_URLS: Dict[str, str] = {} # Track current URL
@tool("start_session", ...)
async def start_session(args):
crawler = await BrowserManager.get_browser()
CRAWLER_SESSIONS[args["session_id"]] = crawler # Store ref
@tool("navigate", ...)
async def navigate(args):
crawler = CRAWLER_SESSIONS[args["session_id"]]
result = await crawler.arun(url=args["url"], ...)
CRAWLER_SESSION_URLS[args["session_id"]] = result.url # Track URL
@tool("extract_data", ...)
async def extract_data(args):
crawler = CRAWLER_SESSIONS[args["session_id"]]
current_url = CRAWLER_SESSION_URLS[args["session_id"]] # Must have URL
result = await crawler.arun(url=current_url, ...) # Re-crawl current page
@tool("close_session", ...)
async def close_session(args):
CRAWLER_SESSIONS.pop(args["session_id"]) # Remove ref
CRAWLER_SESSION_URLS.pop(args["session_id"], None)
# Browser stays alive (singleton)
```
**Important:** Named sessions are just **references** to singleton browser. Multiple sessions = same browser instance.
### 3. Markdown Handling (CRITICAL BUG FIX)
**OLD (WRONG):**
```python
result.markdown_v2.raw_markdown # DEPRECATED
```
**NEW (CORRECT):**
```python
# result.markdown can be:
# - str (simple mode)
# - MarkdownGenerationResult object (with filters)
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
```
Reference: `CRAWL4AI_SDK.md` line 614 - `markdown_v2` deprecated, use `markdown`
### 4. Chat Mode Streaming Input
**Per cc_stream.md:** Use message generator pattern
```python
# chat_mode.py
async def message_generator(self) -> AsyncGenerator[Dict[str, Any], None]:
while not self._exit_requested:
user_input = await asyncio.to_thread(self.ui.get_user_input)
if user_input.startswith('/'):
await self._handle_command(user_input)
continue
# Yield in streaming input format
yield {
"type": "user",
"message": {
"role": "user",
"content": user_input
}
}
async def run(self):
async with ClaudeSDKClient(options=self.options) as client:
await client.query(self.message_generator()) # Pass generator
async for message in client.receive_messages():
# Process streaming responses
```
**Key:** Generator keeps yielding user inputs, SDK streams responses back.
### 5. Claude SDK Integration
**Setup:**
```python
from claude_agent_sdk import tool, create_sdk_mcp_server, ClaudeSDKClient, ClaudeAgentOptions
# 1. Define tools with @tool decorator
@tool("quick_crawl", "description", {"url": str, "output_format": str})
async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]:
return {"content": [{"type": "text", "text": json.dumps(result)}]}
# 2. Create MCP server
crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=[quick_crawl, start_session, ...] # List of @tool functions
)
# 3. Configure options
options = ClaudeAgentOptions(
mcp_servers={"crawler": crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl", # Format: mcp__{server}__{tool}
"mcp__crawler__start_session",
# Built-in tools:
"Read", "Write", "Edit", "Glob", "Grep", "Bash", "NotebookEdit"
],
system_prompt=SYSTEM_PROMPT,
permission_mode="acceptEdits"
)
# 4. Use client
async with ClaudeSDKClient(options=options) as client:
await client.query(prompt_or_generator)
async for message in client.receive_messages():
# Process AssistantMessage, ResultMessage, etc.
```
**Tool response format:**
```python
return {
"content": [{
"type": "text",
"text": json.dumps({"success": True, "data": "..."})
}]
}
```
## Operating Modes
### Single-Shot Mode
```bash
python -m crawl4ai.agent.agent_crawl "Crawl example.com"
```
- One prompt → execute → exit
- Uses singleton browser
- No cleanup of browser (process exit handles it)
### Chat Mode
```bash
python -m crawl4ai.agent.agent_crawl --chat
```
- Interactive loop with streaming I/O
- Commands: `/exit` `/clear` `/help` `/browser`
- Browser persists across all turns
- Cleanup on exit: `BrowserManager.close_browser()`
## Testing Architecture
**3 test levels:**
1. **Component tests** (`test_chat.py`): Non-interactive, tests individual classes
2. **Tool tests** (`test_tools.py`): Direct AsyncWebCrawler calls, validates Crawl4AI integration
3. **Scenario tests** (`test_scenarios.py`): Automated multi-turn conversations
- Injects messages programmatically
- Validates tool calls, keywords, files created
- Categories: SIMPLE (2), MEDIUM (3), COMPLEX (4)
## Dependencies
```python
# External
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from claude_agent_sdk import (
tool, create_sdk_mcp_server, ClaudeSDKClient, ClaudeAgentOptions,
AssistantMessage, TextBlock, ResultMessage, ToolUseBlock
)
from rich.console import Console # Already installed
from rich.markdown import Markdown
from rich.syntax import Syntax
# Stdlib
import asyncio, json, uuid, argparse
from pathlib import Path
from typing import Optional, Dict, Any, AsyncGenerator
```
## Common Pitfalls
1. **DON'T** use `async with AsyncWebCrawler()` - breaks singleton pattern
2. **DON'T** use `result.markdown_v2` - deprecated field
3. **DON'T** call `crawler.arun()` without URL in session tools - needs current_url
4. **DON'T** close browser in tools - managed by BrowserManager
5. **DON'T** use `break` in message iteration - causes asyncio issues
6. **DO** track session URLs in `CRAWLER_SESSION_URLS` for session tools
7. **DO** handle both `str` and `MarkdownGenerationResult` for `result.markdown`
8. **DO** use manual lifecycle `await crawler.start()` / `await crawler.close()`
## Session Storage
**Location:** `~/.crawl4ai/agents/projects/{sanitized_cwd}/{uuid}.jsonl`
**Format:** JSONL with events:
```json
{"timestamp": "...", "event": "session_start", "data": {...}}
{"timestamp": "...", "event": "user_message", "data": {"text": "..."}}
{"timestamp": "...", "event": "assistant_message", "data": {"turn": 1, "text": "..."}}
{"timestamp": "...", "event": "session_end", "data": {"duration_ms": 1000, ...}}
```
## CLI Options
```
--chat Interactive chat mode
--model MODEL Claude model override
--permission-mode MODE acceptEdits|bypassPermissions|default|plan
--add-dir DIR [DIR...] Additional accessible directories
--system-prompt TEXT Custom system prompt
--session-id UUID Resume/specify session
--debug Full tracebacks
```
## Performance Characteristics
- **Browser startup:** ~2-4s (once per session)
- **Quick crawl:** ~1-2s (reuses browser)
- **Session operations:** ~1-2s (same browser)
- **Chat latency:** Real-time streaming, no buffering
- **Memory:** One browser instance regardless of operations
## Extension Points
1. **New tools:** Add `@tool` function → add to `CRAWL_TOOLS` → add to `allowed_tools`
2. **New commands:** Add handler in `ChatMode._handle_command()`
3. **Custom UI:** Replace `TerminalUI` with different renderer
4. **Persistent sessions:** Serialize browser cookies/state to disk in `BrowserManager`
5. **Multi-browser:** Modify `BrowserManager` to support multiple configs (not recommended)
## Next Steps: Testing & Evaluation Pipeline
### Phase 1: Automated Testing (CURRENT)
**Objective:** Verify codebase correctness, not agent quality
**Test Execution:**
```bash
# 1. Component tests (fast, non-interactive)
python crawl4ai/agent/test_chat.py
# Expected: All components instantiate correctly
# 2. Tool integration tests (medium, requires browser)
python crawl4ai/agent/test_tools.py
# Expected: All 7 tools work with Crawl4AI
# 3. Multi-turn scenario tests (slow, comprehensive)
python crawl4ai/agent/test_scenarios.py
# Expected: 9 scenarios pass (2 simple, 3 medium, 4 complex)
# Output: test_agent_output/test_results.json
```
**Success Criteria:**
- All component tests pass
- All tool tests pass
- ≥80% scenario tests pass (7/9)
- No crashes, exceptions, or hangs
- Browser cleanup verified
**Automated Pipeline:**
```bash
# Run all tests in sequence, exit on first failure
cd /Users/unclecode/devs/crawl4ai
python crawl4ai/agent/test_chat.py && \
python crawl4ai/agent/test_tools.py && \
python crawl4ai/agent/test_scenarios.py
echo "Exit code: $?" # 0 = all passed
```
### Phase 2: Evaluation (NEXT)
**Objective:** Measure agent performance quality
**Metrics to define:**
- Task completion rate
- Tool selection accuracy
- Context retention across turns
- Planning effectiveness
- Error recovery capability
**Eval framework needed:**
- Expand scenario tests with quality scoring
- Add ground truth comparisons
- Measure token efficiency
- Track reasoning quality
**Not in scope yet** - wait for Phase 1 completion
---
**Last Updated:** 2025-01-17
**Version:** 1.0.0
**Status:** Testing Phase - Ready for automated test runs

View File

@@ -0,0 +1,13 @@
# __init__.py
"""Crawl4AI Agent - Browser automation agent powered by Claude Code SDK."""
from .c4ai_tools import CRAWL_TOOLS
from .c4ai_prompts import SYSTEM_PROMPT
from .agent_crawl import CrawlAgent, SessionStorage
__all__ = [
"CRAWL_TOOLS",
"SYSTEM_PROMPT",
"CrawlAgent",
"SessionStorage",
]

View File

@@ -0,0 +1,593 @@
```python
# c4ai_tools.py
"""Crawl4AI tools for Claude Code SDK agent."""
import json
import asyncio
from typing import Any, Dict
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from claude_agent_sdk import tool
# Global session storage
CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {}
@tool("quick_crawl", "One-shot crawl for simple extraction. Returns markdown, HTML, or structured data.", {
"url": str,
"output_format": str, # "markdown" | "html" | "structured" | "screenshot"
"extraction_schema": str, # Optional: JSON schema for structured extraction
"js_code": str, # Optional: JavaScript to execute before extraction
"wait_for": str, # Optional: CSS selector to wait for
})
async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]:
"""Fast single-page crawl without session management."""
crawler_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code=args.get("js_code"),
wait_for=args.get("wait_for"),
)
# Add extraction strategy if structured data requested
if args.get("extraction_schema"):
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(args["extraction_schema"]),
instruction="Extract data according to the provided schema."
)
async with AsyncWebCrawler(config=crawler_config) as crawler:
result = await crawler.arun(url=args["url"], config=run_config)
if not result.success:
return {
"content": [{
"type": "text",
"text": json.dumps({"error": result.error_message, "success": False})
}]
}
output_map = {
"markdown": result.markdown_v2.raw_markdown if result.markdown_v2 else "",
"html": result.html,
"structured": result.extracted_content,
"screenshot": result.screenshot,
}
response = {
"success": True,
"url": result.url,
"data": output_map.get(args["output_format"], result.markdown_v2.raw_markdown)
}
return {"content": [{"type": "text", "text": json.dumps(response, indent=2)}]}
@tool("start_session", "Start a persistent browser session for multi-step crawling and automation.", {
"session_id": str,
"headless": bool, # Default True
})
async def start_session(args: Dict[str, Any]) -> Dict[str, Any]:
"""Initialize a persistent crawler session."""
session_id = args["session_id"]
if session_id in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} already exists",
"success": False
})}]}
crawler_config = BrowserConfig(
headless=args.get("headless", True),
verbose=False
)
crawler = AsyncWebCrawler(config=crawler_config)
await crawler.__aenter__()
CRAWLER_SESSIONS[session_id] = crawler
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"session_id": session_id,
"message": f"Browser session {session_id} started"
})}]}
@tool("navigate", "Navigate to a URL in an active session.", {
"session_id": str,
"url": str,
"wait_for": str, # Optional: CSS selector to wait for
"js_code": str, # Optional: JavaScript to execute after load
})
async def navigate(args: Dict[str, Any]) -> Dict[str, Any]:
"""Navigate to URL in session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_for=args.get("wait_for"),
js_code=args.get("js_code"),
)
result = await crawler.arun(url=args["url"], config=run_config)
return {"content": [{"type": "text", "text": json.dumps({
"success": result.success,
"url": result.url,
"message": f"Navigated to {args['url']}"
})}]}
@tool("extract_data", "Extract data from current page in session using schema or return markdown.", {
"session_id": str,
"output_format": str, # "markdown" | "structured"
"extraction_schema": str, # Required for structured, JSON schema
"wait_for": str, # Optional: Wait for element before extraction
"js_code": str, # Optional: Execute JS before extraction
})
async def extract_data(args: Dict[str, Any]) -> Dict[str, Any]:
"""Extract data from current page."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_for=args.get("wait_for"),
js_code=args.get("js_code"),
)
if args["output_format"] == "structured" and args.get("extraction_schema"):
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(args["extraction_schema"]),
instruction="Extract data according to schema."
)
result = await crawler.arun(config=run_config)
if not result.success:
return {"content": [{"type": "text", "text": json.dumps({
"error": result.error_message,
"success": False
})}]}
data = (result.extracted_content if args["output_format"] == "structured"
else result.markdown_v2.raw_markdown if result.markdown_v2 else "")
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"data": data
}, indent=2)}]}
@tool("execute_js", "Execute JavaScript in the current page context.", {
"session_id": str,
"js_code": str,
"wait_for": str, # Optional: Wait for element after execution
})
async def execute_js(args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute JavaScript in session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code=args["js_code"],
wait_for=args.get("wait_for"),
)
result = await crawler.arun(config=run_config)
return {"content": [{"type": "text", "text": json.dumps({
"success": result.success,
"message": "JavaScript executed"
})}]}
@tool("screenshot", "Take a screenshot of the current page.", {
"session_id": str,
})
async def screenshot(args: Dict[str, Any]) -> Dict[str, Any]:
"""Capture screenshot."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
result = await crawler.arun(config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS))
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"screenshot": result.screenshot if result.success else None
})}]}
@tool("close_session", "Close and cleanup a browser session.", {
"session_id": str,
})
async def close_session(args: Dict[str, Any]) -> Dict[str, Any]:
"""Close crawler session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS.pop(session_id)
await crawler.__aexit__(None, None, None)
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"message": f"Session {session_id} closed"
})}]}
# Export all tools
CRAWL_TOOLS = [
quick_crawl,
start_session,
navigate,
extract_data,
execute_js,
screenshot,
close_session,
]
```
```python
# c4ai_prompts.py
"""System prompts for Crawl4AI agent."""
SYSTEM_PROMPT = """You are an expert web crawling and browser automation agent powered by Crawl4AI.
# Core Capabilities
You can perform sophisticated multi-step web scraping and automation tasks through two modes:
## Quick Mode (simple tasks)
- Use `quick_crawl` for single-page data extraction
- Best for: simple scrapes, getting page content, one-time extractions
## Session Mode (complex tasks)
- Use `start_session` to create persistent browser sessions
- Navigate, interact, extract data across multiple pages
- Essential for: workflows requiring JS execution, pagination, filtering, multi-step automation
# Tool Usage Patterns
## Simple Extraction
1. Use `quick_crawl` with appropriate output_format
2. Provide extraction_schema for structured data
## Multi-Step Workflow
1. `start_session` - Create browser session with unique ID
2. `navigate` - Go to target URL
3. `execute_js` - Interact with page (click buttons, scroll, fill forms)
4. `extract_data` - Get data using schema or markdown
5. Repeat steps 2-4 as needed
6. `close_session` - Clean up when done
# Critical Instructions
1. **Iteration & Validation**: When tasks require filtering or conditional logic:
- Extract data first, analyze results
- Filter/validate in your reasoning
- Make subsequent tool calls based on validation
- Continue until task criteria are met
2. **Structured Extraction**: Always use JSON schemas for structured data:
```json
{
"type": "object",
"properties": {
"field_name": {"type": "string"},
"price": {"type": "number"}
}
}
```
3. **Session Management**:
- Generate unique session IDs (e.g., "product_scrape_001")
- Always close sessions when done
- Use sessions for tasks requiring multiple page visits
4. **JavaScript Execution**:
- Use for: clicking buttons, scrolling, waiting for dynamic content
- Example: `js_code: "document.querySelector('.load-more').click()"`
- Combine with `wait_for` to ensure content loads
5. **Error Handling**:
- Check `success` field in all responses
- Retry with different strategies if extraction fails
- Report specific errors to user
6. **Data Persistence**:
- Save results using `Write` tool to JSON files
- Use descriptive filenames with timestamps
- Structure data clearly for user consumption
# Example Workflows
## Workflow 1: Filter & Crawl
Task: "Find products >$10, crawl each, extract details"
1. `quick_crawl` product listing page with schema for [name, price, url]
2. Analyze results, filter price > 10 in reasoning
3. `start_session` for detailed crawling
4. For each filtered product:
- `navigate` to product URL
- `extract_data` with detail schema
5. Aggregate results
6. `close_session`
7. `Write` results to JSON
## Workflow 2: Paginated Scraping
Task: "Scrape all items across multiple pages"
1. `start_session`
2. `navigate` to page 1
3. `extract_data` items from current page
4. Check for "next" button
5. `execute_js` to click next
6. Repeat 3-5 until no more pages
7. `close_session`
8. Save aggregated data
## Workflow 3: Dynamic Content
Task: "Scrape reviews after clicking 'Load More'"
1. `start_session`
2. `navigate` to product page
3. `execute_js` to click load more button
4. `wait_for` reviews container
5. `extract_data` all reviews
6. `close_session`
# Quality Guidelines
- **Be thorough**: Don't stop until task requirements are fully met
- **Validate data**: Check extracted data matches expected format
- **Handle edge cases**: Empty results, pagination limits, rate limiting
- **Clear reporting**: Summarize what was found, any issues encountered
- **Efficient**: Use quick_crawl when possible, sessions only when needed
# Output Format
When saving data, use clean JSON structure:
```json
{
"metadata": {
"scraped_at": "ISO timestamp",
"source_url": "...",
"total_items": 0
},
"data": [...]
}
```
Always provide a final summary of:
- Items found/processed
- Time taken
- Files created
- Any warnings/errors
Remember: You have unlimited turns to complete the task. Take your time, validate each step, and ensure quality results."""
```
```python
# agent_crawl.py
"""Crawl4AI Agent CLI - Browser automation agent powered by Claude Code SDK."""
import asyncio
import sys
import json
import uuid
from pathlib import Path
from datetime import datetime
from typing import Optional
import argparse
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server
from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage
from c4ai_tools import CRAWL_TOOLS
from c4ai_prompts import SYSTEM_PROMPT
class SessionStorage:
"""Manage session storage in ~/.crawl4ai/agents/projects/"""
def __init__(self, cwd: Optional[str] = None):
self.cwd = Path(cwd) if cwd else Path.cwd()
self.base_dir = Path.home() / ".crawl4ai" / "agents" / "projects"
self.project_dir = self.base_dir / self._sanitize_path(str(self.cwd.resolve()))
self.project_dir.mkdir(parents=True, exist_ok=True)
self.session_id = str(uuid.uuid4())
self.log_file = self.project_dir / f"{self.session_id}.jsonl"
@staticmethod
def _sanitize_path(path: str) -> str:
"""Convert /Users/unclecode/devs/test to -Users-unclecode-devs-test"""
return path.replace("/", "-").replace("\\", "-")
def log(self, event_type: str, data: dict):
"""Append event to JSONL log."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": event_type,
"session_id": self.session_id,
"data": data
}
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
def get_session_path(self) -> str:
"""Return path to current session log."""
return str(self.log_file)
class CrawlAgent:
"""Crawl4AI agent wrapper."""
def __init__(self, args: argparse.Namespace):
self.args = args
self.storage = SessionStorage(args.add_dir[0] if args.add_dir else None)
self.client: Optional[ClaudeSDKClient] = None
# Create MCP server with crawl tools
self.crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=CRAWL_TOOLS
)
# Build options
self.options = ClaudeAgentOptions(
mcp_servers={"crawler": self.crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl",
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data",
"mcp__crawler__execute_js",
"mcp__crawler__screenshot",
"mcp__crawler__close_session",
"Write", "Read", "Bash"
],
system_prompt=SYSTEM_PROMPT if not args.system_prompt else args.system_prompt,
permission_mode=args.permission_mode or "acceptEdits",
cwd=args.add_dir[0] if args.add_dir else str(Path.cwd()),
model=args.model,
session_id=args.session_id or self.storage.session_id,
)
async def run(self, prompt: str):
"""Execute crawl task."""
self.storage.log("session_start", {
"prompt": prompt,
"cwd": self.options.cwd,
"model": self.options.model
})
print(f"\n🕷 Crawl4AI Agent")
print(f"📁 Session: {self.storage.session_id}")
print(f"💾 Log: {self.storage.get_session_path()}")
print(f"🎯 Task: {prompt}\n")
async with ClaudeSDKClient(options=self.options) as client:
self.client = client
await client.query(prompt)
turn = 0
async for message in client.receive_messages():
turn += 1
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"\n💭 [{turn}] {block.text}")
self.storage.log("assistant_message", {"turn": turn, "text": block.text})
elif isinstance(message, ResultMessage):
print(f"\n✅ Completed in {message.duration_ms/1000:.2f}s")
print(f"💰 Cost: ${message.total_cost_usd:.4f}" if message.total_cost_usd else "")
print(f"🔄 Turns: {message.num_turns}")
self.storage.log("session_end", {
"duration_ms": message.duration_ms,
"cost_usd": message.total_cost_usd,
"turns": message.num_turns,
"success": not message.is_error
})
break
print(f"\n📊 Session log: {self.storage.get_session_path()}\n")
def main():
parser = argparse.ArgumentParser(
description="Crawl4AI Agent - Browser automation powered by Claude Code SDK",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("prompt", nargs="?", help="Your crawling task prompt")
parser.add_argument("--system-prompt", help="Custom system prompt")
parser.add_argument("--permission-mode", choices=["acceptEdits", "bypassPermissions", "default", "plan"],
help="Permission mode for tool execution")
parser.add_argument("--model", help="Model to use (e.g., 'sonnet', 'opus')")
parser.add_argument("--add-dir", nargs="+", help="Additional directories for file access")
parser.add_argument("--session-id", help="Use specific session ID (UUID)")
parser.add_argument("-v", "--version", action="version", version="Crawl4AI Agent 1.0.0")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
if not args.prompt:
parser.print_help()
print("\nExample usage:")
print(' crawl-agent "Scrape all products from example.com with price > $10"')
print(' crawl-agent --add-dir ~/projects "Find all Python files and analyze imports"')
sys.exit(1)
try:
agent = CrawlAgent(args)
asyncio.run(agent.run(args.prompt))
except KeyboardInterrupt:
print("\n\n⚠ Interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
if args.debug:
raise
sys.exit(1)
if __name__ == "__main__":
main()
```
**Usage:**
```bash
# Simple scrape
python agent_crawl.py "Get all product names from example.com"
# Complex filtering
python agent_crawl.py "Find products >$10 from shop.com, crawl each, extract id/name/price"
# Multi-step automation
python agent_crawl.py "Go to amazon.com, search 'laptop', filter 4+ stars, scrape top 10"
# With options
python agent_crawl.py --add-dir ~/projects --model sonnet "Scrape competitor prices"
```
**Session logs stored at:**
`~/.crawl4ai/agents/projects/-Users-unclecode-devs-test/{uuid}.jsonl`

View File

@@ -0,0 +1,202 @@
# agent_crawl.py
"""Crawl4AI Agent CLI - Browser automation agent powered by Claude Code SDK."""
import asyncio
import sys
import json
import uuid
from pathlib import Path
from datetime import datetime
from typing import Optional
import argparse
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server
from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage
from .c4ai_tools import CRAWL_TOOLS
from .c4ai_prompts import SYSTEM_PROMPT
from .terminal_ui import TerminalUI
from .chat_mode import ChatMode
class SessionStorage:
"""Manage session storage in ~/.crawl4ai/agents/projects/"""
def __init__(self, cwd: Optional[str] = None):
self.cwd = Path(cwd) if cwd else Path.cwd()
self.base_dir = Path.home() / ".crawl4ai" / "agents" / "projects"
self.project_dir = self.base_dir / self._sanitize_path(str(self.cwd.resolve()))
self.project_dir.mkdir(parents=True, exist_ok=True)
self.session_id = str(uuid.uuid4())
self.log_file = self.project_dir / f"{self.session_id}.jsonl"
@staticmethod
def _sanitize_path(path: str) -> str:
"""Convert /Users/unclecode/devs/test to -Users-unclecode-devs-test"""
return path.replace("/", "-").replace("\\", "-")
def log(self, event_type: str, data: dict):
"""Append event to JSONL log."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"event": event_type,
"session_id": self.session_id,
"data": data
}
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
def get_session_path(self) -> str:
"""Return path to current session log."""
return str(self.log_file)
class CrawlAgent:
"""Crawl4AI agent wrapper."""
def __init__(self, args: argparse.Namespace):
self.args = args
self.storage = SessionStorage(args.add_dir[0] if args.add_dir else None)
self.client: Optional[ClaudeSDKClient] = None
# Create MCP server with crawl tools
self.crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=CRAWL_TOOLS
)
# Build options
self.options = ClaudeAgentOptions(
mcp_servers={"crawler": self.crawler_server},
allowed_tools=[
# Crawl4AI tools
"mcp__crawler__quick_crawl",
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data",
"mcp__crawler__execute_js",
"mcp__crawler__screenshot",
"mcp__crawler__close_session",
# Claude Code SDK built-in tools
"Read",
"Write",
"Edit",
"Glob",
"Grep",
"Bash",
"NotebookEdit"
],
system_prompt=SYSTEM_PROMPT if not args.system_prompt else args.system_prompt,
permission_mode=args.permission_mode or "acceptEdits",
cwd=args.add_dir[0] if args.add_dir else str(Path.cwd()),
model=args.model,
)
async def run(self, prompt: str):
"""Execute crawl task."""
self.storage.log("session_start", {
"prompt": prompt,
"cwd": self.options.cwd,
"model": self.options.model
})
print(f"\n🕷️ Crawl4AI Agent")
print(f"📁 Session: {self.storage.session_id}")
print(f"💾 Log: {self.storage.get_session_path()}")
print(f"🎯 Task: {prompt}\n")
async with ClaudeSDKClient(options=self.options) as client:
self.client = client
await client.query(prompt)
turn = 0
async for message in client.receive_messages():
turn += 1
if isinstance(message, AssistantMessage):
for block in message.content:
if isinstance(block, TextBlock):
print(f"\n💭 [{turn}] {block.text}")
self.storage.log("assistant_message", {"turn": turn, "text": block.text})
elif isinstance(message, ResultMessage):
print(f"\n✅ Completed in {message.duration_ms/1000:.2f}s")
print(f"💰 Cost: ${message.total_cost_usd:.4f}" if message.total_cost_usd else "")
print(f"🔄 Turns: {message.num_turns}")
self.storage.log("session_end", {
"duration_ms": message.duration_ms,
"cost_usd": message.total_cost_usd,
"turns": message.num_turns,
"success": not message.is_error
})
break
print(f"\n📊 Session log: {self.storage.get_session_path()}\n")
def main():
parser = argparse.ArgumentParser(
description="Crawl4AI Agent - Browser automation powered by Claude Code SDK",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("prompt", nargs="?", help="Your crawling task prompt (not used in --chat mode)")
parser.add_argument("--chat", action="store_true", help="Start interactive chat mode")
parser.add_argument("--system-prompt", help="Custom system prompt")
parser.add_argument("--permission-mode", choices=["acceptEdits", "bypassPermissions", "default", "plan"],
help="Permission mode for tool execution")
parser.add_argument("--model", help="Model to use (e.g., 'sonnet', 'opus')")
parser.add_argument("--add-dir", nargs="+", help="Additional directories for file access")
parser.add_argument("--session-id", help="Use specific session ID (UUID)")
parser.add_argument("-v", "--version", action="version", version="Crawl4AI Agent 1.0.0")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
# Chat mode - interactive
if args.chat:
try:
agent = CrawlAgent(args)
ui = TerminalUI()
chat = ChatMode(agent.options, ui, agent.storage)
asyncio.run(chat.run())
except KeyboardInterrupt:
print("\n\n⚠️ Chat interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
if args.debug:
raise
sys.exit(1)
return
# Single-shot mode - requires prompt
if not args.prompt:
parser.print_help()
print("\nExample usage:")
print(' # Single-shot mode:')
print(' crawl-agent "Scrape all products from example.com with price > $10"')
print(' crawl-agent --add-dir ~/projects "Find all Python files and analyze imports"')
print()
print(' # Interactive chat mode:')
print(' crawl-agent --chat')
sys.exit(1)
try:
agent = CrawlAgent(args)
asyncio.run(agent.run(args.prompt))
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
if args.debug:
raise
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,73 @@
"""Browser session management with singleton pattern for persistent browser instances."""
from typing import Optional
from crawl4ai import AsyncWebCrawler, BrowserConfig
class BrowserManager:
"""Singleton browser manager for persistent browser sessions across agent operations."""
_instance: Optional['BrowserManager'] = None
_crawler: Optional[AsyncWebCrawler] = None
_config: Optional[BrowserConfig] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
async def get_browser(cls, config: Optional[BrowserConfig] = None) -> AsyncWebCrawler:
"""
Get or create the singleton browser instance.
Args:
config: Optional browser configuration. Only used if no browser exists yet.
To change config, use reconfigure_browser() instead.
Returns:
AsyncWebCrawler instance
"""
# Create new browser if needed
if cls._crawler is None:
# Create default config if none provided
if config is None:
config = BrowserConfig(headless=True, verbose=False)
cls._crawler = AsyncWebCrawler(config=config)
await cls._crawler.start()
cls._config = config
return cls._crawler
@classmethod
async def reconfigure_browser(cls, new_config: BrowserConfig) -> AsyncWebCrawler:
"""
Close current browser and create a new one with different configuration.
Args:
new_config: New browser configuration
Returns:
New AsyncWebCrawler instance
"""
await cls.close_browser()
return await cls.get_browser(new_config)
@classmethod
async def close_browser(cls):
"""Close the current browser instance and cleanup."""
if cls._crawler is not None:
await cls._crawler.close()
cls._crawler = None
cls._config = None
@classmethod
def is_browser_active(cls) -> bool:
"""Check if browser is currently active."""
return cls._crawler is not None
@classmethod
def get_current_config(cls) -> Optional[BrowserConfig]:
"""Get the current browser configuration."""
return cls._config

View File

@@ -0,0 +1,137 @@
# c4ai_prompts.py
"""System prompts for Crawl4AI agent."""
SYSTEM_PROMPT = """You are an expert web crawling and browser automation agent powered by Crawl4AI.
# Core Capabilities
You can perform sophisticated multi-step web scraping and automation tasks through two modes:
## Quick Mode (simple tasks)
- Use `quick_crawl` for single-page data extraction
- Best for: simple scrapes, getting page content, one-time extractions
## Session Mode (complex tasks)
- Use `start_session` to create persistent browser sessions
- Navigate, interact, extract data across multiple pages
- Essential for: workflows requiring JS execution, pagination, filtering, multi-step automation
# Tool Usage Patterns
## Simple Extraction
1. Use `quick_crawl` with appropriate output_format
2. Provide extraction_schema for structured data
## Multi-Step Workflow
1. `start_session` - Create browser session with unique ID
2. `navigate` - Go to target URL
3. `execute_js` - Interact with page (click buttons, scroll, fill forms)
4. `extract_data` - Get data using schema or markdown
5. Repeat steps 2-4 as needed
6. `close_session` - Clean up when done
# Critical Instructions
1. **Iteration & Validation**: When tasks require filtering or conditional logic:
- Extract data first, analyze results
- Filter/validate in your reasoning
- Make subsequent tool calls based on validation
- Continue until task criteria are met
2. **Structured Extraction**: Always use JSON schemas for structured data:
```json
{
"type": "object",
"properties": {
"field_name": {"type": "string"},
"price": {"type": "number"}
}
}
```
3. **Session Management**:
- Generate unique session IDs (e.g., "product_scrape_001")
- Always close sessions when done
- Use sessions for tasks requiring multiple page visits
4. **JavaScript Execution**:
- Use for: clicking buttons, scrolling, waiting for dynamic content
- Example: `js_code: "document.querySelector('.load-more').click()"`
- Combine with `wait_for` to ensure content loads
5. **Error Handling**:
- Check `success` field in all responses
- Retry with different strategies if extraction fails
- Report specific errors to user
6. **Data Persistence**:
- Save results using `Write` tool to JSON files
- Use descriptive filenames with timestamps
- Structure data clearly for user consumption
# Example Workflows
## Workflow 1: Filter & Crawl
Task: "Find products >$10, crawl each, extract details"
1. `quick_crawl` product listing page with schema for [name, price, url]
2. Analyze results, filter price > 10 in reasoning
3. `start_session` for detailed crawling
4. For each filtered product:
- `navigate` to product URL
- `extract_data` with detail schema
5. Aggregate results
6. `close_session`
7. `Write` results to JSON
## Workflow 2: Paginated Scraping
Task: "Scrape all items across multiple pages"
1. `start_session`
2. `navigate` to page 1
3. `extract_data` items from current page
4. Check for "next" button
5. `execute_js` to click next
6. Repeat 3-5 until no more pages
7. `close_session`
8. Save aggregated data
## Workflow 3: Dynamic Content
Task: "Scrape reviews after clicking 'Load More'"
1. `start_session`
2. `navigate` to product page
3. `execute_js` to click load more button
4. `wait_for` reviews container
5. `extract_data` all reviews
6. `close_session`
# Quality Guidelines
- **Be thorough**: Don't stop until task requirements are fully met
- **Validate data**: Check extracted data matches expected format
- **Handle edge cases**: Empty results, pagination limits, rate limiting
- **Clear reporting**: Summarize what was found, any issues encountered
- **Efficient**: Use quick_crawl when possible, sessions only when needed
# Output Format
When saving data, use clean JSON structure:
```json
{
"metadata": {
"scraped_at": "ISO timestamp",
"source_url": "...",
"total_items": 0
},
"data": [...]
}
```
Always provide a final summary of:
- Items found/processed
- Time taken
- Files created
- Any warnings/errors
Remember: You have unlimited turns to complete the task. Take your time, validate each step, and ensure quality results."""

View File

@@ -0,0 +1,314 @@
# c4ai_tools.py
"""Crawl4AI tools for Claude Code SDK agent."""
import json
import asyncio
from typing import Any, Dict
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from claude_agent_sdk import tool
from .browser_manager import BrowserManager
# Global session storage (for named sessions only)
CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {}
CRAWLER_SESSION_URLS: Dict[str, str] = {} # Track current URL per session
@tool("quick_crawl", "One-shot crawl for simple extraction. Returns markdown, HTML, or structured data.", {
"url": str,
"output_format": str, # "markdown" | "html" | "structured" | "screenshot"
"extraction_schema": str, # Optional: JSON schema for structured extraction
"js_code": str, # Optional: JavaScript to execute before extraction
"wait_for": str, # Optional: CSS selector to wait for
})
async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]:
"""Fast single-page crawl using persistent browser."""
# Use singleton browser manager
crawler_config = BrowserConfig(headless=True, verbose=False)
crawler = await BrowserManager.get_browser(crawler_config)
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code=args.get("js_code"),
wait_for=args.get("wait_for"),
)
# Add extraction strategy if structured data requested
if args.get("extraction_schema"):
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(args["extraction_schema"]),
instruction="Extract data according to the provided schema."
)
result = await crawler.arun(url=args["url"], config=run_config)
if not result.success:
return {
"content": [{
"type": "text",
"text": json.dumps({"error": result.error_message, "success": False})
}]
}
# Handle markdown - can be string or MarkdownGenerationResult object
markdown_content = ""
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
output_map = {
"markdown": markdown_content,
"html": result.html,
"structured": result.extracted_content,
"screenshot": result.screenshot,
}
response = {
"success": True,
"url": result.url,
"data": output_map.get(args["output_format"], markdown_content)
}
return {"content": [{"type": "text", "text": json.dumps(response, indent=2)}]}
@tool("start_session", "Start a named browser session for multi-step crawling and automation.", {
"session_id": str,
"headless": bool, # Default True
})
async def start_session(args: Dict[str, Any]) -> Dict[str, Any]:
"""Initialize a named crawler session using the singleton browser."""
session_id = args["session_id"]
if session_id in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} already exists",
"success": False
})}]}
# Use the singleton browser
crawler_config = BrowserConfig(
headless=args.get("headless", True),
verbose=False
)
crawler = await BrowserManager.get_browser(crawler_config)
# Store reference for named session
CRAWLER_SESSIONS[session_id] = crawler
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"session_id": session_id,
"message": f"Browser session {session_id} started"
})}]}
@tool("navigate", "Navigate to a URL in an active session.", {
"session_id": str,
"url": str,
"wait_for": str, # Optional: CSS selector to wait for
"js_code": str, # Optional: JavaScript to execute after load
})
async def navigate(args: Dict[str, Any]) -> Dict[str, Any]:
"""Navigate to URL in session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_for=args.get("wait_for"),
js_code=args.get("js_code"),
)
result = await crawler.arun(url=args["url"], config=run_config)
# Store current URL for this session
if result.success:
CRAWLER_SESSION_URLS[session_id] = result.url
return {"content": [{"type": "text", "text": json.dumps({
"success": result.success,
"url": result.url,
"message": f"Navigated to {args['url']}"
})}]}
@tool("extract_data", "Extract data from current page in session using schema or return markdown.", {
"session_id": str,
"output_format": str, # "markdown" | "structured"
"extraction_schema": str, # Required for structured, JSON schema
"wait_for": str, # Optional: Wait for element before extraction
"js_code": str, # Optional: Execute JS before extraction
})
async def extract_data(args: Dict[str, Any]) -> Dict[str, Any]:
"""Extract data from current page."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
# Check if we have a current URL for this session
if session_id not in CRAWLER_SESSION_URLS:
return {"content": [{"type": "text", "text": json.dumps({
"error": "No page loaded in session. Use 'navigate' first.",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
current_url = CRAWLER_SESSION_URLS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
wait_for=args.get("wait_for"),
js_code=args.get("js_code"),
)
if args["output_format"] == "structured" and args.get("extraction_schema"):
run_config.extraction_strategy = LLMExtractionStrategy(
provider="openai/gpt-4o-mini",
schema=json.loads(args["extraction_schema"]),
instruction="Extract data according to schema."
)
result = await crawler.arun(url=current_url, config=run_config)
if not result.success:
return {"content": [{"type": "text", "text": json.dumps({
"error": result.error_message,
"success": False
})}]}
# Handle markdown - can be string or MarkdownGenerationResult object
markdown_content = ""
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
data = (result.extracted_content if args["output_format"] == "structured"
else markdown_content)
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"data": data
}, indent=2)}]}
@tool("execute_js", "Execute JavaScript in the current page context.", {
"session_id": str,
"js_code": str,
"wait_for": str, # Optional: Wait for element after execution
})
async def execute_js(args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute JavaScript in session."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
# Check if we have a current URL for this session
if session_id not in CRAWLER_SESSION_URLS:
return {"content": [{"type": "text", "text": json.dumps({
"error": "No page loaded in session. Use 'navigate' first.",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
current_url = CRAWLER_SESSION_URLS[session_id]
run_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code=args["js_code"],
wait_for=args.get("wait_for"),
)
result = await crawler.arun(url=current_url, config=run_config)
return {"content": [{"type": "text", "text": json.dumps({
"success": result.success,
"message": "JavaScript executed"
})}]}
@tool("screenshot", "Take a screenshot of the current page.", {
"session_id": str,
})
async def screenshot(args: Dict[str, Any]) -> Dict[str, Any]:
"""Capture screenshot."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
# Check if we have a current URL for this session
if session_id not in CRAWLER_SESSION_URLS:
return {"content": [{"type": "text", "text": json.dumps({
"error": "No page loaded in session. Use 'navigate' first.",
"success": False
})}]}
crawler = CRAWLER_SESSIONS[session_id]
current_url = CRAWLER_SESSION_URLS[session_id]
result = await crawler.arun(
url=current_url,
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS, screenshot=True)
)
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"screenshot": result.screenshot if result.success else None
})}]}
@tool("close_session", "Close and cleanup a named browser session.", {
"session_id": str,
})
async def close_session(args: Dict[str, Any]) -> Dict[str, Any]:
"""Close named crawler session (browser stays alive for other operations)."""
session_id = args["session_id"]
if session_id not in CRAWLER_SESSIONS:
return {"content": [{"type": "text", "text": json.dumps({
"error": f"Session {session_id} not found",
"success": False
})}]}
# Remove from named sessions, but don't close the singleton browser
CRAWLER_SESSIONS.pop(session_id)
CRAWLER_SESSION_URLS.pop(session_id, None) # Remove URL tracking
return {"content": [{"type": "text", "text": json.dumps({
"success": True,
"message": f"Session {session_id} closed"
})}]}
# Export all tools
CRAWL_TOOLS = [
quick_crawl,
start_session,
navigate,
extract_data,
execute_js,
screenshot,
close_session,
]

166
crawl4ai/agent/chat_mode.py Normal file
View File

@@ -0,0 +1,166 @@
"""Chat mode implementation with streaming message generator for Claude SDK."""
import asyncio
from typing import AsyncGenerator, Dict, Any, Optional
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, AssistantMessage, TextBlock, ResultMessage, ToolUseBlock
from .terminal_ui import TerminalUI
from .browser_manager import BrowserManager
class ChatMode:
"""Interactive chat mode with streaming input/output."""
def __init__(self, options: ClaudeAgentOptions, ui: TerminalUI, storage):
self.options = options
self.ui = ui
self.storage = storage
self._exit_requested = False
self._current_streaming_text = ""
async def message_generator(self) -> AsyncGenerator[Dict[str, Any], None]:
"""
Generate user messages as async generator (streaming input mode per cc_stream.md).
Yields messages in the format:
{
"type": "user",
"message": {
"role": "user",
"content": "user input text"
}
}
"""
while not self._exit_requested:
try:
# Get user input
user_input = await asyncio.to_thread(self.ui.get_user_input)
# Handle commands
if user_input.startswith('/'):
await self._handle_command(user_input)
if self._exit_requested:
break
continue
# Skip empty input
if not user_input.strip():
continue
# Log user message
self.storage.log("user_message", {"text": user_input})
# Yield user message for agent
yield {
"type": "user",
"message": {
"role": "user",
"content": user_input
}
}
except KeyboardInterrupt:
self._exit_requested = True
break
except Exception as e:
self.ui.print_error(f"Input error: {e}")
async def _handle_command(self, command: str):
"""Handle special chat commands."""
cmd = command.lower().strip()
if cmd == '/exit' or cmd == '/quit':
self._exit_requested = True
self.ui.print_info("Exiting chat mode...")
elif cmd == '/clear':
self.ui.clear_screen()
elif cmd == '/help':
self.ui.show_commands()
elif cmd == '/browser':
# Show browser status
if BrowserManager.is_browser_active():
config = BrowserManager.get_current_config()
self.ui.print_info(f"Browser active: {config}")
else:
self.ui.print_info("No browser instance active")
else:
self.ui.print_error(f"Unknown command: {command}")
async def run(self):
"""Run the interactive chat loop with streaming responses."""
# Show header
self.ui.show_header(
session_id=str(self.options.session_id or "chat"),
log_path=self.storage.get_session_path() if hasattr(self.storage, 'get_session_path') else "N/A"
)
self.ui.show_commands()
try:
async with ClaudeSDKClient(options=self.options) as client:
# Start streaming input mode
await client.query(self.message_generator())
# Process streaming responses
turn = 0
async for message in client.receive_messages():
turn += 1
if isinstance(message, AssistantMessage):
# Clear "thinking" line if we printed it
if self._current_streaming_text:
self.ui.console.print() # New line after streaming
self._current_streaming_text = ""
# Process message content blocks
for block in message.content:
if isinstance(block, TextBlock):
# Stream text as it arrives
self.ui.print_agent_text(block.text)
self._current_streaming_text += block.text
# Log assistant message
self.storage.log("assistant_message", {
"turn": turn,
"text": block.text
})
elif isinstance(block, ToolUseBlock):
# Show tool usage
self.ui.print_tool_use(block.name)
elif isinstance(message, ResultMessage):
# Session completed (user exited or error)
if message.is_error:
self.ui.print_error(f"Agent error: {message.result}")
else:
self.ui.print_session_summary(
duration_s=message.duration_ms / 1000 if message.duration_ms else 0,
turns=message.num_turns,
cost_usd=message.total_cost_usd
)
# Log session end
self.storage.log("session_end", {
"duration_ms": message.duration_ms,
"cost_usd": message.total_cost_usd,
"turns": message.num_turns,
"success": not message.is_error
})
break
except KeyboardInterrupt:
self.ui.print_info("\nChat interrupted by user")
except Exception as e:
self.ui.print_error(f"Chat error: {e}")
raise
finally:
# Cleanup browser on exit
await BrowserManager.close_browser()
self.ui.print_info("Browser closed")

View File

@@ -0,0 +1,115 @@
"""Terminal UI components using Rich for beautiful agent output."""
from rich.console import Console
from rich.markdown import Markdown
from rich.syntax import Syntax
from rich.panel import Panel
from rich.live import Live
from rich.spinner import Spinner
from rich.text import Text
from rich.prompt import Prompt
from rich.rule import Rule
class TerminalUI:
"""Rich-based terminal interface for the Crawl4AI agent."""
def __init__(self):
self.console = Console()
self._current_text = ""
def show_header(self, session_id: str, log_path: str):
"""Display agent session header."""
self.console.print()
self.console.print(Panel.fit(
"[bold cyan]🕷️ Crawl4AI Agent - Chat Mode[/bold cyan]",
border_style="cyan"
))
self.console.print(f"[dim]📁 Session: {session_id}[/dim]")
self.console.print(f"[dim]💾 Log: {log_path}[/dim]")
self.console.print()
def show_commands(self):
"""Display available commands."""
self.console.print("\n[dim]Commands:[/dim]")
self.console.print(" [cyan]/exit[/cyan] - Exit chat")
self.console.print(" [cyan]/clear[/cyan] - Clear screen")
self.console.print(" [cyan]/help[/cyan] - Show this help\n")
def get_user_input(self) -> str:
"""Get user input with styled prompt."""
return Prompt.ask("\n[bold green]You[/bold green]")
def print_separator(self):
"""Print a visual separator."""
self.console.print(Rule(style="dim"))
def print_thinking(self):
"""Show thinking indicator."""
self.console.print("\n[cyan]Agent:[/cyan] [dim]thinking...[/dim]", end="")
def print_agent_text(self, text: str, stream: bool = False):
"""
Print agent response text.
Args:
text: Text to print
stream: If True, append to current streaming output
"""
if stream:
# For streaming, just print without newline
self.console.print(f"\r[cyan]Agent:[/cyan] {text}", end="")
else:
# For complete messages
self.console.print(f"\n[cyan]Agent:[/cyan] {text}")
def print_markdown(self, markdown_text: str):
"""Render markdown content."""
self.console.print()
self.console.print(Markdown(markdown_text))
def print_code(self, code: str, language: str = "python"):
"""Render code with syntax highlighting."""
self.console.print()
self.console.print(Syntax(code, language, theme="monokai", line_numbers=True))
def print_error(self, error_msg: str):
"""Display error message."""
self.console.print(f"\n[bold red]Error:[/bold red] {error_msg}")
def print_success(self, msg: str):
"""Display success message."""
self.console.print(f"\n[bold green]✓[/bold green] {msg}")
def print_info(self, msg: str):
"""Display info message."""
self.console.print(f"\n[bold blue][/bold blue] {msg}")
def clear_screen(self):
"""Clear the terminal screen."""
self.console.clear()
def print_session_summary(self, duration_s: float, turns: int, cost_usd: float = None):
"""Display session completion summary."""
self.console.print()
self.console.print(Panel(
f"[green]✅ Completed[/green]\n"
f"⏱ Duration: {duration_s:.2f}s\n"
f"🔄 Turns: {turns}\n"
+ (f"💰 Cost: ${cost_usd:.4f}" if cost_usd else ""),
border_style="green"
))
def print_tool_use(self, tool_name: str):
"""Indicate tool usage."""
self.console.print(f"\n[dim]🔧 Using tool: {tool_name}[/dim]")
def with_spinner(self, text: str = "Processing..."):
"""
Context manager for showing a spinner.
Usage:
with ui.with_spinner("Crawling page..."):
# do work
"""
return self.console.status(f"[cyan]{text}[/cyan]", spinner="dots")

114
crawl4ai/agent/test_chat.py Normal file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python
"""Test script to verify chat mode setup (non-interactive)."""
import sys
import asyncio
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from crawl4ai.agent.browser_manager import BrowserManager
from crawl4ai.agent.terminal_ui import TerminalUI
from crawl4ai.agent.chat_mode import ChatMode
from crawl4ai.agent.c4ai_tools import CRAWL_TOOLS
from crawl4ai.agent.c4ai_prompts import SYSTEM_PROMPT
from claude_agent_sdk import ClaudeAgentOptions, create_sdk_mcp_server
class MockStorage:
"""Mock storage for testing."""
def log(self, event_type: str, data: dict):
print(f"[LOG] {event_type}: {data}")
def get_session_path(self):
return "/tmp/test_session.jsonl"
async def test_components():
"""Test individual components."""
print("="*60)
print("CHAT MODE COMPONENT TESTS")
print("="*60)
# Test 1: BrowserManager
print("\n[TEST 1] BrowserManager singleton")
try:
browser1 = await BrowserManager.get_browser()
browser2 = await BrowserManager.get_browser()
assert browser1 is browser2, "Browser instances should be same (singleton)"
print("✓ BrowserManager singleton works")
await BrowserManager.close_browser()
except Exception as e:
print(f"✗ BrowserManager failed: {e}")
return False
# Test 2: TerminalUI
print("\n[TEST 2] TerminalUI rendering")
try:
ui = TerminalUI()
ui.show_header("test-123", "/tmp/test.log")
ui.print_agent_text("Hello from agent")
ui.print_markdown("# Test\nThis is **bold**")
ui.print_success("Test success message")
print("✓ TerminalUI renders correctly")
except Exception as e:
print(f"✗ TerminalUI failed: {e}")
return False
# Test 3: MCP Server Setup
print("\n[TEST 3] MCP Server with tools")
try:
crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=CRAWL_TOOLS
)
print(f"✓ MCP server created with {len(CRAWL_TOOLS)} tools")
except Exception as e:
print(f"✗ MCP Server failed: {e}")
return False
# Test 4: ChatMode instantiation
print("\n[TEST 4] ChatMode instantiation")
try:
options = ClaudeAgentOptions(
mcp_servers={"crawler": crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl",
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data",
"mcp__crawler__execute_js",
"mcp__crawler__screenshot",
"mcp__crawler__close_session",
],
system_prompt=SYSTEM_PROMPT,
permission_mode="acceptEdits"
)
ui = TerminalUI()
storage = MockStorage()
chat = ChatMode(options, ui, storage)
print("✓ ChatMode instance created successfully")
except Exception as e:
print(f"✗ ChatMode failed: {e}")
import traceback
traceback.print_exc()
return False
print("\n" + "="*60)
print("ALL COMPONENT TESTS PASSED ✓")
print("="*60)
print("\nTo test interactive chat mode, run:")
print(" python -m crawl4ai.agent.agent_crawl --chat")
return True
if __name__ == "__main__":
success = asyncio.run(test_components())
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,524 @@
#!/usr/bin/env python
"""
Automated multi-turn chat scenario tests for Crawl4AI Agent.
Tests agent's ability to handle complex conversations, maintain state,
plan and execute tasks without human interaction.
"""
import asyncio
import json
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server
from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage, ToolUseBlock
from .c4ai_tools import CRAWL_TOOLS
from .c4ai_prompts import SYSTEM_PROMPT
from .browser_manager import BrowserManager
class TurnResult(Enum):
"""Result of a single conversation turn."""
PASS = "PASS"
FAIL = "FAIL"
TIMEOUT = "TIMEOUT"
ERROR = "ERROR"
@dataclass
class TurnExpectation:
"""Expectations for a single conversation turn."""
user_message: str
expect_tools: Optional[List[str]] = None # Tools that should be called
expect_keywords: Optional[List[str]] = None # Keywords in response
expect_files_created: Optional[List[str]] = None # File patterns created
expect_success: bool = True # Should complete without error
expect_min_turns: int = 1 # Minimum agent turns to complete
timeout_seconds: int = 60
@dataclass
class Scenario:
"""A complete multi-turn conversation scenario."""
name: str
category: str # "simple", "medium", "complex"
description: str
turns: List[TurnExpectation]
cleanup_files: Optional[List[str]] = None # Files to cleanup after test
# =============================================================================
# TEST SCENARIOS - Categorized from Simple to Complex
# =============================================================================
SIMPLE_SCENARIOS = [
Scenario(
name="Single quick crawl",
category="simple",
description="Basic one-shot crawl with markdown extraction",
turns=[
TurnExpectation(
user_message="Use quick_crawl to get the title from example.com",
expect_tools=["mcp__crawler__quick_crawl"],
expect_keywords=["Example Domain", "title"],
timeout_seconds=30
)
]
),
Scenario(
name="Session lifecycle",
category="simple",
description="Start session, navigate, close - basic session management",
turns=[
TurnExpectation(
user_message="Start a session named 'simple_test'",
expect_tools=["mcp__crawler__start_session"],
expect_keywords=["session", "started"],
timeout_seconds=20
),
TurnExpectation(
user_message="Navigate to example.com",
expect_tools=["mcp__crawler__navigate"],
expect_keywords=["navigated", "example.com"],
timeout_seconds=25
),
TurnExpectation(
user_message="Close the session",
expect_tools=["mcp__crawler__close_session"],
expect_keywords=["closed"],
timeout_seconds=15
)
]
),
]
MEDIUM_SCENARIOS = [
Scenario(
name="Multi-page crawl with file output",
category="medium",
description="Crawl multiple pages and save results to file",
turns=[
TurnExpectation(
user_message="Crawl example.com and example.org, extract titles from both",
expect_tools=["mcp__crawler__quick_crawl"],
expect_min_turns=2, # Should make 2 separate crawls
timeout_seconds=45
),
TurnExpectation(
user_message="Save the results to a JSON file called crawl_results.json",
expect_tools=["Write"],
expect_files_created=["crawl_results.json"],
timeout_seconds=20
)
],
cleanup_files=["crawl_results.json"]
),
Scenario(
name="Session-based data extraction",
category="medium",
description="Use session to navigate and extract data in steps",
turns=[
TurnExpectation(
user_message="Start session 'extract_test', navigate to example.com, and extract the markdown",
expect_tools=["mcp__crawler__start_session", "mcp__crawler__navigate", "mcp__crawler__extract_data"],
expect_keywords=["Example Domain"],
timeout_seconds=50
),
TurnExpectation(
user_message="Now save that markdown to example_content.md",
expect_tools=["Write"],
expect_files_created=["example_content.md"],
timeout_seconds=20
),
TurnExpectation(
user_message="Close the session",
expect_tools=["mcp__crawler__close_session"],
timeout_seconds=15
)
],
cleanup_files=["example_content.md"]
),
Scenario(
name="Context retention across turns",
category="medium",
description="Agent should remember previous context",
turns=[
TurnExpectation(
user_message="Crawl example.com and tell me the title",
expect_tools=["mcp__crawler__quick_crawl"],
expect_keywords=["Example Domain"],
timeout_seconds=30
),
TurnExpectation(
user_message="What was the URL I just asked you to crawl?",
expect_keywords=["example.com"],
expect_tools=[], # Should answer from memory, no tools needed
timeout_seconds=15
)
]
),
]
COMPLEX_SCENARIOS = [
Scenario(
name="Multi-step task with planning",
category="complex",
description="Complex task requiring agent to plan, execute, and verify",
turns=[
TurnExpectation(
user_message="Crawl example.com and example.org, compare their content, and create a markdown report with: 1) titles of both, 2) word count comparison, 3) save to comparison_report.md",
expect_tools=["mcp__crawler__quick_crawl", "Write"],
expect_files_created=["comparison_report.md"],
expect_min_turns=3, # Plan, crawl both, write report
timeout_seconds=90
),
TurnExpectation(
user_message="Read back the report you just created",
expect_tools=["Read"],
expect_keywords=["Example Domain"],
timeout_seconds=20
)
],
cleanup_files=["comparison_report.md"]
),
Scenario(
name="Session with state manipulation",
category="complex",
description="Complex session workflow with multiple operations",
turns=[
TurnExpectation(
user_message="Start session 'complex_session' and navigate to example.com",
expect_tools=["mcp__crawler__start_session", "mcp__crawler__navigate"],
timeout_seconds=30
),
TurnExpectation(
user_message="Extract the page content and count how many times the word 'example' appears (case insensitive)",
expect_tools=["mcp__crawler__extract_data"],
expect_keywords=["example"],
timeout_seconds=30
),
TurnExpectation(
user_message="Take a screenshot of the current page",
expect_tools=["mcp__crawler__screenshot"],
expect_keywords=["screenshot"],
timeout_seconds=25
),
TurnExpectation(
user_message="Close the session",
expect_tools=["mcp__crawler__close_session"],
timeout_seconds=15
)
]
),
Scenario(
name="Error recovery and continuation",
category="complex",
description="Agent should handle errors gracefully and continue",
turns=[
TurnExpectation(
user_message="Crawl https://this-site-definitely-does-not-exist-12345.com",
expect_success=False, # Should fail gracefully
expect_keywords=["error", "fail"],
timeout_seconds=30
),
TurnExpectation(
user_message="That's okay, crawl example.com instead",
expect_tools=["mcp__crawler__quick_crawl"],
expect_keywords=["Example Domain"],
timeout_seconds=30
)
]
),
]
# Combine all scenarios
ALL_SCENARIOS = SIMPLE_SCENARIOS + MEDIUM_SCENARIOS + COMPLEX_SCENARIOS
# =============================================================================
# TEST RUNNER
# =============================================================================
class ScenarioRunner:
"""Runs automated chat scenarios without human interaction."""
def __init__(self, working_dir: Path):
self.working_dir = working_dir
self.results = []
async def run_scenario(self, scenario: Scenario) -> Dict[str, Any]:
"""Run a single scenario and return results."""
print(f"\n{'='*70}")
print(f"[{scenario.category.upper()}] {scenario.name}")
print(f"{'='*70}")
print(f"Description: {scenario.description}\n")
start_time = time.time()
turn_results = []
try:
# Setup agent options
crawler_server = create_sdk_mcp_server(
name="crawl4ai",
version="1.0.0",
tools=CRAWL_TOOLS
)
options = ClaudeAgentOptions(
mcp_servers={"crawler": crawler_server},
allowed_tools=[
"mcp__crawler__quick_crawl",
"mcp__crawler__start_session",
"mcp__crawler__navigate",
"mcp__crawler__extract_data",
"mcp__crawler__execute_js",
"mcp__crawler__screenshot",
"mcp__crawler__close_session",
"Read", "Write", "Edit", "Glob", "Grep", "Bash"
],
system_prompt=SYSTEM_PROMPT,
permission_mode="acceptEdits",
cwd=str(self.working_dir)
)
# Run conversation
async with ClaudeSDKClient(options=options) as client:
for turn_idx, expectation in enumerate(scenario.turns, 1):
print(f"\nTurn {turn_idx}: {expectation.user_message}")
turn_result = await self._run_turn(
client, expectation, turn_idx
)
turn_results.append(turn_result)
if turn_result["status"] != TurnResult.PASS:
print(f" ✗ FAILED: {turn_result['reason']}")
break
else:
print(f" ✓ PASSED")
# Cleanup
if scenario.cleanup_files:
self._cleanup_files(scenario.cleanup_files)
# Overall result
all_passed = all(r["status"] == TurnResult.PASS for r in turn_results)
duration = time.time() - start_time
result = {
"scenario": scenario.name,
"category": scenario.category,
"status": "PASS" if all_passed else "FAIL",
"duration_seconds": duration,
"turns": turn_results
}
return result
except Exception as e:
print(f"\n✗ SCENARIO ERROR: {e}")
return {
"scenario": scenario.name,
"category": scenario.category,
"status": "ERROR",
"error": str(e),
"duration_seconds": time.time() - start_time,
"turns": turn_results
}
finally:
# Ensure browser cleanup
await BrowserManager.close_browser()
async def _run_turn(
self,
client: ClaudeSDKClient,
expectation: TurnExpectation,
turn_number: int
) -> Dict[str, Any]:
"""Execute a single conversation turn and validate."""
tools_used = []
response_text = ""
agent_turns = 0
try:
# Send user message
await client.query(expectation.user_message)
# Collect response
start_time = time.time()
async for message in client.receive_messages():
if time.time() - start_time > expectation.timeout_seconds:
return {
"turn": turn_number,
"status": TurnResult.TIMEOUT,
"reason": f"Exceeded {expectation.timeout_seconds}s timeout"
}
if isinstance(message, AssistantMessage):
agent_turns += 1
for block in message.content:
if isinstance(block, TextBlock):
response_text += block.text + " "
elif isinstance(block, ToolUseBlock):
tools_used.append(block.name)
elif isinstance(message, ResultMessage):
# Check if error when expecting success
if expectation.expect_success and message.is_error:
return {
"turn": turn_number,
"status": TurnResult.FAIL,
"reason": f"Agent returned error: {message.result}"
}
break
# Validate expectations
validation = self._validate_turn(
expectation, tools_used, response_text, agent_turns
)
return {
"turn": turn_number,
"status": validation["status"],
"reason": validation.get("reason", "All checks passed"),
"tools_used": tools_used,
"agent_turns": agent_turns
}
except Exception as e:
return {
"turn": turn_number,
"status": TurnResult.ERROR,
"reason": f"Exception: {str(e)}"
}
def _validate_turn(
self,
expectation: TurnExpectation,
tools_used: List[str],
response_text: str,
agent_turns: int
) -> Dict[str, Any]:
"""Validate turn results against expectations."""
# Check expected tools
if expectation.expect_tools:
for tool in expectation.expect_tools:
if tool not in tools_used:
return {
"status": TurnResult.FAIL,
"reason": f"Expected tool '{tool}' was not used"
}
# Check keywords
if expectation.expect_keywords:
response_lower = response_text.lower()
for keyword in expectation.expect_keywords:
if keyword.lower() not in response_lower:
return {
"status": TurnResult.FAIL,
"reason": f"Expected keyword '{keyword}' not found in response"
}
# Check files created
if expectation.expect_files_created:
for pattern in expectation.expect_files_created:
matches = list(self.working_dir.glob(pattern))
if not matches:
return {
"status": TurnResult.FAIL,
"reason": f"Expected file matching '{pattern}' was not created"
}
# Check minimum turns
if agent_turns < expectation.expect_min_turns:
return {
"status": TurnResult.FAIL,
"reason": f"Expected at least {expectation.expect_min_turns} agent turns, got {agent_turns}"
}
return {"status": TurnResult.PASS}
def _cleanup_files(self, patterns: List[str]):
"""Remove files created during test."""
for pattern in patterns:
for file_path in self.working_dir.glob(pattern):
try:
file_path.unlink()
except Exception as e:
print(f" Warning: Could not delete {file_path}: {e}")
async def run_all_scenarios(working_dir: Optional[Path] = None):
"""Run all test scenarios and report results."""
if working_dir is None:
working_dir = Path.cwd() / "test_agent_output"
working_dir.mkdir(exist_ok=True)
runner = ScenarioRunner(working_dir)
print("\n" + "="*70)
print("CRAWL4AI AGENT SCENARIO TESTS")
print("="*70)
print(f"Working directory: {working_dir}")
print(f"Total scenarios: {len(ALL_SCENARIOS)}")
print(f" Simple: {len(SIMPLE_SCENARIOS)}")
print(f" Medium: {len(MEDIUM_SCENARIOS)}")
print(f" Complex: {len(COMPLEX_SCENARIOS)}")
results = []
for scenario in ALL_SCENARIOS:
result = await runner.run_scenario(scenario)
results.append(result)
# Summary
print("\n" + "="*70)
print("TEST SUMMARY")
print("="*70)
by_category = {"simple": [], "medium": [], "complex": []}
for result in results:
by_category[result["category"]].append(result)
for category in ["simple", "medium", "complex"]:
cat_results = by_category[category]
passed = sum(1 for r in cat_results if r["status"] == "PASS")
total = len(cat_results)
print(f"\n{category.upper()}: {passed}/{total} passed")
for r in cat_results:
status_icon = "" if r["status"] == "PASS" else ""
print(f" {status_icon} {r['scenario']} ({r['duration_seconds']:.1f}s)")
total_passed = sum(1 for r in results if r["status"] == "PASS")
total = len(results)
print(f"\nOVERALL: {total_passed}/{total} scenarios passed ({total_passed/total*100:.1f}%)")
# Save detailed results
results_file = working_dir / "test_results.json"
with open(results_file, "w") as f:
json.dump(results, f, indent=2)
print(f"\nDetailed results saved to: {results_file}")
return total_passed == total
if __name__ == "__main__":
import sys
success = asyncio.run(run_all_scenarios())
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python
"""Test script for Crawl4AI tools - tests tools directly without the agent."""
import asyncio
import json
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
async def test_quick_crawl():
"""Test quick_crawl tool logic directly."""
print("\n" + "="*60)
print("TEST 1: Quick Crawl - Markdown Format")
print("="*60)
crawler_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=crawler_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success}")
print(f"URL: {result.url}")
# Handle markdown - can be string or MarkdownGenerationResult object
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
else:
markdown_content = str(result.markdown)
print(f"Markdown type: {type(result.markdown)}")
print(f"Markdown length: {len(markdown_content)}")
print(f"Markdown preview:\n{markdown_content[:300]}")
return result.success
async def test_session_workflow():
"""Test session-based workflow."""
print("\n" + "="*60)
print("TEST 2: Session-Based Workflow")
print("="*60)
crawler_config = BrowserConfig(headless=True, verbose=False)
# Start session
crawler = AsyncWebCrawler(config=crawler_config)
await crawler.__aenter__()
print("✓ Session started")
try:
# Navigate to URL
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"✓ Navigated to {result.url}, success: {result.success}")
# Extract data
if isinstance(result.markdown, str):
markdown_content = result.markdown
elif hasattr(result.markdown, 'raw_markdown'):
markdown_content = result.markdown.raw_markdown
else:
markdown_content = str(result.markdown)
print(f"✓ Extracted {len(markdown_content)} chars of markdown")
print(f" Preview: {markdown_content[:200]}")
# Screenshot test - need to re-fetch with screenshot enabled
screenshot_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, screenshot=True)
result2 = await crawler.arun(url=result.url, config=screenshot_config)
print(f"✓ Screenshot captured: {result2.screenshot is not None}")
return True
finally:
# Close session
await crawler.__aexit__(None, None, None)
print("✓ Session closed")
async def test_html_format():
"""Test HTML output format."""
print("\n" + "="*60)
print("TEST 3: Quick Crawl - HTML Format")
print("="*60)
crawler_config = BrowserConfig(headless=True, verbose=False)
run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
async with AsyncWebCrawler(config=crawler_config) as crawler:
result = await crawler.arun(url="https://example.com", config=run_config)
print(f"Success: {result.success}")
print(f"HTML length: {len(result.html)}")
print(f"HTML preview:\n{result.html[:300]}")
return result.success
async def main():
"""Run all tests."""
print("\n" + "="*70)
print(" CRAWL4AI TOOLS TEST SUITE")
print("="*70)
tests = [
("Quick Crawl (Markdown)", test_quick_crawl),
("Session Workflow", test_session_workflow),
("Quick Crawl (HTML)", test_html_format),
]
results = []
for name, test_func in tests:
try:
result = await test_func()
results.append((name, result, None))
except Exception as e:
results.append((name, False, str(e)))
# Summary
print("\n" + "="*70)
print(" TEST SUMMARY")
print("="*70)
for name, success, error in results:
status = "✓ PASS" if success else "✗ FAIL"
print(f"{status} - {name}")
if error:
print(f" Error: {error}")
total = len(results)
passed = sum(1 for _, success, _ in results if success)
print(f"\nTotal: {total} | Passed: {passed} | Failed: {total - passed}")
return all(success for _, success, _ in results)
if __name__ == "__main__":
success = asyncio.run(main())
exit(0 if success else 1)