Release/v0.7.6 (#1556)

* fix(docker-api): migrate to modern datetime library API

Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>

* Fix examples in README.md

* feat(docker): add user-provided hooks support to Docker API

Implements comprehensive hooks functionality allowing users to provide custom Python
functions as strings that execute at specific points in the crawling pipeline.

Key Features:
- Support for all 8 crawl4ai hook points:
  • on_browser_created: Initialize browser settings
  • on_page_context_created: Configure page context
  • before_goto: Pre-navigation setup
  • after_goto: Post-navigation processing
  • on_user_agent_updated: User agent modification handling
  • on_execution_started: Crawl execution initialization
  • before_retrieve_html: Pre-extraction processing
  • before_return_html: Final HTML processing

Implementation Details:
- Created UserHookManager for validation, compilation, and safe execution
- Added IsolatedHookWrapper for error isolation and timeout protection
- AST-based validation ensures code structure correctness
- Sandboxed execution with restricted builtins for security
- Configurable timeout (1-120 seconds) prevents infinite loops
- Comprehensive error handling ensures hooks don't crash main process
- Execution tracking with detailed statistics and logging

API Changes:
- Added HookConfig schema with code and timeout fields
- Extended CrawlRequest with optional hooks parameter
- Added /hooks/info endpoint for hook discovery
- Updated /crawl and /crawl/stream endpoints to support hooks

Safety Features:
- Malformed hooks return clear validation errors
- Hook errors are isolated and reported without stopping crawl
- Execution statistics track success/failure/timeout rates
- All hook results are JSON-serializable

Testing:
- Comprehensive test suite covering all 8 hooks
- Error handling and timeout scenarios validated
- Authentication, performance, and content extraction examples
- 100% success rate in production testing

Documentation:
- Added extensive hooks section to docker-deployment.md
- Security warnings about user-provided code risks
- Real-world examples using httpbin.org, GitHub, BBC
- Best practices and troubleshooting guide

ref #1377

* fix(deep-crawl): BestFirst priority inversion; remove pre-scoring truncation. ref #1253

  Use negative scores in PQ to visit high-score URLs first and drop link cap prior to scoring; add test for ordering.

* docs: Update URL seeding examples to use proper async context managers
- Wrap all AsyncUrlSeeder usage with async context managers
- Update URL seeding adventure example to use "sitemap+cc" source, focus on course posts, and add stream=True parameter to fix runtime error

* fix(crawler): Removed the incorrect reference in browser_config variable #1310

* docs: update Docker instructions to use the latest release tag

* fix(docker): Fix LLM API key handling for multi-provider support

Previously, the system incorrectly used OPENAI_API_KEY for all LLM providers
due to a hardcoded api_key_env fallback in config.yml. This caused authentication
errors when using non-OpenAI providers like Gemini.

Changes:
- Remove api_key_env from config.yml to let litellm handle provider-specific env vars
- Simplify get_llm_api_key() to return None, allowing litellm to auto-detect keys
- Update validate_llm_provider() to trust litellm's built-in key detection
- Update documentation to reflect the new automatic key handling

The fix leverages litellm's existing capability to automatically find the correct
environment variable for each provider (OPENAI_API_KEY, GEMINI_API_TOKEN, etc.)
without manual configuration.

ref #1291

* docs: update adaptive crawler docs and cache defaults; remove deprecated examples (#1330)
- Replace BaseStrategy with CrawlStrategy in custom strategy examples (DomainSpecificStrategy, HybridStrategy)
- Remove “Custom Link Scoring” and “Caching Strategy” sections no longer aligned with current library
- Revise memory pruning example to use adaptive.get_relevant_content and index-based retention of top 500 docs
- Correct Quickstart note: default cache mode is CacheMode.BYPASS; instruct enabling with CacheMode.ENABLED

* fix(utils): Improve URL normalization by avoiding quote/unquote to preserve '+' signs. ref #1332

* feat: Add comprehensive website to API example with frontend

This commit adds a complete, web scraping API example that demonstrates how to get structured data from any website and use it like an API using the crawl4ai library with a minimalist frontend interface.

Core Functionality
- AI-powered web scraping with plain English queries
- Dual scraping approaches: Schema-based (faster) and LLM-based (flexible)
- Intelligent schema caching for improved performance
- Custom LLM model support with API key management
- Automatic duplicate request prevention

Modern Frontend Interface
- Minimalist black-and-white design inspired by modern web apps
- Responsive layout with smooth animations and transitions
- Three main pages: Scrape Data, Models Management, API Request History
- Real-time results display with JSON formatting
- Copy-to-clipboard functionality for extracted data
- Toast notifications for user feedback
- Auto-scroll to results when scraping starts

Model Management System
- Web-based model configuration interface
- Support for any LLM provider (OpenAI, Gemini, Anthropic, etc.)
- Simplified configuration requiring only provider and API token
- Add, list, and delete model configurations
- Secure storage of API keys in local JSON files

API Request History
- Automatic saving of all API requests and responses
- Display of request history with URL, query, and cURL commands
- Duplicate prevention (same URL + query combinations)
- Request deletion functionality
- Clean, simplified display focusing on essential information

Technical Implementation

Backend (FastAPI)
- RESTful API with comprehensive endpoints
- Pydantic models for request/response validation
- Async web scraping with crawl4ai library
- Error handling with detailed error messages
- File-based storage for models and request history

Frontend (Vanilla JS/CSS/HTML)
- No framework dependencies - pure HTML, CSS, JavaScript
- Modern CSS Grid and Flexbox layouts
- Custom dropdown styling with SVG arrows
- Responsive design for mobile and desktop
- Smooth scrolling and animations

Core Library Integration
- WebScraperAgent class for orchestration
- ModelConfig class for LLM configuration management
- Schema generation and caching system
- LLM extraction strategy support
- Browser configuration with headless mode

* fix(dependencies): add cssselect to project dependencies

Fixes bug reported in issue #1405
[Bug]: Excluded selector (excluded_selector) doesn't work

This commit reintroduces the cssselect library which was removed by PR (https://github.com/unclecode/crawl4ai/pull/1368) and merged via (437395e490).

Integration tested against 0.7.4 Docker container. Reintroducing cssselector package eliminated errors seen in logs and excluded_selector functionality was restored.

Refs: #1405

* fix(docker): resolve filter serialization and JSON encoding errors in deep crawl strategy (ref #1419)

  - Fix URLPatternFilter serialization by preventing private __slots__ from being serialized as constructor params
  - Add public attributes to URLPatternFilter to store original constructor parameters for proper serialization
  - Handle property descriptors in CrawlResult.model_dump() to prevent JSON serialization errors
  - Ensure filter chains work correctly with Docker client and REST API

  The issue occurred because:
  1. Private implementation details (_simple_suffixes, etc.) were being serialized and passed as constructor arguments during deserialization
  2. Property descriptors were being included in the serialized output, causing "Object of type property is not JSON serializable" errors

  Changes:
  - async_configs.py: Comment out __slots__ serialization logic (lines 100-109)
  - filters.py: Add patterns, use_glob, reverse to URLPatternFilter __slots__ and store as public attributes
  - models.py: Convert property descriptors to strings in model_dump() instead of including them directly

* fix(logger): ensure logger is a Logger instance in crawling strategies. ref #1437

* feat(docker): Add temperature and base_url parameters for LLM configuration. ref #1035

  Implement hierarchical configuration for LLM parameters with support for:
  - Temperature control (0.0-2.0) to adjust response creativity
  - Custom base_url for proxy servers and alternative endpoints
  - 4-tier priority: request params > provider env > global env > defaults

  Add helper functions in utils.py, update API schemas and handlers,
  support environment variables (LLM_TEMPERATURE, OPENAI_TEMPERATURE, etc.),
  and provide comprehensive documentation with examples.

* feat(docker): improve docker error handling
- Return comprehensive error messages along with status codes for api internal errors.
- Fix fit_html property serialization issue in both /crawl and /crawl/stream endpoints
- Add sanitization to ensure fit_html is always JSON-serializable (string or None)
- Add comprehensive error handling test suite.

* #1375 : refactor(proxy) Deprecate 'proxy' parameter in BrowserConfig and enhance proxy string parsing

- Updated ProxyConfig.from_string to support multiple proxy formats, including URLs with credentials.
- Deprecated the 'proxy' parameter in BrowserConfig, replacing it with 'proxy_config' for better flexibility.
- Added warnings for deprecated usage and clarified behavior when both parameters are provided.
- Updated documentation and tests to reflect changes in proxy configuration handling.

* Remove deprecated test for 'proxy' parameter in BrowserConfig and update .gitignore to include test_scripts directory.

* feat: add preserve_https_for_internal_links flag to maintain HTTPS during crawling. Ref #1410

Added a new `preserve_https_for_internal_links` configuration flag that preserves the original HTTPS scheme for same-domain links even when the server redirects to HTTP.

* feat: update documentation for preserve_https_for_internal_links. ref #1410

* fix: drop Python 3.9 support and require Python >=3.10.
The library no longer supports Python 3.9 and so it was important to drop all references to python 3.9.
Following changes have been made:
- pyproject.toml: set requires-python to ">=3.10"; remove 3.9 classifier
- setup.py: set python_requires to ">=3.10"; remove 3.9 classifier
- docs: update Python version mentions
  - deploy/docker/c4ai-doc-context.md: options -> 3.10, 3.11, 3.12, 3.13

* issue #1329 refactor(crawler): move unwanted properties to CrawlerRunConfig class

* fix(auth): fixed Docker JWT authentication. ref #1442

* remove: delete unused yoyo snapshot subproject

* fix: raise error on last attempt failure in perform_completion_with_backoff. ref #989

* Commit without API

* fix: update option labels in request builder for clarity

* fix: allow custom LLM providers for adaptive crawler embedding config. ref: #1291

  - Change embedding_llm_config from Dict to Union[LLMConfig, Dict] for type safety
  - Add backward-compatible conversion property _embedding_llm_config_dict
  - Replace all hardcoded OpenAI embedding configs with configurable options
  - Fix LLMConfig object attribute access in query expansion logic
  - Add comprehensive example demonstrating multiple provider configurations
  - Update documentation with both LLMConfig object and dictionary usage patterns

  Users can now specify any LLM provider for query expansion in embedding strategy:
  - New: embedding_llm_config=LLMConfig(provider='anthropic/claude-3', api_token='key')
  - Old: embedding_llm_config={'provider': 'openai/gpt-4', 'api_token': 'key'} (still works)

* refactor(BrowserConfig): change deprecation warning for 'proxy' parameter to UserWarning

* feat(StealthAdapter): fix stealth features for Playwright integration. ref #1481

* #1505 fix(api): update config handling to only set base config if not provided by user

* fix(docker-deployment): replace console.log with print for metadata extraction

* Release v0.7.5: The Update

- Updated version to 0.7.5
- Added comprehensive demo and release notes
- Updated documentation

* refactor(release): remove memory management section for cleaner documentation. ref #1443

* feat(docs): add brand book and page copy functionality

- Add comprehensive brand book with color system, typography, components
- Add page copy dropdown with markdown copy/view functionality
- Update mkdocs.yml with new assets and branding navigation
- Use terminal-style ASCII icons and condensed menu design

* Update gitignore add local scripts folder

* fix: remove this import as it causes python to treat "json" as a variable in the except block

* fix: always return a list, even if we catch an exception

* feat(marketplace): Add Crawl4AI marketplace with secure configuration

- Implement marketplace frontend and admin dashboard
- Add FastAPI backend with environment-based configuration
- Use .env file for secrets management
- Include data generation scripts
- Add proper CORS configuration
- Remove hardcoded password from admin login
- Update gitignore for security

* fix(marketplace): Update URLs to use /marketplace path and relative API endpoints

- Change API_BASE to relative '/api' for production
- Move marketplace to /marketplace instead of /marketplace/frontend
- Update MkDocs navigation
- Fix logo path in marketplace index

* fix(docs): hide copy menu on non-markdown pages

* feat(marketplace): add sponsor logo uploads

Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>

* feat(docs): add chatgpt quick link to page actions

* fix(marketplace): align admin api with backend endpoints

* fix(marketplace): isolate api under marketplace prefix

* fix(marketplace): resolve app detail page routing and styling issues

- Fixed JavaScript errors from missing HTML elements (install-code, usage-code, integration-code)
- Added missing CSS classes for tabs, overview layout, sidebar, and integration content
- Fixed tab navigation to display horizontally in single line
- Added proper padding to tab content sections (removed from container, added to content)
- Fixed tab selector from .nav-tab to .tab-btn to match HTML structure
- Added sidebar styling with stats grid and metadata display
- Improved responsive design with mobile-friendly tab scrolling
- Fixed code block positioning for copy buttons
- Removed margin from first headings to prevent extra spacing
- Added null checks for DOM elements in JavaScript to prevent errors

These changes resolve the routing issue where clicking on apps caused page redirects,
and fix the broken layout where CSS was not properly applied to the app detail page.

* fix(marketplace): prevent hero image overflow and secondary card stretching

- Fixed hero image to 200px height with min/max constraints
- Added object-fit: cover to hero-image img elements
- Changed secondary-featured align-items from stretch to flex-start
- Fixed secondary-card height to 118px (no flex: 1 stretching)
- Updated responsive grid layouts for wider screens
- Added flex: 1 to hero-content for better content distribution

These changes ensure a rigid, predictable layout that prevents:
1. Large images from pushing text content down
2. Single secondary cards from stretching to fill entire height

* feat: Add hooks utility for function-based hooks with Docker client integration. ref #1377

   Add hooks_to_string() utility function that converts Python function objects
   to string representations for the Docker API, enabling developers to write hooks
   as regular Python functions instead of strings.

   Core Changes:
   - New hooks_to_string() utility in crawl4ai/utils.py using inspect.getsource()
   - Docker client now accepts both function objects and strings for hooks
   - Automatic detection and conversion in Crawl4aiDockerClient._prepare_request()
   - New hooks and hooks_timeout parameters in client.crawl() method

   Documentation:
   - Docker client examples with function-based hooks (docs/examples/docker_client_hooks_example.py)
   - Updated main Docker deployment guide with comprehensive hooks section
   - Added unit tests for hooks utility (tests/docker/test_hooks_utility.py)

* feat: Add hooks utility for function-based hooks with Docker client integration. ref #1377

   Add hooks_to_string() utility function that converts Python function objects
   to string representations for the Docker API, enabling developers to write hooks
   as regular Python functions instead of strings.

   Core Changes:
   - New hooks_to_string() utility in crawl4ai/utils.py using inspect.getsource()
   - Docker client now accepts both function objects and strings for hooks
   - Automatic detection and conversion in Crawl4aiDockerClient._prepare_request()
   - New hooks and hooks_timeout parameters in client.crawl() method

   Documentation:
   - Docker client examples with function-based hooks (docs/examples/docker_client_hooks_example.py)
   - Updated main Docker deployment guide with comprehensive hooks section
   - Added unit tests for hooks utility (tests/docker/test_hooks_utility.py)

* fix(docs): clarify Docker Hooks System with function-based API in README

* docs: Add demonstration files for v0.7.5 release, showcasing the new Docker Hooks System and all other features.

* docs: Update 0.7.5 video walkthrough

* docs: add complete SDK reference documentation

Add comprehensive single-page SDK reference combining:
- Installation & Setup
- Quick Start
- Core API (AsyncWebCrawler, arun, arun_many, CrawlResult)
- Configuration (BrowserConfig, CrawlerConfig, Parameters)
- Crawling Patterns
- Content Processing (Markdown, Fit Markdown, Selection, Interaction, Link & Media)
- Extraction Strategies (LLM and No-LLM)
- Advanced Features (Session Management, Hooks & Auth)

Generated using scripts/generate_sdk_docs.py in ultra-dense mode
optimized for AI assistant consumption.

Stats: 23K words, 185 code blocks, 220KB

* feat: add AI assistant skill package for Crawl4AI

- Create comprehensive skill package for AI coding assistants
- Include complete SDK reference (23K words, v0.7.4)
- Add three extraction scripts (basic, batch, pipeline)
- Implement version tracking in skill and scripts
- Add prominent download section on homepage
- Place skill in docs/assets for web distribution

The skill enables AI assistants like Claude, Cursor, and Windsurf
to effectively use Crawl4AI with optimized workflows for markdown
generation and data extraction.

* fix: remove non-existent wiki link and clarify skill usage instructions

* fix: update Crawl4AI skill with corrected parameters and examples

- Fixed CrawlerConfig → CrawlerRunConfig throughout
- Fixed parameter names (timeout → page_timeout, store_html removed)
- Fixed schema format (selector → baseSelector)
- Corrected proxy configuration (in BrowserConfig, not CrawlerRunConfig)
- Fixed fit_markdown usage with content filters
- Added comprehensive references to docs/examples/ directory
- Created safe packaging script to avoid root directory pollution
- All scripts tested and verified working

* fix: thoroughly verify and fix all Crawl4AI skill examples

- Cross-checked every section against actual docs
- Fixed BM25ContentFilter parameters (user_query, bm25_threshold)
- Removed incorrect wait_for selector from basic example
- Added comprehensive test suite (4 test files)
- All examples now tested and verified working
- Tests validate: basic crawling, markdown generation, data extraction, advanced patterns
- Package size: 76.6 KB (includes tests for future validation)

* feat(ci): split release pipeline and add Docker caching

- Split release.yml into PyPI/GitHub release and Docker workflows
- Add GitHub Actions cache for Docker builds (10-15x faster rebuilds)
- Implement dual-trigger for docker-release.yml (auto + manual)
- Add comprehensive workflow documentation in .github/workflows/docs/
- Backup original workflow as release.yml.backup

* feat: add webhook notifications for crawl job completion

Implements webhook support for the crawl job API to eliminate polling requirements.

Changes:
- Added WebhookConfig and WebhookPayload schemas to schemas.py
- Created webhook.py with WebhookDeliveryService class
- Integrated webhook notifications in api.py handle_crawl_job
- Updated job.py CrawlJobPayload to accept webhook_config
- Added webhook configuration section to config.yml
- Included comprehensive usage examples in WEBHOOK_EXAMPLES.md

Features:
- Webhook notifications on job completion (success/failure)
- Configurable data inclusion in webhook payload
- Custom webhook headers support
- Global default webhook URL configuration
- Exponential backoff retry logic (5 attempts: 1s, 2s, 4s, 8s, 16s)
- 30-second timeout per webhook call

Usage:
POST /crawl/job with optional webhook_config:
- webhook_url: URL to receive notifications
- webhook_data_in_payload: include full results (default: false)
- webhook_headers: custom headers for authentication

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: add webhook documentation to Docker README

Added comprehensive webhook section to README.md including:
- Overview of asynchronous job queue with webhooks
- Benefits and use cases
- Quick start examples
- Webhook authentication
- Global webhook configuration
- Job status polling alternative

Updated table of contents and summary to include webhook feature.
Maintains consistent tone and style with rest of README.

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: add webhook example for Docker deployment

Added docker_webhook_example.py demonstrating:
- Submitting crawl jobs with webhook configuration
- Flask-based webhook receiver implementation
- Three usage patterns:
  1. Webhook notification only (fetch data separately)
  2. Webhook with full data in payload
  3. Traditional polling approach for comparison

Includes comprehensive comments explaining:
- Webhook payload structure
- Authentication headers setup
- Error handling
- Production deployment tips

Example is fully functional and ready to run with Flask installed.

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* test: add webhook implementation validation tests

Added comprehensive test suite to validate webhook implementation:
- Module import verification
- WebhookDeliveryService initialization
- Pydantic model validation (WebhookConfig)
- Payload construction logic
- Exponential backoff calculation
- API integration checks

All tests pass (6/6), confirming implementation is correct.

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* test: add comprehensive webhook feature test script

Added end-to-end test script that automates webhook feature testing:

Script Features (test_webhook_feature.sh):
- Automatic branch switching and dependency installation
- Redis and server startup/shutdown management
- Webhook receiver implementation
- Integration test for webhook notifications
- Comprehensive cleanup and error handling
- Returns to original branch after completion

Test Flow:
1. Fetch and checkout webhook feature branch
2. Activate venv and install dependencies
3. Start Redis and Crawl4AI server
4. Submit crawl job with webhook config
5. Verify webhook delivery and payload
6. Clean up all processes and return to original branch

Documentation:
- WEBHOOK_TEST_README.md with usage instructions
- Troubleshooting guide
- Exit codes and safety features

Usage: ./tests/test_webhook_feature.sh

Generated with Claude Code https://claude.com/claude-code

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: properly serialize Pydantic HttpUrl in webhook config

Use model_dump(mode='json') instead of deprecated dict() method to ensure
Pydantic special types (HttpUrl, UUID, etc.) are properly serialized to
JSON-compatible native Python types.

This fixes webhook delivery failures caused by HttpUrl objects remaining
as Pydantic types in the webhook_config dict, which caused JSON
serialization errors and httpx request failures.

Also update mcp requirement to >=1.18.0 for compatibility.

* feat: add webhook support for /llm/job endpoint

Add comprehensive webhook notification support for the /llm/job endpoint,
following the same pattern as the existing /crawl/job implementation.

Changes:
- Add webhook_config field to LlmJobPayload model (job.py)
- Implement webhook notifications in process_llm_extraction() with 4
  notification points: success, provider validation failure, extraction
  failure, and general exceptions (api.py)
- Store webhook_config in Redis task data for job tracking
- Initialize WebhookDeliveryService with exponential backoff retry logic
Documentation:
- Add Example 6 to WEBHOOK_EXAMPLES.md showing LLM extraction with webhooks
- Update Flask webhook handler to support both crawl and llm_extraction tasks
- Add TypeScript client examples for LLM jobs
- Add comprehensive examples to docker_webhook_example.py with schema support
- Clarify data structure differences between webhook and API responses

Testing:
- Add test_llm_webhook_feature.py with 7 validation tests (all passing)
- Verify pattern consistency with /crawl/job implementation
- Add implementation guide (WEBHOOK_LLM_JOB_IMPLEMENTATION.md)

* fix: remove duplicate comma in webhook_config parameter

* fix: update Crawl4AI Docker container port from 11234 to 11235

* Release v0.7.6: The 0.7.6 Update

- Updated version to 0.7.6
- Added comprehensive demo and release notes
- Updated all documentation
- Update the veriosn in Dockerfile to 0.7.6

---------

Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
Co-authored-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
Co-authored-by: Nezar Ali <abu5sohaib@gmail.com>
Co-authored-by: Soham Kukreti <kukretisoham@gmail.com>
Co-authored-by: James T. Wood <jamesthomaswood@gmail.com>
Co-authored-by: AHMET YILMAZ <tawfik@kidocode.com>
Co-authored-by: nafeqq-1306 <nafiquee@yahoo.com>
Co-authored-by: unclecode <unclecode@kidocode.com>
Co-authored-by: Martin Sjöborg <martin.sjoborg@quartr.se>
Co-authored-by: Martin Sjöborg <martin@sjoborg.org>
Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Nasrin
2025-10-22 20:41:06 +08:00
committed by GitHub
parent fdbcddbf1a
commit 7cac008c10
127 changed files with 33817 additions and 514 deletions

View File

@@ -0,0 +1,154 @@
import asyncio
import os
from crawl4ai import AsyncWebCrawler, AdaptiveCrawler, AdaptiveConfig, LLMConfig
async def test_configuration(name: str, config: AdaptiveConfig, url: str, query: str):
"""Test a specific configuration"""
print(f"\n{'='*60}")
print(f"Configuration: {name}")
print(f"{'='*60}")
async with AsyncWebCrawler(verbose=False) as crawler:
adaptive = AdaptiveCrawler(crawler, config)
result = await adaptive.digest(start_url=url, query=query)
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
print(f"\n{'='*50}")
print(f"Pages crawled: {len(result.crawled_urls)}")
print(f"Final confidence: {adaptive.confidence:.1%}")
print(f"Stopped reason: {result.metrics.get('stopped_reason', 'max_pages')}")
if result.metrics.get('is_irrelevant', False):
print("⚠️ Query detected as irrelevant!")
return result
async def llm_embedding():
"""Demonstrate various embedding configurations"""
print("EMBEDDING STRATEGY CONFIGURATION EXAMPLES")
print("=" * 60)
# Base URL and query for testing
test_url = "https://docs.python.org/3/library/asyncio.html"
openai_llm_config = LLMConfig(
provider='openai/text-embedding-3-small',
api_token=os.getenv('OPENAI_API_KEY'),
temperature=0.7,
max_tokens=2000
)
config_openai = AdaptiveConfig(
strategy="embedding",
max_pages=10,
# Use OpenAI embeddings
embedding_llm_config=openai_llm_config,
# embedding_llm_config={
# 'provider': 'openai/text-embedding-3-small',
# 'api_token': os.getenv('OPENAI_API_KEY')
# },
# OpenAI embeddings are high quality, can be stricter
embedding_k_exp=4.0,
n_query_variations=12
)
await test_configuration(
"OpenAI Embeddings",
config_openai,
test_url,
# "event-driven architecture patterns"
"async await context managers coroutines"
)
return
async def basic_adaptive_crawling():
"""Basic adaptive crawling example"""
# Initialize the crawler
async with AsyncWebCrawler(verbose=True) as crawler:
# Create an adaptive crawler with default settings (statistical strategy)
adaptive = AdaptiveCrawler(crawler)
# Note: You can also use embedding strategy for semantic understanding:
# from crawl4ai import AdaptiveConfig
# config = AdaptiveConfig(strategy="embedding")
# adaptive = AdaptiveCrawler(crawler, config)
# Start adaptive crawling
print("Starting adaptive crawl for Python async programming information...")
result = await adaptive.digest(
start_url="https://docs.python.org/3/library/asyncio.html",
query="async await context managers coroutines"
)
# Display crawl statistics
print("\n" + "="*50)
print("CRAWL STATISTICS")
print("="*50)
adaptive.print_stats(detailed=False)
# Get the most relevant content found
print("\n" + "="*50)
print("MOST RELEVANT PAGES")
print("="*50)
relevant_pages = adaptive.get_relevant_content(top_k=5)
for i, page in enumerate(relevant_pages, 1):
print(f"\n{i}. {page['url']}")
print(f" Relevance Score: {page['score']:.2%}")
# Show a snippet of the content
content = page['content'] or ""
if content:
snippet = content[:200].replace('\n', ' ')
if len(content) > 200:
snippet += "..."
print(f" Preview: {snippet}")
# Show final confidence
print(f"\n{'='*50}")
print(f"Final Confidence: {adaptive.confidence:.2%}")
print(f"Total Pages Crawled: {len(result.crawled_urls)}")
print(f"Knowledge Base Size: {len(adaptive.state.knowledge_base)} documents")
if adaptive.confidence >= 0.8:
print("✓ High confidence - can answer detailed questions about async Python")
elif adaptive.confidence >= 0.6:
print("~ Moderate confidence - can answer basic questions")
else:
print("✗ Low confidence - need more information")
if __name__ == "__main__":
asyncio.run(llm_embedding())
# asyncio.run(basic_adaptive_crawling())

View File

@@ -0,0 +1,522 @@
#!/usr/bin/env python3
"""
Comprehensive hooks examples using Docker Client with function objects.
This approach is recommended because:
- Write hooks as regular Python functions
- Full IDE support (autocomplete, type checking)
- Automatic conversion to API format
- Reusable and testable code
- Clean, readable syntax
"""
import asyncio
from crawl4ai import Crawl4aiDockerClient
# API_BASE_URL = "http://localhost:11235"
API_BASE_URL = "http://localhost:11234"
# ============================================================================
# Hook Function Definitions
# ============================================================================
# --- All Hooks Demo ---
async def browser_created_hook(browser, **kwargs):
"""Called after browser is created"""
print("[HOOK] Browser created and ready")
return browser
async def page_context_hook(page, context, **kwargs):
"""Setup page environment"""
print("[HOOK] Setting up page environment")
# Set viewport
await page.set_viewport_size({"width": 1920, "height": 1080})
# Add cookies
await context.add_cookies([{
"name": "test_session",
"value": "abc123xyz",
"domain": ".httpbin.org",
"path": "/"
}])
# Block resources
await context.route("**/*.{png,jpg,jpeg,gif}", lambda route: route.abort())
await context.route("**/analytics/*", lambda route: route.abort())
print("[HOOK] Environment configured")
return page
async def user_agent_hook(page, context, user_agent, **kwargs):
"""Called when user agent is updated"""
print(f"[HOOK] User agent: {user_agent[:50]}...")
return page
async def before_goto_hook(page, context, url, **kwargs):
"""Called before navigating to URL"""
print(f"[HOOK] Navigating to: {url}")
await page.set_extra_http_headers({
"X-Custom-Header": "crawl4ai-test",
"Accept-Language": "en-US"
})
return page
async def after_goto_hook(page, context, url, response, **kwargs):
"""Called after page loads"""
print(f"[HOOK] Page loaded: {url}")
await page.wait_for_timeout(1000)
try:
await page.wait_for_selector("body", timeout=2000)
print("[HOOK] Body element ready")
except:
print("[HOOK] Timeout, continuing")
return page
async def execution_started_hook(page, context, **kwargs):
"""Called when custom JS execution starts"""
print("[HOOK] JS execution started")
await page.evaluate("console.log('[HOOK] Custom JS');")
return page
async def before_retrieve_hook(page, context, **kwargs):
"""Called before retrieving HTML"""
print("[HOOK] Preparing HTML retrieval")
# Scroll for lazy content
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(500)
await page.evaluate("window.scrollTo(0, 0);")
print("[HOOK] Scrolling complete")
return page
async def before_return_hook(page, context, html, **kwargs):
"""Called before returning HTML"""
print(f"[HOOK] HTML ready: {len(html)} chars")
metrics = await page.evaluate('''() => ({
images: document.images.length,
links: document.links.length,
scripts: document.scripts.length
})''')
print(f"[HOOK] Metrics - Images: {metrics['images']}, Links: {metrics['links']}")
return page
# --- Authentication Hooks ---
async def auth_context_hook(page, context, **kwargs):
"""Setup authentication context"""
print("[HOOK] Setting up authentication")
# Add auth cookies
await context.add_cookies([{
"name": "auth_token",
"value": "fake_jwt_token",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True
}])
# Set localStorage
await page.evaluate('''
localStorage.setItem('user_id', '12345');
localStorage.setItem('auth_time', new Date().toISOString());
''')
print("[HOOK] Auth context ready")
return page
async def auth_headers_hook(page, context, url, **kwargs):
"""Add authentication headers"""
print(f"[HOOK] Adding auth headers for {url}")
import base64
credentials = base64.b64encode(b"user:passwd").decode('ascii')
await page.set_extra_http_headers({
'Authorization': f'Basic {credentials}',
'X-API-Key': 'test-key-123'
})
return page
# --- Performance Optimization Hooks ---
async def performance_hook(page, context, **kwargs):
"""Optimize page for performance"""
print("[HOOK] Optimizing for performance")
# Block resource-heavy content
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg}", lambda r: r.abort())
await context.route("**/*.{woff,woff2,ttf}", lambda r: r.abort())
await context.route("**/*.{mp4,webm,ogg}", lambda r: r.abort())
await context.route("**/googletagmanager.com/*", lambda r: r.abort())
await context.route("**/google-analytics.com/*", lambda r: r.abort())
await context.route("**/facebook.com/*", lambda r: r.abort())
# Disable animations
await page.add_style_tag(content='''
*, *::before, *::after {
animation-duration: 0s !important;
transition-duration: 0s !important;
}
''')
print("[HOOK] Optimizations applied")
return page
async def cleanup_hook(page, context, **kwargs):
"""Clean page before extraction"""
print("[HOOK] Cleaning page")
await page.evaluate('''() => {
const selectors = [
'.ad', '.ads', '.advertisement',
'.popup', '.modal', '.overlay',
'.cookie-banner', '.newsletter'
];
selectors.forEach(sel => {
document.querySelectorAll(sel).forEach(el => el.remove());
});
document.querySelectorAll('script, style').forEach(el => el.remove());
}''')
print("[HOOK] Page cleaned")
return page
# --- Content Extraction Hooks ---
async def wait_dynamic_content_hook(page, context, url, response, **kwargs):
"""Wait for dynamic content to load"""
print(f"[HOOK] Waiting for dynamic content on {url}")
await page.wait_for_timeout(2000)
# Click "Load More" if exists
try:
load_more = await page.query_selector('[class*="load-more"], button:has-text("Load More")')
if load_more:
await load_more.click()
await page.wait_for_timeout(1000)
print("[HOOK] Clicked 'Load More'")
except:
pass
return page
async def extract_metadata_hook(page, context, **kwargs):
"""Extract page metadata"""
print("[HOOK] Extracting metadata")
metadata = await page.evaluate('''() => {
const getMeta = (name) => {
const el = document.querySelector(`meta[name="${name}"], meta[property="${name}"]`);
return el ? el.getAttribute('content') : null;
};
return {
title: document.title,
description: getMeta('description'),
author: getMeta('author'),
keywords: getMeta('keywords'),
};
}''')
print(f"[HOOK] Metadata: {metadata}")
# Infinite scroll
for i in range(3):
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(1000)
print(f"[HOOK] Scroll {i+1}/3")
return page
# --- Multi-URL Hooks ---
async def url_specific_hook(page, context, url, **kwargs):
"""Apply URL-specific logic"""
print(f"[HOOK] Processing URL: {url}")
# URL-specific headers
if 'html' in url:
await page.set_extra_http_headers({"X-Type": "HTML"})
elif 'json' in url:
await page.set_extra_http_headers({"X-Type": "JSON"})
return page
async def track_progress_hook(page, context, url, response, **kwargs):
"""Track crawl progress"""
status = response.status if response else 'unknown'
print(f"[HOOK] Loaded {url} - Status: {status}")
return page
# ============================================================================
# Test Functions
# ============================================================================
async def test_all_hooks_comprehensive():
"""Test all 8 hook types"""
print("=" * 70)
print("Test 1: All Hooks Comprehensive Demo (Docker Client)")
print("=" * 70)
async with Crawl4aiDockerClient(base_url=API_BASE_URL, verbose=False) as client:
print("\nCrawling with all 8 hooks...")
# Define hooks with function objects
hooks = {
"on_browser_created": browser_created_hook,
"on_page_context_created": page_context_hook,
"on_user_agent_updated": user_agent_hook,
"before_goto": before_goto_hook,
"after_goto": after_goto_hook,
"on_execution_started": execution_started_hook,
"before_retrieve_html": before_retrieve_hook,
"before_return_html": before_return_hook
}
result = await client.crawl(
["https://httpbin.org/html"],
hooks=hooks,
hooks_timeout=30
)
print("\n✅ Success!")
print(f" URL: {result.url}")
print(f" Success: {result.success}")
print(f" HTML: {len(result.html)} chars")
async def test_authentication_workflow():
"""Test authentication with hooks"""
print("\n" + "=" * 70)
print("Test 2: Authentication Workflow (Docker Client)")
print("=" * 70)
async with Crawl4aiDockerClient(base_url=API_BASE_URL, verbose=False) as client:
print("\nTesting authentication...")
hooks = {
"on_page_context_created": auth_context_hook,
"before_goto": auth_headers_hook
}
result = await client.crawl(
["https://httpbin.org/basic-auth/user/passwd"],
hooks=hooks,
hooks_timeout=15
)
print("\n✅ Authentication completed")
if result.success:
if '"authenticated"' in result.html and 'true' in result.html:
print(" ✅ Basic auth successful!")
else:
print(" ⚠️ Auth status unclear")
else:
print(f" ❌ Failed: {result.error_message}")
async def test_performance_optimization():
"""Test performance optimization"""
print("\n" + "=" * 70)
print("Test 3: Performance Optimization (Docker Client)")
print("=" * 70)
async with Crawl4aiDockerClient(base_url=API_BASE_URL, verbose=False) as client:
print("\nTesting performance hooks...")
hooks = {
"on_page_context_created": performance_hook,
"before_retrieve_html": cleanup_hook
}
result = await client.crawl(
["https://httpbin.org/html"],
hooks=hooks,
hooks_timeout=10
)
print("\n✅ Optimization completed")
print(f" HTML size: {len(result.html):,} chars")
print(" Resources blocked, ads removed")
async def test_content_extraction():
"""Test content extraction"""
print("\n" + "=" * 70)
print("Test 4: Content Extraction (Docker Client)")
print("=" * 70)
async with Crawl4aiDockerClient(base_url=API_BASE_URL, verbose=False) as client:
print("\nTesting extraction hooks...")
hooks = {
"after_goto": wait_dynamic_content_hook,
"before_retrieve_html": extract_metadata_hook
}
result = await client.crawl(
["https://www.kidocode.com/"],
hooks=hooks,
hooks_timeout=20
)
print("\n✅ Extraction completed")
print(f" URL: {result.url}")
print(f" Success: {result.success}")
print(f" Metadata: {result.metadata}")
async def test_multi_url_crawl():
"""Test hooks with multiple URLs"""
print("\n" + "=" * 70)
print("Test 5: Multi-URL Crawl (Docker Client)")
print("=" * 70)
async with Crawl4aiDockerClient(base_url=API_BASE_URL, verbose=False) as client:
print("\nCrawling multiple URLs...")
hooks = {
"before_goto": url_specific_hook,
"after_goto": track_progress_hook
}
results = await client.crawl(
[
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml"
],
hooks=hooks,
hooks_timeout=15
)
print("\n✅ Multi-URL crawl completed")
print(f"\n Crawled {len(results)} URLs:")
for i, result in enumerate(results, 1):
status = "" if result.success else ""
print(f" {status} {i}. {result.url}")
async def test_reusable_hook_library():
"""Test using reusable hook library"""
print("\n" + "=" * 70)
print("Test 6: Reusable Hook Library (Docker Client)")
print("=" * 70)
# Create a library of reusable hooks
class HookLibrary:
@staticmethod
async def block_images(page, context, **kwargs):
"""Block all images"""
await context.route("**/*.{png,jpg,jpeg,gif}", lambda r: r.abort())
print("[LIBRARY] Images blocked")
return page
@staticmethod
async def block_analytics(page, context, **kwargs):
"""Block analytics"""
await context.route("**/analytics/*", lambda r: r.abort())
await context.route("**/google-analytics.com/*", lambda r: r.abort())
print("[LIBRARY] Analytics blocked")
return page
@staticmethod
async def scroll_infinite(page, context, **kwargs):
"""Handle infinite scroll"""
for i in range(5):
prev = await page.evaluate("document.body.scrollHeight")
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(1000)
curr = await page.evaluate("document.body.scrollHeight")
if curr == prev:
break
print("[LIBRARY] Infinite scroll complete")
return page
async with Crawl4aiDockerClient(base_url=API_BASE_URL, verbose=False) as client:
print("\nUsing hook library...")
hooks = {
"on_page_context_created": HookLibrary.block_images,
"before_retrieve_html": HookLibrary.scroll_infinite
}
result = await client.crawl(
["https://www.kidocode.com/"],
hooks=hooks,
hooks_timeout=20
)
print("\n✅ Library hooks completed")
print(f" Success: {result.success}")
# ============================================================================
# Main
# ============================================================================
async def main():
"""Run all Docker client hook examples"""
print("🔧 Crawl4AI Docker Client - Hooks Examples (Function-Based)")
print("Using Python function objects with automatic conversion")
print("=" * 70)
tests = [
("All Hooks Demo", test_all_hooks_comprehensive),
("Authentication", test_authentication_workflow),
("Performance", test_performance_optimization),
("Extraction", test_content_extraction),
("Multi-URL", test_multi_url_crawl),
("Hook Library", test_reusable_hook_library)
]
for i, (name, test_func) in enumerate(tests, 1):
try:
await test_func()
print(f"\n✅ Test {i}/{len(tests)}: {name} completed\n")
except Exception as e:
print(f"\n❌ Test {i}/{len(tests)}: {name} failed: {e}\n")
import traceback
traceback.print_exc()
print("=" * 70)
print("🎉 All Docker client hook examples completed!")
print("\n💡 Key Benefits of Function-Based Hooks:")
print(" • Write as regular Python functions")
print(" • Full IDE support (autocomplete, types)")
print(" • Automatic conversion to API format")
print(" • Reusable across projects")
print(" • Clean, readable code")
print(" • Easy to test and debug")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,513 @@
#!/usr/bin/env python3
"""
Comprehensive test demonstrating all hook types from hooks_example.py
adapted for the Docker API with real URLs
"""
import requests
import json
import time
from typing import Dict, Any
# API_BASE_URL = "http://localhost:11234"
API_BASE_URL = "http://localhost:11235"
def test_all_hooks_demo():
"""Demonstrate all 8 hook types with practical examples"""
print("=" * 70)
print("Testing: All Hooks Comprehensive Demo")
print("=" * 70)
hooks_code = {
"on_browser_created": """
async def hook(browser, **kwargs):
# Hook called after browser is created
print("[HOOK] on_browser_created - Browser is ready!")
# Browser-level configurations would go here
return browser
""",
"on_page_context_created": """
async def hook(page, context, **kwargs):
# Hook called after a new page and context are created
print("[HOOK] on_page_context_created - New page created!")
# Set viewport size for consistent rendering
await page.set_viewport_size({"width": 1920, "height": 1080})
# Add cookies for the session (using httpbin.org domain)
await context.add_cookies([
{
"name": "test_session",
"value": "abc123xyz",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True,
"secure": True
}
])
# Block ads and tracking scripts to speed up crawling
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg}", lambda route: route.abort())
await context.route("**/analytics/*", lambda route: route.abort())
await context.route("**/ads/*", lambda route: route.abort())
print("[HOOK] Viewport set, cookies added, and ads blocked")
return page
""",
"on_user_agent_updated": """
async def hook(page, context, user_agent, **kwargs):
# Hook called when user agent is updated
print(f"[HOOK] on_user_agent_updated - User agent: {user_agent[:50]}...")
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
# Hook called before navigating to each URL
print(f"[HOOK] before_goto - About to visit: {url}")
# Add custom headers for the request
await page.set_extra_http_headers({
"X-Custom-Header": "crawl4ai-test",
"Accept-Language": "en-US,en;q=0.9",
"DNT": "1"
})
return page
""",
"after_goto": """
async def hook(page, context, url, response, **kwargs):
# Hook called after navigating to each URL
print(f"[HOOK] after_goto - Successfully loaded: {url}")
# Wait a moment for dynamic content to load
await page.wait_for_timeout(1000)
# Check if specific elements exist (with error handling)
try:
# For httpbin.org, wait for body element
await page.wait_for_selector("body", timeout=2000)
print("[HOOK] Body element found and loaded")
except:
print("[HOOK] Timeout waiting for body, continuing anyway")
return page
""",
"on_execution_started": """
async def hook(page, context, **kwargs):
# Hook called after custom JavaScript execution
print("[HOOK] on_execution_started - Custom JS executed!")
# You could inject additional JavaScript here if needed
await page.evaluate("console.log('[INJECTED] Hook JS running');")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
# Hook called before retrieving the HTML content
print("[HOOK] before_retrieve_html - Preparing to get HTML")
# Scroll to bottom to trigger lazy loading
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(500)
# Scroll back to top
await page.evaluate("window.scrollTo(0, 0);")
await page.wait_for_timeout(500)
# One more scroll to middle for good measure
await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 2);")
print("[HOOK] Scrolling completed for lazy-loaded content")
return page
""",
"before_return_html": """
async def hook(page, context, html, **kwargs):
# Hook called before returning the HTML content
print(f"[HOOK] before_return_html - HTML length: {len(html)} characters")
# Log some page metrics
metrics = await page.evaluate('''() => {
return {
images: document.images.length,
links: document.links.length,
scripts: document.scripts.length
}
}''')
print(f"[HOOK] Page metrics - Images: {metrics['images']}, Links: {metrics['links']}, Scripts: {metrics['scripts']}")
return page
"""
}
# Create request payload
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 30
},
"crawler_config": {
"js_code": "window.scrollTo(0, document.body.scrollHeight);",
"wait_for": "body",
"cache_mode": "bypass"
}
}
print("\nSending request with all 8 hooks...")
start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds")
if response.status_code == 200:
data = response.json()
print("\n✅ Request successful!")
# Check hooks execution
if 'hooks' in data:
hooks_info = data['hooks']
print("\n📊 Hooks Execution Summary:")
print(f" Status: {hooks_info['status']['status']}")
print(f" Attached hooks: {len(hooks_info['status']['attached_hooks'])}")
for hook_name in hooks_info['status']['attached_hooks']:
print(f"{hook_name}")
if 'summary' in hooks_info:
summary = hooks_info['summary']
print(f"\n📈 Execution Statistics:")
print(f" Total executions: {summary['total_executions']}")
print(f" Successful: {summary['successful']}")
print(f" Failed: {summary['failed']}")
print(f" Timed out: {summary['timed_out']}")
print(f" Success rate: {summary['success_rate']:.1f}%")
if hooks_info.get('execution_log'):
print(f"\n📝 Execution Log:")
for log_entry in hooks_info['execution_log']:
status_icon = "" if log_entry['status'] == 'success' else ""
exec_time = log_entry.get('execution_time', 0)
print(f" {status_icon} {log_entry['hook_point']}: {exec_time:.3f}s")
# Check crawl results
if 'results' in data and len(data['results']) > 0:
print(f"\n📄 Crawl Results:")
for result in data['results']:
print(f" URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
if result.get('html'):
print(f" HTML length: {len(result['html'])} characters")
else:
print(f"❌ Error: {response.status_code}")
try:
error_data = response.json()
print(f"Error details: {json.dumps(error_data, indent=2)}")
except:
print(f"Error text: {response.text[:500]}")
def test_authentication_flow():
"""Test a complete authentication flow with multiple hooks"""
print("\n" + "=" * 70)
print("Testing: Authentication Flow with Multiple Hooks")
print("=" * 70)
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
print("[HOOK] Setting up authentication context")
# Add authentication cookies
await context.add_cookies([
{
"name": "auth_token",
"value": "fake_jwt_token_here",
"domain": ".httpbin.org",
"path": "/",
"httpOnly": True,
"secure": True
}
])
# Set localStorage items (for SPA authentication)
await page.evaluate('''
localStorage.setItem('user_id', '12345');
localStorage.setItem('auth_time', new Date().toISOString());
''')
return page
""",
"before_goto": """
async def hook(page, context, url, **kwargs):
print(f"[HOOK] Adding auth headers for {url}")
# Add Authorization header
import base64
credentials = base64.b64encode(b"user:passwd").decode('ascii')
await page.set_extra_http_headers({
'Authorization': f'Basic {credentials}',
'X-API-Key': 'test-api-key-123'
})
return page
"""
}
payload = {
"urls": [
"https://httpbin.org/basic-auth/user/passwd"
],
"hooks": {
"code": hooks_code,
"timeout": 15
}
}
print("\nTesting authentication with httpbin endpoints...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("✅ Authentication test completed")
if 'results' in data:
for i, result in enumerate(data['results']):
print(f"\n URL {i+1}: {result['url']}")
if result.get('success'):
# Check for authentication success indicators
html_content = result.get('html', '')
if '"authenticated"' in html_content and 'true' in html_content:
print(" ✅ Authentication successful! Basic auth worked.")
else:
print(" ⚠️ Page loaded but auth status unclear")
else:
print(f" ❌ Failed: {result.get('error_message', 'Unknown error')}")
else:
print(f"❌ Error: {response.status_code}")
def test_performance_optimization_hooks():
"""Test hooks for performance optimization"""
print("\n" + "=" * 70)
print("Testing: Performance Optimization Hooks")
print("=" * 70)
hooks_code = {
"on_page_context_created": """
async def hook(page, context, **kwargs):
print("[HOOK] Optimizing page for performance")
# Block resource-heavy content
await context.route("**/*.{png,jpg,jpeg,gif,webp,svg,ico}", lambda route: route.abort())
await context.route("**/*.{woff,woff2,ttf,otf}", lambda route: route.abort())
await context.route("**/*.{mp4,webm,ogg,mp3,wav}", lambda route: route.abort())
await context.route("**/googletagmanager.com/*", lambda route: route.abort())
await context.route("**/google-analytics.com/*", lambda route: route.abort())
await context.route("**/doubleclick.net/*", lambda route: route.abort())
await context.route("**/facebook.com/*", lambda route: route.abort())
# Disable animations and transitions
await page.add_style_tag(content='''
*, *::before, *::after {
animation-duration: 0s !important;
animation-delay: 0s !important;
transition-duration: 0s !important;
transition-delay: 0s !important;
}
''')
print("[HOOK] Performance optimizations applied")
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Removing unnecessary elements before extraction")
# Remove ads, popups, and other unnecessary elements
await page.evaluate('''() => {
// Remove common ad containers
const adSelectors = [
'.ad', '.ads', '.advertisement', '[id*="ad-"]', '[class*="ad-"]',
'.popup', '.modal', '.overlay', '.cookie-banner', '.newsletter-signup'
];
adSelectors.forEach(selector => {
document.querySelectorAll(selector).forEach(el => el.remove());
});
// Remove script tags to clean up HTML
document.querySelectorAll('script').forEach(el => el.remove());
// Remove style tags we don't need
document.querySelectorAll('style').forEach(el => el.remove());
}''')
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html"],
"hooks": {
"code": hooks_code,
"timeout": 10
}
}
print("\nTesting performance optimization hooks...")
start_time = time.time()
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
elapsed_time = time.time() - start_time
print(f"Request completed in {elapsed_time:.2f} seconds")
if response.status_code == 200:
data = response.json()
print("✅ Performance optimization test completed")
if 'results' in data and len(data['results']) > 0:
result = data['results'][0]
if result.get('html'):
print(f" HTML size: {len(result['html'])} characters")
print(" Resources blocked, ads removed, animations disabled")
else:
print(f"❌ Error: {response.status_code}")
def test_content_extraction_hooks():
"""Test hooks for intelligent content extraction"""
print("\n" + "=" * 70)
print("Testing: Content Extraction Hooks")
print("=" * 70)
hooks_code = {
"after_goto": """
async def hook(page, context, url, response, **kwargs):
print(f"[HOOK] Waiting for dynamic content on {url}")
# Wait for any lazy-loaded content
await page.wait_for_timeout(2000)
# Trigger any "Load More" buttons
try:
load_more = await page.query_selector('[class*="load-more"], [class*="show-more"], button:has-text("Load More")')
if load_more:
await load_more.click()
await page.wait_for_timeout(1000)
print("[HOOK] Clicked 'Load More' button")
except:
pass
return page
""",
"before_retrieve_html": """
async def hook(page, context, **kwargs):
print("[HOOK] Extracting structured data")
# Extract metadata
metadata = await page.evaluate('''() => {
const getMeta = (name) => {
const element = document.querySelector(`meta[name="${name}"], meta[property="${name}"]`);
return element ? element.getAttribute('content') : null;
};
return {
title: document.title,
description: getMeta('description') || getMeta('og:description'),
author: getMeta('author'),
keywords: getMeta('keywords'),
ogTitle: getMeta('og:title'),
ogImage: getMeta('og:image'),
canonical: document.querySelector('link[rel="canonical"]')?.href,
jsonLd: Array.from(document.querySelectorAll('script[type="application/ld+json"]'))
.map(el => el.textContent).filter(Boolean)
};
}''')
print(f"[HOOK] Extracted metadata: {json.dumps(metadata, indent=2)}")
# Infinite scroll handling
for i in range(3):
await page.evaluate("window.scrollTo(0, document.body.scrollHeight);")
await page.wait_for_timeout(1000)
print(f"[HOOK] Scroll iteration {i+1}/3")
return page
"""
}
payload = {
"urls": ["https://httpbin.org/html", "https://httpbin.org/json"],
"hooks": {
"code": hooks_code,
"timeout": 20
}
}
print("\nTesting content extraction hooks...")
response = requests.post(f"{API_BASE_URL}/crawl", json=payload)
if response.status_code == 200:
data = response.json()
print("✅ Content extraction test completed")
if 'hooks' in data and 'summary' in data['hooks']:
summary = data['hooks']['summary']
print(f" Hooks executed: {summary['successful']}/{summary['total_executions']}")
if 'results' in data:
for result in data['results']:
print(f"\n URL: {result['url']}")
print(f" Success: {result.get('success', False)}")
else:
print(f"❌ Error: {response.status_code}")
def main():
"""Run comprehensive hook tests"""
print("🔧 Crawl4AI Docker API - Comprehensive Hooks Testing")
print("Based on docs/examples/hooks_example.py")
print("=" * 70)
tests = [
("All Hooks Demo", test_all_hooks_demo),
("Authentication Flow", test_authentication_flow),
("Performance Optimization", test_performance_optimization_hooks),
("Content Extraction", test_content_extraction_hooks),
]
for i, (name, test_func) in enumerate(tests, 1):
print(f"\n📌 Test {i}/{len(tests)}: {name}")
try:
test_func()
print(f"{name} completed")
except Exception as e:
print(f"{name} failed: {e}")
import traceback
traceback.print_exc()
print("\n" + "=" * 70)
print("🎉 All comprehensive hook tests completed!")
print("=" * 70)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,461 @@
"""
Docker Webhook Example for Crawl4AI
This example demonstrates how to use webhooks with the Crawl4AI job queue API.
Instead of polling for results, webhooks notify your application when jobs complete.
Supports both:
- /crawl/job - Raw crawling with markdown extraction
- /llm/job - LLM-powered content extraction
Prerequisites:
1. Crawl4AI Docker container running on localhost:11235
2. Flask installed: pip install flask requests
3. LLM API key configured in .llm.env (for LLM extraction examples)
Usage:
1. Run this script: python docker_webhook_example.py
2. The webhook server will start on http://localhost:8080
3. Jobs will be submitted and webhooks will be received automatically
"""
import requests
import json
import time
from flask import Flask, request, jsonify
from threading import Thread
# Configuration
CRAWL4AI_BASE_URL = "http://localhost:11235"
WEBHOOK_BASE_URL = "http://localhost:8080" # Your webhook receiver URL
# Initialize Flask app for webhook receiver
app = Flask(__name__)
# Store received webhook data for demonstration
received_webhooks = []
@app.route('/webhooks/crawl-complete', methods=['POST'])
def handle_crawl_webhook():
"""
Webhook handler that receives notifications when crawl jobs complete.
Payload structure:
{
"task_id": "crawl_abc123",
"task_type": "crawl",
"status": "completed" or "failed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com"],
"error": "error message" (only if failed),
"data": {...} (only if webhook_data_in_payload=True)
}
"""
payload = request.json
print(f"\n{'='*60}")
print(f"📬 Webhook received for task: {payload['task_id']}")
print(f" Status: {payload['status']}")
print(f" Timestamp: {payload['timestamp']}")
print(f" URLs: {payload['urls']}")
if payload['status'] == 'completed':
# If data is in payload, process it directly
if 'data' in payload:
print(f" ✅ Data included in webhook")
data = payload['data']
# Process the crawl results here
for result in data.get('results', []):
print(f" - Crawled: {result.get('url')}")
print(f" - Markdown length: {len(result.get('markdown', ''))}")
else:
# Fetch results from API if not included
print(f" 📥 Fetching results from API...")
task_id = payload['task_id']
result_response = requests.get(f"{CRAWL4AI_BASE_URL}/crawl/job/{task_id}")
if result_response.ok:
data = result_response.json()
print(f" ✅ Results fetched successfully")
# Process the crawl results here
for result in data['result'].get('results', []):
print(f" - Crawled: {result.get('url')}")
print(f" - Markdown length: {len(result.get('markdown', ''))}")
elif payload['status'] == 'failed':
print(f" ❌ Job failed: {payload.get('error', 'Unknown error')}")
print(f"{'='*60}\n")
# Store webhook for demonstration
received_webhooks.append(payload)
# Return 200 OK to acknowledge receipt
return jsonify({"status": "received"}), 200
@app.route('/webhooks/llm-complete', methods=['POST'])
def handle_llm_webhook():
"""
Webhook handler that receives notifications when LLM extraction jobs complete.
Payload structure:
{
"task_id": "llm_1698765432_12345",
"task_type": "llm_extraction",
"status": "completed" or "failed",
"timestamp": "2025-10-21T10:30:00.000000+00:00",
"urls": ["https://example.com/article"],
"error": "error message" (only if failed),
"data": {"extracted_content": {...}} (only if webhook_data_in_payload=True)
}
"""
payload = request.json
print(f"\n{'='*60}")
print(f"🤖 LLM Webhook received for task: {payload['task_id']}")
print(f" Task Type: {payload['task_type']}")
print(f" Status: {payload['status']}")
print(f" Timestamp: {payload['timestamp']}")
print(f" URL: {payload['urls'][0]}")
if payload['status'] == 'completed':
# If data is in payload, process it directly
if 'data' in payload:
print(f" ✅ Data included in webhook")
data = payload['data']
# Webhook wraps extracted content in 'extracted_content' field
extracted = data.get('extracted_content', {})
print(f" - Extracted content:")
print(f" {json.dumps(extracted, indent=8)}")
else:
# Fetch results from API if not included
print(f" 📥 Fetching results from API...")
task_id = payload['task_id']
result_response = requests.get(f"{CRAWL4AI_BASE_URL}/llm/job/{task_id}")
if result_response.ok:
data = result_response.json()
print(f" ✅ Results fetched successfully")
# API returns unwrapped content in 'result' field
extracted = data['result']
print(f" - Extracted content:")
print(f" {json.dumps(extracted, indent=8)}")
elif payload['status'] == 'failed':
print(f" ❌ Job failed: {payload.get('error', 'Unknown error')}")
print(f"{'='*60}\n")
# Store webhook for demonstration
received_webhooks.append(payload)
# Return 200 OK to acknowledge receipt
return jsonify({"status": "received"}), 200
def start_webhook_server():
"""Start the Flask webhook server in a separate thread"""
app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False)
def submit_crawl_job_with_webhook(urls, webhook_url, include_data=False):
"""
Submit a crawl job with webhook notification.
Args:
urls: List of URLs to crawl
webhook_url: URL to receive webhook notifications
include_data: Whether to include full results in webhook payload
Returns:
task_id: The job's task identifier
"""
payload = {
"urls": urls,
"browser_config": {"headless": True},
"crawler_config": {"cache_mode": "bypass"},
"webhook_config": {
"webhook_url": webhook_url,
"webhook_data_in_payload": include_data,
# Optional: Add custom headers for authentication
# "webhook_headers": {
# "X-Webhook-Secret": "your-secret-token"
# }
}
}
print(f"\n🚀 Submitting crawl job...")
print(f" URLs: {urls}")
print(f" Webhook: {webhook_url}")
print(f" Include data: {include_data}")
response = requests.post(
f"{CRAWL4AI_BASE_URL}/crawl/job",
json=payload,
headers={"Content-Type": "application/json"}
)
if response.ok:
data = response.json()
task_id = data['task_id']
print(f" ✅ Job submitted successfully")
print(f" Task ID: {task_id}")
return task_id
else:
print(f" ❌ Failed to submit job: {response.text}")
return None
def submit_llm_job_with_webhook(url, query, webhook_url, include_data=False, schema=None, provider=None):
"""
Submit an LLM extraction job with webhook notification.
Args:
url: URL to extract content from
query: Instruction for the LLM (e.g., "Extract article title and author")
webhook_url: URL to receive webhook notifications
include_data: Whether to include full results in webhook payload
schema: Optional JSON schema for structured extraction
provider: Optional LLM provider (e.g., "openai/gpt-4o-mini")
Returns:
task_id: The job's task identifier
"""
payload = {
"url": url,
"q": query,
"cache": False,
"webhook_config": {
"webhook_url": webhook_url,
"webhook_data_in_payload": include_data,
# Optional: Add custom headers for authentication
# "webhook_headers": {
# "X-Webhook-Secret": "your-secret-token"
# }
}
}
if schema:
payload["schema"] = schema
if provider:
payload["provider"] = provider
print(f"\n🤖 Submitting LLM extraction job...")
print(f" URL: {url}")
print(f" Query: {query}")
print(f" Webhook: {webhook_url}")
print(f" Include data: {include_data}")
if provider:
print(f" Provider: {provider}")
response = requests.post(
f"{CRAWL4AI_BASE_URL}/llm/job",
json=payload,
headers={"Content-Type": "application/json"}
)
if response.ok:
data = response.json()
task_id = data['task_id']
print(f" ✅ Job submitted successfully")
print(f" Task ID: {task_id}")
return task_id
else:
print(f" ❌ Failed to submit job: {response.text}")
return None
def submit_job_without_webhook(urls):
"""
Submit a job without webhook (traditional polling approach).
Args:
urls: List of URLs to crawl
Returns:
task_id: The job's task identifier
"""
payload = {
"urls": urls,
"browser_config": {"headless": True},
"crawler_config": {"cache_mode": "bypass"}
}
print(f"\n🚀 Submitting crawl job (without webhook)...")
print(f" URLs: {urls}")
response = requests.post(
f"{CRAWL4AI_BASE_URL}/crawl/job",
json=payload
)
if response.ok:
data = response.json()
task_id = data['task_id']
print(f" ✅ Job submitted successfully")
print(f" Task ID: {task_id}")
return task_id
else:
print(f" ❌ Failed to submit job: {response.text}")
return None
def poll_job_status(task_id, timeout=60):
"""
Poll for job status (used when webhook is not configured).
Args:
task_id: The job's task identifier
timeout: Maximum time to wait in seconds
"""
print(f"\n⏳ Polling for job status...")
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(f"{CRAWL4AI_BASE_URL}/crawl/job/{task_id}")
if response.ok:
data = response.json()
status = data.get('status', 'unknown')
if status == 'completed':
print(f" ✅ Job completed!")
return data
elif status == 'failed':
print(f" ❌ Job failed: {data.get('error', 'Unknown error')}")
return data
else:
print(f" ⏳ Status: {status}, waiting...")
time.sleep(2)
else:
print(f" ❌ Failed to get status: {response.text}")
return None
print(f" ⏰ Timeout reached")
return None
def main():
"""Run the webhook demonstration"""
# Check if Crawl4AI is running
try:
health = requests.get(f"{CRAWL4AI_BASE_URL}/health", timeout=5)
print(f"✅ Crawl4AI is running: {health.json()}")
except:
print(f"❌ Cannot connect to Crawl4AI at {CRAWL4AI_BASE_URL}")
print(" Please make sure Docker container is running:")
print(" docker run -d -p 11235:11235 --name crawl4ai unclecode/crawl4ai:latest")
return
# Start webhook server in background thread
print(f"\n🌐 Starting webhook server at {WEBHOOK_BASE_URL}...")
webhook_thread = Thread(target=start_webhook_server, daemon=True)
webhook_thread.start()
time.sleep(2) # Give server time to start
# Example 1: Job with webhook (notification only, fetch data separately)
print(f"\n{'='*60}")
print("Example 1: Webhook Notification Only")
print(f"{'='*60}")
task_id_1 = submit_crawl_job_with_webhook(
urls=["https://example.com"],
webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/crawl-complete",
include_data=False
)
# Example 2: Job with webhook (data included in payload)
time.sleep(5) # Wait a bit between requests
print(f"\n{'='*60}")
print("Example 2: Webhook with Full Data")
print(f"{'='*60}")
task_id_2 = submit_crawl_job_with_webhook(
urls=["https://www.python.org"],
webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/crawl-complete",
include_data=True
)
# Example 3: LLM extraction with webhook (notification only)
time.sleep(5) # Wait a bit between requests
print(f"\n{'='*60}")
print("Example 3: LLM Extraction with Webhook (Notification Only)")
print(f"{'='*60}")
task_id_3 = submit_llm_job_with_webhook(
url="https://www.example.com",
query="Extract the main heading and description from this page.",
webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/llm-complete",
include_data=False,
provider="openai/gpt-4o-mini"
)
# Example 4: LLM extraction with webhook (data included + schema)
time.sleep(5) # Wait a bit between requests
print(f"\n{'='*60}")
print("Example 4: LLM Extraction with Schema and Full Data")
print(f"{'='*60}")
# Define a schema for structured extraction
schema = json.dumps({
"type": "object",
"properties": {
"title": {"type": "string", "description": "Page title"},
"description": {"type": "string", "description": "Page description"}
},
"required": ["title"]
})
task_id_4 = submit_llm_job_with_webhook(
url="https://www.python.org",
query="Extract the title and description of this website",
webhook_url=f"{WEBHOOK_BASE_URL}/webhooks/llm-complete",
include_data=True,
schema=schema,
provider="openai/gpt-4o-mini"
)
# Example 5: Traditional polling (no webhook)
time.sleep(5) # Wait a bit between requests
print(f"\n{'='*60}")
print("Example 5: Traditional Polling (No Webhook)")
print(f"{'='*60}")
task_id_5 = submit_job_without_webhook(
urls=["https://github.com"]
)
if task_id_5:
result = poll_job_status(task_id_5)
if result and result.get('status') == 'completed':
print(f" ✅ Results retrieved via polling")
# Wait for webhooks to arrive
print(f"\n⏳ Waiting for webhooks to be received...")
time.sleep(30) # Give jobs time to complete and webhooks to arrive (longer for LLM)
# Summary
print(f"\n{'='*60}")
print("Summary")
print(f"{'='*60}")
print(f"Total webhooks received: {len(received_webhooks)}")
crawl_webhooks = [w for w in received_webhooks if w['task_type'] == 'crawl']
llm_webhooks = [w for w in received_webhooks if w['task_type'] == 'llm_extraction']
print(f"\n📊 Breakdown:")
print(f" - Crawl webhooks: {len(crawl_webhooks)}")
print(f" - LLM extraction webhooks: {len(llm_webhooks)}")
print(f"\n📋 Details:")
for i, webhook in enumerate(received_webhooks, 1):
task_type = webhook['task_type']
icon = "🕷️" if task_type == "crawl" else "🤖"
print(f"{i}. {icon} Task {webhook['task_id']}: {webhook['status']} ({task_type})")
print(f"\n✅ Demo completed!")
print(f"\n💡 Pro tips:")
print(f" - In production, your webhook URL should be publicly accessible")
print(f" (e.g., https://myapp.com/webhooks) or use ngrok for testing")
print(f" - Both /crawl/job and /llm/job support the same webhook configuration")
print(f" - Use webhook_data_in_payload=true to get results directly in the webhook")
print(f" - LLM jobs may take longer, adjust timeouts accordingly")
if __name__ == "__main__":
main()

