From 31741e571ac45f4b52b5b143318d179066b1fe55 Mon Sep 17 00:00:00 2001 From: unclecode Date: Fri, 17 Oct 2025 12:25:45 +0800 Subject: [PATCH] feat(agent): implement Claude Code SDK agent with chat mode and persistent browser Implementation: - Singleton browser pattern (BrowserManager) - one instance for entire session - 7 MCP tools for Crawl4AI (quick_crawl, sessions, navigation, extraction, JS execution, screenshots) - Interactive chat mode with streaming I/O using Claude SDK message generator - Rich-based terminal UI with markdown rendering and syntax highlighting - Single-shot and chat modes (--chat flag) - Comprehensive test suite: component tests, tool tests, 9 multi-turn scenarios Architecture: - agent_crawl.py: CLI entry point with SessionStorage (JSONL logging) - browser_manager.py: Singleton pattern for persistent AsyncWebCrawler - c4ai_tools.py: MCP tools using @tool decorator, integrated with BrowserManager - chat_mode.py: Streaming input mode per Claude SDK spec - terminal_ui.py: Rich-based beautiful terminal output - test_scenarios.py: Automated multi-turn conversation tests (simple/medium/complex) - TECH_SPEC.md: Complete AI-to-AI knowledge transfer document Key fixes: - Use result.markdown (not deprecated result.markdown_v2) - Handle both str and MarkdownGenerationResult types - Track current URL per session for extract_data/execute_js/screenshot tools - Manual browser lifecycle (start/close) instead of context managers Tools enabled: - Crawl4AI: quick_crawl, start_session, navigate, extract_data, execute_js, screenshot, close_session - Claude SDK built-in: Read, Write, Edit, Glob, Grep, Bash, NotebookEdit Total: 12 files, 2820 lines --- crawl4ai/agent/TECH_SPEC.md | 429 +++++++++++++++++++++ crawl4ai/agent/__init__.py | 13 + crawl4ai/agent/agent-cc-sdk.md | 593 ++++++++++++++++++++++++++++++ crawl4ai/agent/agent_crawl.py | 202 ++++++++++ crawl4ai/agent/browser_manager.py | 73 ++++ crawl4ai/agent/c4ai_prompts.py | 137 +++++++ crawl4ai/agent/c4ai_tools.py | 314 ++++++++++++++++ crawl4ai/agent/chat_mode.py | 166 +++++++++ crawl4ai/agent/terminal_ui.py | 115 ++++++ crawl4ai/agent/test_chat.py | 114 ++++++ crawl4ai/agent/test_scenarios.py | 524 ++++++++++++++++++++++++++ crawl4ai/agent/test_tools.py | 140 +++++++ 12 files changed, 2820 insertions(+) create mode 100644 crawl4ai/agent/TECH_SPEC.md create mode 100644 crawl4ai/agent/__init__.py create mode 100644 crawl4ai/agent/agent-cc-sdk.md create mode 100644 crawl4ai/agent/agent_crawl.py create mode 100644 crawl4ai/agent/browser_manager.py create mode 100644 crawl4ai/agent/c4ai_prompts.py create mode 100644 crawl4ai/agent/c4ai_tools.py create mode 100644 crawl4ai/agent/chat_mode.py create mode 100644 crawl4ai/agent/terminal_ui.py create mode 100644 crawl4ai/agent/test_chat.py create mode 100644 crawl4ai/agent/test_scenarios.py create mode 100644 crawl4ai/agent/test_tools.py diff --git a/crawl4ai/agent/TECH_SPEC.md b/crawl4ai/agent/TECH_SPEC.md new file mode 100644 index 00000000..9760fead --- /dev/null +++ b/crawl4ai/agent/TECH_SPEC.md @@ -0,0 +1,429 @@ +# Crawl4AI Agent Technical Specification +*AI-to-AI Knowledge Transfer Document* + +## Context Documents +**MUST READ FIRST:** +1. `/Users/unclecode/devs/crawl4ai/tmp/CRAWL4AI_SDK.md` - Crawl4AI complete API reference +2. `/Users/unclecode/devs/crawl4ai/tmp/cc_stream.md` - Claude SDK streaming input mode +3. `/Users/unclecode/devs/crawl4ai/tmp/CC_PYTHON_SDK.md` - Claude Code Python SDK complete reference + +## Architecture Overview + +**Core Principle:** Singleton browser instance + streaming chat mode + MCP tools + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Agent Entry Point │ +│ agent_crawl.py (CLI: --chat | single-shot) │ +└─────────────────────────────────────────────────────────────┘ + │ + ┌───────────────────┼───────────────────┐ + │ │ │ + [Chat Mode] [Single-shot] [Browser Manager] + │ │ │ + ▼ ▼ ▼ + ChatMode.run() CrawlAgent.run() BrowserManager + - Streaming - One prompt (Singleton) + - Interactive - Exit after │ + - Commands - Uses same ▼ + │ browser AsyncWebCrawler + │ │ (persistent) + └───────────────────┴────────────────┘ + │ + ┌───────┴────────┐ + │ │ + MCP Tools Claude SDK + (Crawl4AI) (Built-in) + │ │ + ┌───────────┴────┐ ┌──────┴──────┐ + │ │ │ │ + quick_crawl session Read Edit + navigate tools Write Glob + extract_data Bash Grep + execute_js + screenshot + close_session +``` + +## File Structure + +``` +crawl4ai/agent/ +├── __init__.py # Module exports +├── agent_crawl.py # Main CLI entry (190 lines) +│ ├── SessionStorage # JSONL logging to ~/.crawl4ai/agents/projects/ +│ ├── CrawlAgent # Single-shot wrapper +│ └── main() # CLI parser (--chat flag) +│ +├── browser_manager.py # Singleton pattern (70 lines) +│ └── BrowserManager # Class methods only, no instances +│ ├── get_browser() # Returns singleton AsyncWebCrawler +│ ├── reconfigure_browser() +│ ├── close_browser() +│ └── is_browser_active() +│ +├── c4ai_tools.py # 7 MCP tools (310 lines) +│ ├── @tool decorators # Claude SDK decorator +│ ├── CRAWLER_SESSIONS # Dict[str, AsyncWebCrawler] for named sessions +│ ├── CRAWLER_SESSION_URLS # Dict[str, str] track current URL per session +│ └── CRAWL_TOOLS # List of tool functions +│ +├── c4ai_prompts.py # System prompt (130 lines) +│ └── SYSTEM_PROMPT # Agent behavior definition +│ +├── terminal_ui.py # Rich-based UI (120 lines) +│ └── TerminalUI # Console rendering +│ ├── show_header() +│ ├── print_markdown() +│ ├── print_code() +│ └── with_spinner() +│ +├── chat_mode.py # Streaming chat (160 lines) +│ └── ChatMode +│ ├── message_generator() # AsyncGenerator per cc_stream.md +│ ├── _handle_command() # /exit /clear /help /browser +│ └── run() # Main chat loop +│ +├── test_tools.py # Direct tool tests (130 lines) +├── test_chat.py # Component tests (90 lines) +└── test_scenarios.py # Multi-turn scenarios (500 lines) + ├── SIMPLE_SCENARIOS + ├── MEDIUM_SCENARIOS + ├── COMPLEX_SCENARIOS + └── ScenarioRunner +``` + +## Critical Implementation Details + +### 1. Browser Singleton Pattern + +**Key:** ONE browser instance for ENTIRE agent session + +```python +# browser_manager.py +class BrowserManager: + _crawler: Optional[AsyncWebCrawler] = None # Singleton + _config: Optional[BrowserConfig] = None + + @classmethod + async def get_browser(cls, config=None) -> AsyncWebCrawler: + if cls._crawler is None: + cls._crawler = AsyncWebCrawler(config or BrowserConfig()) + await cls._crawler.start() # Manual lifecycle + return cls._crawler +``` + +**Behavior:** +- First call: creates browser with `config` (or default) +- Subsequent calls: returns same instance, **ignores config param** +- To change config: `reconfigure_browser(new_config)` (closes old, creates new) +- Tools use: `crawler = await BrowserManager.get_browser()` +- No `async with` context manager - manual `start()` / `close()` + +### 2. Tool Architecture + +**Two types of browser usage:** + +**A) Quick operations** (quick_crawl): +```python +@tool("quick_crawl", ...) +async def quick_crawl(args): + crawler = await BrowserManager.get_browser() # Singleton + result = await crawler.arun(url=args["url"], config=run_config) + # No close - browser stays alive +``` + +**B) Named sessions** (start_session, navigate, extract_data, etc.): +```python +CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {} # Named refs +CRAWLER_SESSION_URLS: Dict[str, str] = {} # Track current URL + +@tool("start_session", ...) +async def start_session(args): + crawler = await BrowserManager.get_browser() + CRAWLER_SESSIONS[args["session_id"]] = crawler # Store ref + +@tool("navigate", ...) +async def navigate(args): + crawler = CRAWLER_SESSIONS[args["session_id"]] + result = await crawler.arun(url=args["url"], ...) + CRAWLER_SESSION_URLS[args["session_id"]] = result.url # Track URL + +@tool("extract_data", ...) +async def extract_data(args): + crawler = CRAWLER_SESSIONS[args["session_id"]] + current_url = CRAWLER_SESSION_URLS[args["session_id"]] # Must have URL + result = await crawler.arun(url=current_url, ...) # Re-crawl current page + +@tool("close_session", ...) +async def close_session(args): + CRAWLER_SESSIONS.pop(args["session_id"]) # Remove ref + CRAWLER_SESSION_URLS.pop(args["session_id"], None) + # Browser stays alive (singleton) +``` + +**Important:** Named sessions are just **references** to singleton browser. Multiple sessions = same browser instance. + +### 3. Markdown Handling (CRITICAL BUG FIX) + +**OLD (WRONG):** +```python +result.markdown_v2.raw_markdown # DEPRECATED +``` + +**NEW (CORRECT):** +```python +# result.markdown can be: +# - str (simple mode) +# - MarkdownGenerationResult object (with filters) + +if isinstance(result.markdown, str): + markdown_content = result.markdown +elif hasattr(result.markdown, 'raw_markdown'): + markdown_content = result.markdown.raw_markdown +``` + +Reference: `CRAWL4AI_SDK.md` line 614 - `markdown_v2` deprecated, use `markdown` + +### 4. Chat Mode Streaming Input + +**Per cc_stream.md:** Use message generator pattern + +```python +# chat_mode.py +async def message_generator(self) -> AsyncGenerator[Dict[str, Any], None]: + while not self._exit_requested: + user_input = await asyncio.to_thread(self.ui.get_user_input) + + if user_input.startswith('/'): + await self._handle_command(user_input) + continue + + # Yield in streaming input format + yield { + "type": "user", + "message": { + "role": "user", + "content": user_input + } + } + +async def run(self): + async with ClaudeSDKClient(options=self.options) as client: + await client.query(self.message_generator()) # Pass generator + + async for message in client.receive_messages(): + # Process streaming responses +``` + +**Key:** Generator keeps yielding user inputs, SDK streams responses back. + +### 5. Claude SDK Integration + +**Setup:** +```python +from claude_agent_sdk import tool, create_sdk_mcp_server, ClaudeSDKClient, ClaudeAgentOptions + +# 1. Define tools with @tool decorator +@tool("quick_crawl", "description", {"url": str, "output_format": str}) +async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]: + return {"content": [{"type": "text", "text": json.dumps(result)}]} + +# 2. Create MCP server +crawler_server = create_sdk_mcp_server( + name="crawl4ai", + version="1.0.0", + tools=[quick_crawl, start_session, ...] # List of @tool functions +) + +# 3. Configure options +options = ClaudeAgentOptions( + mcp_servers={"crawler": crawler_server}, + allowed_tools=[ + "mcp__crawler__quick_crawl", # Format: mcp__{server}__{tool} + "mcp__crawler__start_session", + # Built-in tools: + "Read", "Write", "Edit", "Glob", "Grep", "Bash", "NotebookEdit" + ], + system_prompt=SYSTEM_PROMPT, + permission_mode="acceptEdits" +) + +# 4. Use client +async with ClaudeSDKClient(options=options) as client: + await client.query(prompt_or_generator) + async for message in client.receive_messages(): + # Process AssistantMessage, ResultMessage, etc. +``` + +**Tool response format:** +```python +return { + "content": [{ + "type": "text", + "text": json.dumps({"success": True, "data": "..."}) + }] +} +``` + +## Operating Modes + +### Single-Shot Mode +```bash +python -m crawl4ai.agent.agent_crawl "Crawl example.com" +``` +- One prompt → execute → exit +- Uses singleton browser +- No cleanup of browser (process exit handles it) + +### Chat Mode +```bash +python -m crawl4ai.agent.agent_crawl --chat +``` +- Interactive loop with streaming I/O +- Commands: `/exit` `/clear` `/help` `/browser` +- Browser persists across all turns +- Cleanup on exit: `BrowserManager.close_browser()` + +## Testing Architecture + +**3 test levels:** + +1. **Component tests** (`test_chat.py`): Non-interactive, tests individual classes +2. **Tool tests** (`test_tools.py`): Direct AsyncWebCrawler calls, validates Crawl4AI integration +3. **Scenario tests** (`test_scenarios.py`): Automated multi-turn conversations + - Injects messages programmatically + - Validates tool calls, keywords, files created + - Categories: SIMPLE (2), MEDIUM (3), COMPLEX (4) + +## Dependencies + +```python +# External +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode +from crawl4ai.extraction_strategy import LLMExtractionStrategy +from claude_agent_sdk import ( + tool, create_sdk_mcp_server, ClaudeSDKClient, ClaudeAgentOptions, + AssistantMessage, TextBlock, ResultMessage, ToolUseBlock +) +from rich.console import Console # Already installed +from rich.markdown import Markdown +from rich.syntax import Syntax + +# Stdlib +import asyncio, json, uuid, argparse +from pathlib import Path +from typing import Optional, Dict, Any, AsyncGenerator +``` + +## Common Pitfalls + +1. **DON'T** use `async with AsyncWebCrawler()` - breaks singleton pattern +2. **DON'T** use `result.markdown_v2` - deprecated field +3. **DON'T** call `crawler.arun()` without URL in session tools - needs current_url +4. **DON'T** close browser in tools - managed by BrowserManager +5. **DON'T** use `break` in message iteration - causes asyncio issues +6. **DO** track session URLs in `CRAWLER_SESSION_URLS` for session tools +7. **DO** handle both `str` and `MarkdownGenerationResult` for `result.markdown` +8. **DO** use manual lifecycle `await crawler.start()` / `await crawler.close()` + +## Session Storage + +**Location:** `~/.crawl4ai/agents/projects/{sanitized_cwd}/{uuid}.jsonl` + +**Format:** JSONL with events: +```json +{"timestamp": "...", "event": "session_start", "data": {...}} +{"timestamp": "...", "event": "user_message", "data": {"text": "..."}} +{"timestamp": "...", "event": "assistant_message", "data": {"turn": 1, "text": "..."}} +{"timestamp": "...", "event": "session_end", "data": {"duration_ms": 1000, ...}} +``` + +## CLI Options + +``` +--chat Interactive chat mode +--model MODEL Claude model override +--permission-mode MODE acceptEdits|bypassPermissions|default|plan +--add-dir DIR [DIR...] Additional accessible directories +--system-prompt TEXT Custom system prompt +--session-id UUID Resume/specify session +--debug Full tracebacks +``` + +## Performance Characteristics + +- **Browser startup:** ~2-4s (once per session) +- **Quick crawl:** ~1-2s (reuses browser) +- **Session operations:** ~1-2s (same browser) +- **Chat latency:** Real-time streaming, no buffering +- **Memory:** One browser instance regardless of operations + +## Extension Points + +1. **New tools:** Add `@tool` function → add to `CRAWL_TOOLS` → add to `allowed_tools` +2. **New commands:** Add handler in `ChatMode._handle_command()` +3. **Custom UI:** Replace `TerminalUI` with different renderer +4. **Persistent sessions:** Serialize browser cookies/state to disk in `BrowserManager` +5. **Multi-browser:** Modify `BrowserManager` to support multiple configs (not recommended) + +## Next Steps: Testing & Evaluation Pipeline + +### Phase 1: Automated Testing (CURRENT) +**Objective:** Verify codebase correctness, not agent quality + +**Test Execution:** +```bash +# 1. Component tests (fast, non-interactive) +python crawl4ai/agent/test_chat.py +# Expected: All components instantiate correctly + +# 2. Tool integration tests (medium, requires browser) +python crawl4ai/agent/test_tools.py +# Expected: All 7 tools work with Crawl4AI + +# 3. Multi-turn scenario tests (slow, comprehensive) +python crawl4ai/agent/test_scenarios.py +# Expected: 9 scenarios pass (2 simple, 3 medium, 4 complex) +# Output: test_agent_output/test_results.json +``` + +**Success Criteria:** +- All component tests pass +- All tool tests pass +- ≥80% scenario tests pass (7/9) +- No crashes, exceptions, or hangs +- Browser cleanup verified + +**Automated Pipeline:** +```bash +# Run all tests in sequence, exit on first failure +cd /Users/unclecode/devs/crawl4ai +python crawl4ai/agent/test_chat.py && \ +python crawl4ai/agent/test_tools.py && \ +python crawl4ai/agent/test_scenarios.py +echo "Exit code: $?" # 0 = all passed +``` + +### Phase 2: Evaluation (NEXT) +**Objective:** Measure agent performance quality + +**Metrics to define:** +- Task completion rate +- Tool selection accuracy +- Context retention across turns +- Planning effectiveness +- Error recovery capability + +**Eval framework needed:** +- Expand scenario tests with quality scoring +- Add ground truth comparisons +- Measure token efficiency +- Track reasoning quality + +**Not in scope yet** - wait for Phase 1 completion + +--- +**Last Updated:** 2025-01-17 +**Version:** 1.0.0 +**Status:** Testing Phase - Ready for automated test runs diff --git a/crawl4ai/agent/__init__.py b/crawl4ai/agent/__init__.py new file mode 100644 index 00000000..f2f6b83f --- /dev/null +++ b/crawl4ai/agent/__init__.py @@ -0,0 +1,13 @@ +# __init__.py +"""Crawl4AI Agent - Browser automation agent powered by Claude Code SDK.""" + +from .c4ai_tools import CRAWL_TOOLS +from .c4ai_prompts import SYSTEM_PROMPT +from .agent_crawl import CrawlAgent, SessionStorage + +__all__ = [ + "CRAWL_TOOLS", + "SYSTEM_PROMPT", + "CrawlAgent", + "SessionStorage", +] diff --git a/crawl4ai/agent/agent-cc-sdk.md b/crawl4ai/agent/agent-cc-sdk.md new file mode 100644 index 00000000..643cf1b6 --- /dev/null +++ b/crawl4ai/agent/agent-cc-sdk.md @@ -0,0 +1,593 @@ +```python +# c4ai_tools.py +"""Crawl4AI tools for Claude Code SDK agent.""" + +import json +import asyncio +from typing import Any, Dict +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode +from crawl4ai.extraction_strategy import LLMExtractionStrategy +from claude_agent_sdk import tool + +# Global session storage +CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {} + +@tool("quick_crawl", "One-shot crawl for simple extraction. Returns markdown, HTML, or structured data.", { + "url": str, + "output_format": str, # "markdown" | "html" | "structured" | "screenshot" + "extraction_schema": str, # Optional: JSON schema for structured extraction + "js_code": str, # Optional: JavaScript to execute before extraction + "wait_for": str, # Optional: CSS selector to wait for +}) +async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]: + """Fast single-page crawl without session management.""" + + crawler_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + js_code=args.get("js_code"), + wait_for=args.get("wait_for"), + ) + + # Add extraction strategy if structured data requested + if args.get("extraction_schema"): + run_config.extraction_strategy = LLMExtractionStrategy( + provider="openai/gpt-4o-mini", + schema=json.loads(args["extraction_schema"]), + instruction="Extract data according to the provided schema." + ) + + async with AsyncWebCrawler(config=crawler_config) as crawler: + result = await crawler.arun(url=args["url"], config=run_config) + + if not result.success: + return { + "content": [{ + "type": "text", + "text": json.dumps({"error": result.error_message, "success": False}) + }] + } + + output_map = { + "markdown": result.markdown_v2.raw_markdown if result.markdown_v2 else "", + "html": result.html, + "structured": result.extracted_content, + "screenshot": result.screenshot, + } + + response = { + "success": True, + "url": result.url, + "data": output_map.get(args["output_format"], result.markdown_v2.raw_markdown) + } + + return {"content": [{"type": "text", "text": json.dumps(response, indent=2)}]} + + +@tool("start_session", "Start a persistent browser session for multi-step crawling and automation.", { + "session_id": str, + "headless": bool, # Default True +}) +async def start_session(args: Dict[str, Any]) -> Dict[str, Any]: + """Initialize a persistent crawler session.""" + + session_id = args["session_id"] + if session_id in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} already exists", + "success": False + })}]} + + crawler_config = BrowserConfig( + headless=args.get("headless", True), + verbose=False + ) + + crawler = AsyncWebCrawler(config=crawler_config) + await crawler.__aenter__() + CRAWLER_SESSIONS[session_id] = crawler + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "session_id": session_id, + "message": f"Browser session {session_id} started" + })}]} + + +@tool("navigate", "Navigate to a URL in an active session.", { + "session_id": str, + "url": str, + "wait_for": str, # Optional: CSS selector to wait for + "js_code": str, # Optional: JavaScript to execute after load +}) +async def navigate(args: Dict[str, Any]) -> Dict[str, Any]: + """Navigate to URL in session.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_for=args.get("wait_for"), + js_code=args.get("js_code"), + ) + + result = await crawler.arun(url=args["url"], config=run_config) + + return {"content": [{"type": "text", "text": json.dumps({ + "success": result.success, + "url": result.url, + "message": f"Navigated to {args['url']}" + })}]} + + +@tool("extract_data", "Extract data from current page in session using schema or return markdown.", { + "session_id": str, + "output_format": str, # "markdown" | "structured" + "extraction_schema": str, # Required for structured, JSON schema + "wait_for": str, # Optional: Wait for element before extraction + "js_code": str, # Optional: Execute JS before extraction +}) +async def extract_data(args: Dict[str, Any]) -> Dict[str, Any]: + """Extract data from current page.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_for=args.get("wait_for"), + js_code=args.get("js_code"), + ) + + if args["output_format"] == "structured" and args.get("extraction_schema"): + run_config.extraction_strategy = LLMExtractionStrategy( + provider="openai/gpt-4o-mini", + schema=json.loads(args["extraction_schema"]), + instruction="Extract data according to schema." + ) + + result = await crawler.arun(config=run_config) + + if not result.success: + return {"content": [{"type": "text", "text": json.dumps({ + "error": result.error_message, + "success": False + })}]} + + data = (result.extracted_content if args["output_format"] == "structured" + else result.markdown_v2.raw_markdown if result.markdown_v2 else "") + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "data": data + }, indent=2)}]} + + +@tool("execute_js", "Execute JavaScript in the current page context.", { + "session_id": str, + "js_code": str, + "wait_for": str, # Optional: Wait for element after execution +}) +async def execute_js(args: Dict[str, Any]) -> Dict[str, Any]: + """Execute JavaScript in session.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + js_code=args["js_code"], + wait_for=args.get("wait_for"), + ) + + result = await crawler.arun(config=run_config) + + return {"content": [{"type": "text", "text": json.dumps({ + "success": result.success, + "message": "JavaScript executed" + })}]} + + +@tool("screenshot", "Take a screenshot of the current page.", { + "session_id": str, +}) +async def screenshot(args: Dict[str, Any]) -> Dict[str, Any]: + """Capture screenshot.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + result = await crawler.arun(config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)) + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "screenshot": result.screenshot if result.success else None + })}]} + + +@tool("close_session", "Close and cleanup a browser session.", { + "session_id": str, +}) +async def close_session(args: Dict[str, Any]) -> Dict[str, Any]: + """Close crawler session.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS.pop(session_id) + await crawler.__aexit__(None, None, None) + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "message": f"Session {session_id} closed" + })}]} + + +# Export all tools +CRAWL_TOOLS = [ + quick_crawl, + start_session, + navigate, + extract_data, + execute_js, + screenshot, + close_session, +] +``` + +```python +# c4ai_prompts.py +"""System prompts for Crawl4AI agent.""" + +SYSTEM_PROMPT = """You are an expert web crawling and browser automation agent powered by Crawl4AI. + +# Core Capabilities + +You can perform sophisticated multi-step web scraping and automation tasks through two modes: + +## Quick Mode (simple tasks) +- Use `quick_crawl` for single-page data extraction +- Best for: simple scrapes, getting page content, one-time extractions + +## Session Mode (complex tasks) +- Use `start_session` to create persistent browser sessions +- Navigate, interact, extract data across multiple pages +- Essential for: workflows requiring JS execution, pagination, filtering, multi-step automation + +# Tool Usage Patterns + +## Simple Extraction +1. Use `quick_crawl` with appropriate output_format +2. Provide extraction_schema for structured data + +## Multi-Step Workflow +1. `start_session` - Create browser session with unique ID +2. `navigate` - Go to target URL +3. `execute_js` - Interact with page (click buttons, scroll, fill forms) +4. `extract_data` - Get data using schema or markdown +5. Repeat steps 2-4 as needed +6. `close_session` - Clean up when done + +# Critical Instructions + +1. **Iteration & Validation**: When tasks require filtering or conditional logic: + - Extract data first, analyze results + - Filter/validate in your reasoning + - Make subsequent tool calls based on validation + - Continue until task criteria are met + +2. **Structured Extraction**: Always use JSON schemas for structured data: + ```json + { + "type": "object", + "properties": { + "field_name": {"type": "string"}, + "price": {"type": "number"} + } + } + ``` + +3. **Session Management**: + - Generate unique session IDs (e.g., "product_scrape_001") + - Always close sessions when done + - Use sessions for tasks requiring multiple page visits + +4. **JavaScript Execution**: + - Use for: clicking buttons, scrolling, waiting for dynamic content + - Example: `js_code: "document.querySelector('.load-more').click()"` + - Combine with `wait_for` to ensure content loads + +5. **Error Handling**: + - Check `success` field in all responses + - Retry with different strategies if extraction fails + - Report specific errors to user + +6. **Data Persistence**: + - Save results using `Write` tool to JSON files + - Use descriptive filenames with timestamps + - Structure data clearly for user consumption + +# Example Workflows + +## Workflow 1: Filter & Crawl +Task: "Find products >$10, crawl each, extract details" + +1. `quick_crawl` product listing page with schema for [name, price, url] +2. Analyze results, filter price > 10 in reasoning +3. `start_session` for detailed crawling +4. For each filtered product: + - `navigate` to product URL + - `extract_data` with detail schema +5. Aggregate results +6. `close_session` +7. `Write` results to JSON + +## Workflow 2: Paginated Scraping +Task: "Scrape all items across multiple pages" + +1. `start_session` +2. `navigate` to page 1 +3. `extract_data` items from current page +4. Check for "next" button +5. `execute_js` to click next +6. Repeat 3-5 until no more pages +7. `close_session` +8. Save aggregated data + +## Workflow 3: Dynamic Content +Task: "Scrape reviews after clicking 'Load More'" + +1. `start_session` +2. `navigate` to product page +3. `execute_js` to click load more button +4. `wait_for` reviews container +5. `extract_data` all reviews +6. `close_session` + +# Quality Guidelines + +- **Be thorough**: Don't stop until task requirements are fully met +- **Validate data**: Check extracted data matches expected format +- **Handle edge cases**: Empty results, pagination limits, rate limiting +- **Clear reporting**: Summarize what was found, any issues encountered +- **Efficient**: Use quick_crawl when possible, sessions only when needed + +# Output Format + +When saving data, use clean JSON structure: +```json +{ + "metadata": { + "scraped_at": "ISO timestamp", + "source_url": "...", + "total_items": 0 + }, + "data": [...] +} +``` + +Always provide a final summary of: +- Items found/processed +- Time taken +- Files created +- Any warnings/errors + +Remember: You have unlimited turns to complete the task. Take your time, validate each step, and ensure quality results.""" +``` + +```python +# agent_crawl.py +"""Crawl4AI Agent CLI - Browser automation agent powered by Claude Code SDK.""" + +import asyncio +import sys +import json +import uuid +from pathlib import Path +from datetime import datetime +from typing import Optional +import argparse + +from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server +from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage + +from c4ai_tools import CRAWL_TOOLS +from c4ai_prompts import SYSTEM_PROMPT + + +class SessionStorage: + """Manage session storage in ~/.crawl4ai/agents/projects/""" + + def __init__(self, cwd: Optional[str] = None): + self.cwd = Path(cwd) if cwd else Path.cwd() + self.base_dir = Path.home() / ".crawl4ai" / "agents" / "projects" + self.project_dir = self.base_dir / self._sanitize_path(str(self.cwd.resolve())) + self.project_dir.mkdir(parents=True, exist_ok=True) + self.session_id = str(uuid.uuid4()) + self.log_file = self.project_dir / f"{self.session_id}.jsonl" + + @staticmethod + def _sanitize_path(path: str) -> str: + """Convert /Users/unclecode/devs/test to -Users-unclecode-devs-test""" + return path.replace("/", "-").replace("\\", "-") + + def log(self, event_type: str, data: dict): + """Append event to JSONL log.""" + entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": event_type, + "session_id": self.session_id, + "data": data + } + with open(self.log_file, "a") as f: + f.write(json.dumps(entry) + "\n") + + def get_session_path(self) -> str: + """Return path to current session log.""" + return str(self.log_file) + + +class CrawlAgent: + """Crawl4AI agent wrapper.""" + + def __init__(self, args: argparse.Namespace): + self.args = args + self.storage = SessionStorage(args.add_dir[0] if args.add_dir else None) + self.client: Optional[ClaudeSDKClient] = None + + # Create MCP server with crawl tools + self.crawler_server = create_sdk_mcp_server( + name="crawl4ai", + version="1.0.0", + tools=CRAWL_TOOLS + ) + + # Build options + self.options = ClaudeAgentOptions( + mcp_servers={"crawler": self.crawler_server}, + allowed_tools=[ + "mcp__crawler__quick_crawl", + "mcp__crawler__start_session", + "mcp__crawler__navigate", + "mcp__crawler__extract_data", + "mcp__crawler__execute_js", + "mcp__crawler__screenshot", + "mcp__crawler__close_session", + "Write", "Read", "Bash" + ], + system_prompt=SYSTEM_PROMPT if not args.system_prompt else args.system_prompt, + permission_mode=args.permission_mode or "acceptEdits", + cwd=args.add_dir[0] if args.add_dir else str(Path.cwd()), + model=args.model, + session_id=args.session_id or self.storage.session_id, + ) + + async def run(self, prompt: str): + """Execute crawl task.""" + + self.storage.log("session_start", { + "prompt": prompt, + "cwd": self.options.cwd, + "model": self.options.model + }) + + print(f"\n🕷️ Crawl4AI Agent") + print(f"📁 Session: {self.storage.session_id}") + print(f"💾 Log: {self.storage.get_session_path()}") + print(f"🎯 Task: {prompt}\n") + + async with ClaudeSDKClient(options=self.options) as client: + self.client = client + await client.query(prompt) + + turn = 0 + async for message in client.receive_messages(): + turn += 1 + + if isinstance(message, AssistantMessage): + for block in message.content: + if isinstance(block, TextBlock): + print(f"\n💭 [{turn}] {block.text}") + self.storage.log("assistant_message", {"turn": turn, "text": block.text}) + + elif isinstance(message, ResultMessage): + print(f"\n✅ Completed in {message.duration_ms/1000:.2f}s") + print(f"💰 Cost: ${message.total_cost_usd:.4f}" if message.total_cost_usd else "") + print(f"🔄 Turns: {message.num_turns}") + + self.storage.log("session_end", { + "duration_ms": message.duration_ms, + "cost_usd": message.total_cost_usd, + "turns": message.num_turns, + "success": not message.is_error + }) + break + + print(f"\n📊 Session log: {self.storage.get_session_path()}\n") + + +def main(): + parser = argparse.ArgumentParser( + description="Crawl4AI Agent - Browser automation powered by Claude Code SDK", + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument("prompt", nargs="?", help="Your crawling task prompt") + parser.add_argument("--system-prompt", help="Custom system prompt") + parser.add_argument("--permission-mode", choices=["acceptEdits", "bypassPermissions", "default", "plan"], + help="Permission mode for tool execution") + parser.add_argument("--model", help="Model to use (e.g., 'sonnet', 'opus')") + parser.add_argument("--add-dir", nargs="+", help="Additional directories for file access") + parser.add_argument("--session-id", help="Use specific session ID (UUID)") + parser.add_argument("-v", "--version", action="version", version="Crawl4AI Agent 1.0.0") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") + + args = parser.parse_args() + + if not args.prompt: + parser.print_help() + print("\nExample usage:") + print(' crawl-agent "Scrape all products from example.com with price > $10"') + print(' crawl-agent --add-dir ~/projects "Find all Python files and analyze imports"') + sys.exit(1) + + try: + agent = CrawlAgent(args) + asyncio.run(agent.run(args.prompt)) + except KeyboardInterrupt: + print("\n\n⚠️ Interrupted by user") + sys.exit(0) + except Exception as e: + print(f"\n❌ Error: {e}") + if args.debug: + raise + sys.exit(1) + + +if __name__ == "__main__": + main() +``` + +**Usage:** + +```bash +# Simple scrape +python agent_crawl.py "Get all product names from example.com" + +# Complex filtering +python agent_crawl.py "Find products >$10 from shop.com, crawl each, extract id/name/price" + +# Multi-step automation +python agent_crawl.py "Go to amazon.com, search 'laptop', filter 4+ stars, scrape top 10" + +# With options +python agent_crawl.py --add-dir ~/projects --model sonnet "Scrape competitor prices" +``` + +**Session logs stored at:** +`~/.crawl4ai/agents/projects/-Users-unclecode-devs-test/{uuid}.jsonl` \ No newline at end of file diff --git a/crawl4ai/agent/agent_crawl.py b/crawl4ai/agent/agent_crawl.py new file mode 100644 index 00000000..b16fcf31 --- /dev/null +++ b/crawl4ai/agent/agent_crawl.py @@ -0,0 +1,202 @@ +# agent_crawl.py +"""Crawl4AI Agent CLI - Browser automation agent powered by Claude Code SDK.""" + +import asyncio +import sys +import json +import uuid +from pathlib import Path +from datetime import datetime +from typing import Optional +import argparse + +from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server +from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage + +from .c4ai_tools import CRAWL_TOOLS +from .c4ai_prompts import SYSTEM_PROMPT +from .terminal_ui import TerminalUI +from .chat_mode import ChatMode + + +class SessionStorage: + """Manage session storage in ~/.crawl4ai/agents/projects/""" + + def __init__(self, cwd: Optional[str] = None): + self.cwd = Path(cwd) if cwd else Path.cwd() + self.base_dir = Path.home() / ".crawl4ai" / "agents" / "projects" + self.project_dir = self.base_dir / self._sanitize_path(str(self.cwd.resolve())) + self.project_dir.mkdir(parents=True, exist_ok=True) + self.session_id = str(uuid.uuid4()) + self.log_file = self.project_dir / f"{self.session_id}.jsonl" + + @staticmethod + def _sanitize_path(path: str) -> str: + """Convert /Users/unclecode/devs/test to -Users-unclecode-devs-test""" + return path.replace("/", "-").replace("\\", "-") + + def log(self, event_type: str, data: dict): + """Append event to JSONL log.""" + entry = { + "timestamp": datetime.utcnow().isoformat(), + "event": event_type, + "session_id": self.session_id, + "data": data + } + with open(self.log_file, "a") as f: + f.write(json.dumps(entry) + "\n") + + def get_session_path(self) -> str: + """Return path to current session log.""" + return str(self.log_file) + + +class CrawlAgent: + """Crawl4AI agent wrapper.""" + + def __init__(self, args: argparse.Namespace): + self.args = args + self.storage = SessionStorage(args.add_dir[0] if args.add_dir else None) + self.client: Optional[ClaudeSDKClient] = None + + # Create MCP server with crawl tools + self.crawler_server = create_sdk_mcp_server( + name="crawl4ai", + version="1.0.0", + tools=CRAWL_TOOLS + ) + + # Build options + self.options = ClaudeAgentOptions( + mcp_servers={"crawler": self.crawler_server}, + allowed_tools=[ + # Crawl4AI tools + "mcp__crawler__quick_crawl", + "mcp__crawler__start_session", + "mcp__crawler__navigate", + "mcp__crawler__extract_data", + "mcp__crawler__execute_js", + "mcp__crawler__screenshot", + "mcp__crawler__close_session", + # Claude Code SDK built-in tools + "Read", + "Write", + "Edit", + "Glob", + "Grep", + "Bash", + "NotebookEdit" + ], + system_prompt=SYSTEM_PROMPT if not args.system_prompt else args.system_prompt, + permission_mode=args.permission_mode or "acceptEdits", + cwd=args.add_dir[0] if args.add_dir else str(Path.cwd()), + model=args.model, + ) + + async def run(self, prompt: str): + """Execute crawl task.""" + + self.storage.log("session_start", { + "prompt": prompt, + "cwd": self.options.cwd, + "model": self.options.model + }) + + print(f"\n🕷️ Crawl4AI Agent") + print(f"📁 Session: {self.storage.session_id}") + print(f"💾 Log: {self.storage.get_session_path()}") + print(f"🎯 Task: {prompt}\n") + + async with ClaudeSDKClient(options=self.options) as client: + self.client = client + await client.query(prompt) + + turn = 0 + async for message in client.receive_messages(): + turn += 1 + + if isinstance(message, AssistantMessage): + for block in message.content: + if isinstance(block, TextBlock): + print(f"\n💭 [{turn}] {block.text}") + self.storage.log("assistant_message", {"turn": turn, "text": block.text}) + + elif isinstance(message, ResultMessage): + print(f"\n✅ Completed in {message.duration_ms/1000:.2f}s") + print(f"💰 Cost: ${message.total_cost_usd:.4f}" if message.total_cost_usd else "") + print(f"🔄 Turns: {message.num_turns}") + + self.storage.log("session_end", { + "duration_ms": message.duration_ms, + "cost_usd": message.total_cost_usd, + "turns": message.num_turns, + "success": not message.is_error + }) + break + + print(f"\n📊 Session log: {self.storage.get_session_path()}\n") + + +def main(): + parser = argparse.ArgumentParser( + description="Crawl4AI Agent - Browser automation powered by Claude Code SDK", + formatter_class=argparse.RawDescriptionHelpFormatter + ) + + parser.add_argument("prompt", nargs="?", help="Your crawling task prompt (not used in --chat mode)") + parser.add_argument("--chat", action="store_true", help="Start interactive chat mode") + parser.add_argument("--system-prompt", help="Custom system prompt") + parser.add_argument("--permission-mode", choices=["acceptEdits", "bypassPermissions", "default", "plan"], + help="Permission mode for tool execution") + parser.add_argument("--model", help="Model to use (e.g., 'sonnet', 'opus')") + parser.add_argument("--add-dir", nargs="+", help="Additional directories for file access") + parser.add_argument("--session-id", help="Use specific session ID (UUID)") + parser.add_argument("-v", "--version", action="version", version="Crawl4AI Agent 1.0.0") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") + + args = parser.parse_args() + + # Chat mode - interactive + if args.chat: + try: + agent = CrawlAgent(args) + ui = TerminalUI() + chat = ChatMode(agent.options, ui, agent.storage) + asyncio.run(chat.run()) + except KeyboardInterrupt: + print("\n\n⚠️ Chat interrupted by user") + sys.exit(0) + except Exception as e: + print(f"\n❌ Error: {e}") + if args.debug: + raise + sys.exit(1) + return + + # Single-shot mode - requires prompt + if not args.prompt: + parser.print_help() + print("\nExample usage:") + print(' # Single-shot mode:') + print(' crawl-agent "Scrape all products from example.com with price > $10"') + print(' crawl-agent --add-dir ~/projects "Find all Python files and analyze imports"') + print() + print(' # Interactive chat mode:') + print(' crawl-agent --chat') + sys.exit(1) + + try: + agent = CrawlAgent(args) + asyncio.run(agent.run(args.prompt)) + except KeyboardInterrupt: + print("\n\n⚠️ Interrupted by user") + sys.exit(0) + except Exception as e: + print(f"\n❌ Error: {e}") + if args.debug: + raise + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/crawl4ai/agent/browser_manager.py b/crawl4ai/agent/browser_manager.py new file mode 100644 index 00000000..bb317736 --- /dev/null +++ b/crawl4ai/agent/browser_manager.py @@ -0,0 +1,73 @@ +"""Browser session management with singleton pattern for persistent browser instances.""" + +from typing import Optional +from crawl4ai import AsyncWebCrawler, BrowserConfig + + +class BrowserManager: + """Singleton browser manager for persistent browser sessions across agent operations.""" + + _instance: Optional['BrowserManager'] = None + _crawler: Optional[AsyncWebCrawler] = None + _config: Optional[BrowserConfig] = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @classmethod + async def get_browser(cls, config: Optional[BrowserConfig] = None) -> AsyncWebCrawler: + """ + Get or create the singleton browser instance. + + Args: + config: Optional browser configuration. Only used if no browser exists yet. + To change config, use reconfigure_browser() instead. + + Returns: + AsyncWebCrawler instance + """ + # Create new browser if needed + if cls._crawler is None: + # Create default config if none provided + if config is None: + config = BrowserConfig(headless=True, verbose=False) + + cls._crawler = AsyncWebCrawler(config=config) + await cls._crawler.start() + cls._config = config + + return cls._crawler + + @classmethod + async def reconfigure_browser(cls, new_config: BrowserConfig) -> AsyncWebCrawler: + """ + Close current browser and create a new one with different configuration. + + Args: + new_config: New browser configuration + + Returns: + New AsyncWebCrawler instance + """ + await cls.close_browser() + return await cls.get_browser(new_config) + + @classmethod + async def close_browser(cls): + """Close the current browser instance and cleanup.""" + if cls._crawler is not None: + await cls._crawler.close() + cls._crawler = None + cls._config = None + + @classmethod + def is_browser_active(cls) -> bool: + """Check if browser is currently active.""" + return cls._crawler is not None + + @classmethod + def get_current_config(cls) -> Optional[BrowserConfig]: + """Get the current browser configuration.""" + return cls._config diff --git a/crawl4ai/agent/c4ai_prompts.py b/crawl4ai/agent/c4ai_prompts.py new file mode 100644 index 00000000..efdcf9d0 --- /dev/null +++ b/crawl4ai/agent/c4ai_prompts.py @@ -0,0 +1,137 @@ +# c4ai_prompts.py +"""System prompts for Crawl4AI agent.""" + +SYSTEM_PROMPT = """You are an expert web crawling and browser automation agent powered by Crawl4AI. + +# Core Capabilities + +You can perform sophisticated multi-step web scraping and automation tasks through two modes: + +## Quick Mode (simple tasks) +- Use `quick_crawl` for single-page data extraction +- Best for: simple scrapes, getting page content, one-time extractions + +## Session Mode (complex tasks) +- Use `start_session` to create persistent browser sessions +- Navigate, interact, extract data across multiple pages +- Essential for: workflows requiring JS execution, pagination, filtering, multi-step automation + +# Tool Usage Patterns + +## Simple Extraction +1. Use `quick_crawl` with appropriate output_format +2. Provide extraction_schema for structured data + +## Multi-Step Workflow +1. `start_session` - Create browser session with unique ID +2. `navigate` - Go to target URL +3. `execute_js` - Interact with page (click buttons, scroll, fill forms) +4. `extract_data` - Get data using schema or markdown +5. Repeat steps 2-4 as needed +6. `close_session` - Clean up when done + +# Critical Instructions + +1. **Iteration & Validation**: When tasks require filtering or conditional logic: + - Extract data first, analyze results + - Filter/validate in your reasoning + - Make subsequent tool calls based on validation + - Continue until task criteria are met + +2. **Structured Extraction**: Always use JSON schemas for structured data: + ```json + { + "type": "object", + "properties": { + "field_name": {"type": "string"}, + "price": {"type": "number"} + } + } + ``` + +3. **Session Management**: + - Generate unique session IDs (e.g., "product_scrape_001") + - Always close sessions when done + - Use sessions for tasks requiring multiple page visits + +4. **JavaScript Execution**: + - Use for: clicking buttons, scrolling, waiting for dynamic content + - Example: `js_code: "document.querySelector('.load-more').click()"` + - Combine with `wait_for` to ensure content loads + +5. **Error Handling**: + - Check `success` field in all responses + - Retry with different strategies if extraction fails + - Report specific errors to user + +6. **Data Persistence**: + - Save results using `Write` tool to JSON files + - Use descriptive filenames with timestamps + - Structure data clearly for user consumption + +# Example Workflows + +## Workflow 1: Filter & Crawl +Task: "Find products >$10, crawl each, extract details" + +1. `quick_crawl` product listing page with schema for [name, price, url] +2. Analyze results, filter price > 10 in reasoning +3. `start_session` for detailed crawling +4. For each filtered product: + - `navigate` to product URL + - `extract_data` with detail schema +5. Aggregate results +6. `close_session` +7. `Write` results to JSON + +## Workflow 2: Paginated Scraping +Task: "Scrape all items across multiple pages" + +1. `start_session` +2. `navigate` to page 1 +3. `extract_data` items from current page +4. Check for "next" button +5. `execute_js` to click next +6. Repeat 3-5 until no more pages +7. `close_session` +8. Save aggregated data + +## Workflow 3: Dynamic Content +Task: "Scrape reviews after clicking 'Load More'" + +1. `start_session` +2. `navigate` to product page +3. `execute_js` to click load more button +4. `wait_for` reviews container +5. `extract_data` all reviews +6. `close_session` + +# Quality Guidelines + +- **Be thorough**: Don't stop until task requirements are fully met +- **Validate data**: Check extracted data matches expected format +- **Handle edge cases**: Empty results, pagination limits, rate limiting +- **Clear reporting**: Summarize what was found, any issues encountered +- **Efficient**: Use quick_crawl when possible, sessions only when needed + +# Output Format + +When saving data, use clean JSON structure: +```json +{ + "metadata": { + "scraped_at": "ISO timestamp", + "source_url": "...", + "total_items": 0 + }, + "data": [...] +} +``` + +Always provide a final summary of: +- Items found/processed +- Time taken +- Files created +- Any warnings/errors + +Remember: You have unlimited turns to complete the task. Take your time, validate each step, and ensure quality results.""" diff --git a/crawl4ai/agent/c4ai_tools.py b/crawl4ai/agent/c4ai_tools.py new file mode 100644 index 00000000..f18d4316 --- /dev/null +++ b/crawl4ai/agent/c4ai_tools.py @@ -0,0 +1,314 @@ +# c4ai_tools.py +"""Crawl4AI tools for Claude Code SDK agent.""" + +import json +import asyncio +from typing import Any, Dict +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode +from crawl4ai.extraction_strategy import LLMExtractionStrategy +from claude_agent_sdk import tool + +from .browser_manager import BrowserManager + +# Global session storage (for named sessions only) +CRAWLER_SESSIONS: Dict[str, AsyncWebCrawler] = {} +CRAWLER_SESSION_URLS: Dict[str, str] = {} # Track current URL per session + +@tool("quick_crawl", "One-shot crawl for simple extraction. Returns markdown, HTML, or structured data.", { + "url": str, + "output_format": str, # "markdown" | "html" | "structured" | "screenshot" + "extraction_schema": str, # Optional: JSON schema for structured extraction + "js_code": str, # Optional: JavaScript to execute before extraction + "wait_for": str, # Optional: CSS selector to wait for +}) +async def quick_crawl(args: Dict[str, Any]) -> Dict[str, Any]: + """Fast single-page crawl using persistent browser.""" + + # Use singleton browser manager + crawler_config = BrowserConfig(headless=True, verbose=False) + crawler = await BrowserManager.get_browser(crawler_config) + + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + js_code=args.get("js_code"), + wait_for=args.get("wait_for"), + ) + + # Add extraction strategy if structured data requested + if args.get("extraction_schema"): + run_config.extraction_strategy = LLMExtractionStrategy( + provider="openai/gpt-4o-mini", + schema=json.loads(args["extraction_schema"]), + instruction="Extract data according to the provided schema." + ) + + result = await crawler.arun(url=args["url"], config=run_config) + + if not result.success: + return { + "content": [{ + "type": "text", + "text": json.dumps({"error": result.error_message, "success": False}) + }] + } + + # Handle markdown - can be string or MarkdownGenerationResult object + markdown_content = "" + if isinstance(result.markdown, str): + markdown_content = result.markdown + elif hasattr(result.markdown, 'raw_markdown'): + markdown_content = result.markdown.raw_markdown + + output_map = { + "markdown": markdown_content, + "html": result.html, + "structured": result.extracted_content, + "screenshot": result.screenshot, + } + + response = { + "success": True, + "url": result.url, + "data": output_map.get(args["output_format"], markdown_content) + } + + return {"content": [{"type": "text", "text": json.dumps(response, indent=2)}]} + + +@tool("start_session", "Start a named browser session for multi-step crawling and automation.", { + "session_id": str, + "headless": bool, # Default True +}) +async def start_session(args: Dict[str, Any]) -> Dict[str, Any]: + """Initialize a named crawler session using the singleton browser.""" + + session_id = args["session_id"] + if session_id in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} already exists", + "success": False + })}]} + + # Use the singleton browser + crawler_config = BrowserConfig( + headless=args.get("headless", True), + verbose=False + ) + crawler = await BrowserManager.get_browser(crawler_config) + + # Store reference for named session + CRAWLER_SESSIONS[session_id] = crawler + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "session_id": session_id, + "message": f"Browser session {session_id} started" + })}]} + + +@tool("navigate", "Navigate to a URL in an active session.", { + "session_id": str, + "url": str, + "wait_for": str, # Optional: CSS selector to wait for + "js_code": str, # Optional: JavaScript to execute after load +}) +async def navigate(args: Dict[str, Any]) -> Dict[str, Any]: + """Navigate to URL in session.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_for=args.get("wait_for"), + js_code=args.get("js_code"), + ) + + result = await crawler.arun(url=args["url"], config=run_config) + + # Store current URL for this session + if result.success: + CRAWLER_SESSION_URLS[session_id] = result.url + + return {"content": [{"type": "text", "text": json.dumps({ + "success": result.success, + "url": result.url, + "message": f"Navigated to {args['url']}" + })}]} + + +@tool("extract_data", "Extract data from current page in session using schema or return markdown.", { + "session_id": str, + "output_format": str, # "markdown" | "structured" + "extraction_schema": str, # Required for structured, JSON schema + "wait_for": str, # Optional: Wait for element before extraction + "js_code": str, # Optional: Execute JS before extraction +}) +async def extract_data(args: Dict[str, Any]) -> Dict[str, Any]: + """Extract data from current page.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + # Check if we have a current URL for this session + if session_id not in CRAWLER_SESSION_URLS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": "No page loaded in session. Use 'navigate' first.", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + current_url = CRAWLER_SESSION_URLS[session_id] + + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + wait_for=args.get("wait_for"), + js_code=args.get("js_code"), + ) + + if args["output_format"] == "structured" and args.get("extraction_schema"): + run_config.extraction_strategy = LLMExtractionStrategy( + provider="openai/gpt-4o-mini", + schema=json.loads(args["extraction_schema"]), + instruction="Extract data according to schema." + ) + + result = await crawler.arun(url=current_url, config=run_config) + + if not result.success: + return {"content": [{"type": "text", "text": json.dumps({ + "error": result.error_message, + "success": False + })}]} + + # Handle markdown - can be string or MarkdownGenerationResult object + markdown_content = "" + if isinstance(result.markdown, str): + markdown_content = result.markdown + elif hasattr(result.markdown, 'raw_markdown'): + markdown_content = result.markdown.raw_markdown + + data = (result.extracted_content if args["output_format"] == "structured" + else markdown_content) + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "data": data + }, indent=2)}]} + + +@tool("execute_js", "Execute JavaScript in the current page context.", { + "session_id": str, + "js_code": str, + "wait_for": str, # Optional: Wait for element after execution +}) +async def execute_js(args: Dict[str, Any]) -> Dict[str, Any]: + """Execute JavaScript in session.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + # Check if we have a current URL for this session + if session_id not in CRAWLER_SESSION_URLS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": "No page loaded in session. Use 'navigate' first.", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + current_url = CRAWLER_SESSION_URLS[session_id] + + run_config = CrawlerRunConfig( + cache_mode=CacheMode.BYPASS, + js_code=args["js_code"], + wait_for=args.get("wait_for"), + ) + + result = await crawler.arun(url=current_url, config=run_config) + + return {"content": [{"type": "text", "text": json.dumps({ + "success": result.success, + "message": "JavaScript executed" + })}]} + + +@tool("screenshot", "Take a screenshot of the current page.", { + "session_id": str, +}) +async def screenshot(args: Dict[str, Any]) -> Dict[str, Any]: + """Capture screenshot.""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + # Check if we have a current URL for this session + if session_id not in CRAWLER_SESSION_URLS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": "No page loaded in session. Use 'navigate' first.", + "success": False + })}]} + + crawler = CRAWLER_SESSIONS[session_id] + current_url = CRAWLER_SESSION_URLS[session_id] + + result = await crawler.arun( + url=current_url, + config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS, screenshot=True) + ) + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "screenshot": result.screenshot if result.success else None + })}]} + + +@tool("close_session", "Close and cleanup a named browser session.", { + "session_id": str, +}) +async def close_session(args: Dict[str, Any]) -> Dict[str, Any]: + """Close named crawler session (browser stays alive for other operations).""" + + session_id = args["session_id"] + if session_id not in CRAWLER_SESSIONS: + return {"content": [{"type": "text", "text": json.dumps({ + "error": f"Session {session_id} not found", + "success": False + })}]} + + # Remove from named sessions, but don't close the singleton browser + CRAWLER_SESSIONS.pop(session_id) + CRAWLER_SESSION_URLS.pop(session_id, None) # Remove URL tracking + + return {"content": [{"type": "text", "text": json.dumps({ + "success": True, + "message": f"Session {session_id} closed" + })}]} + + +# Export all tools +CRAWL_TOOLS = [ + quick_crawl, + start_session, + navigate, + extract_data, + execute_js, + screenshot, + close_session, +] diff --git a/crawl4ai/agent/chat_mode.py b/crawl4ai/agent/chat_mode.py new file mode 100644 index 00000000..c240a0e1 --- /dev/null +++ b/crawl4ai/agent/chat_mode.py @@ -0,0 +1,166 @@ +"""Chat mode implementation with streaming message generator for Claude SDK.""" + +import asyncio +from typing import AsyncGenerator, Dict, Any, Optional +from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, AssistantMessage, TextBlock, ResultMessage, ToolUseBlock + +from .terminal_ui import TerminalUI +from .browser_manager import BrowserManager + + +class ChatMode: + """Interactive chat mode with streaming input/output.""" + + def __init__(self, options: ClaudeAgentOptions, ui: TerminalUI, storage): + self.options = options + self.ui = ui + self.storage = storage + self._exit_requested = False + self._current_streaming_text = "" + + async def message_generator(self) -> AsyncGenerator[Dict[str, Any], None]: + """ + Generate user messages as async generator (streaming input mode per cc_stream.md). + + Yields messages in the format: + { + "type": "user", + "message": { + "role": "user", + "content": "user input text" + } + } + """ + while not self._exit_requested: + try: + # Get user input + user_input = await asyncio.to_thread(self.ui.get_user_input) + + # Handle commands + if user_input.startswith('/'): + await self._handle_command(user_input) + if self._exit_requested: + break + continue + + # Skip empty input + if not user_input.strip(): + continue + + # Log user message + self.storage.log("user_message", {"text": user_input}) + + # Yield user message for agent + yield { + "type": "user", + "message": { + "role": "user", + "content": user_input + } + } + + except KeyboardInterrupt: + self._exit_requested = True + break + except Exception as e: + self.ui.print_error(f"Input error: {e}") + + async def _handle_command(self, command: str): + """Handle special chat commands.""" + cmd = command.lower().strip() + + if cmd == '/exit' or cmd == '/quit': + self._exit_requested = True + self.ui.print_info("Exiting chat mode...") + + elif cmd == '/clear': + self.ui.clear_screen() + + elif cmd == '/help': + self.ui.show_commands() + + elif cmd == '/browser': + # Show browser status + if BrowserManager.is_browser_active(): + config = BrowserManager.get_current_config() + self.ui.print_info(f"Browser active: {config}") + else: + self.ui.print_info("No browser instance active") + + else: + self.ui.print_error(f"Unknown command: {command}") + + async def run(self): + """Run the interactive chat loop with streaming responses.""" + # Show header + self.ui.show_header( + session_id=str(self.options.session_id or "chat"), + log_path=self.storage.get_session_path() if hasattr(self.storage, 'get_session_path') else "N/A" + ) + self.ui.show_commands() + + try: + async with ClaudeSDKClient(options=self.options) as client: + # Start streaming input mode + await client.query(self.message_generator()) + + # Process streaming responses + turn = 0 + async for message in client.receive_messages(): + turn += 1 + + if isinstance(message, AssistantMessage): + # Clear "thinking" line if we printed it + if self._current_streaming_text: + self.ui.console.print() # New line after streaming + + self._current_streaming_text = "" + + # Process message content blocks + for block in message.content: + if isinstance(block, TextBlock): + # Stream text as it arrives + self.ui.print_agent_text(block.text) + self._current_streaming_text += block.text + + # Log assistant message + self.storage.log("assistant_message", { + "turn": turn, + "text": block.text + }) + + elif isinstance(block, ToolUseBlock): + # Show tool usage + self.ui.print_tool_use(block.name) + + elif isinstance(message, ResultMessage): + # Session completed (user exited or error) + if message.is_error: + self.ui.print_error(f"Agent error: {message.result}") + else: + self.ui.print_session_summary( + duration_s=message.duration_ms / 1000 if message.duration_ms else 0, + turns=message.num_turns, + cost_usd=message.total_cost_usd + ) + + # Log session end + self.storage.log("session_end", { + "duration_ms": message.duration_ms, + "cost_usd": message.total_cost_usd, + "turns": message.num_turns, + "success": not message.is_error + }) + break + + except KeyboardInterrupt: + self.ui.print_info("\nChat interrupted by user") + + except Exception as e: + self.ui.print_error(f"Chat error: {e}") + raise + + finally: + # Cleanup browser on exit + await BrowserManager.close_browser() + self.ui.print_info("Browser closed") diff --git a/crawl4ai/agent/terminal_ui.py b/crawl4ai/agent/terminal_ui.py new file mode 100644 index 00000000..84475080 --- /dev/null +++ b/crawl4ai/agent/terminal_ui.py @@ -0,0 +1,115 @@ +"""Terminal UI components using Rich for beautiful agent output.""" + +from rich.console import Console +from rich.markdown import Markdown +from rich.syntax import Syntax +from rich.panel import Panel +from rich.live import Live +from rich.spinner import Spinner +from rich.text import Text +from rich.prompt import Prompt +from rich.rule import Rule + + +class TerminalUI: + """Rich-based terminal interface for the Crawl4AI agent.""" + + def __init__(self): + self.console = Console() + self._current_text = "" + + def show_header(self, session_id: str, log_path: str): + """Display agent session header.""" + self.console.print() + self.console.print(Panel.fit( + "[bold cyan]🕷️ Crawl4AI Agent - Chat Mode[/bold cyan]", + border_style="cyan" + )) + self.console.print(f"[dim]📁 Session: {session_id}[/dim]") + self.console.print(f"[dim]💾 Log: {log_path}[/dim]") + self.console.print() + + def show_commands(self): + """Display available commands.""" + self.console.print("\n[dim]Commands:[/dim]") + self.console.print(" [cyan]/exit[/cyan] - Exit chat") + self.console.print(" [cyan]/clear[/cyan] - Clear screen") + self.console.print(" [cyan]/help[/cyan] - Show this help\n") + + def get_user_input(self) -> str: + """Get user input with styled prompt.""" + return Prompt.ask("\n[bold green]You[/bold green]") + + def print_separator(self): + """Print a visual separator.""" + self.console.print(Rule(style="dim")) + + def print_thinking(self): + """Show thinking indicator.""" + self.console.print("\n[cyan]Agent:[/cyan] [dim]thinking...[/dim]", end="") + + def print_agent_text(self, text: str, stream: bool = False): + """ + Print agent response text. + + Args: + text: Text to print + stream: If True, append to current streaming output + """ + if stream: + # For streaming, just print without newline + self.console.print(f"\r[cyan]Agent:[/cyan] {text}", end="") + else: + # For complete messages + self.console.print(f"\n[cyan]Agent:[/cyan] {text}") + + def print_markdown(self, markdown_text: str): + """Render markdown content.""" + self.console.print() + self.console.print(Markdown(markdown_text)) + + def print_code(self, code: str, language: str = "python"): + """Render code with syntax highlighting.""" + self.console.print() + self.console.print(Syntax(code, language, theme="monokai", line_numbers=True)) + + def print_error(self, error_msg: str): + """Display error message.""" + self.console.print(f"\n[bold red]Error:[/bold red] {error_msg}") + + def print_success(self, msg: str): + """Display success message.""" + self.console.print(f"\n[bold green]✓[/bold green] {msg}") + + def print_info(self, msg: str): + """Display info message.""" + self.console.print(f"\n[bold blue]ℹ[/bold blue] {msg}") + + def clear_screen(self): + """Clear the terminal screen.""" + self.console.clear() + + def print_session_summary(self, duration_s: float, turns: int, cost_usd: float = None): + """Display session completion summary.""" + self.console.print() + self.console.print(Panel( + f"[green]✅ Completed[/green]\n" + f"⏱ Duration: {duration_s:.2f}s\n" + f"🔄 Turns: {turns}\n" + + (f"💰 Cost: ${cost_usd:.4f}" if cost_usd else ""), + border_style="green" + )) + + def print_tool_use(self, tool_name: str): + """Indicate tool usage.""" + self.console.print(f"\n[dim]🔧 Using tool: {tool_name}[/dim]") + + def with_spinner(self, text: str = "Processing..."): + """ + Context manager for showing a spinner. + + Usage: + with ui.with_spinner("Crawling page..."): + # do work + """ + return self.console.status(f"[cyan]{text}[/cyan]", spinner="dots") diff --git a/crawl4ai/agent/test_chat.py b/crawl4ai/agent/test_chat.py new file mode 100644 index 00000000..f716c0ca --- /dev/null +++ b/crawl4ai/agent/test_chat.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +"""Test script to verify chat mode setup (non-interactive).""" + +import sys +import asyncio +from pathlib import Path + +# Add parent to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from crawl4ai.agent.browser_manager import BrowserManager +from crawl4ai.agent.terminal_ui import TerminalUI +from crawl4ai.agent.chat_mode import ChatMode +from crawl4ai.agent.c4ai_tools import CRAWL_TOOLS +from crawl4ai.agent.c4ai_prompts import SYSTEM_PROMPT + +from claude_agent_sdk import ClaudeAgentOptions, create_sdk_mcp_server + + +class MockStorage: + """Mock storage for testing.""" + + def log(self, event_type: str, data: dict): + print(f"[LOG] {event_type}: {data}") + + def get_session_path(self): + return "/tmp/test_session.jsonl" + + +async def test_components(): + """Test individual components.""" + + print("="*60) + print("CHAT MODE COMPONENT TESTS") + print("="*60) + + # Test 1: BrowserManager + print("\n[TEST 1] BrowserManager singleton") + try: + browser1 = await BrowserManager.get_browser() + browser2 = await BrowserManager.get_browser() + assert browser1 is browser2, "Browser instances should be same (singleton)" + print("✓ BrowserManager singleton works") + await BrowserManager.close_browser() + except Exception as e: + print(f"✗ BrowserManager failed: {e}") + return False + + # Test 2: TerminalUI + print("\n[TEST 2] TerminalUI rendering") + try: + ui = TerminalUI() + ui.show_header("test-123", "/tmp/test.log") + ui.print_agent_text("Hello from agent") + ui.print_markdown("# Test\nThis is **bold**") + ui.print_success("Test success message") + print("✓ TerminalUI renders correctly") + except Exception as e: + print(f"✗ TerminalUI failed: {e}") + return False + + # Test 3: MCP Server Setup + print("\n[TEST 3] MCP Server with tools") + try: + crawler_server = create_sdk_mcp_server( + name="crawl4ai", + version="1.0.0", + tools=CRAWL_TOOLS + ) + print(f"✓ MCP server created with {len(CRAWL_TOOLS)} tools") + except Exception as e: + print(f"✗ MCP Server failed: {e}") + return False + + # Test 4: ChatMode instantiation + print("\n[TEST 4] ChatMode instantiation") + try: + options = ClaudeAgentOptions( + mcp_servers={"crawler": crawler_server}, + allowed_tools=[ + "mcp__crawler__quick_crawl", + "mcp__crawler__start_session", + "mcp__crawler__navigate", + "mcp__crawler__extract_data", + "mcp__crawler__execute_js", + "mcp__crawler__screenshot", + "mcp__crawler__close_session", + ], + system_prompt=SYSTEM_PROMPT, + permission_mode="acceptEdits" + ) + + ui = TerminalUI() + storage = MockStorage() + chat = ChatMode(options, ui, storage) + print("✓ ChatMode instance created successfully") + except Exception as e: + print(f"✗ ChatMode failed: {e}") + import traceback + traceback.print_exc() + return False + + print("\n" + "="*60) + print("ALL COMPONENT TESTS PASSED ✓") + print("="*60) + print("\nTo test interactive chat mode, run:") + print(" python -m crawl4ai.agent.agent_crawl --chat") + + return True + + +if __name__ == "__main__": + success = asyncio.run(test_components()) + sys.exit(0 if success else 1) diff --git a/crawl4ai/agent/test_scenarios.py b/crawl4ai/agent/test_scenarios.py new file mode 100644 index 00000000..fb7cad44 --- /dev/null +++ b/crawl4ai/agent/test_scenarios.py @@ -0,0 +1,524 @@ +#!/usr/bin/env python +""" +Automated multi-turn chat scenario tests for Crawl4AI Agent. + +Tests agent's ability to handle complex conversations, maintain state, +plan and execute tasks without human interaction. +""" + +import asyncio +import json +import time +from pathlib import Path +from typing import List, Dict, Any, Optional +from dataclasses import dataclass +from enum import Enum + +from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions, create_sdk_mcp_server +from claude_agent_sdk import AssistantMessage, TextBlock, ResultMessage, ToolUseBlock + +from .c4ai_tools import CRAWL_TOOLS +from .c4ai_prompts import SYSTEM_PROMPT +from .browser_manager import BrowserManager + + +class TurnResult(Enum): + """Result of a single conversation turn.""" + PASS = "PASS" + FAIL = "FAIL" + TIMEOUT = "TIMEOUT" + ERROR = "ERROR" + + +@dataclass +class TurnExpectation: + """Expectations for a single conversation turn.""" + user_message: str + expect_tools: Optional[List[str]] = None # Tools that should be called + expect_keywords: Optional[List[str]] = None # Keywords in response + expect_files_created: Optional[List[str]] = None # File patterns created + expect_success: bool = True # Should complete without error + expect_min_turns: int = 1 # Minimum agent turns to complete + timeout_seconds: int = 60 + + +@dataclass +class Scenario: + """A complete multi-turn conversation scenario.""" + name: str + category: str # "simple", "medium", "complex" + description: str + turns: List[TurnExpectation] + cleanup_files: Optional[List[str]] = None # Files to cleanup after test + + +# ============================================================================= +# TEST SCENARIOS - Categorized from Simple to Complex +# ============================================================================= + +SIMPLE_SCENARIOS = [ + Scenario( + name="Single quick crawl", + category="simple", + description="Basic one-shot crawl with markdown extraction", + turns=[ + TurnExpectation( + user_message="Use quick_crawl to get the title from example.com", + expect_tools=["mcp__crawler__quick_crawl"], + expect_keywords=["Example Domain", "title"], + timeout_seconds=30 + ) + ] + ), + + Scenario( + name="Session lifecycle", + category="simple", + description="Start session, navigate, close - basic session management", + turns=[ + TurnExpectation( + user_message="Start a session named 'simple_test'", + expect_tools=["mcp__crawler__start_session"], + expect_keywords=["session", "started"], + timeout_seconds=20 + ), + TurnExpectation( + user_message="Navigate to example.com", + expect_tools=["mcp__crawler__navigate"], + expect_keywords=["navigated", "example.com"], + timeout_seconds=25 + ), + TurnExpectation( + user_message="Close the session", + expect_tools=["mcp__crawler__close_session"], + expect_keywords=["closed"], + timeout_seconds=15 + ) + ] + ), +] + + +MEDIUM_SCENARIOS = [ + Scenario( + name="Multi-page crawl with file output", + category="medium", + description="Crawl multiple pages and save results to file", + turns=[ + TurnExpectation( + user_message="Crawl example.com and example.org, extract titles from both", + expect_tools=["mcp__crawler__quick_crawl"], + expect_min_turns=2, # Should make 2 separate crawls + timeout_seconds=45 + ), + TurnExpectation( + user_message="Save the results to a JSON file called crawl_results.json", + expect_tools=["Write"], + expect_files_created=["crawl_results.json"], + timeout_seconds=20 + ) + ], + cleanup_files=["crawl_results.json"] + ), + + Scenario( + name="Session-based data extraction", + category="medium", + description="Use session to navigate and extract data in steps", + turns=[ + TurnExpectation( + user_message="Start session 'extract_test', navigate to example.com, and extract the markdown", + expect_tools=["mcp__crawler__start_session", "mcp__crawler__navigate", "mcp__crawler__extract_data"], + expect_keywords=["Example Domain"], + timeout_seconds=50 + ), + TurnExpectation( + user_message="Now save that markdown to example_content.md", + expect_tools=["Write"], + expect_files_created=["example_content.md"], + timeout_seconds=20 + ), + TurnExpectation( + user_message="Close the session", + expect_tools=["mcp__crawler__close_session"], + timeout_seconds=15 + ) + ], + cleanup_files=["example_content.md"] + ), + + Scenario( + name="Context retention across turns", + category="medium", + description="Agent should remember previous context", + turns=[ + TurnExpectation( + user_message="Crawl example.com and tell me the title", + expect_tools=["mcp__crawler__quick_crawl"], + expect_keywords=["Example Domain"], + timeout_seconds=30 + ), + TurnExpectation( + user_message="What was the URL I just asked you to crawl?", + expect_keywords=["example.com"], + expect_tools=[], # Should answer from memory, no tools needed + timeout_seconds=15 + ) + ] + ), +] + + +COMPLEX_SCENARIOS = [ + Scenario( + name="Multi-step task with planning", + category="complex", + description="Complex task requiring agent to plan, execute, and verify", + turns=[ + TurnExpectation( + user_message="Crawl example.com and example.org, compare their content, and create a markdown report with: 1) titles of both, 2) word count comparison, 3) save to comparison_report.md", + expect_tools=["mcp__crawler__quick_crawl", "Write"], + expect_files_created=["comparison_report.md"], + expect_min_turns=3, # Plan, crawl both, write report + timeout_seconds=90 + ), + TurnExpectation( + user_message="Read back the report you just created", + expect_tools=["Read"], + expect_keywords=["Example Domain"], + timeout_seconds=20 + ) + ], + cleanup_files=["comparison_report.md"] + ), + + Scenario( + name="Session with state manipulation", + category="complex", + description="Complex session workflow with multiple operations", + turns=[ + TurnExpectation( + user_message="Start session 'complex_session' and navigate to example.com", + expect_tools=["mcp__crawler__start_session", "mcp__crawler__navigate"], + timeout_seconds=30 + ), + TurnExpectation( + user_message="Extract the page content and count how many times the word 'example' appears (case insensitive)", + expect_tools=["mcp__crawler__extract_data"], + expect_keywords=["example"], + timeout_seconds=30 + ), + TurnExpectation( + user_message="Take a screenshot of the current page", + expect_tools=["mcp__crawler__screenshot"], + expect_keywords=["screenshot"], + timeout_seconds=25 + ), + TurnExpectation( + user_message="Close the session", + expect_tools=["mcp__crawler__close_session"], + timeout_seconds=15 + ) + ] + ), + + Scenario( + name="Error recovery and continuation", + category="complex", + description="Agent should handle errors gracefully and continue", + turns=[ + TurnExpectation( + user_message="Crawl https://this-site-definitely-does-not-exist-12345.com", + expect_success=False, # Should fail gracefully + expect_keywords=["error", "fail"], + timeout_seconds=30 + ), + TurnExpectation( + user_message="That's okay, crawl example.com instead", + expect_tools=["mcp__crawler__quick_crawl"], + expect_keywords=["Example Domain"], + timeout_seconds=30 + ) + ] + ), +] + + +# Combine all scenarios +ALL_SCENARIOS = SIMPLE_SCENARIOS + MEDIUM_SCENARIOS + COMPLEX_SCENARIOS + + +# ============================================================================= +# TEST RUNNER +# ============================================================================= + +class ScenarioRunner: + """Runs automated chat scenarios without human interaction.""" + + def __init__(self, working_dir: Path): + self.working_dir = working_dir + self.results = [] + + async def run_scenario(self, scenario: Scenario) -> Dict[str, Any]: + """Run a single scenario and return results.""" + print(f"\n{'='*70}") + print(f"[{scenario.category.upper()}] {scenario.name}") + print(f"{'='*70}") + print(f"Description: {scenario.description}\n") + + start_time = time.time() + turn_results = [] + + try: + # Setup agent options + crawler_server = create_sdk_mcp_server( + name="crawl4ai", + version="1.0.0", + tools=CRAWL_TOOLS + ) + + options = ClaudeAgentOptions( + mcp_servers={"crawler": crawler_server}, + allowed_tools=[ + "mcp__crawler__quick_crawl", + "mcp__crawler__start_session", + "mcp__crawler__navigate", + "mcp__crawler__extract_data", + "mcp__crawler__execute_js", + "mcp__crawler__screenshot", + "mcp__crawler__close_session", + "Read", "Write", "Edit", "Glob", "Grep", "Bash" + ], + system_prompt=SYSTEM_PROMPT, + permission_mode="acceptEdits", + cwd=str(self.working_dir) + ) + + # Run conversation + async with ClaudeSDKClient(options=options) as client: + for turn_idx, expectation in enumerate(scenario.turns, 1): + print(f"\nTurn {turn_idx}: {expectation.user_message}") + + turn_result = await self._run_turn( + client, expectation, turn_idx + ) + turn_results.append(turn_result) + + if turn_result["status"] != TurnResult.PASS: + print(f" ✗ FAILED: {turn_result['reason']}") + break + else: + print(f" ✓ PASSED") + + # Cleanup + if scenario.cleanup_files: + self._cleanup_files(scenario.cleanup_files) + + # Overall result + all_passed = all(r["status"] == TurnResult.PASS for r in turn_results) + duration = time.time() - start_time + + result = { + "scenario": scenario.name, + "category": scenario.category, + "status": "PASS" if all_passed else "FAIL", + "duration_seconds": duration, + "turns": turn_results + } + + return result + + except Exception as e: + print(f"\n✗ SCENARIO ERROR: {e}") + return { + "scenario": scenario.name, + "category": scenario.category, + "status": "ERROR", + "error": str(e), + "duration_seconds": time.time() - start_time, + "turns": turn_results + } + finally: + # Ensure browser cleanup + await BrowserManager.close_browser() + + async def _run_turn( + self, + client: ClaudeSDKClient, + expectation: TurnExpectation, + turn_number: int + ) -> Dict[str, Any]: + """Execute a single conversation turn and validate.""" + + tools_used = [] + response_text = "" + agent_turns = 0 + + try: + # Send user message + await client.query(expectation.user_message) + + # Collect response + start_time = time.time() + async for message in client.receive_messages(): + if time.time() - start_time > expectation.timeout_seconds: + return { + "turn": turn_number, + "status": TurnResult.TIMEOUT, + "reason": f"Exceeded {expectation.timeout_seconds}s timeout" + } + + if isinstance(message, AssistantMessage): + agent_turns += 1 + for block in message.content: + if isinstance(block, TextBlock): + response_text += block.text + " " + elif isinstance(block, ToolUseBlock): + tools_used.append(block.name) + + elif isinstance(message, ResultMessage): + # Check if error when expecting success + if expectation.expect_success and message.is_error: + return { + "turn": turn_number, + "status": TurnResult.FAIL, + "reason": f"Agent returned error: {message.result}" + } + break + + # Validate expectations + validation = self._validate_turn( + expectation, tools_used, response_text, agent_turns + ) + + return { + "turn": turn_number, + "status": validation["status"], + "reason": validation.get("reason", "All checks passed"), + "tools_used": tools_used, + "agent_turns": agent_turns + } + + except Exception as e: + return { + "turn": turn_number, + "status": TurnResult.ERROR, + "reason": f"Exception: {str(e)}" + } + + def _validate_turn( + self, + expectation: TurnExpectation, + tools_used: List[str], + response_text: str, + agent_turns: int + ) -> Dict[str, Any]: + """Validate turn results against expectations.""" + + # Check expected tools + if expectation.expect_tools: + for tool in expectation.expect_tools: + if tool not in tools_used: + return { + "status": TurnResult.FAIL, + "reason": f"Expected tool '{tool}' was not used" + } + + # Check keywords + if expectation.expect_keywords: + response_lower = response_text.lower() + for keyword in expectation.expect_keywords: + if keyword.lower() not in response_lower: + return { + "status": TurnResult.FAIL, + "reason": f"Expected keyword '{keyword}' not found in response" + } + + # Check files created + if expectation.expect_files_created: + for pattern in expectation.expect_files_created: + matches = list(self.working_dir.glob(pattern)) + if not matches: + return { + "status": TurnResult.FAIL, + "reason": f"Expected file matching '{pattern}' was not created" + } + + # Check minimum turns + if agent_turns < expectation.expect_min_turns: + return { + "status": TurnResult.FAIL, + "reason": f"Expected at least {expectation.expect_min_turns} agent turns, got {agent_turns}" + } + + return {"status": TurnResult.PASS} + + def _cleanup_files(self, patterns: List[str]): + """Remove files created during test.""" + for pattern in patterns: + for file_path in self.working_dir.glob(pattern): + try: + file_path.unlink() + except Exception as e: + print(f" Warning: Could not delete {file_path}: {e}") + + +async def run_all_scenarios(working_dir: Optional[Path] = None): + """Run all test scenarios and report results.""" + + if working_dir is None: + working_dir = Path.cwd() / "test_agent_output" + working_dir.mkdir(exist_ok=True) + + runner = ScenarioRunner(working_dir) + + print("\n" + "="*70) + print("CRAWL4AI AGENT SCENARIO TESTS") + print("="*70) + print(f"Working directory: {working_dir}") + print(f"Total scenarios: {len(ALL_SCENARIOS)}") + print(f" Simple: {len(SIMPLE_SCENARIOS)}") + print(f" Medium: {len(MEDIUM_SCENARIOS)}") + print(f" Complex: {len(COMPLEX_SCENARIOS)}") + + results = [] + for scenario in ALL_SCENARIOS: + result = await runner.run_scenario(scenario) + results.append(result) + + # Summary + print("\n" + "="*70) + print("TEST SUMMARY") + print("="*70) + + by_category = {"simple": [], "medium": [], "complex": []} + for result in results: + by_category[result["category"]].append(result) + + for category in ["simple", "medium", "complex"]: + cat_results = by_category[category] + passed = sum(1 for r in cat_results if r["status"] == "PASS") + total = len(cat_results) + print(f"\n{category.upper()}: {passed}/{total} passed") + for r in cat_results: + status_icon = "✓" if r["status"] == "PASS" else "✗" + print(f" {status_icon} {r['scenario']} ({r['duration_seconds']:.1f}s)") + + total_passed = sum(1 for r in results if r["status"] == "PASS") + total = len(results) + + print(f"\nOVERALL: {total_passed}/{total} scenarios passed ({total_passed/total*100:.1f}%)") + + # Save detailed results + results_file = working_dir / "test_results.json" + with open(results_file, "w") as f: + json.dump(results, f, indent=2) + print(f"\nDetailed results saved to: {results_file}") + + return total_passed == total + + +if __name__ == "__main__": + import sys + success = asyncio.run(run_all_scenarios()) + sys.exit(0 if success else 1) diff --git a/crawl4ai/agent/test_tools.py b/crawl4ai/agent/test_tools.py new file mode 100644 index 00000000..6cda71dd --- /dev/null +++ b/crawl4ai/agent/test_tools.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +"""Test script for Crawl4AI tools - tests tools directly without the agent.""" + +import asyncio +import json +from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode + +async def test_quick_crawl(): + """Test quick_crawl tool logic directly.""" + print("\n" + "="*60) + print("TEST 1: Quick Crawl - Markdown Format") + print("="*60) + + crawler_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + async with AsyncWebCrawler(config=crawler_config) as crawler: + result = await crawler.arun(url="https://example.com", config=run_config) + + print(f"Success: {result.success}") + print(f"URL: {result.url}") + + # Handle markdown - can be string or MarkdownGenerationResult object + if isinstance(result.markdown, str): + markdown_content = result.markdown + elif hasattr(result.markdown, 'raw_markdown'): + markdown_content = result.markdown.raw_markdown + else: + markdown_content = str(result.markdown) + + print(f"Markdown type: {type(result.markdown)}") + print(f"Markdown length: {len(markdown_content)}") + print(f"Markdown preview:\n{markdown_content[:300]}") + + return result.success + + +async def test_session_workflow(): + """Test session-based workflow.""" + print("\n" + "="*60) + print("TEST 2: Session-Based Workflow") + print("="*60) + + crawler_config = BrowserConfig(headless=True, verbose=False) + + # Start session + crawler = AsyncWebCrawler(config=crawler_config) + await crawler.__aenter__() + print("✓ Session started") + + try: + # Navigate to URL + run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + result = await crawler.arun(url="https://example.com", config=run_config) + print(f"✓ Navigated to {result.url}, success: {result.success}") + + # Extract data + if isinstance(result.markdown, str): + markdown_content = result.markdown + elif hasattr(result.markdown, 'raw_markdown'): + markdown_content = result.markdown.raw_markdown + else: + markdown_content = str(result.markdown) + + print(f"✓ Extracted {len(markdown_content)} chars of markdown") + print(f" Preview: {markdown_content[:200]}") + + # Screenshot test - need to re-fetch with screenshot enabled + screenshot_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS, screenshot=True) + result2 = await crawler.arun(url=result.url, config=screenshot_config) + print(f"✓ Screenshot captured: {result2.screenshot is not None}") + + return True + + finally: + # Close session + await crawler.__aexit__(None, None, None) + print("✓ Session closed") + + +async def test_html_format(): + """Test HTML output format.""" + print("\n" + "="*60) + print("TEST 3: Quick Crawl - HTML Format") + print("="*60) + + crawler_config = BrowserConfig(headless=True, verbose=False) + run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS) + + async with AsyncWebCrawler(config=crawler_config) as crawler: + result = await crawler.arun(url="https://example.com", config=run_config) + + print(f"Success: {result.success}") + print(f"HTML length: {len(result.html)}") + print(f"HTML preview:\n{result.html[:300]}") + + return result.success + + +async def main(): + """Run all tests.""" + print("\n" + "="*70) + print(" CRAWL4AI TOOLS TEST SUITE") + print("="*70) + + tests = [ + ("Quick Crawl (Markdown)", test_quick_crawl), + ("Session Workflow", test_session_workflow), + ("Quick Crawl (HTML)", test_html_format), + ] + + results = [] + for name, test_func in tests: + try: + result = await test_func() + results.append((name, result, None)) + except Exception as e: + results.append((name, False, str(e))) + + # Summary + print("\n" + "="*70) + print(" TEST SUMMARY") + print("="*70) + + for name, success, error in results: + status = "✓ PASS" if success else "✗ FAIL" + print(f"{status} - {name}") + if error: + print(f" Error: {error}") + + total = len(results) + passed = sum(1 for _, success, _ in results if success) + print(f"\nTotal: {total} | Passed: {passed} | Failed: {total - passed}") + + return all(success for _, success, _ in results) + + +if __name__ == "__main__": + success = asyncio.run(main()) + exit(0 if success else 1)