221
docs/examples/website-to-api/.gitignore vendored Normal file
View File

@@ -0,0 +1,221 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
#poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
#pdm.lock
#pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
#pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
#directories
models
schemas
saved_requests

View File

@@ -0,0 +1,252 @@
# Web Scraper API with Custom Model Support
A powerful web scraping API that converts any website into structured data using AI. Features a beautiful minimalist frontend interface and support for custom LLM models!
## Features
- **AI-Powered Scraping**: Provide a URL and plain English query to extract structured data
- **Beautiful Frontend**: Modern minimalist black-and-white interface with smooth UX
- **Custom Model Support**: Use any LLM provider (OpenAI, Gemini, Anthropic, etc.) with your own API keys
- **Model Management**: Save, list, and manage multiple model configurations via web interface
- **Dual Scraping Approaches**: Choose between Schema-based (faster) or LLM-based (more flexible) extraction
- **API Request History**: Automatic saving and display of all API requests with cURL commands
- **Schema Caching**: Intelligent caching of generated schemas for faster subsequent requests
- **Duplicate Prevention**: Avoids saving duplicate requests (same URL + query)
- **RESTful API**: Easy-to-use HTTP endpoints for all operations
## Quick Start
### 1. Install Dependencies
```bash
pip install -r requirements.txt
```
### 2. Start the API Server
```bash
python app.py
```
The server will start on `http://localhost:8000` with a beautiful web interface!
### 3. Using the Web Interface
Once the server is running, open your browser and go to `http://localhost:8000` to access the modern web interface!
#### Pages:
- **Scrape Data**: Enter URLs and queries to extract structured data
- **Models**: Manage your AI model configurations (add, list, delete)
- **API Requests**: View history of all scraping requests with cURL commands
#### Features:
- **Minimalist Design**: Clean black-and-white theme inspired by modern web apps
- **Real-time Results**: See extracted data in formatted JSON
- **Copy to Clipboard**: Easy copying of results
- **Toast Notifications**: User-friendly feedback
- **Dual Scraping Modes**: Choose between Schema-based and LLM-based approaches
## Model Management
### Adding Models via Web Interface
1. Go to the **Models** page
2. Enter your model details:
- **Provider**: LLM provider (e.g., `gemini/gemini-2.5-flash`, `openai/gpt-4o`)
- **API Token**: Your API key for the provider
3. Click "Add Model"
### API Usage for Model Management
#### Save a Model Configuration
```bash
curl -X POST "http://localhost:8000/models" \
-H "Content-Type: application/json" \
-d '{
"provider": "gemini/gemini-2.5-flash",
"api_token": "your-api-key-here"
}'
```
#### List Saved Models
```bash
curl -X GET "http://localhost:8000/models"
```
#### Delete a Model Configuration
```bash
curl -X DELETE "http://localhost:8000/models/my-gemini"
```
## Scraping Approaches
### 1. Schema-based Scraping (Faster)
- Generates CSS selectors for targeted extraction
- Caches schemas for repeated requests
- Faster execution for structured websites
### 2. LLM-based Scraping (More Flexible)
- Direct LLM extraction without schema generation
- More flexible for complex or dynamic content
- Better for unstructured data extraction
## Supported LLM Providers
The API supports any LLM provider that crawl4ai supports, including:
- **Google Gemini**: `gemini/gemini-2.5-flash`, `gemini/gemini-pro`
- **OpenAI**: `openai/gpt-4`, `openai/gpt-3.5-turbo`
- **Anthropic**: `anthropic/claude-3-opus`, `anthropic/claude-3-sonnet`
- **And more...**
## API Endpoints
### Core Endpoints
- `POST /scrape` - Schema-based scraping
- `POST /scrape-with-llm` - LLM-based scraping
- `GET /schemas` - List cached schemas
- `POST /clear-cache` - Clear schema cache
- `GET /health` - Health check
### Model Management Endpoints
- `GET /models` - List saved model configurations
- `POST /models` - Save a new model configuration
- `DELETE /models/{model_name}` - Delete a model configuration
### API Request History
- `GET /saved-requests` - List all saved API requests
- `DELETE /saved-requests/{request_id}` - Delete a saved request
## Request/Response Examples
### Scrape Request
```json
{
"url": "https://example.com",
"query": "Extract the product name, price, and description",
"model_name": "my-custom-model"
}
```
### Scrape Response
```json
{
"success": true,
"url": "https://example.com",
"query": "Extract the product name, price, and description",
"extracted_data": {
"product_name": "Example Product",
"price": "$99.99",
"description": "This is an example product description"
},
"schema_used": { ... },
"timestamp": "2024-01-01T12:00:00Z"
}
```
### Model Configuration Request
```json
{
"provider": "gemini/gemini-2.5-flash",
"api_token": "your-api-key-here"
}
```
## Testing
Run the test script to verify the model management functionality:
```bash
python test_models.py
```
## File Structure
```
parse_example/
├── api_server.py # FastAPI server with all endpoints
├── web_scraper_lib.py # Core scraping library
├── test_models.py # Test script for model management
├── requirements.txt # Dependencies
├── static/ # Frontend files
│ ├── index.html # Main HTML interface
│ ├── styles.css # CSS styles (minimalist theme)
│ └── script.js # JavaScript functionality
├── schemas/ # Cached schemas
├── models/ # Saved model configurations
├── saved_requests/ # API request history
└── README.md # This file
```
## Advanced Usage
### Using the Library Directly
```python
from web_scraper_lib import WebScraperAgent
# Initialize agent
agent = WebScraperAgent()
# Save a model configuration
agent.save_model_config(
model_name="my-model",
provider="openai/gpt-4",
api_token="your-api-key"
)
# Schema-based scraping
result = await agent.scrape_data(
url="https://example.com",
query="Extract product information",
model_name="my-model"
)
# LLM-based scraping
result = await agent.scrape_data_with_llm(
url="https://example.com",
query="Extract product information",
model_name="my-model"
)
```
### Schema Caching
The system automatically caches generated schemas based on URL and query combinations:
- **First request**: Generates schema using AI
- **Subsequent requests**: Uses cached schema for faster extraction
### API Request History
All API requests are automatically saved with:
- Request details (URL, query, model used)
- Response data
- Timestamp
- cURL command for re-execution
### Duplicate Prevention
The system prevents saving duplicate requests:
- Same URL + query combinations are not saved multiple times
- Returns existing request ID for duplicates
- Keeps the API request history clean
## Error Handling
The API provides detailed error messages for common issues:
- Invalid URLs
- Missing model configurations
- API key errors
- Network timeouts
- Parsing errors

View File

@@ -0,0 +1,363 @@
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel, HttpUrl
from typing import Dict, Any, Optional, Union, List
import uvicorn
import asyncio
import os
import json
from datetime import datetime
from web_scraper_lib import WebScraperAgent, scrape_website
app = FastAPI(
title="Web Scraper API",
description="Convert any website into a structured data API. Provide a URL and tell AI what data you need in plain English.",
version="1.0.0"
)
# Mount static files
if os.path.exists("static"):
app.mount("/static", StaticFiles(directory="static"), name="static")
# Mount assets directory
if os.path.exists("assets"):
app.mount("/assets", StaticFiles(directory="assets"), name="assets")
# Initialize the scraper agent
scraper_agent = WebScraperAgent()
# Create directory for saved API requests
os.makedirs("saved_requests", exist_ok=True)
class ScrapeRequest(BaseModel):
url: HttpUrl
query: str
model_name: Optional[str] = None
class ModelConfigRequest(BaseModel):
model_name: str
provider: str
api_token: str
class ScrapeResponse(BaseModel):
success: bool
url: str
query: str
extracted_data: Union[Dict[str, Any], list]
schema_used: Optional[Dict[str, Any]] = None
timestamp: Optional[str] = None
error: Optional[str] = None
class SavedApiRequest(BaseModel):
id: str
endpoint: str
method: str
headers: Dict[str, str]
body: Dict[str, Any]
timestamp: str
response: Optional[Dict[str, Any]] = None
def save_api_request(endpoint: str, method: str, headers: Dict[str, str], body: Dict[str, Any], response: Optional[Dict[str, Any]] = None) -> str:
"""Save an API request to a JSON file."""
# Check for duplicate requests (same URL and query)
if endpoint in ["/scrape", "/scrape-with-llm"] and "url" in body and "query" in body:
existing_requests = get_saved_requests()
for existing_request in existing_requests:
if (existing_request.endpoint == endpoint and
existing_request.body.get("url") == body["url"] and
existing_request.body.get("query") == body["query"]):
print(f"Duplicate request found for URL: {body['url']} and query: {body['query']}")
return existing_request.id # Return existing request ID instead of creating new one
request_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
saved_request = SavedApiRequest(
id=request_id,
endpoint=endpoint,
method=method,
headers=headers,
body=body,
timestamp=datetime.now().isoformat(),
response=response
)
file_path = os.path.join("saved_requests", f"{request_id}.json")
with open(file_path, "w") as f:
json.dump(saved_request.dict(), f, indent=2)
return request_id
def get_saved_requests() -> List[SavedApiRequest]:
"""Get all saved API requests."""
requests = []
if os.path.exists("saved_requests"):
for filename in os.listdir("saved_requests"):
if filename.endswith('.json'):
file_path = os.path.join("saved_requests", filename)
try:
with open(file_path, "r") as f:
data = json.load(f)
requests.append(SavedApiRequest(**data))
except Exception as e:
print(f"Error loading saved request {filename}: {e}")
# Sort by timestamp (newest first)
requests.sort(key=lambda x: x.timestamp, reverse=True)
return requests
@app.get("/")
async def root():
"""Serve the frontend interface."""
if os.path.exists("static/index.html"):
return FileResponse("static/index.html")
else:
return {
"message": "Web Scraper API",
"description": "Convert any website into structured data with AI",
"endpoints": {
"/scrape": "POST - Scrape data from a website",
"/schemas": "GET - List cached schemas",
"/clear-cache": "POST - Clear schema cache",
"/models": "GET - List saved model configurations",
"/models": "POST - Save a new model configuration",
"/models/{model_name}": "DELETE - Delete a model configuration",
"/saved-requests": "GET - List saved API requests"
}
}
@app.post("/scrape", response_model=ScrapeResponse)
async def scrape_website_endpoint(request: ScrapeRequest):
"""
Scrape structured data from any website.
This endpoint:
1. Takes a URL and plain English query
2. Generates a custom scraper using AI
3. Returns structured data
"""
try:
# Save the API request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
result = await scraper_agent.scrape_data(
url=str(request.url),
query=request.query,
model_name=request.model_name
)
response_data = ScrapeResponse(
success=True,
url=result["url"],
query=result["query"],
extracted_data=result["extracted_data"],
schema_used=result["schema_used"],
timestamp=result["timestamp"]
)
# Save the request with response
save_api_request(
endpoint="/scrape",
method="POST",
headers=headers,
body=body,
response=response_data.dict()
)
return response_data
except Exception as e:
# Save the failed request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
save_api_request(
endpoint="/scrape",
method="POST",
headers=headers,
body=body,
response={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
@app.post("/scrape-with-llm", response_model=ScrapeResponse)
async def scrape_website_endpoint_with_llm(request: ScrapeRequest):
"""
Scrape structured data from any website using a custom LLM model.
"""
try:
# Save the API request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
result = await scraper_agent.scrape_data_with_llm(
url=str(request.url),
query=request.query,
model_name=request.model_name
)
response_data = ScrapeResponse(
success=True,
url=result["url"],
query=result["query"],
extracted_data=result["extracted_data"],
timestamp=result["timestamp"]
)
# Save the request with response
save_api_request(
endpoint="/scrape-with-llm",
method="POST",
headers=headers,
body=body,
response=response_data.dict()
)
return response_data
except Exception as e:
# Save the failed request
headers = {"Content-Type": "application/json"}
body = {
"url": str(request.url),
"query": request.query,
"model_name": request.model_name
}
save_api_request(
endpoint="/scrape-with-llm",
method="POST",
headers=headers,
body=body,
response={"error": str(e)}
)
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
@app.get("/saved-requests")
async def list_saved_requests():
"""List all saved API requests."""
try:
requests = get_saved_requests()
return {
"success": True,
"requests": [req.dict() for req in requests],
"count": len(requests)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list saved requests: {str(e)}")
@app.delete("/saved-requests/{request_id}")
async def delete_saved_request(request_id: str):
"""Delete a saved API request."""
try:
file_path = os.path.join("saved_requests", f"{request_id}.json")
if os.path.exists(file_path):
os.remove(file_path)
return {
"success": True,
"message": f"Saved request '{request_id}' deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Saved request '{request_id}' not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete saved request: {str(e)}")
@app.get("/schemas")
async def list_cached_schemas():
"""List all cached schemas."""
try:
schemas = await scraper_agent.get_cached_schemas()
return {
"success": True,
"cached_schemas": schemas,
"count": len(schemas)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list schemas: {str(e)}")
@app.post("/clear-cache")
async def clear_schema_cache():
"""Clear all cached schemas."""
try:
scraper_agent.clear_cache()
return {
"success": True,
"message": "Schema cache cleared successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to clear cache: {str(e)}")
@app.get("/models")
async def list_models():
"""List all saved model configurations."""
try:
models = scraper_agent.list_saved_models()
return {
"success": True,
"models": models,
"count": len(models)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to list models: {str(e)}")
@app.post("/models")
async def save_model_config(request: ModelConfigRequest):
"""Save a new model configuration."""
try:
success = scraper_agent.save_model_config(
model_name=request.model_name,
provider=request.provider,
api_token=request.api_token
)
if success:
return {
"success": True,
"message": f"Model configuration '{request.model_name}' saved successfully"
}
else:
raise HTTPException(status_code=500, detail="Failed to save model configuration")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save model: {str(e)}")
@app.delete("/models/{model_name}")
async def delete_model_config(model_name: str):
"""Delete a model configuration."""
try:
success = scraper_agent.delete_model_config(model_name)
if success:
return {
"success": True,
"message": f"Model configuration '{model_name}' deleted successfully"
}
else:
raise HTTPException(status_code=404, detail=f"Model configuration '{model_name}' not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete model: {str(e)}")
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "service": "web-scraper-api"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""
Startup script for the Web Scraper API with frontend interface.
"""
import os
import sys
import uvicorn
from pathlib import Path
def main():
# Check if static directory exists
static_dir = Path("static")
if not static_dir.exists():
print("❌ Static directory not found!")
print("Please make sure the 'static' directory exists with the frontend files.")
sys.exit(1)
# Check if required frontend files exist
required_files = ["index.html", "styles.css", "script.js"]
missing_files = []
for file in required_files:
if not (static_dir / file).exists():
missing_files.append(file)
if missing_files:
print(f"❌ Missing frontend files: {', '.join(missing_files)}")
print("Please make sure all frontend files are present in the static directory.")
sys.exit(1)
print("🚀 Starting Web Scraper API with Frontend Interface")
print("=" * 50)
print("📁 Static files found and ready to serve")
print("🌐 Frontend will be available at: http://localhost:8000")
print("🔌 API endpoints available at: http://localhost:8000/docs")
print("=" * 50)
# Start the server
uvicorn.run(
"api_server:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
if __name__ == "__main__":
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.8 KiB

View File

@@ -0,0 +1,5 @@
crawl4ai
fastapi
uvicorn
pydantic
litellm

View File

@@ -0,0 +1,201 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Web2API Example</title>
<link rel="stylesheet" href="/static/styles.css">
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet">
</head>
<body>
<!-- Header -->
<header class="header">
<div class="header-content">
<div class="logo">
<img src="/assets/crawl4ai_logo.jpg" alt="Crawl4AI Logo" class="logo-image">
<span>Web2API Example</span>
</div>
<nav class="nav-links">
<a href="#" class="nav-link active" data-page="scrape">Scrape</a>
<a href="#" class="nav-link" data-page="models">Models</a>
<a href="#" class="nav-link" data-page="requests">API Requests</a>
</nav>
</div>
</header>
<!-- Main Content -->
<main class="main-content">
<!-- Scrape Page -->
<div id="scrape-page" class="page active">
<div class="hero-section">
<h1 class="hero-title">Turn Any Website Into An API</h1>
<p class="hero-subtitle">This example shows how to turn any website into an API using Crawl4AI.</p>
</div>
<!-- Workflow Demonstration -->
<div class="workflow-demo">
<div class="workflow-step">
<h3 class="step-title">1. Your Request</h3>
<div class="request-box">
<div class="input-group">
<label>URL:</label>
<input type="url" id="url" name="url" placeholder="https://example-bookstore.com/new-releases" required>
</div>
<div class="input-group">
<label>QUERY:</label>
<textarea id="query" name="query" placeholder="Extract all the book titles, their authors, and the biography of the author" required></textarea>
</div>
<div class="form-options">
<div class="option-group">
<label for="scraping-approach">Approach:</label>
<select id="scraping-approach" name="scraping_approach">
<option value="llm">LLM-based (More Flexible)</option>
<option value="schema">Schema-based (Uses LLM once!)</option>
</select>
</div>
<div class="option-group">
<label for="model-select">Model:</label>
<select id="model-select" name="model_name" required>
<option value="">Select a Model</option>
</select>
</div>
</div>
<button type="submit" id="extract-btn" class="extract-btn">
<i class="fas fa-magic"></i>
Extract Data
</button>
</div>
</div>
<div class="workflow-arrow"></div>
<div class="workflow-step">
<h3 class="step-title">2. Your Instant API & Data</h3>
<div class="response-container">
<div class="api-request-box">
<label>API Request (cURL):</label>
<pre id="curl-example">curl -X POST http://localhost:8000/scrape -H "Content-Type: application/json" -d '{"url": "...", "query": "..."}'
# Or for LLM-based approach:
curl -X POST http://localhost:8000/scrape-with-llm -H "Content-Type: application/json" -d '{"url": "...", "query": "..."}'</pre>
</div>
<div class="json-response-box">
<label>JSON Response:</label>
<pre id="json-output">{
"success": true,
"extracted_data": [
{
"title": "Example Book",
"author": "John Doe",
"description": "A great book..."
}
]
}</pre>
</div>
</div>
</div>
</div>
<!-- Results Section -->
<div id="results-section" class="results-section" style="display: none;">
<div class="results-header">
<h2>Extracted Data</h2>
<button id="copy-json" class="copy-btn">
<i class="fas fa-copy"></i>
Copy JSON
</button>
</div>
<div class="results-content">
<div class="result-info">
<div class="info-item">
<span class="label">URL:</span>
<span id="result-url" class="value"></span>
</div>
<div class="info-item">
<span class="label">Query:</span>
<span id="result-query" class="value"></span>
</div>
<div class="info-item">
<span class="label">Model Used:</span>
<span id="result-model" class="value"></span>
</div>
</div>
<div class="json-display">
<pre id="actual-json-output"></pre>
</div>
</div>
</div>
<!-- Loading State -->
<div id="loading" class="loading" style="display: none;">
<div class="spinner"></div>
<p>AI is analyzing the website and extracting data...</p>
</div>
</div>
<!-- Models Page -->
<div id="models-page" class="page">
<div class="models-header">
<h1>Model Configuration</h1>
<p>Configure and manage your AI model configurations</p>
</div>
<div class="models-container">
<!-- Add New Model Form -->
<div class="model-form-section">
<h3>Add New Model</h3>
<form id="model-form" class="model-form">
<div class="form-row">
<div class="input-group">
<label for="model-name">Model Name:</label>
<input type="text" id="model-name" name="model_name" placeholder="my-gemini" required>
</div>
<div class="input-group">
<label for="provider">Provider:</label>
<input type="text" id="provider" name="provider" placeholder="gemini/gemini-2.5-flash" required>
</div>
</div>
<div class="input-group">
<label for="api-token">API Token:</label>
<input type="password" id="api-token" name="api_token" placeholder="Enter your API token" required>
</div>
<button type="submit" class="save-btn">
<i class="fas fa-save"></i>
Save Model
</button>
</form>
</div>
<!-- Saved Models List -->
<div class="saved-models-section">
<h3>Saved Models</h3>
<div id="models-list" class="models-list">
<!-- Models will be loaded here -->
</div>
</div>
</div>
</div>
<!-- API Requests Page -->
<div id="requests-page" class="page">
<div class="requests-header">
<h1>Saved API Requests</h1>
<p>View and manage your previous API requests</p>
</div>
<div class="requests-container">
<div class="requests-list" id="requests-list">
<!-- Saved requests will be loaded here -->
</div>
</div>
</div>
</main>
<!-- Toast Notifications -->
<div id="toast-container" class="toast-container"></div>
<script src="/static/script.js"></script>
</body>
</html>

View File

@@ -0,0 +1,401 @@
// API Configuration
const API_BASE_URL = 'http://localhost:8000';
// DOM Elements
const navLinks = document.querySelectorAll('.nav-link');
const pages = document.querySelectorAll('.page');
const scrapeForm = document.getElementById('scrape-form');
const modelForm = document.getElementById('model-form');
const modelSelect = document.getElementById('model-select');
const modelsList = document.getElementById('models-list');
const resultsSection = document.getElementById('results-section');
const loadingSection = document.getElementById('loading');
const copyJsonBtn = document.getElementById('copy-json');
// Navigation
navLinks.forEach(link => {
link.addEventListener('click', (e) => {
e.preventDefault();
const targetPage = link.dataset.page;
// Update active nav link
navLinks.forEach(l => l.classList.remove('active'));
link.classList.add('active');
// Show target page
pages.forEach(page => page.classList.remove('active'));
document.getElementById(`${targetPage}-page`).classList.add('active');
// Load data for the page
if (targetPage === 'models') {
loadModels();
} else if (targetPage === 'requests') {
loadSavedRequests();
}
});
});
// Scrape Form Handler
document.getElementById('extract-btn').addEventListener('click', async (e) => {
e.preventDefault();
// Scroll to results section immediately when button is clicked
document.getElementById('results-section').scrollIntoView({
behavior: 'smooth',
block: 'start'
});
const url = document.getElementById('url').value;
const query = document.getElementById('query').value;
const headless = true; // Always use headless mode
const model_name = document.getElementById('model-select').value || null;
const scraping_approach = document.getElementById('scraping-approach').value;
if (!url || !query) {
showToast('Please fill in both URL and query fields', 'error');
return;
}
if (!model_name) {
showToast('Please select a model from the dropdown or add one from the Models page', 'error');
return;
}
const data = {
url: url,
query: query,
headless: headless,
model_name: model_name
};
// Show loading state
showLoading(true);
hideResults();
try {
// Choose endpoint based on scraping approach
const endpoint = scraping_approach === 'llm' ? '/scrape-with-llm' : '/scrape';
const response = await fetch(`${API_BASE_URL}${endpoint}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
const result = await response.json();
if (response.ok) {
displayResults(result);
showToast(`Data extracted successfully using ${scraping_approach === 'llm' ? 'LLM-based' : 'Schema-based'} approach!`, 'success');
} else {
throw new Error(result.detail || 'Failed to extract data');
}
} catch (error) {
console.error('Scraping error:', error);
showToast(`Error: ${error.message}`, 'error');
} finally {
showLoading(false);
}
});
// Model Form Handler
modelForm.addEventListener('submit', async (e) => {
e.preventDefault();
const formData = new FormData(modelForm);
const data = {
model_name: formData.get('model_name'),
provider: formData.get('provider'),
api_token: formData.get('api_token')
};
try {
const response = await fetch(`${API_BASE_URL}/models`, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
const result = await response.json();
if (response.ok) {
showToast('Model saved successfully!', 'success');
modelForm.reset();
loadModels();
loadModelSelect();
} else {
throw new Error(result.detail || 'Failed to save model');
}
} catch (error) {
console.error('Model save error:', error);
showToast(`Error: ${error.message}`, 'error');
}
});
// Copy JSON Button
copyJsonBtn.addEventListener('click', () => {
const actualJsonOutput = document.getElementById('actual-json-output');
const textToCopy = actualJsonOutput.textContent;
navigator.clipboard.writeText(textToCopy).then(() => {
showToast('JSON copied to clipboard!', 'success');
}).catch(() => {
showToast('Failed to copy JSON', 'error');
});
});
// Load Models
async function loadModels() {
try {
const response = await fetch(`${API_BASE_URL}/models`);
const result = await response.json();
if (response.ok) {
displayModels(result.models);
} else {
throw new Error(result.detail || 'Failed to load models');
}
} catch (error) {
console.error('Load models error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Display Models
function displayModels(models) {
if (models.length === 0) {
modelsList.innerHTML = '<p style="text-align: center; color: #7f8c8d; padding: 2rem;">No models saved yet. Add your first model above!</p>';
return;
}
modelsList.innerHTML = models.map(model => `
<div class="model-card">
<div class="model-info">
<div class="model-name">${model}</div>
<div class="model-provider">Model Configuration</div>
</div>
<div class="model-actions">
<button class="btn btn-danger" onclick="deleteModel('${model}')">
<i class="fas fa-trash"></i>
Delete
</button>
</div>
</div>
`).join('');
}
// Delete Model
async function deleteModel(modelName) {
if (!confirm(`Are you sure you want to delete the model "${modelName}"?`)) {
return;
}
try {
const response = await fetch(`${API_BASE_URL}/models/${modelName}`, {
method: 'DELETE'
});
const result = await response.json();
if (response.ok) {
showToast('Model deleted successfully!', 'success');
loadModels();
loadModelSelect();
} else {
throw new Error(result.detail || 'Failed to delete model');
}
} catch (error) {
console.error('Delete model error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Load Model Select Options
async function loadModelSelect() {
try {
const response = await fetch(`${API_BASE_URL}/models`);
const result = await response.json();
if (response.ok) {
// Clear existing options
modelSelect.innerHTML = '<option value="">Select a Model</option>';
// Add model options
result.models.forEach(model => {
const option = document.createElement('option');
option.value = model;
option.textContent = model;
modelSelect.appendChild(option);
});
}
} catch (error) {
console.error('Load model select error:', error);
}
}
// Display Results
function displayResults(result) {
// Update result info
document.getElementById('result-url').textContent = result.url;
document.getElementById('result-query').textContent = result.query;
document.getElementById('result-model').textContent = result.model_name || 'Default Model';
// Display JSON in the actual results section
const actualJsonOutput = document.getElementById('actual-json-output');
actualJsonOutput.textContent = JSON.stringify(result.extracted_data, null, 2);
// Don't update the sample JSON in the workflow demo - keep it as example
// Update the cURL example based on the approach used
const scraping_approach = document.getElementById('scraping-approach').value;
const endpoint = scraping_approach === 'llm' ? '/scrape-with-llm' : '/scrape';
const curlExample = document.getElementById('curl-example');
curlExample.textContent = `curl -X POST http://localhost:8000${endpoint} -H "Content-Type: application/json" -d '{"url": "${result.url}", "query": "${result.query}"}'`;
// Show results section
resultsSection.style.display = 'block';
resultsSection.scrollIntoView({ behavior: 'smooth' });
}
// Show/Hide Loading
function showLoading(show) {
loadingSection.style.display = show ? 'block' : 'none';
}
// Hide Results
function hideResults() {
resultsSection.style.display = 'none';
}
// Toast Notifications
function showToast(message, type = 'info') {
const toastContainer = document.getElementById('toast-container');
const toast = document.createElement('div');
toast.className = `toast ${type}`;
const icon = type === 'success' ? 'fas fa-check-circle' :
type === 'error' ? 'fas fa-exclamation-circle' :
'fas fa-info-circle';
toast.innerHTML = `
<i class="${icon}"></i>
<span>${message}</span>
`;
toastContainer.appendChild(toast);
// Auto remove after 5 seconds
setTimeout(() => {
toast.remove();
}, 5000);
}
// Load Saved Requests
async function loadSavedRequests() {
try {
const response = await fetch(`${API_BASE_URL}/saved-requests`);
const result = await response.json();
if (response.ok) {
displaySavedRequests(result.requests);
} else {
throw new Error(result.detail || 'Failed to load saved requests');
}
} catch (error) {
console.error('Load saved requests error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Display Saved Requests
function displaySavedRequests(requests) {
const requestsList = document.getElementById('requests-list');
if (requests.length === 0) {
requestsList.innerHTML = '<p style="text-align: center; color: #CCCCCC; padding: 2rem;">No saved API requests yet. Make your first request from the Scrape page!</p>';
return;
}
requestsList.innerHTML = requests.map(request => {
const url = request.body.url;
const query = request.body.query;
const model = request.body.model_name || 'Default Model';
const endpoint = request.endpoint;
// Create curl command
const curlCommand = `curl -X POST http://localhost:8000${endpoint} \\
-H "Content-Type: application/json" \\
-d '{
"url": "${url}",
"query": "${query}",
"model_name": "${model}"
}'`;
return `
<div class="request-card">
<div class="request-header">
<div class="request-info">
<div class="request-url">${url}</div>
<div class="request-query">${query}</div>
</div>
<div class="request-actions">
<button class="btn-danger" onclick="deleteSavedRequest('${request.id}')">
<i class="fas fa-trash"></i>
Delete
</button>
</div>
</div>
<div class="request-curl">
<h4>cURL Command:</h4>
<pre>${curlCommand}</pre>
</div>
</div>
`;
}).join('');
}
// Delete Saved Request
async function deleteSavedRequest(requestId) {
if (!confirm('Are you sure you want to delete this saved request?')) {
return;
}
try {
const response = await fetch(`${API_BASE_URL}/saved-requests/${requestId}`, {
method: 'DELETE'
});
const result = await response.json();
if (response.ok) {
showToast('Saved request deleted successfully!', 'success');
loadSavedRequests();
} else {
throw new Error(result.detail || 'Failed to delete saved request');
}
} catch (error) {
console.error('Delete saved request error:', error);
showToast(`Error: ${error.message}`, 'error');
}
}
// Initialize
document.addEventListener('DOMContentLoaded', () => {
loadModelSelect();
// Check if API is available
fetch(`${API_BASE_URL}/health`)
.then(response => {
if (!response.ok) {
showToast('Warning: API server might not be running', 'error');
}
})
.catch(() => {
showToast('Warning: Cannot connect to API server. Make sure it\'s running on localhost:8000', 'error');
});
});

View File

@@ -0,0 +1,765 @@
/* Reset and Base Styles */
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #000000;
color: #FFFFFF;
line-height: 1.6;
font-size: 16px;
}
/* Header */
.header {
border-bottom: 1px solid #333;
padding: 1rem 0;
background: #000000;
position: sticky;
top: 0;
z-index: 100;
}
.header-content {
max-width: 1200px;
margin: 0 auto;
padding: 0 2rem;
display: flex;
justify-content: space-between;
align-items: center;
}
.logo {
display: flex;
align-items: center;
gap: 0.5rem;
font-size: 1.5rem;
font-weight: 600;
color: #FFFFFF;
}
.logo-image {
width: 40px;
height: 40px;
border-radius: 4px;
object-fit: contain;
}
.nav-links {
display: flex;
gap: 2rem;
}
.nav-link {
color: #CCCCCC;
text-decoration: none;
font-weight: 500;
transition: color 0.2s ease;
}
.nav-link:hover,
.nav-link.active {
color: #FFFFFF;
}
/* Main Content */
.main-content {
max-width: 1200px;
margin: 0 auto;
padding: 2rem;
}
.page {
display: none;
}
.page.active {
display: block;
}
/* Hero Section */
.hero-section {
text-align: center;
margin-bottom: 4rem;
padding: 2rem 0;
}
.hero-title {
font-size: 3rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
line-height: 1.2;
}
.hero-subtitle {
font-size: 1.25rem;
color: #CCCCCC;
max-width: 600px;
margin: 0 auto;
}
/* Workflow Demo */
.workflow-demo {
display: grid;
grid-template-columns: 1fr auto 1fr;
gap: 2rem;
align-items: start;
margin-bottom: 4rem;
}
.workflow-step {
display: flex;
flex-direction: column;
gap: 1rem;
}
.step-title {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
text-align: center;
margin-bottom: 1rem;
}
.workflow-arrow {
font-size: 2rem;
font-weight: 700;
color: #09b5a5;
display: flex;
align-items: center;
justify-content: center;
margin-top: 20rem;
}
/* Request Box */
.request-box {
border: 2px solid #333;
border-radius: 8px;
padding: 2rem;
background: #111111;
}
.input-group {
margin-bottom: 1.5rem;
}
.input-group label {
display: block;
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 0.5rem;
font-size: 0.9rem;
}
.input-group input,
.input-group textarea,
.input-group select {
width: 100%;
padding: 0.75rem;
border: 1px solid #333;
border-radius: 4px;
font-family: 'Courier New', monospace;
font-size: 0.9rem;
background: #1A1A1A;
color: #FFFFFF;
transition: border-color 0.2s ease;
}
.input-group input:focus,
.input-group textarea:focus,
.input-group select:focus {
outline: none;
border-color: #09b5a5;
}
.input-group textarea {
min-height: 80px;
resize: vertical;
}
.form-options {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1rem;
margin-bottom: 1.5rem;
}
.option-group {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.option-group label {
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
font-size: 0.9rem;
}
.option-group input[type="checkbox"] {
width: auto;
margin-right: 0.5rem;
}
.extract-btn {
width: 100%;
padding: 1rem;
background: #09b5a5;
color: #000000;
border: none;
border-radius: 4px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
gap: 0.5rem;
}
.extract-btn:hover {
background: #09b5a5;
}
/* Dropdown specific styling */
select,
.input-group select,
.option-group select {
cursor: pointer !important;
appearance: none !important;
-webkit-appearance: none !important;
-moz-appearance: none !important;
-ms-appearance: none !important;
background-image: url("data:image/svg+xml;charset=UTF-8,%3csvg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 24 24' fill='none' stroke='%23FFFFFF' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3e%3cpolyline points='6,9 12,15 18,9'%3e%3c/polyline%3e%3c/svg%3e") !important;
background-repeat: no-repeat !important;
background-position: right 0.75rem center !important;
background-size: 1rem !important;
padding-right: 2.5rem !important;
border: 1px solid #333 !important;
border-radius: 4px !important;
font-family: 'Courier New', monospace !important;
font-size: 0.9rem !important;
background-color: #1A1A1A !important;
color: #FFFFFF !important;
}
select:hover,
.input-group select:hover,
.option-group select:hover {
border-color: #09b5a5 !important;
}
select:focus,
.input-group select:focus,
.option-group select:focus {
outline: none !important;
border-color: #09b5a5 !important;
}
select option,
.input-group select option,
.option-group select option {
background: #1A1A1A !important;
color: #FFFFFF !important;
padding: 0.5rem !important;
}
/* Response Container */
.response-container {
display: flex;
flex-direction: column;
gap: 1rem;
}
.api-request-box,
.json-response-box {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
background: #111111;
}
.api-request-box label,
.json-response-box label {
display: block;
font-family: 'Courier New', monospace;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 0.5rem;
font-size: 0.9rem;
}
.api-request-box pre,
.json-response-box pre {
font-family: 'Courier New', monospace;
font-size: 0.85rem;
line-height: 1.5;
color: #FFFFFF;
background: #1A1A1A;
padding: 1rem;
border-radius: 4px;
overflow-x: auto;
white-space: pre-wrap;
word-break: break-all;
}
/* Results Section */
.results-section {
border: 2px solid #333;
border-radius: 8px;
overflow: hidden;
margin-top: 2rem;
background: #111111;
}
.results-header {
background: #1A1A1A;
color: #FFFFFF;
padding: 1rem 1.5rem;
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 1px solid #333;
}
.results-header h2 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
}
.copy-btn {
background: #09b5a5;
color: #000000;
border: none;
padding: 0.5rem 1rem;
border-radius: 4px;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
display: flex;
align-items: center;
gap: 0.5rem;
transition: background-color 0.2s ease;
}
.copy-btn:hover {
background: #09b5a5;
}
.results-content {
padding: 1.5rem;
}
.result-info {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(250px, 1fr));
gap: 1rem;
margin-bottom: 1.5rem;
padding: 1rem;
background: #1A1A1A;
border-radius: 4px;
border: 1px solid #333;
}
.info-item {
display: flex;
flex-direction: column;
gap: 0.25rem;
}
.info-item .label {
font-weight: 600;
color: #FFFFFF;
font-size: 0.9rem;
}
.info-item .value {
color: #CCCCCC;
word-break: break-all;
}
.json-display {
background: #1A1A1A;
border-radius: 4px;
overflow: hidden;
border: 1px solid #333;
}
.json-display pre {
color: #FFFFFF;
padding: 1.5rem;
margin: 0;
overflow-x: auto;
font-family: 'Courier New', monospace;
font-size: 0.9rem;
line-height: 1.5;
}
/* Loading State */
.loading {
text-align: center;
padding: 3rem;
}
.spinner {
width: 40px;
height: 40px;
border: 3px solid #333;
border-top: 3px solid #09b5a5;
border-radius: 50%;
animation: spin 1s linear infinite;
margin: 0 auto 1rem;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
/* Models Page */
.models-header {
text-align: center;
margin-bottom: 3rem;
}
.models-header h1 {
font-size: 2.5rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
}
.models-header p {
font-size: 1.1rem;
color: #CCCCCC;
}
/* API Requests Page */
.requests-header {
text-align: center;
margin-bottom: 3rem;
}
.requests-header h1 {
font-size: 2.5rem;
font-weight: 700;
color: #FFFFFF;
margin-bottom: 1rem;
}
.requests-header p {
font-size: 1.1rem;
color: #CCCCCC;
}
.requests-container {
max-width: 1200px;
margin: 0 auto;
}
.requests-list {
display: grid;
gap: 1.5rem;
}
.request-card {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
background: #111111;
transition: border-color 0.2s ease;
}
.request-card:hover {
border-color: #09b5a5;
}
.request-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 1rem;
padding-bottom: 1rem;
border-bottom: 1px solid #333;
}
.request-info {
display: flex;
flex-direction: column;
gap: 0.5rem;
}
.request-url {
font-family: 'Courier New', monospace;
font-weight: 600;
color: #09b5a5;
font-size: 1.1rem;
word-break: break-all;
}
.request-query {
color: #CCCCCC;
font-size: 0.9rem;
margin-top: 0.5rem;
word-break: break-all;
}
.request-actions {
display: flex;
gap: 0.5rem;
}
.request-curl {
background: #1A1A1A;
border: 1px solid #333;
border-radius: 4px;
padding: 1rem;
margin-top: 1rem;
}
.request-curl h4 {
color: #FFFFFF;
font-size: 0.9rem;
font-weight: 600;
margin-bottom: 0.5rem;
font-family: 'Courier New', monospace;
}
.request-curl pre {
color: #CCCCCC;
font-size: 0.8rem;
line-height: 1.4;
overflow-x: auto;
white-space: pre-wrap;
word-break: break-all;
background: #111111;
padding: 0.75rem;
border-radius: 4px;
border: 1px solid #333;
}
.models-container {
max-width: 800px;
margin: 0 auto;
}
.model-form-section {
border: 2px solid #333;
border-radius: 8px;
padding: 2rem;
margin-bottom: 2rem;
background: #111111;
}
.model-form-section h3 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 1.5rem;
}
.model-form {
display: flex;
flex-direction: column;
gap: 1.5rem;
}
.form-row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 1rem;
}
.save-btn {
padding: 1rem;
background: #09b5a5;
color: #000000;
border: none;
border-radius: 4px;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
justify-content: center;
gap: 0.5rem;
}
.save-btn:hover {
background: #09b5a5;
}
.saved-models-section h3 {
font-size: 1.25rem;
font-weight: 600;
color: #FFFFFF;
margin-bottom: 1.5rem;
}
.models-list {
display: grid;
gap: 1rem;
}
.model-card {
border: 2px solid #333;
border-radius: 8px;
padding: 1.5rem;
display: flex;
justify-content: space-between;
align-items: center;
transition: border-color 0.2s ease;
background: #111111;
}
.model-card:hover {
border-color: #09b5a5;
}
.model-info {
flex: 1;
}
.model-name {
font-weight: 600;
color: #FFFFFF;
font-size: 1.1rem;
margin-bottom: 0.5rem;
}
.model-provider {
color: #CCCCCC;
font-size: 0.9rem;
}
.model-actions {
display: flex;
gap: 0.5rem;
}
.btn-danger {
background: #FF4444;
color: #FFFFFF;
border: none;
padding: 0.5rem 1rem;
border-radius: 4px;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
transition: background-color 0.2s ease;
display: flex;
align-items: center;
gap: 0.5rem;
}
.btn-danger:hover {
background: #CC3333;
}
/* Toast Notifications */
.toast-container {
position: fixed;
top: 20px;
right: 20px;
z-index: 1000;
}
.toast {
background: #111111;
border: 2px solid #333;
border-radius: 4px;
padding: 1rem 1.5rem;
margin-bottom: 0.5rem;
display: flex;
align-items: center;
gap: 0.5rem;
animation: slideIn 0.3s ease;
max-width: 400px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3);
color: #FFFFFF;
}
.toast.success {
border-color: #09b5a5;
background: #0A1A1A;
}
.toast.error {
border-color: #FF4444;
background: #1A0A0A;
}
.toast.info {
border-color: #09b5a5;
background: #0A1A1A;
}
@keyframes slideIn {
from {
transform: translateX(100%);
opacity: 0;
}
to {
transform: translateX(0);
opacity: 1;
}
}
/* Responsive Design */
@media (max-width: 768px) {
.header-content {
padding: 0 1rem;
}
.main-content {
padding: 1rem;
}
.hero-title {
font-size: 2rem;
}
.workflow-demo {
grid-template-columns: 1fr;
gap: 1rem;
}
.workflow-arrow {
transform: rotate(90deg);
margin: 1rem 0;
}
.form-options {
grid-template-columns: 1fr;
}
.form-row {
grid-template-columns: 1fr;
}
.result-info {
grid-template-columns: 1fr;
}
.model-card {
flex-direction: column;
gap: 1rem;
text-align: center;
}
.model-actions {
width: 100%;
justify-content: center;
}
}

View File

@@ -0,0 +1,28 @@
import asyncio
from web_scraper_lib import scrape_website
import os
async def test_library():
"""Test the mini library directly."""
print("=== Testing Mini Library ===")
# Test 1: Scrape with a custom model
url = "https://marketplace.mainstreet.co.in/collections/adidas-yeezy/products/adidas-yeezy-boost-350-v2-yecheil-non-reflective"
query = "Extract the following data: Product name, Product price, Product description, Product size. DO NOT EXTRACT ANYTHING ELSE."
if os.path.exists("models"):
model_name = os.listdir("models")[0].split(".")[0]
else:
raise Exception("No models found in models directory")
print(f"Scraping: {url}")
print(f"Query: {query}")
try:
result = await scrape_website(url, query, model_name)
print("✅ Library test successful!")
print(f"Extracted data: {result['extracted_data']}")
except Exception as e:
print(f"❌ Library test failed: {e}")
if __name__ == "__main__":
asyncio.run(test_library())

View File

@@ -0,0 +1,67 @@
#!/usr/bin/env python3
"""
Test script for the new model management functionality.
This script demonstrates how to save and use custom model configurations.
"""
import asyncio
import requests
import json
# API base URL
BASE_URL = "http://localhost:8000"
def test_model_management():
"""Test the model management endpoints."""
print("=== Testing Model Management ===")
# 1. List current models
print("\n1. Listing current models:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 2. Save another model configuration (OpenAI example)
print("\n2. Saving OpenAI model configuration:")
openai_config = {
"model_name": "my-openai",
"provider": "openai",
"api_token": "your-openai-api-key-here"
}
response = requests.post(f"{BASE_URL}/models", json=openai_config)
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 3. List models again to see the new ones
print("\n3. Listing models after adding new ones:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 4. Delete a model configuration
print("\n4. Deleting a model configuration:")
response = requests.delete(f"{BASE_URL}/models/my-openai")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# 5. Final list of models
print("\n5. Final list of models:")
response = requests.get(f"{BASE_URL}/models")
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
if __name__ == "__main__":
print("Model Management Test Script")
print("Make sure the API server is running on http://localhost:8000")
print("=" * 50)
try:
test_model_management()
except requests.exceptions.ConnectionError:
print("Error: Could not connect to the API server.")
print("Make sure the server is running with: python api_server.py")
except Exception as e:
print(f"Error: {e}")

View File

@@ -0,0 +1,397 @@
from crawl4ai import (
AsyncWebCrawler,
BrowserConfig,
CacheMode,
CrawlerRunConfig,
LLMConfig,
JsonCssExtractionStrategy,
LLMExtractionStrategy
)
import os
import json
import hashlib
from typing import Dict, Any, Optional, List
from litellm import completion
class ModelConfig:
"""Configuration for LLM models."""
def __init__(self, provider: str, api_token: str):
self.provider = provider
self.api_token = api_token
def to_dict(self) -> Dict[str, Any]:
return {
"provider": self.provider,
"api_token": self.api_token
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ModelConfig':
return cls(
provider=data["provider"],
api_token=data["api_token"]
)
class WebScraperAgent:
"""
A mini library that converts any website into a structured data API.
Features:
1. Provide a URL and tell AI what data you need in plain English
2. Generate: Agent reverse-engineers the site and deploys custom scraper
3. Integrate: Use private API endpoint to get structured data
4. Support for custom LLM models and API keys
"""
def __init__(self, schemas_dir: str = "schemas", models_dir: str = "models"):
self.schemas_dir = schemas_dir
self.models_dir = models_dir
os.makedirs(self.schemas_dir, exist_ok=True)
os.makedirs(self.models_dir, exist_ok=True)
def _generate_schema_key(self, url: str, query: str) -> str:
"""Generate a unique key for schema caching based on URL and query."""
content = f"{url}:{query}"
return hashlib.md5(content.encode()).hexdigest()
def save_model_config(self, model_name: str, provider: str, api_token: str) -> bool:
"""
Save a model configuration for later use.
Args:
model_name: User-friendly name for the model
provider: LLM provider (e.g., 'gemini', 'openai', 'anthropic')
api_token: API token for the provider
Returns:
True if saved successfully
"""
try:
model_config = ModelConfig(provider, api_token)
config_path = os.path.join(self.models_dir, f"{model_name}.json")
with open(config_path, "w") as f:
json.dump(model_config.to_dict(), f, indent=2)
print(f"Model configuration saved: {model_name}")
return True
except Exception as e:
print(f"Failed to save model configuration: {e}")
return False
def load_model_config(self, model_name: str) -> Optional[ModelConfig]:
"""
Load a saved model configuration.
Args:
model_name: Name of the saved model configuration
Returns:
ModelConfig object or None if not found
"""
try:
config_path = os.path.join(self.models_dir, f"{model_name}.json")
if not os.path.exists(config_path):
return None
with open(config_path, "r") as f:
data = json.load(f)
return ModelConfig.from_dict(data)
except Exception as e:
print(f"Failed to load model configuration: {e}")
return None
def list_saved_models(self) -> List[str]:
"""List all saved model configurations."""
models = []
for filename in os.listdir(self.models_dir):
if filename.endswith('.json'):
models.append(filename[:-5]) # Remove .json extension
return models
def delete_model_config(self, model_name: str) -> bool:
"""
Delete a saved model configuration.
Args:
model_name: Name of the model configuration to delete
Returns:
True if deleted successfully
"""
try:
config_path = os.path.join(self.models_dir, f"{model_name}.json")
if os.path.exists(config_path):
os.remove(config_path)
print(f"Model configuration deleted: {model_name}")
return True
return False
except Exception as e:
print(f"Failed to delete model configuration: {e}")
return False
async def _load_or_generate_schema(self, url: str, query: str, session_id: str = "schema_generator", model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Loads schema from cache if exists, otherwise generates using AI.
This is the "Generate" step - our agent reverse-engineers the site.
Args:
url: URL to scrape
query: Query for data extraction
session_id: Session identifier
model_name: Name of saved model configuration to use
"""
schema_key = self._generate_schema_key(url, query)
schema_path = os.path.join(self.schemas_dir, f"{schema_key}.json")
if os.path.exists(schema_path):
print(f"Schema found in cache for {url}")
with open(schema_path, "r") as f:
return json.load(f)
print(f"Generating new schema for {url}")
print(f"Query: {query}")
query += """
IMPORTANT:
GENERATE THE SCHEMA WITH ONLY THE FIELDS MENTIONED IN THE QUERY. MAKE SURE THE NUMBER OF FIELDS IN THE SCHEME MATCH THE NUMBER OF FIELDS IN THE QUERY.
"""
# Step 1: Fetch the page HTML
async with AsyncWebCrawler(config=BrowserConfig(headless=True)) as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
session_id=session_id,
simulate_user=True,
remove_overlay_elements=True,
delay_before_return_html=5,
)
)
html = result.fit_html
# Step 2: Generate schema using AI with custom model if specified
print("AI is analyzing the page structure...")
# Use custom model configuration if provided
if model_name:
model_config = self.load_model_config(model_name)
if model_config:
llm_config = LLMConfig(
provider=model_config.provider,
api_token=model_config.api_token
)
print(f"Using custom model: {model_name}")
else:
raise ValueError(f"Model configuration '{model_name}' not found. Please add it from the Models page.")
else:
# Require a model to be specified
raise ValueError("No model specified. Please select a model from the dropdown or add one from the Models page.")
schema = JsonCssExtractionStrategy.generate_schema(
html=html,
llm_config=llm_config,
query=query
)
# Step 3: Cache the generated schema
print(f"Schema generated and cached: {json.dumps(schema, indent=2)}")
with open(schema_path, "w") as f:
json.dump(schema, f, indent=2)
return schema
def _generate_llm_schema(self, query: str, llm_config: LLMConfig) -> Dict[str, Any]:
"""
Generate a schema for a given query using a custom LLM model.
Args:
query: Plain English description of what data to extract
model_config: Model configuration to use
"""
# ask the model to generate a schema for the given query in the form of a json.
prompt = f"""
IDENTIFY THE FIELDS FOR EXTRACTION MENTIONED IN THE QUERY and GENERATE A JSON SCHEMA FOR THE FIELDS.
eg.
{{
"name": "str",
"age": "str",
"email": "str",
"product_name": "str",
"product_price": "str",
"product_description": "str",
"product_image": "str",
"product_url": "str",
"product_rating": "str",
"product_reviews": "str",
}}
Here is the query:
{query}
IMPORTANT:
THE RESULT SHOULD BE A JSON OBJECT.
MAKE SURE THE NUMBER OF FIELDS IN THE RESULT MATCH THE NUMBER OF FIELDS IN THE QUERY.
THE RESULT SHOULD BE A JSON OBJECT.
"""
response = completion(
model=llm_config.provider,
messages=[{"role": "user", "content": prompt}],
api_key=llm_config.api_token,
result_type="json"
)
return response.json()["choices"][0]["message"]["content"]
async def scrape_data_with_llm(self, url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Scrape structured data from any website using a custom LLM model.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
"""
if model_name:
model_config = self.load_model_config(model_name)
if model_config:
llm_config = LLMConfig(
provider=model_config.provider,
api_token=model_config.api_token
)
print(f"Using custom model: {model_name}")
else:
raise ValueError(f"Model configuration '{model_name}' not found. Please add it from the Models page.")
else:
# Require a model to be specified
raise ValueError("No model specified. Please select a model from the dropdown or add one from the Models page.")
query += """\n
IMPORTANT:
THE RESULT SHOULD BE A JSON OBJECT WITH THE ONLY THE FIELDS MENTIONED IN THE QUERY.
MAKE SURE THE NUMBER OF FIELDS IN THE RESULT MATCH THE NUMBER OF FIELDS IN THE QUERY.
THE RESULT SHOULD BE A JSON OBJECT.
"""
schema = self._generate_llm_schema(query, llm_config)
print(f"Schema: {schema}")
llm_extraction_strategy = LLMExtractionStrategy(
llm_config=llm_config,
instruction=query,
result_type="json",
schema=schema
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url=url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
simulate_user=True,
extraction_strategy=llm_extraction_strategy,
)
)
extracted_data = result.extracted_content
if isinstance(extracted_data, str):
try:
extracted_data = json.loads(extracted_data)
except json.JSONDecodeError:
# If it's not valid JSON, keep it as string
pass
return {
"url": url,
"query": query,
"extracted_data": extracted_data,
"timestamp": result.timestamp if hasattr(result, 'timestamp') else None
}
async def scrape_data(self, url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Main method to scrape structured data from any website.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
Returns:
Structured data extracted from the website
"""
# Step 1: Generate or load schema (reverse-engineer the site)
schema = await self._load_or_generate_schema(url=url, query=query, model_name=model_name)
# Step 2: Deploy custom high-speed scraper
print(f"Deploying custom scraper for {url}")
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
run_config = CrawlerRunConfig(
extraction_strategy=JsonCssExtractionStrategy(schema=schema),
)
result = await crawler.arun(url=url, config=run_config)
# Step 3: Return structured data
# Parse extracted_content if it's a JSON string
extracted_data = result.extracted_content
if isinstance(extracted_data, str):
try:
extracted_data = json.loads(extracted_data)
except json.JSONDecodeError:
# If it's not valid JSON, keep it as string
pass
return {
"url": url,
"query": query,
"extracted_data": extracted_data,
"schema_used": schema,
"timestamp": result.timestamp if hasattr(result, 'timestamp') else None
}
async def get_cached_schemas(self) -> Dict[str, str]:
"""Get list of cached schemas."""
schemas = {}
for filename in os.listdir(self.schemas_dir):
if filename.endswith('.json'):
schema_key = filename[:-5] # Remove .json extension
schemas[schema_key] = filename
return schemas
def clear_cache(self):
"""Clear all cached schemas."""
import shutil
if os.path.exists(self.schemas_dir):
shutil.rmtree(self.schemas_dir)
os.makedirs(self.schemas_dir, exist_ok=True)
print("Schema cache cleared")
# Convenience function for simple usage
async def scrape_website(url: str, query: str, model_name: Optional[str] = None) -> Dict[str, Any]:
"""
Simple function to scrape any website with plain English instructions.
Args:
url: Website URL
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
Returns:
Extracted structured data
"""
agent = WebScraperAgent()
return await agent.scrape_data(url, query, model_name)
async def scrape_website_with_llm(url: str, query: str, model_name: Optional[str] = None):
"""
Scrape structured data from any website using a custom LLM model.
Args:
url: The website URL to scrape
query: Plain English description of what data to extract
model_name: Name of saved model configuration to use
"""
agent = WebScraperAgent()
return await agent.scrape_data_with_llm(url, query, model_name)