Compare commits

...

21 Commits

Author SHA1 Message Date
AHMET YILMAZ
8e1362acf5 Fix async generator type mismatch in Docker Client streaming
- Fixed single_result_generator to properly handle async generators from deep crawl strategies
- Added proper __aiter__ checking to distinguish between CrawlResult and async generators
- Await and yield individual results from nested async generators
- Streaming functionality now works correctly for all patterns (SDK, Direct API, Docker Client)
- All 22 comprehensive tests passing with 100% success rate
- Live streaming test confirmed working end-to-end
2025-08-15 15:49:11 +08:00
AHMET YILMAZ
07e9d651fb feat: Comprehensive deep crawl streaming functionality restoration
🚀 Major Achievements:
-  ORJSON Serialization System: Complete implementation with custom handlers
-  Global Deprecated Properties System: DeprecatedPropertiesMixin for automatic exclusion
-  Deep Crawl Streaming: Fully restored with proper CrawlResultContainer handling
-  Docker Client Streaming: Fixed async generator patterns and result type checking
-  Server API Improvements: Correct method selection logic and streaming responses
-  Type Safety: Dict-as-logger detection to prevent crashes

📊 Test Results: 100% success rate on comprehensive test suite (10/10 tests passing)

🔧 Files Modified:
- crawl4ai/models.py: ORJSON + DeprecatedPropertiesMixin implementation
- deploy/docker/api.py: Streaming endpoint fixes + CrawlResultContainer handling
- deploy/docker/server.py: Production imports + ORJSON response handling
- crawl4ai/docker_client.py: Async generator streaming fixes
- crawl4ai/deep_crawling/bfs_strategy.py: Logger type safety
- .gitignore: Development environment cleanup
- tests/test_comprehensive_fixes.py: Rich-based comprehensive test suite

🎯 Impact: Production-ready deep crawl streaming functionality with comprehensive testing coverage
2025-08-15 15:31:36 +08:00
Nasrin
11b310edef Merge pull request #1378 from unclecode/fix/exit_with_q
Cross Platform fix for browser profiler
2025-08-13 14:16:47 +08:00
Nasrin
489981e670 Merge pull request #1390 from unclecode/fix/docker-raw-html
Check for raw: and raw:// URLs before auto-appending https:// prefix
2025-08-13 13:56:33 +08:00
Nasrin
b92be4ef66 Merge pull request #1371 from unclecode/bug/proxy_config
#1057 : enhance ProxyConfig initialization to support dict and string…
2025-08-12 16:55:52 +08:00
Nasrin
7c0edaf266 Merge pull request #1384 from unclecode/fix/update_docker_examples
docs: remove CRAWL4AI_API_TOKEN references and use correct endpoints in Docker example scripts (#1015)
2025-08-12 16:53:42 +08:00
ntohidi
dfcfd8ae57 fix(dispatcher): enable true concurrency for fast-completing tasks in arun_many. REF: #560
The MemoryAdaptiveDispatcher was processing tasks sequentially despite
  max_session_permit > 1 due to fetching only one task per event loop iteration.
  This particularly affected raw:// URLs which complete in microseconds.

  Changes:
  - Replace single task fetch with greedy slot filling using get_nowait()
  - Fill all available slots (up to max_session_permit) immediately
  - Break on empty queue instead of waiting with timeout

  This ensures proper parallelization for all task types, especially
  ultra-fast operations like raw HTML processing.
2025-08-12 16:51:22 +08:00
ntohidi
955110a8b0 Merge branch 'develop' of https://github.com/unclecode/crawl4ai into develop 2025-08-12 12:22:25 +08:00
Soham Kukreti
f30811b524 fix: Check for raw: and raw:// URLs before auto-appending https:// prefix
- Add raw HTML URL validation alongside http/https checks
- Fix URL preprocessing logic to handle raw: and raw:// prefixes
- Update error message and add comprehensive test cases
2025-08-11 22:10:53 +05:30
ntohidi
8146d477e9 Merge branch 'main' into develop 2025-08-11 18:56:15 +08:00
ntohidi
96c4b0de67 fix(browser_manager): serialize new_page on persistent context to avoid races ref #1198
- Add _page_lock and guarded creation; handle empty context.pages safely
  - Prevents BrowserContext.new_page “Target page/context closed” during concurrent arun_many
2025-08-11 18:55:43 +08:00
Nasrin
57c14db7cb Merge pull request #1381 from unclecode/fix/base-tag-link-resolution
fix: Implement base tag support in link extraction (#1147)
2025-08-11 18:32:32 +08:00
Soham Kukreti
cd2dd68e4c docs: remove CRAWL4AI_API_TOKEN references and use correct endpoints in Docker example scripts (#1015)
- Remove deprecated API token authentication from all Docker examples
- Fix async job endpoints: /crawl -> /crawl/job for submission, /task/{id} -> /crawl/job/{id} for polling
- Fix sync endpoint: /crawl_sync -> /crawl (synchronous)
- Remove non-existent /crawl_direct endpoint
- Update request format to use new structure with browser_config and crawler_config
- Fix response handling for both async and sync calls
- Update extraction strategy format to use proper nested structure
- Add Ollama connectivity check before running tests
- Update test schemas and selectors for current website structures

This makes the Docker examples work out-of-the-box with the current API structure.
2025-08-09 19:37:22 +05:30
UncleCode
f0ce7b2710 feat: add v0.7.3 release notes, changelog updates, and documentation for new features 2025-08-09 21:04:18 +08:00
Soham Kukreti
18ad3ef159 fix: Implement base tag support in link extraction (#1147)
- Extract base href from <head><base> tag using XPath in _process_element method
- Use base URL as the primary URL for link normalization when present
- Add error handling with logging for malformed or problematic base tags
- Maintain backward compatibility when no base tag is present
- Add test to verify the functionality of the base tag extraction.
2025-08-08 20:11:57 +05:30
AHMET YILMAZ
0541b61405 feat(browser-profiler): implement cross-platform keyboard listeners and improve quit handling 2025-08-08 11:18:34 +08:00
AHMET YILMAZ
89cf5aba2b #1057 : enhance ProxyConfig initialization to support dict and string formats 2025-08-06 18:34:58 +08:00
Nasrin
6735c68288 Merge pull request #1170 from prokopis3/fix/create-profile
fix(browser_profiler): cross-platform 'q' to quit - create profile
2025-08-06 16:29:14 +08:00
prokopis3
c4d625fb3c chore(profile-test): fix filename typo ( test_crteate_profile.py → test_create_profile.py )
- Rename file to correct spelling
- No content changes
2025-06-12 14:38:32 +03:00
prokopis3
ef722766f0 fix(browser_profiler): improve keyboard input handling
- fix handling of special keys in Windows msvcrt implementation
- Guard against UnicodeDecodeError from multi-byte key sequences
- Filter out non-printable characters and control sequences
- Add error handling to prevent coroutine crashes
- Add unit test to verify keyboard input handling

Key changes:
- Safe UTF-8 decoding with try/except for special keys
- Skip non-printable and multi-byte character sequences
- Add broad exception handling in keyboard listener

Test runs on Windows only due to msvcrt dependency.
2025-06-12 14:33:12 +03:00
prokopis3
4bcb7171a3 fix(browser_profiler): cross-platform 'q' to quit
This commit introduces platform-specific handling for the 'q' key press to quit the browser profiler, ensuring compatibility with both Windows and Unix-like systems. It also adds a check to see if the browser process has already exited, terminating the input listener if so.

- Implemented `msvcrt` for Windows to capture keyboard input without requiring a newline.
- Retained `termios`, `tty`, and `select` for Unix-like systems.
- Added a check for browser process termination to gracefully exit the input listener.
- Updated logger messages to use colored output for better user experience.
2025-05-30 14:43:18 +03:00
25 changed files with 3263 additions and 480 deletions

10
.gitignore vendored
View File

@@ -1,6 +1,11 @@
# Scripts folder (private tools)
.scripts/
# Local development CLI (private)
local_dev.py
dev
DEV_CLI_README.md
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@@ -270,4 +275,7 @@ docs/**/data
.codecat/
docs/apps/linkdin/debug*/
docs/apps/linkdin/samples/insights/*
docs/apps/linkdin/samples/insights/*
# Production checklist (local, not for version control)
PRODUCTION_CHECKLIST.md

View File

@@ -5,6 +5,76 @@ All notable changes to Crawl4AI will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.7.3] - 2025-08-09
### Added
- **🕵️ Undetected Browser Support**: New browser adapter pattern with stealth capabilities
- `browser_adapter.py` with undetected Chrome integration
- Bypass sophisticated bot detection systems (Cloudflare, Akamai, custom solutions)
- Support for headless stealth mode with anti-detection techniques
- Human-like behavior simulation with random mouse movements and scrolling
- Comprehensive examples for anti-bot strategies and stealth crawling
- Full documentation guide for undetected browser usage
- **🎨 Multi-URL Configuration System**: URL-specific crawler configurations for batch processing
- Different crawling strategies for different URL patterns in a single batch
- Support for string patterns with wildcards (`"*.pdf"`, `"*/blog/*"`)
- Lambda function matchers for complex URL logic
- Mixed matchers combining strings and functions with AND/OR logic
- Fallback configuration support when no patterns match
- First-match-wins configuration selection with optional fallback
- **🧠 Memory Monitoring & Optimization**: Comprehensive memory usage tracking
- New `memory_utils.py` module for memory monitoring and optimization
- Real-time memory usage tracking during crawl sessions
- Memory leak detection and reporting
- Performance optimization recommendations
- Peak memory usage analysis and efficiency metrics
- Automatic cleanup suggestions for memory-intensive operations
- **📊 Enhanced Table Extraction**: Improved table access and DataFrame conversion
- Direct `result.tables` interface replacing generic `result.media` approach
- Instant pandas DataFrame conversion with `pd.DataFrame(table['data'])`
- Enhanced table detection algorithms for better accuracy
- Table metadata including source XPath and headers
- Improved table structure preservation during extraction
- **💰 GitHub Sponsors Integration**: 4-tier sponsorship system
- Supporter ($5/month): Community support + early feature previews
- Professional ($25/month): Priority support + beta access
- Business ($100/month): Direct consultation + custom integrations
- Enterprise ($500/month): Dedicated support + feature development
- Custom arrangement options for larger organizations
- **🐳 Docker LLM Provider Flexibility**: Environment-based LLM configuration
- `LLM_PROVIDER` environment variable support for dynamic provider switching
- `.llm.env` file support for secure configuration management
- Per-request provider override capabilities in API endpoints
- Support for OpenAI, Groq, and other providers without rebuilding images
- Enhanced Docker documentation with deployment examples
### Fixed
- **URL Matcher Fallback**: Resolved edge cases in URL pattern matching logic
- **Memory Management**: Fixed memory leaks in long-running crawl sessions
- **Sitemap Processing**: Improved redirect handling in sitemap fetching
- **Table Extraction**: Enhanced table detection and extraction accuracy
- **Error Handling**: Better error messages and recovery from network failures
### Changed
- **Architecture Refactoring**: Major cleanup and optimization
- Moved 2,450+ lines from main `async_crawler_strategy.py` to backup
- Cleaner separation of concerns in crawler architecture
- Better maintainability and code organization
- Preserved backward compatibility while improving performance
### Documentation
- **Comprehensive Examples**: Added real-world URLs and practical use cases
- **API Documentation**: Complete CrawlResult field documentation with all available fields
- **Migration Guides**: Updated table extraction patterns from `result.media` to `result.tables`
- **Undetected Browser Guide**: Full documentation for stealth mode and anti-bot strategies
- **Multi-Config Examples**: Detailed examples for URL-specific configurations
- **Docker Deployment**: Enhanced Docker documentation with LLM provider configuration
## [0.7.x] - 2025-06-29
### Added

View File

@@ -27,9 +27,9 @@
Crawl4AI turns the web into clean, LLM ready Markdown for RAG, agents, and data pipelines. Fast, controllable, battle tested by a 50k+ star community.
[✨ Check out latest update v0.7.0](#-recent-updates)
[✨ Check out latest update v0.7.3](#-recent-updates)
✨ New in v0.7.0, Adaptive Crawling, Virtual Scroll, Link Preview scoring, Async URL Seeder, big performance gains. [Release notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.0.md)
✨ New in v0.7.3: Undetected Browser Support, Multi-URL Configurations, Memory Monitoring, Enhanced Table Extraction, GitHub Sponsors. [Release notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.3.md)
<details>
<summary>🤓 <strong>My Personal Story</strong></summary>
@@ -542,7 +542,89 @@ async def test_news_crawl():
## ✨ Recent Updates
### Version 0.7.0 Release Highlights - The Adaptive Intelligence Update
<details>
<summary><strong>Version 0.7.3 Release Highlights - The Multi-Config Intelligence Update</strong></summary>
- **🕵️ Undetected Browser Support**: Bypass sophisticated bot detection systems:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig
browser_config = BrowserConfig(
browser_type="undetected", # Use undetected Chrome
headless=True, # Can run headless with stealth
extra_args=[
"--disable-blink-features=AutomationControlled",
"--disable-web-security"
]
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun("https://protected-site.com")
# Successfully bypass Cloudflare, Akamai, and custom bot detection
```
- **🎨 Multi-URL Configuration**: Different strategies for different URL patterns in one batch:
```python
from crawl4ai import CrawlerRunConfig, MatchMode
configs = [
# Documentation sites - aggressive caching
CrawlerRunConfig(
url_matcher=["*docs*", "*documentation*"],
cache_mode="write",
markdown_generator_options={"include_links": True}
),
# News/blog sites - fresh content
CrawlerRunConfig(
url_matcher=lambda url: 'blog' in url or 'news' in url,
cache_mode="bypass"
),
# Fallback for everything else
CrawlerRunConfig()
]
results = await crawler.arun_many(urls, config=configs)
# Each URL gets the perfect configuration automatically
```
- **🧠 Memory Monitoring**: Track and optimize memory usage during crawling:
```python
from crawl4ai.memory_utils import MemoryMonitor
monitor = MemoryMonitor()
monitor.start_monitoring()
results = await crawler.arun_many(large_url_list)
report = monitor.get_report()
print(f"Peak memory: {report['peak_mb']:.1f} MB")
print(f"Efficiency: {report['efficiency']:.1f}%")
# Get optimization recommendations
```
- **📊 Enhanced Table Extraction**: Direct DataFrame conversion from web tables:
```python
result = await crawler.arun("https://site-with-tables.com")
# New way - direct table access
if result.tables:
import pandas as pd
for table in result.tables:
df = pd.DataFrame(table['data'])
print(f"Table: {df.shape[0]} rows × {df.shape[1]} columns")
```
- **💰 GitHub Sponsors**: 4-tier sponsorship system for project sustainability
- **🐳 Docker LLM Flexibility**: Configure providers via environment variables
[Full v0.7.3 Release Notes →](https://github.com/unclecode/crawl4ai/blob/main/docs/blog/release-v0.7.3.md)
</details>
<details>
<summary><strong>Version 0.7.0 Release Highlights - The Adaptive Intelligence Update</strong></summary>
- **🧠 Adaptive Crawling**: Your crawler now learns and adapts to website patterns automatically:
```python
@@ -607,6 +689,8 @@ async def test_news_crawl():
Read the full details in our [0.7.0 Release Notes](https://docs.crawl4ai.com/blog/release-v0.7.0) or check the [CHANGELOG](https://github.com/unclecode/crawl4ai/blob/main/CHANGELOG.md).
</details>
## Version Numbering in Crawl4AI
Crawl4AI follows standard Python version numbering conventions (PEP 440) to help users understand the stability and features of each release.

View File

@@ -448,6 +448,10 @@ class BrowserConfig:
self.chrome_channel = ""
self.proxy = proxy
self.proxy_config = proxy_config
if isinstance(self.proxy_config, dict):
self.proxy_config = ProxyConfig.from_dict(self.proxy_config)
if isinstance(self.proxy_config, str):
self.proxy_config = ProxyConfig.from_string(self.proxy_config)
self.viewport_width = viewport_width
@@ -1159,6 +1163,11 @@ class CrawlerRunConfig():
self.parser_type = parser_type
self.scraping_strategy = scraping_strategy or LXMLWebScrapingStrategy()
self.proxy_config = proxy_config
if isinstance(proxy_config, dict):
self.proxy_config = ProxyConfig.from_dict(proxy_config)
if isinstance(proxy_config, str):
self.proxy_config = ProxyConfig.from_string(proxy_config)
self.proxy_rotation_strategy = proxy_rotation_strategy
# Browser Location and Identity Parameters

View File

@@ -407,32 +407,34 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
t.cancel()
raise exc
# If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try:
# Try to get a task with timeout to avoid blocking indefinitely
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for(
self.task_queue.get(), timeout=0.1
)
# Create and start the task
task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
# If memory pressure is low, greedily fill all available slots
if not self.memory_pressure_mode:
slots = self.max_session_permit - len(active_tasks)
while slots > 0:
try:
# Use get_nowait() to immediately get tasks without blocking
priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait()
except asyncio.TimeoutError:
# No tasks in queue, that's fine
pass
# Create and start the task
task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
slots -= 1
except asyncio.QueueEmpty:
# No more tasks in queue, exit the loop
break
# Wait for completion even if queue is starved
if active_tasks:
@@ -559,32 +561,34 @@ class MemoryAdaptiveDispatcher(BaseDispatcher):
for t in active_tasks:
t.cancel()
raise exc
# If memory pressure is low, start new tasks
if not self.memory_pressure_mode and len(active_tasks) < self.max_session_permit:
try:
# Try to get a task with timeout
priority, (url, task_id, retry_count, enqueue_time) = await asyncio.wait_for(
self.task_queue.get(), timeout=0.1
)
# Create and start the task
task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
# If memory pressure is low, greedily fill all available slots
if not self.memory_pressure_mode:
slots = self.max_session_permit - len(active_tasks)
while slots > 0:
try:
# Use get_nowait() to immediately get tasks without blocking
priority, (url, task_id, retry_count, enqueue_time) = self.task_queue.get_nowait()
except asyncio.TimeoutError:
# No tasks in queue, that's fine
pass
# Create and start the task
task = asyncio.create_task(
self.crawl_url(url, config, task_id, retry_count)
)
active_tasks.append(task)
# Update waiting time in monitor
if self.monitor:
wait_time = time.time() - enqueue_time
self.monitor.update_task(
task_id,
wait_time=wait_time,
status=CrawlStatus.IN_PROGRESS
)
slots -= 1
except asyncio.QueueEmpty:
# No more tasks in queue, exit the loop
break
# Process completed tasks and yield results
if active_tasks:

View File

@@ -608,6 +608,11 @@ class BrowserManager:
self.contexts_by_config = {}
self._contexts_lock = asyncio.Lock()
# Serialize context.new_page() across concurrent tasks to avoid races
# when using a shared persistent context (context.pages may be empty
# for all racers). Prevents 'Target page/context closed' errors.
self._page_lock = asyncio.Lock()
# Stealth-related attributes
self._stealth_instance = None
self._stealth_cm = None
@@ -1027,13 +1032,26 @@ class BrowserManager:
context = await self.create_browser_context(crawlerRunConfig)
ctx = self.default_context # default context, one window only
ctx = await clone_runtime_state(context, ctx, crawlerRunConfig, self.config)
page = await ctx.new_page()
# Avoid concurrent new_page on shared persistent context
# See GH-1198: context.pages can be empty under races
async with self._page_lock:
page = await ctx.new_page()
else:
context = self.default_context
pages = context.pages
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
if not page:
page = context.pages[0] # await context.new_page()
if pages:
page = pages[0]
else:
# Double-check under lock to avoid TOCTOU and ensure only
# one task calls new_page when pages=[] concurrently
async with self._page_lock:
pages = context.pages
if pages:
page = pages[0]
else:
page = await context.new_page()
else:
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)

View File

@@ -65,6 +65,213 @@ class BrowserProfiler:
self.builtin_config_file = os.path.join(self.builtin_browser_dir, "browser_config.json")
os.makedirs(self.builtin_browser_dir, exist_ok=True)
def _is_windows(self) -> bool:
"""Check if running on Windows platform."""
return sys.platform.startswith('win') or sys.platform == 'cygwin'
def _is_macos(self) -> bool:
"""Check if running on macOS platform."""
return sys.platform == 'darwin'
def _is_linux(self) -> bool:
"""Check if running on Linux platform."""
return sys.platform.startswith('linux')
def _get_quit_message(self, tag: str) -> str:
"""Get appropriate quit message based on context."""
if tag == "PROFILE":
return "Closing browser and saving profile..."
elif tag == "CDP":
return "Closing browser..."
else:
return "Closing browser..."
async def _listen_windows(self, user_done_event, check_browser_process, tag: str):
"""Windows-specific keyboard listener using msvcrt."""
try:
import msvcrt
except ImportError:
raise ImportError("msvcrt module not available on this platform")
while True:
try:
# Check for keyboard input
if msvcrt.kbhit():
raw = msvcrt.getch()
# Handle Unicode decoding more robustly
key = None
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
try:
# Try different encodings
key = raw.decode("latin1")
except UnicodeDecodeError:
# Skip if we can't decode
continue
# Validate key
if not key or len(key) != 1:
continue
# Check for printable characters only
if not key.isprintable():
continue
# Check for quit command
if key.lower() == "q":
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
# Check if browser process ended
if await check_browser_process():
return
# Small delay to prevent busy waiting
await asyncio.sleep(0.1)
except Exception as e:
self.logger.warning(f"Error in Windows keyboard listener: {e}", tag=tag)
# Continue trying instead of failing completely
await asyncio.sleep(0.1)
continue
async def _listen_unix(self, user_done_event: asyncio.Event, check_browser_process, tag: str):
"""Unix/Linux/macOS keyboard listener using termios and select."""
try:
import termios
import tty
import select
except ImportError:
raise ImportError("termios/tty/select modules not available on this platform")
# Get stdin file descriptor
try:
fd = sys.stdin.fileno()
except (AttributeError, OSError):
raise ImportError("stdin is not a terminal")
# Save original terminal settings
old_settings = None
try:
old_settings = termios.tcgetattr(fd)
except termios.error as e:
raise ImportError(f"Cannot get terminal attributes: {e}")
try:
# Switch to non-canonical mode (cbreak mode)
tty.setcbreak(fd)
while True:
try:
# Use select to check if input is available (non-blocking)
# Timeout of 0.5 seconds to periodically check browser process
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
# Read one character
key = sys.stdin.read(1)
if key and key.lower() == "q":
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
# Check if browser process ended
if await check_browser_process():
return
# Small delay to prevent busy waiting
await asyncio.sleep(0.1)
except (KeyboardInterrupt, EOFError):
# Handle Ctrl+C or EOF gracefully
self.logger.info("Keyboard interrupt received", tag=tag)
user_done_event.set()
return
except Exception as e:
self.logger.warning(f"Error in Unix keyboard listener: {e}", tag=tag)
await asyncio.sleep(0.1)
continue
finally:
# Always restore terminal settings
if old_settings is not None:
try:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
except Exception as e:
self.logger.error(f"Failed to restore terminal settings: {e}", tag=tag)
async def _listen_fallback(self, user_done_event: asyncio.Event, check_browser_process, tag: str):
"""Fallback keyboard listener using simple input() method."""
self.logger.info("Using fallback input mode. Type 'q' and press Enter to quit.", tag=tag)
# Run input in a separate thread to avoid blocking
import threading
import queue
input_queue = queue.Queue()
def input_thread():
"""Thread function to handle input."""
try:
while not user_done_event.is_set():
try:
# Use input() with a prompt
user_input = input("Press 'q' + Enter to quit: ").strip().lower()
input_queue.put(user_input)
if user_input == 'q':
break
except (EOFError, KeyboardInterrupt):
input_queue.put('q')
break
except Exception as e:
self.logger.warning(f"Error in input thread: {e}", tag=tag)
break
except Exception as e:
self.logger.error(f"Input thread failed: {e}", tag=tag)
# Start input thread
thread = threading.Thread(target=input_thread, daemon=True)
thread.start()
try:
while not user_done_event.is_set():
# Check for user input
try:
user_input = input_queue.get_nowait()
if user_input == 'q':
self.logger.info(
self._get_quit_message(tag),
tag=tag,
base_color=LogColor.GREEN
)
user_done_event.set()
return
except queue.Empty:
pass
# Check if browser process ended
if await check_browser_process():
return
# Small delay
await asyncio.sleep(0.5)
except Exception as e:
self.logger.error(f"Fallback listener failed: {e}", tag=tag)
user_done_event.set()
async def create_profile(self,
profile_name: Optional[str] = None,
browser_config: Optional[BrowserConfig] = None) -> Optional[str]:
@@ -180,42 +387,38 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import termios
import tty
import select
"""Cross-platform keyboard listener that waits for 'q' key press."""
# First output the prompt
self.logger.info("Press 'q' when you've finished using the browser...", tag="PROFILE")
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
self.logger.info(
"Press {segment} when you've finished using the browser...",
tag="PROFILE",
params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW},
base_color=LogColor.CYAN
)
async def check_browser_process():
"""Check if browser process is still running."""
if (
managed_browser.browser_process
and managed_browser.browser_process.poll() is not None
):
self.logger.info(
"Browser already closed. Ending input listener.", tag="PROFILE"
)
user_done_event.set()
return True
return False
# Try platform-specific implementations with fallback
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == 'q':
self.logger.info("Closing browser and saving profile...", tag="PROFILE", base_color=LogColor.GREEN)
user_done_event.set()
return
# Check if the browser process has already exited
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="PROFILE")
user_done_event.set()
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
if self._is_windows():
await self._listen_windows(user_done_event, check_browser_process, "PROFILE")
else:
await self._listen_unix(user_done_event, check_browser_process, "PROFILE")
except Exception as e:
self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="PROFILE")
self.logger.info("Falling back to simple input mode...", tag="PROFILE")
await self._listen_fallback(user_done_event, check_browser_process, "PROFILE")
try:
from playwright.async_api import async_playwright
@@ -682,42 +885,33 @@ class BrowserProfiler:
# Run keyboard input loop in a separate task
async def listen_for_quit_command():
import termios
import tty
import select
"""Cross-platform keyboard listener that waits for 'q' key press."""
# First output the prompt
self.logger.info("Press 'q' to stop the browser and exit...", tag="CDP")
# Save original terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
self.logger.info(
"Press {segment} to stop the browser and exit...",
tag="CDP",
params={"segment": "'q'"}, colors={"segment": LogColor.YELLOW},
base_color=LogColor.CYAN
)
async def check_browser_process():
"""Check if browser process is still running."""
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return True
return False
# Try platform-specific implementations with fallback
try:
# Switch to non-canonical mode (no line buffering)
tty.setcbreak(fd)
while True:
# Check if input is available (non-blocking)
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
if readable:
key = sys.stdin.read(1)
if key.lower() == 'q':
self.logger.info("Closing browser...", tag="CDP")
user_done_event.set()
return
# Check if the browser process has already exited
if managed_browser.browser_process and managed_browser.browser_process.poll() is not None:
self.logger.info("Browser already closed. Ending input listener.", tag="CDP")
user_done_event.set()
return
await asyncio.sleep(0.1)
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
if self._is_windows():
await self._listen_windows(user_done_event, check_browser_process, "CDP")
else:
await self._listen_unix(user_done_event, check_browser_process, "CDP")
except Exception as e:
self.logger.warning(f"Platform-specific keyboard listener failed: {e}", tag="CDP")
self.logger.info("Falling back to simple input mode...", tag="CDP")
await self._listen_fallback(user_done_event, check_browser_process, "CDP")
# Function to retrieve and display CDP JSON config
async def get_cdp_json(port):

View File

@@ -242,6 +242,16 @@ class LXMLWebScrapingStrategy(ContentScrapingStrategy):
exclude_domains = set(kwargs.get("exclude_domains", []))
# Process links
try:
base_element = element.xpath("//head/base[@href]")
if base_element:
base_href = base_element[0].get("href", "").strip()
if base_href:
url = base_href
except Exception as e:
self._log("error", f"Error extracting base URL: {str(e)}", "SCRAPE")
pass
for link in element.xpath(".//a[@href]"):
href = link.get("href", "").strip()
if not href:

View File

@@ -38,7 +38,14 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
self.include_external = include_external
self.score_threshold = score_threshold
self.max_pages = max_pages
self.logger = logger or logging.getLogger(__name__)
# Type check for logger
if isinstance(logger, dict):
logging.getLogger(__name__).warning(
"BFSDeepCrawlStrategy received a dict as logger; falling back to default logger."
)
self.logger = logging.getLogger(__name__)
else:
self.logger = logger or logging.getLogger(__name__)
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self._pages_crawled = 0

View File

@@ -30,7 +30,7 @@ class Crawl4aiDockerClient:
def __init__(
self,
base_url: str = "http://localhost:8000",
timeout: float = 30.0,
timeout: float = 600.0, # Increased to 10 minutes for crawling operations
verify_ssl: bool = True,
verbose: bool = True,
log_file: Optional[str] = None
@@ -113,21 +113,12 @@ class Crawl4aiDockerClient:
self.logger.info(f"Crawling {len(urls)} URLs {'(streaming)' if is_streaming else ''}", tag="CRAWL")
if is_streaming:
async def stream_results() -> AsyncGenerator[CrawlResult, None]:
async with self._http_client.stream("POST", f"{self.base_url}/crawl/stream", json=data) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.strip():
result = json.loads(line)
if "error" in result:
self.logger.error_status(url=result.get("url", "unknown"), error=result["error"])
continue
self.logger.url_status(url=result.get("url", "unknown"), success=True, timing=result.get("timing", 0.0))
if result.get("status") == "completed":
continue
else:
yield CrawlResult(**result)
return stream_results()
# For streaming, we need to return the async generator properly
# The caller should be able to do: async for result in await client.crawl(...)
async def streaming_wrapper():
async for result in self._stream_crawl_results(data):
yield result
return streaming_wrapper()
response = await self._request("POST", "/crawl", json=data)
result_data = response.json()
@@ -138,6 +129,35 @@ class Crawl4aiDockerClient:
self.logger.success(f"Crawl completed with {len(results)} results", tag="CRAWL")
return results[0] if len(results) == 1 else results
async def _stream_crawl_results(self, data: Dict[str, Any]) -> AsyncGenerator[CrawlResult, None]:
"""Internal method to handle streaming crawl results."""
async with self._http_client.stream("POST", f"{self.base_url}/crawl/stream", json=data) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.strip():
try:
result = json.loads(line)
if "error" in result:
self.logger.error_status(url=result.get("url", "unknown"), error=result["error"])
continue
# Check if this is a crawl result (has required fields)
if "url" in result and "success" in result:
self.logger.url_status(url=result.get("url", "unknown"), success=result.get("success", False), timing=result.get("timing", 0.0))
# Create CrawlResult object properly
crawl_result = CrawlResult(**result)
yield crawl_result
# Skip status-only messages
elif result.get("status") == "completed":
continue
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse streaming response: {e}", tag="STREAM")
continue
except Exception as e:
self.logger.error(f"Error processing streaming result: {e}", tag="STREAM")
continue
async def get_schema(self) -> Dict[str, Any]:
"""Retrieve configuration schemas."""
response = await self._request("GET", "/schema")

View File

@@ -1,4 +1,36 @@
from pydantic import BaseModel, HttpUrl, PrivateAttr, Field
"""
Crawl4AI Models Module
This module contains Pydantic models used throughout the Crawl4AI library.
Key Features:
- ORJSONModel: Base model with ORJSON serialization support
- DeprecatedPropertiesMixin: Global system for handling deprecated properties
- CrawlResult: Main result model with backward compatibility support
Deprecated Properties System:
The DeprecatedPropertiesMixin provides a global way to handle deprecated properties
across all models. Instead of manually excluding deprecated properties in each
model_dump() call, you can simply override the get_deprecated_properties() method:
Example:
class MyModel(ORJSONModel):
name: str
old_field: Optional[str] = None
def get_deprecated_properties(self) -> set[str]:
return {'old_field', 'another_deprecated_field'}
@property
def old_field(self):
raise AttributeError("old_field is deprecated, use name instead")
The system automatically excludes these properties from serialization, preventing
property objects from appearing in JSON output.
"""
from pydantic import BaseModel, ConfigDict,HttpUrl, PrivateAttr, Field
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
from typing import AsyncGenerator
from typing import Generic, TypeVar
@@ -8,7 +40,7 @@ from .ssl_certificate import SSLCertificate
from datetime import datetime
from datetime import timedelta
import orjson
###############################
# Dispatcher Models
###############################
@@ -91,7 +123,122 @@ class TokenUsage:
completion_tokens_details: Optional[dict] = None
prompt_tokens_details: Optional[dict] = None
class UrlModel(BaseModel):
def orjson_default(obj):
# Handle datetime (if not already handled by orjson)
if isinstance(obj, datetime):
return obj.isoformat()
# Handle property objects (convert to string or something else)
if isinstance(obj, property):
return str(obj)
# Last resort: convert to string
return str(obj)
class DeprecatedPropertiesMixin:
"""
Mixin to handle deprecated properties in Pydantic models.
Classes that inherit from this mixin can define deprecated properties
that will be automatically excluded from serialization.
Usage:
1. Override the get_deprecated_properties() method to return a set of deprecated property names
2. The model_dump method will automatically exclude these properties
Example:
class MyModel(ORJSONModel):
def get_deprecated_properties(self) -> set[str]:
return {'old_field', 'legacy_property'}
name: str
old_field: Optional[str] = None # Field definition
@property
def old_field(self): # Property that overrides the field
raise AttributeError("old_field is deprecated, use name instead")
"""
def get_deprecated_properties(self) -> set[str]:
"""
Get deprecated property names for this model.
Override this method in subclasses to define deprecated properties.
Returns:
set[str]: Set of deprecated property names
"""
return set()
@classmethod
def get_all_deprecated_properties(cls) -> set[str]:
"""
Get all deprecated properties from this class and all parent classes.
Returns:
set[str]: Set of all deprecated property names
"""
deprecated_props = set()
# Create an instance to call the instance method
try:
# Try to create a dummy instance to get deprecated properties
dummy_instance = cls.__new__(cls)
deprecated_props.update(dummy_instance.get_deprecated_properties())
except Exception:
# If we can't create an instance, check for class-level definitions
pass
# Also check parent classes
for klass in cls.__mro__:
if hasattr(klass, 'get_deprecated_properties') and klass != DeprecatedPropertiesMixin:
try:
dummy_instance = klass.__new__(klass)
deprecated_props.update(dummy_instance.get_deprecated_properties())
except Exception:
pass
return deprecated_props
def model_dump(self, *args, **kwargs):
"""
Override model_dump to automatically exclude deprecated properties.
This method:
1. Gets the existing exclude set from kwargs
2. Adds all deprecated properties defined in get_deprecated_properties()
3. Calls the parent model_dump with the updated exclude set
"""
# Get the default exclude set, or create empty set if None
exclude = kwargs.get('exclude', set())
if exclude is None:
exclude = set()
elif not isinstance(exclude, set):
exclude = set(exclude) if exclude else set()
# Add deprecated properties for this instance
exclude.update(self.get_deprecated_properties())
kwargs['exclude'] = exclude
return super().model_dump(*args, **kwargs)
class ORJSONModel(DeprecatedPropertiesMixin, BaseModel):
model_config = ConfigDict(
ser_json_timedelta="iso8601", # Optional: format timedelta
ser_json_bytes="utf8", # Optional: bytes → UTF-8 string
)
def model_dump_json(self, **kwargs) -> bytes:
"""Custom JSON serialization using orjson"""
return orjson.dumps(self.model_dump(**kwargs), default=orjson_default)
@classmethod
def model_validate_json(cls, json_data: Union[str, bytes], **kwargs):
"""Custom JSON deserialization using orjson"""
if isinstance(json_data, str):
json_data = json_data.encode()
return cls.model_validate(orjson.loads(json_data), **kwargs)
class UrlModel(ORJSONModel):
url: HttpUrl
forced: bool = False
@@ -108,7 +255,7 @@ class TraversalStats:
total_depth_reached: int = 0
current_depth: int = 0
class DispatchResult(BaseModel):
class DispatchResult(ORJSONModel):
task_id: str
memory_usage: float
peak_memory: float
@@ -116,7 +263,7 @@ class DispatchResult(BaseModel):
end_time: Union[datetime, float]
error_message: str = ""
class MarkdownGenerationResult(BaseModel):
class MarkdownGenerationResult(ORJSONModel):
raw_markdown: str
markdown_with_citations: str
references_markdown: str
@@ -126,7 +273,7 @@ class MarkdownGenerationResult(BaseModel):
def __str__(self):
return self.raw_markdown
class CrawlResult(BaseModel):
class CrawlResult(ORJSONModel):
url: str
html: str
fit_html: Optional[str] = None
@@ -156,6 +303,10 @@ class CrawlResult(BaseModel):
class Config:
arbitrary_types_allowed = True
def get_deprecated_properties(self) -> set[str]:
"""Define deprecated properties that should be excluded from serialization."""
return {'fit_html', 'fit_markdown', 'markdown_v2'}
# NOTE: The StringCompatibleMarkdown class, custom __init__ method, property getters/setters,
# and model_dump override all exist to support a smooth transition from markdown as a string
# to markdown as a MarkdownGenerationResult object, while maintaining backward compatibility.
@@ -245,14 +396,16 @@ class CrawlResult(BaseModel):
1. PrivateAttr fields are excluded from serialization by default
2. We need to maintain backward compatibility by including the 'markdown' field
in the serialized output
3. We're transitioning from 'markdown_v2' to enhancing 'markdown' to hold
the same type of data
3. Uses the DeprecatedPropertiesMixin to automatically exclude deprecated properties
Future developers: This method ensures that the markdown content is properly
serialized despite being stored in a private attribute. If the serialization
requirements change, this is where you would update the logic.
serialized despite being stored in a private attribute. The deprecated properties
are automatically handled by the mixin.
"""
# Use the parent class method which handles deprecated properties automatically
result = super().model_dump(*args, **kwargs)
# Add the markdown content if it exists
if self._markdown is not None:
result["markdown"] = self._markdown.model_dump()
return result
@@ -307,7 +460,7 @@ RunManyReturn = Union[
# 1. Replace the private attribute and property with a standard field
# 2. Update any serialization logic that might depend on the current behavior
class AsyncCrawlResponse(BaseModel):
class AsyncCrawlResponse(ORJSONModel):
html: str
response_headers: Dict[str, str]
js_execution_result: Optional[Dict[str, Any]] = None
@@ -328,7 +481,7 @@ class AsyncCrawlResponse(BaseModel):
###############################
# Scraping Models
###############################
class MediaItem(BaseModel):
class MediaItem(ORJSONModel):
src: Optional[str] = ""
data: Optional[str] = ""
alt: Optional[str] = ""
@@ -340,7 +493,7 @@ class MediaItem(BaseModel):
width: Optional[int] = None
class Link(BaseModel):
class Link(ORJSONModel):
href: Optional[str] = ""
text: Optional[str] = ""
title: Optional[str] = ""
@@ -353,7 +506,7 @@ class Link(BaseModel):
total_score: Optional[float] = None # Combined score from intrinsic and contextual scores
class Media(BaseModel):
class Media(ORJSONModel):
images: List[MediaItem] = []
videos: List[
MediaItem
@@ -364,12 +517,12 @@ class Media(BaseModel):
tables: List[Dict] = [] # Table data extracted from HTML tables
class Links(BaseModel):
class Links(ORJSONModel):
internal: List[Link] = []
external: List[Link] = []
class ScrapingResult(BaseModel):
class ScrapingResult(ORJSONModel):
cleaned_html: str
success: bool
media: Media = Media()

View File

@@ -1,5 +1,6 @@
import os
import json
import orjson
import asyncio
from typing import List, Tuple, Dict
from functools import partial
@@ -65,7 +66,7 @@ async def handle_llm_qa(
) -> str:
"""Process QA using LLM with crawled content as context."""
try:
if not url.startswith(('http://', 'https://')):
if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")):
url = 'https://' + url
# Extract base URL by finding last '?q=' occurrence
last_q_index = url.rfind('?q=')
@@ -191,7 +192,7 @@ async def handle_markdown_request(
detail=error_msg
)
decoded_url = unquote(url)
if not decoded_url.startswith(('http://', 'https://')):
if not decoded_url.startswith(('http://', 'https://')) and not decoded_url.startswith(("raw:", "raw://")):
decoded_url = 'https://' + decoded_url
if filter_type == FilterType.RAW:
@@ -328,7 +329,7 @@ async def create_new_task(
) -> JSONResponse:
"""Create and initialize a new task."""
decoded_url = unquote(input_path)
if not decoded_url.startswith(('http://', 'https://')):
if not decoded_url.startswith(('http://', 'https://')) and not decoded_url.startswith(("raw:", "raw://")):
decoded_url = 'https://' + decoded_url
from datetime import datetime
@@ -384,27 +385,60 @@ def create_task_response(task: dict, task_id: str, base_url: str) -> dict:
async def stream_results(crawler: AsyncWebCrawler, results_gen: AsyncGenerator) -> AsyncGenerator[bytes, None]:
"""Stream results with heartbeats and completion markers."""
import json
from utils import datetime_handler
import orjson
from datetime import datetime
import inspect
def orjson_default(obj):
# Handle datetime (if not already handled by orjson)
if isinstance(obj, datetime):
return obj.isoformat()
# Handle property objects (convert to string or something else)
if isinstance(obj, property):
return str(obj)
# Last resort: convert to string
return str(obj)
try:
async for result in results_gen:
try:
server_memory_mb = _get_memory_mb()
result_dict = result.model_dump()
result_dict['server_memory_mb'] = server_memory_mb
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
data = json.dumps(result_dict, default=datetime_handler) + "\n"
yield data.encode('utf-8')
except Exception as e:
logger.error(f"Serialization error: {e}")
error_response = {"error": str(e), "url": getattr(result, 'url', 'unknown')}
yield (json.dumps(error_response) + "\n").encode('utf-8')
logger.info(f"Starting streaming with results_gen type: {type(results_gen)}")
logger.info(f"Is results_gen async generator: {inspect.isasyncgen(results_gen)}")
# Check if results_gen is actually an async generator vs another type
if inspect.isasyncgen(results_gen):
logger.info("Processing as async generator")
async for result in results_gen:
try:
logger.info(f"Processing streaming result of type: {type(result)}")
# Check if this result is actually a CrawlResult
if hasattr(result, 'model_dump_json'):
server_memory_mb = _get_memory_mb()
result_json = result.model_dump_json()
result_dict = orjson.loads(result_json)
result_dict['server_memory_mb'] = server_memory_mb
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
logger.info(f"Streaming result for {result_dict.get('url', 'unknown')}")
data = orjson.dumps(result_dict, default=orjson_default).decode('utf-8') + "\n"
yield data.encode('utf-8')
else:
logger.error(f"Result doesn't have model_dump_json method: {type(result)}")
error_response = {"error": f"Invalid result type: {type(result)}", "url": "unknown"}
yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8')
except Exception as e:
logger.error(f"Serialization error: {e}")
logger.error(f"Result type was: {type(result)}")
error_response = {"error": str(e), "url": getattr(result, 'url', 'unknown')}
yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8')
else:
logger.error(f"results_gen is not an async generator: {type(results_gen)}")
error_response = {"error": f"Invalid results_gen type: {type(results_gen)}"}
yield (orjson.dumps(error_response).decode('utf-8') + "\n").encode('utf-8')
yield json.dumps({"status": "completed"}).encode('utf-8')
yield orjson.dumps({"status": "completed"}).decode('utf-8').encode('utf-8')
except asyncio.CancelledError:
logger.warning("Client disconnected during streaming")
@@ -428,7 +462,7 @@ async def handle_crawl_request(
peak_mem_mb = start_mem_mb
try:
urls = [('https://' + url) if not url.startswith(('http://', 'https://')) else url for url in urls]
urls = [('https://' + url) if not url.startswith(('http://', 'https://')) and not url.startswith(("raw:", "raw://")) else url for url in urls]
browser_config = BrowserConfig.load(browser_config)
crawler_config = CrawlerRunConfig.load(crawler_config)
@@ -472,7 +506,9 @@ async def handle_crawl_request(
# Process results to handle PDF bytes
processed_results = []
for result in results:
result_dict = result.model_dump()
# Use ORJSON serialization to handle property objects properly
result_json = result.model_dump_json()
result_dict = orjson.loads(result_json)
# If PDF exists, encode it to base64
if result_dict.get('pdf') is not None:
result_dict['pdf'] = b64encode(result_dict['pdf']).decode('utf-8')
@@ -522,8 +558,19 @@ async def handle_stream_crawl_request(
browser_config.verbose = False
crawler_config = CrawlerRunConfig.load(crawler_config)
crawler_config.scraping_strategy = LXMLWebScrapingStrategy()
crawler_config.stream = True
# Don't force stream=True here - let the deep crawl strategy control its own streaming behavior
# Apply global base config (this was missing!)
base_config = config["crawler"]["base_config"]
for key, value in base_config.items():
if hasattr(crawler_config, key):
print(f"[DEBUG] Applying base_config: {key} = {value}")
setattr(crawler_config, key, value)
print(f"[DEBUG] Deep crawl strategy: {type(crawler_config.deep_crawl_strategy).__name__ if crawler_config.deep_crawl_strategy else 'None'}")
print(f"[DEBUG] Stream mode: {crawler_config.stream}")
print(f"[DEBUG] Simulate user: {getattr(crawler_config, 'simulate_user', 'Not set')}")
dispatcher = MemoryAdaptiveDispatcher(
memory_threshold_percent=config["crawler"]["memory_threshold_percent"],
rate_limiter=RateLimiter(
@@ -537,11 +584,58 @@ async def handle_stream_crawl_request(
# crawler = AsyncWebCrawler(config=browser_config)
# await crawler.start()
results_gen = await crawler.arun_many(
urls=urls,
config=crawler_config,
dispatcher=dispatcher
)
# Use correct method based on URL count (same as regular endpoint)
if len(urls) == 1:
# For single URL, use arun to get CrawlResult, then wrap in async generator
single_result_container = await crawler.arun(
url=urls[0],
config=crawler_config,
dispatcher=dispatcher
)
async def single_result_generator():
# Handle CrawlResultContainer - extract the actual results
if hasattr(single_result_container, '_results'):
# It's a CrawlResultContainer - iterate over the internal results
for result in single_result_container._results:
# Check if the result is an async generator (from deep crawl)
if hasattr(result, '__aiter__'):
async for sub_result in result:
yield sub_result
else:
yield result
elif hasattr(single_result_container, '__aiter__'):
# It's an async generator (from streaming deep crawl)
async for result in single_result_container:
yield result
elif hasattr(single_result_container, '__iter__') and not hasattr(single_result_container, 'url'):
# It's iterable but not a CrawlResult itself
for result in single_result_container:
# Check if each result is an async generator
if hasattr(result, '__aiter__'):
async for sub_result in result:
yield sub_result
else:
yield result
else:
# It's a single CrawlResult
yield single_result_container
results_gen = single_result_generator()
else:
# For multiple URLs, use arun_many
results_gen = await crawler.arun_many(
urls=urls,
config=crawler_config,
dispatcher=dispatcher
)
# If results_gen is a list (e.g., from deep crawl), convert to async generator
if isinstance(results_gen, list):
async def convert_list_to_generator():
for result in results_gen:
yield result
results_gen = convert_list_to_generator()
return crawler, results_gen

View File

@@ -7,13 +7,16 @@ Crawl4AI FastAPI entrypoint
"""
# ── stdlib & 3rdparty imports ───────────────────────────────
from datetime import datetime
import orjson
from crawler_pool import get_crawler, close_all, janitor
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from auth import create_access_token, get_token_dependency, TokenRequest
from pydantic import BaseModel
from typing import Optional, List, Dict
from fastapi import Request, Depends
from fastapi.responses import FileResponse
from fastapi.responses import FileResponse, ORJSONResponse
import base64
import re
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
@@ -32,6 +35,8 @@ from schemas import (
JSEndpointRequest,
)
# Use the proper serialization functions from async_configs
from crawl4ai.async_configs import to_serializable_dict
from utils import (
FilterType, load_config, setup_logging, verify_email_domain
)
@@ -112,11 +117,26 @@ async def lifespan(_: FastAPI):
app.state.janitor.cancel()
await close_all()
def orjson_default(obj):
# Handle datetime (if not already handled by orjson)
if isinstance(obj, datetime):
return obj.isoformat()
# Handle property objects (convert to string or something else)
if isinstance(obj, property):
return str(obj)
# Last resort: convert to string
return str(obj)
def orjson_dumps(v, *, default):
return orjson.dumps(v, default=orjson_default).decode()
# ───────────────────── FastAPI instance ──────────────────────
app = FastAPI(
title=config["app"]["title"],
version=config["app"]["version"],
lifespan=lifespan,
default_response_class=ORJSONResponse
)
# ── static playground ──────────────────────────────────────
@@ -237,9 +257,9 @@ async def get_markdown(
body: MarkdownRequest,
_td: Dict = Depends(token_dep),
):
if not body.url.startswith(("http://", "https://")):
if not body.url.startswith(("http://", "https://")) and not body.url.startswith(("raw:", "raw://")):
raise HTTPException(
400, "URL must be absolute and start with http/https")
400, "Invalid URL format. Must start with http://, https://, or for raw HTML (raw:, raw://)")
markdown = await handle_markdown_request(
body.url, body.f, body.q, body.c, config, body.provider
)
@@ -401,7 +421,7 @@ async def llm_endpoint(
):
if not q:
raise HTTPException(400, "Query parameter 'q' is required")
if not url.startswith(("http://", "https://")):
if not url.startswith(("http://", "https://")) and not url.startswith(("raw:", "raw://")):
url = "https://" + url
answer = await handle_llm_qa(url, q, config)
return JSONResponse({"answer": answer})
@@ -435,15 +455,20 @@ async def crawl(
"""
Crawl a list of URLs and return the results as JSON.
"""
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
res = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
return JSONResponse(res)
try:
if not crawl_request.urls:
raise HTTPException(400, "At least one URL required")
res = await handle_crawl_request(
urls=crawl_request.urls,
browser_config=crawl_request.browser_config,
crawler_config=crawl_request.crawler_config,
config=config,
)
# handle_crawl_request returns a dictionary, so we can pass it directly to ORJSONResponse
return ORJSONResponse(res)
except Exception as e:
print(f"Error occurred: {e}")
return ORJSONResponse({"error": str(e)}, status_code=500)
@app.post("/crawl/stream")

View File

@@ -8,10 +8,14 @@ Today I'm releasing Crawl4AI v0.7.3—the Multi-Config Intelligence Update. This
## 🎯 What's New at a Glance
- **Multi-URL Configurations**: Different crawling strategies for different URL patterns in a single batch
- **Flexible Docker LLM Providers**: Configure LLM providers via environment variables
- **Bug Fixes**: Resolved several critical issues for better stability
- **Documentation Updates**: Clearer examples and improved API documentation
- **🕵️ Undetected Browser Support**: Stealth mode for bypassing bot detection systems
- **🎨 Multi-URL Configurations**: Different crawling strategies for different URL patterns in a single batch
- **🐳 Flexible Docker LLM Providers**: Configure LLM providers via environment variables
- **🧠 Memory Monitoring**: Enhanced memory usage tracking and optimization tools
- **📊 Enhanced Table Extraction**: Improved table access and DataFrame conversion
- **💰 GitHub Sponsors**: 4-tier sponsorship system with custom arrangements
- **🔧 Bug Fixes**: Resolved several critical issues for better stability
- **📚 Documentation Updates**: Clearer examples and improved API documentation
## 🎨 Multi-URL Configurations: One Size Doesn't Fit All
@@ -78,6 +82,182 @@ async with AsyncWebCrawler() as crawler:
- **Reduced Complexity**: No more if/else forests in your extraction code
- **Better Performance**: Each URL gets exactly the processing it needs
## 🕵️ Undetected Browser Support: Stealth Mode Activated
**The Problem:** Modern websites employ sophisticated bot detection systems. Cloudflare, Akamai, and custom solutions block automated crawlers, limiting access to valuable content.
**My Solution:** I implemented undetected browser support with a flexible adapter pattern. Now Crawl4AI can bypass most bot detection systems using stealth techniques.
### Technical Implementation
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig
# Enable undetected mode for stealth crawling
browser_config = BrowserConfig(
browser_type="undetected", # Use undetected Chrome
headless=True, # Can run headless with stealth
extra_args=[
"--disable-blink-features=AutomationControlled",
"--disable-web-security",
"--disable-features=VizDisplayCompositor"
]
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# This will bypass most bot detection systems
result = await crawler.arun("https://protected-site.com")
if result.success:
print("✅ Successfully bypassed bot detection!")
print(f"Content length: {len(result.markdown)}")
```
**Advanced Anti-Bot Strategies:**
```python
# Combine multiple stealth techniques
from crawl4ai import CrawlerRunConfig
config = CrawlerRunConfig(
# Random user agents and headers
headers={
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"DNT": "1"
},
# Human-like behavior simulation
js_code="""
// Random mouse movements
const simulateHuman = () => {
const event = new MouseEvent('mousemove', {
clientX: Math.random() * window.innerWidth,
clientY: Math.random() * window.innerHeight
});
document.dispatchEvent(event);
};
setInterval(simulateHuman, 100 + Math.random() * 200);
// Random scrolling
const randomScroll = () => {
const scrollY = Math.random() * (document.body.scrollHeight - window.innerHeight);
window.scrollTo(0, scrollY);
};
setTimeout(randomScroll, 500 + Math.random() * 1000);
""",
# Delay to appear more human
delay_before_return_html=2.0
)
result = await crawler.arun("https://bot-protected-site.com", config=config)
```
**Expected Real-World Impact:**
- **Enterprise Scraping**: Access previously blocked corporate sites and databases
- **Market Research**: Gather data from competitor sites with protection
- **Price Monitoring**: Track e-commerce sites that block automated access
- **Content Aggregation**: Collect news and social media despite anti-bot measures
- **Compliance Testing**: Verify your own site's bot protection effectiveness
## 🧠 Memory Monitoring & Optimization
**The Problem:** Long-running crawl sessions consuming excessive memory, especially when processing large batches or heavy JavaScript sites.
**My Solution:** Built comprehensive memory monitoring and optimization utilities that track usage patterns and provide actionable insights.
### Memory Tracking Implementation
```python
from crawl4ai.memory_utils import MemoryMonitor, get_memory_info
# Monitor memory during crawling
monitor = MemoryMonitor()
async with AsyncWebCrawler() as crawler:
# Start monitoring
monitor.start_monitoring()
# Perform memory-intensive operations
results = await crawler.arun_many([
"https://heavy-js-site.com",
"https://large-images-site.com",
"https://dynamic-content-site.com"
])
# Get detailed memory report
memory_report = monitor.get_report()
print(f"Peak memory usage: {memory_report['peak_mb']:.1f} MB")
print(f"Memory efficiency: {memory_report['efficiency']:.1f}%")
# Automatic cleanup suggestions
if memory_report['peak_mb'] > 1000: # > 1GB
print("💡 Consider batch size optimization")
print("💡 Enable aggressive garbage collection")
```
**Expected Real-World Impact:**
- **Production Stability**: Prevent memory-related crashes in long-running services
- **Cost Optimization**: Right-size server resources based on actual usage
- **Performance Tuning**: Identify memory bottlenecks and optimization opportunities
- **Scalability Planning**: Understand memory patterns for horizontal scaling
## 📊 Enhanced Table Extraction
**The Problem:** Table data was accessed through the generic `result.media` interface, making DataFrame conversion cumbersome and unclear.
**My Solution:** Dedicated `result.tables` interface with direct DataFrame conversion and improved detection algorithms.
### New Table Access Pattern
```python
# Old way (deprecated)
# tables_data = result.media.get('tables', [])
# New way (v0.7.3+)
result = await crawler.arun("https://site-with-tables.com")
# Direct table access
if result.tables:
print(f"Found {len(result.tables)} tables")
# Convert to pandas DataFrame instantly
import pandas as pd
for i, table in enumerate(result.tables):
df = pd.DataFrame(table['data'])
print(f"Table {i}: {df.shape[0]} rows × {df.shape[1]} columns")
print(df.head())
# Table metadata
print(f"Source: {table.get('source_xpath', 'Unknown')}")
print(f"Headers: {table.get('headers', [])}")
```
**Expected Real-World Impact:**
- **Data Analysis**: Faster transition from web data to analysis-ready DataFrames
- **ETL Pipelines**: Cleaner integration with data processing workflows
- **Reporting**: Simplified table extraction for automated reporting systems
## 💰 Community Support: GitHub Sponsors
I've launched GitHub Sponsors to ensure Crawl4AI's continued development and support our growing community.
**Sponsorship Tiers:**
- **🌱 Supporter ($5/month)**: Community support + early feature previews
- **🚀 Professional ($25/month)**: Priority support + beta access
- **🏢 Business ($100/month)**: Direct consultation + custom integrations
- **🏛️ Enterprise ($500/month)**: Dedicated support + feature development
**Why Sponsor?**
- Ensure continuous development and maintenance
- Get priority support and feature requests
- Access to premium documentation and examples
- Direct line to the development team
[**Become a Sponsor →**](https://github.com/sponsors/unclecode)
## 🐳 Docker: Flexible LLM Provider Configuration
**The Problem:** Hardcoded LLM providers in Docker deployments. Want to switch from OpenAI to Groq? Rebuild and redeploy. Testing different models? Multiple Docker images.

View File

@@ -8,26 +8,20 @@ from typing import Dict, Any
class Crawl4AiTester:
def __init__(self, base_url: str = "http://localhost:11235", api_token: str = None):
def __init__(self, base_url: str = "http://localhost:11235"):
self.base_url = base_url
self.api_token = (
api_token or os.getenv("CRAWL4AI_API_TOKEN") or "test_api_code"
) # Check environment variable as fallback
self.headers = (
{"Authorization": f"Bearer {self.api_token}"} if self.api_token else {}
)
def submit_and_wait(
self, request_data: Dict[str, Any], timeout: int = 300
) -> Dict[str, Any]:
# Submit crawl job
# Submit crawl job using async endpoint
response = requests.post(
f"{self.base_url}/crawl", json=request_data, headers=self.headers
f"{self.base_url}/crawl/job", json=request_data
)
if response.status_code == 403:
raise Exception("API token is invalid or missing")
task_id = response.json()["task_id"]
print(f"Task ID: {task_id}")
response.raise_for_status()
job_response = response.json()
task_id = job_response["task_id"]
print(f"Submitted job with task_id: {task_id}")
# Poll for result
start_time = time.time()
@@ -38,8 +32,9 @@ class Crawl4AiTester:
)
result = requests.get(
f"{self.base_url}/task/{task_id}", headers=self.headers
f"{self.base_url}/crawl/job/{task_id}"
)
result.raise_for_status()
status = result.json()
if status["status"] == "failed":
@@ -52,10 +47,10 @@ class Crawl4AiTester:
time.sleep(2)
def submit_sync(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
# Use synchronous crawl endpoint
response = requests.post(
f"{self.base_url}/crawl_sync",
f"{self.base_url}/crawl",
json=request_data,
headers=self.headers,
timeout=60,
)
if response.status_code == 408:
@@ -63,20 +58,9 @@ class Crawl4AiTester:
response.raise_for_status()
return response.json()
def crawl_direct(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Directly crawl without using task queue"""
response = requests.post(
f"{self.base_url}/crawl_direct", json=request_data, headers=self.headers
)
response.raise_for_status()
return response.json()
def test_docker_deployment(version="basic"):
tester = Crawl4AiTester(
base_url="http://localhost:11235",
# base_url="https://api.crawl4ai.com" # just for example
# api_token="test" # just for example
)
print(f"Testing Crawl4AI Docker {version} version")
@@ -95,11 +79,8 @@ def test_docker_deployment(version="basic"):
time.sleep(5)
# Test cases based on version
test_basic_crawl_direct(tester)
test_basic_crawl(tester)
test_basic_crawl(tester)
test_basic_crawl_sync(tester)
if version in ["full", "transformer"]:
test_cosine_extraction(tester)
@@ -112,115 +93,129 @@ def test_docker_deployment(version="basic"):
def test_basic_crawl(tester: Crawl4AiTester):
print("\n=== Testing Basic Crawl ===")
print("\n=== Testing Basic Crawl (Async) ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 10,
"session_id": "test",
"urls": ["https://www.nbcnews.com/business"],
"browser_config": {},
"crawler_config": {}
}
result = tester.submit_and_wait(request)
print(f"Basic crawl result length: {len(result['result']['markdown'])}")
print(f"Basic crawl result count: {len(result['result']['results'])}")
assert result["result"]["success"]
assert len(result["result"]["markdown"]) > 0
assert len(result["result"]["results"]) > 0
assert len(result["result"]["results"][0]["markdown"]) > 0
def test_basic_crawl_sync(tester: Crawl4AiTester):
print("\n=== Testing Basic Crawl (Sync) ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 10,
"session_id": "test",
"urls": ["https://www.nbcnews.com/business"],
"browser_config": {},
"crawler_config": {}
}
result = tester.submit_sync(request)
print(f"Basic crawl result length: {len(result['result']['markdown'])}")
assert result["status"] == "completed"
assert result["result"]["success"]
assert len(result["result"]["markdown"]) > 0
def test_basic_crawl_direct(tester: Crawl4AiTester):
print("\n=== Testing Basic Crawl (Direct) ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 10,
# "session_id": "test"
"cache_mode": "bypass", # or "enabled", "disabled", "read_only", "write_only"
}
result = tester.crawl_direct(request)
print(f"Basic crawl result length: {len(result['result']['markdown'])}")
assert result["result"]["success"]
assert len(result["result"]["markdown"]) > 0
print(f"Basic crawl result count: {len(result['results'])}")
assert result["success"]
assert len(result["results"]) > 0
assert len(result["results"][0]["markdown"]) > 0
def test_js_execution(tester: Crawl4AiTester):
print("\n=== Testing JS Execution ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 8,
"js_code": [
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"
],
"wait_for": "article.tease-card:nth-child(10)",
"crawler_params": {"headless": True},
"urls": ["https://www.nbcnews.com/business"],
"browser_config": {"headless": True},
"crawler_config": {
"js_code": [
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); if(loadMoreButton) loadMoreButton.click();"
],
"wait_for": "wide-tease-item__wrapper df flex-column flex-row-m flex-nowrap-m enable-new-sports-feed-mobile-design(10)"
}
}
result = tester.submit_and_wait(request)
print(f"JS execution result length: {len(result['result']['markdown'])}")
print(f"JS execution result count: {len(result['result']['results'])}")
assert result["result"]["success"]
def test_css_selector(tester: Crawl4AiTester):
print("\n=== Testing CSS Selector ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 7,
"css_selector": ".wide-tease-item__description",
"crawler_params": {"headless": True},
"extra": {"word_count_threshold": 10},
"urls": ["https://www.nbcnews.com/business"],
"browser_config": {"headless": True},
"crawler_config": {
"css_selector": ".wide-tease-item__description",
"word_count_threshold": 10
}
}
result = tester.submit_and_wait(request)
print(f"CSS selector result length: {len(result['result']['markdown'])}")
print(f"CSS selector result count: {len(result['result']['results'])}")
assert result["result"]["success"]
def test_structured_extraction(tester: Crawl4AiTester):
print("\n=== Testing Structured Extraction ===")
schema = {
"name": "Coinbase Crypto Prices",
"baseSelector": ".cds-tableRow-t45thuk",
"name": "Cryptocurrency Prices",
"baseSelector": "table[data-testid=\"prices-table\"] tbody tr",
"fields": [
{
"name": "crypto",
"selector": "td:nth-child(1) h2",
"type": "text",
"name": "asset_name",
"selector": "td:nth-child(2) p.cds-headline-h4steop",
"type": "text"
},
{
"name": "symbol",
"selector": "td:nth-child(1) p",
"type": "text",
"name": "asset_symbol",
"selector": "td:nth-child(2) p.cds-label2-l1sm09ec",
"type": "text"
},
{
"name": "asset_image_url",
"selector": "td:nth-child(2) img[alt=\"Asset Symbol\"]",
"type": "attribute",
"attribute": "src"
},
{
"name": "asset_url",
"selector": "td:nth-child(2) a[aria-label^=\"Asset page for\"]",
"type": "attribute",
"attribute": "href"
},
{
"name": "price",
"selector": "td:nth-child(2)",
"type": "text",
"selector": "td:nth-child(3) div.cds-typographyResets-t6muwls.cds-body-bwup3gq",
"type": "text"
},
],
{
"name": "change",
"selector": "td:nth-child(7) p.cds-body-bwup3gq",
"type": "text"
}
]
}
request = {
"urls": "https://www.coinbase.com/explore",
"priority": 9,
"extraction_config": {"type": "json_css", "params": {"schema": schema}},
"urls": ["https://www.coinbase.com/explore"],
"browser_config": {},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"extraction_strategy": {
"type": "JsonCssExtractionStrategy",
"params": {"schema": schema}
}
}
}
}
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print(f"Extracted {len(extracted)} items")
print("Sample item:", json.dumps(extracted[0], indent=2))
if extracted:
print("Sample item:", json.dumps(extracted[0], indent=2))
assert result["result"]["success"]
assert len(extracted) > 0
@@ -230,43 +225,54 @@ def test_llm_extraction(tester: Crawl4AiTester):
schema = {
"type": "object",
"properties": {
"model_name": {
"asset_name": {
"type": "string",
"description": "Name of the OpenAI model.",
"description": "Name of the asset.",
},
"input_fee": {
"price": {
"type": "string",
"description": "Fee for input token for the OpenAI model.",
"description": "Price of the asset.",
},
"output_fee": {
"change": {
"type": "string",
"description": "Fee for output token for the OpenAI model.",
"description": "Change in price of the asset.",
},
},
"required": ["model_name", "input_fee", "output_fee"],
"required": ["asset_name", "price", "change"],
}
request = {
"urls": "https://openai.com/api/pricing",
"priority": 8,
"extraction_config": {
"type": "llm",
"urls": ["https://www.coinbase.com/en-in/explore"],
"browser_config": {},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"provider": "openai/gpt-4o-mini",
"api_token": os.getenv("OPENAI_API_KEY"),
"schema": schema,
"extraction_type": "schema",
"instruction": """From the crawled content, extract all mentioned model names along with their fees for input and output tokens.""",
},
},
"crawler_params": {"word_count_threshold": 1},
"extraction_strategy": {
"type": "LLMExtractionStrategy",
"params": {
"llm_config": {
"type": "LLMConfig",
"params": {
"provider": "gemini/gemini-2.0-flash-exp",
"api_token": os.getenv("GEMINI_API_KEY")
}
},
"schema": schema,
"extraction_type": "schema",
"instruction": "From the crawled content, extract asset names along with their prices and change in price.",
}
},
"word_count_threshold": 1
}
}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
print(f"Extracted {len(extracted)} model pricing entries")
print("Sample entry:", json.dumps(extracted[0], indent=2))
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print(f"Extracted {len(extracted)} asset pricing entries")
if extracted:
print("Sample entry:", json.dumps(extracted[0], indent=2))
assert result["result"]["success"]
except Exception as e:
print(f"LLM extraction test failed (might be due to missing API key): {str(e)}")
@@ -274,6 +280,16 @@ def test_llm_extraction(tester: Crawl4AiTester):
def test_llm_with_ollama(tester: Crawl4AiTester):
print("\n=== Testing LLM with Ollama ===")
# Check if Ollama is accessible first
try:
ollama_response = requests.get("http://localhost:11434/api/tags", timeout=5)
ollama_response.raise_for_status()
print("Ollama is accessible")
except:
print("Ollama is not accessible, skipping test")
return
schema = {
"type": "object",
"properties": {
@@ -294,24 +310,33 @@ def test_llm_with_ollama(tester: Crawl4AiTester):
}
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 8,
"extraction_config": {
"type": "llm",
"urls": ["https://www.nbcnews.com/business"],
"browser_config": {"verbose": True},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"provider": "ollama/llama2",
"schema": schema,
"extraction_type": "schema",
"instruction": "Extract the main article information including title, summary, and main topics.",
},
},
"extra": {"word_count_threshold": 1},
"crawler_params": {"verbose": True},
"extraction_strategy": {
"type": "LLMExtractionStrategy",
"params": {
"llm_config": {
"type": "LLMConfig",
"params": {
"provider": "ollama/llama3.2:latest",
}
},
"schema": schema,
"extraction_type": "schema",
"instruction": "Extract the main article information including title, summary, and main topics.",
}
},
"word_count_threshold": 1
}
}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print("Extracted content:", json.dumps(extracted, indent=2))
assert result["result"]["success"]
except Exception as e:
@@ -321,24 +346,30 @@ def test_llm_with_ollama(tester: Crawl4AiTester):
def test_cosine_extraction(tester: Crawl4AiTester):
print("\n=== Testing Cosine Extraction ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 8,
"extraction_config": {
"type": "cosine",
"urls": ["https://www.nbcnews.com/business"],
"browser_config": {},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"semantic_filter": "business finance economy",
"word_count_threshold": 10,
"max_dist": 0.2,
"top_k": 3,
},
},
"extraction_strategy": {
"type": "CosineStrategy",
"params": {
"semantic_filter": "business finance economy",
"word_count_threshold": 10,
"max_dist": 0.2,
"top_k": 3,
}
}
}
}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print(f"Extracted {len(extracted)} text clusters")
print("First cluster tags:", extracted[0]["tags"])
if extracted:
print("First cluster tags:", extracted[0]["tags"])
assert result["result"]["success"]
except Exception as e:
print(f"Cosine extraction test failed: {str(e)}")
@@ -347,20 +378,25 @@ def test_cosine_extraction(tester: Crawl4AiTester):
def test_screenshot(tester: Crawl4AiTester):
print("\n=== Testing Screenshot ===")
request = {
"urls": "https://www.nbcnews.com/business",
"priority": 5,
"screenshot": True,
"crawler_params": {"headless": True},
"urls": ["https://www.nbcnews.com/business"],
"browser_config": {"headless": True},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"screenshot": True
}
}
}
result = tester.submit_and_wait(request)
print("Screenshot captured:", bool(result["result"]["screenshot"]))
screenshot_data = result["result"]["results"][0]["screenshot"]
print("Screenshot captured:", bool(screenshot_data))
if result["result"]["screenshot"]:
if screenshot_data:
# Save screenshot
screenshot_data = base64.b64decode(result["result"]["screenshot"])
screenshot_bytes = base64.b64decode(screenshot_data)
with open("test_screenshot.jpg", "wb") as f:
f.write(screenshot_data)
f.write(screenshot_bytes)
print("Screenshot saved as test_screenshot.jpg")
assert result["result"]["success"]
@@ -368,5 +404,4 @@ def test_screenshot(tester: Crawl4AiTester):
if __name__ == "__main__":
version = sys.argv[1] if len(sys.argv) > 1 else "basic"
# version = "full"
test_docker_deployment(version)

View File

@@ -91,6 +91,17 @@ async def test_css_selector_extraction():
assert result.markdown
assert all(heading in result.markdown for heading in ["#", "##", "###"])
@pytest.mark.asyncio
async def test_base_tag_link_extraction():
async with AsyncWebCrawler(verbose=True) as crawler:
url = "https://sohamkukreti.github.io/portfolio"
result = await crawler.arun(url=url)
assert result.success
assert result.links
assert isinstance(result.links, dict)
assert "internal" in result.links
assert "external" in result.links
assert any("github.com" in x["href"] for x in result.links["external"])
# Entry point for debugging
if __name__ == "__main__":

View File

@@ -10,11 +10,13 @@ import sys
import uuid
import shutil
from crawl4ai import BrowserProfiler
from crawl4ai.browser_manager import BrowserManager
# Add the project root to Python path if running directly
if __name__ == "__main__":
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from crawl4ai.browser import BrowserManager, BrowserProfileManager
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.async_logger import AsyncLogger
@@ -25,7 +27,7 @@ async def test_profile_creation():
"""Test creating and managing browser profiles."""
logger.info("Testing profile creation and management", tag="TEST")
profile_manager = BrowserProfileManager(logger=logger)
profile_manager = BrowserProfiler(logger=logger)
try:
# List existing profiles
@@ -83,7 +85,7 @@ async def test_profile_with_browser():
"""Test using a profile with a browser."""
logger.info("Testing using a profile with a browser", tag="TEST")
profile_manager = BrowserProfileManager(logger=logger)
profile_manager = BrowserProfiler(logger=logger)
test_profile_name = f"test-browser-profile-{uuid.uuid4().hex[:8]}"
profile_path = None
@@ -101,6 +103,8 @@ async def test_profile_with_browser():
# Now use this profile with a browser
browser_config = BrowserConfig(
user_data_dir=profile_path,
use_managed_browser=True,
use_persistent_context=True,
headless=True
)

View File

@@ -168,7 +168,7 @@ class SimpleApiTester:
print("\n=== CORE APIs ===")
test_url = "https://example.com"
test_raw_html_url = "raw://<html><body><h1>Hello, World!</h1></body></html>"
# Test markdown endpoint
md_payload = {
"url": test_url,
@@ -180,6 +180,17 @@ class SimpleApiTester:
# print(result['data'].get('markdown', ''))
self.print_result(result)
# Test markdown endpoint with raw HTML
raw_md_payload = {
"url": test_raw_html_url,
"f": "fit",
"q": "test query",
"c": "0"
}
result = self.test_post_endpoint("/md", raw_md_payload)
self.print_result(result)
# Test HTML endpoint
html_payload = {"url": test_url}
result = self.test_post_endpoint("/html", html_payload)
@@ -215,6 +226,15 @@ class SimpleApiTester:
result = self.test_post_endpoint("/crawl", crawl_payload)
self.print_result(result)
# Test crawl endpoint with raw HTML
crawl_payload = {
"urls": [test_raw_html_url],
"browser_config": {},
"crawler_config": {}
}
result = self.test_post_endpoint("/crawl", crawl_payload)
self.print_result(result)
# Test config dump
config_payload = {"code": "CrawlerRunConfig()"}
result = self.test_post_endpoint("/config/dump", config_payload)

View File

@@ -74,7 +74,7 @@ async def test_direct_api():
# Make direct API call
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8000/crawl",
"http://localhost:11235/crawl",
json=request_data,
timeout=300
)
@@ -100,13 +100,24 @@ async def test_direct_api():
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8000/crawl",
"http://localhost:11235/crawl",
json=request_data
)
assert response.status_code == 200
result = response.json()
print("Structured extraction result:", result["success"])
# Test 3: Raw HTML
request_data["urls"] = ["raw://<html><body><h1>Hello, World!</h1><a href='https://example.com'>Example</a></body></html>"]
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:11235/crawl",
json=request_data
)
assert response.status_code == 200
result = response.json()
print("Raw HTML result:", result["success"])
# Test 3: Get schema
# async with httpx.AsyncClient() as client:
# response = await client.get("http://localhost:8000/schema")
@@ -118,7 +129,7 @@ async def test_with_client():
"""Test using the Crawl4AI Docker client SDK"""
print("\n=== Testing Client SDK ===")
async with Crawl4aiDockerClient(verbose=True) as client:
async with Crawl4aiDockerClient(base_url="http://localhost:11235", verbose=True) as client:
# Test 1: Basic crawl
browser_config = BrowserConfig(headless=True)
crawler_config = CrawlerRunConfig(

View File

@@ -6,28 +6,22 @@ import base64
import os
from typing import Dict, Any
class Crawl4AiTester:
def __init__(self, base_url: str = "http://localhost:11235", api_token: str = None):
def __init__(self, base_url: str = "http://localhost:11235"):
self.base_url = base_url
self.api_token = api_token or os.getenv(
"CRAWL4AI_API_TOKEN"
) # Check environment variable as fallback
self.headers = (
{"Authorization": f"Bearer {self.api_token}"} if self.api_token else {}
)
def submit_and_wait(
self, request_data: Dict[str, Any], timeout: int = 300
) -> Dict[str, Any]:
# Submit crawl job
# Submit crawl job using async endpoint
response = requests.post(
f"{self.base_url}/crawl", json=request_data, headers=self.headers
f"{self.base_url}/crawl/job", json=request_data
)
if response.status_code == 403:
raise Exception("API token is invalid or missing")
task_id = response.json()["task_id"]
print(f"Task ID: {task_id}")
response.raise_for_status()
job_response = response.json()
task_id = job_response["task_id"]
print(f"Submitted job with task_id: {task_id}")
# Poll for result
start_time = time.time()
@@ -38,8 +32,9 @@ class Crawl4AiTester:
)
result = requests.get(
f"{self.base_url}/task/{task_id}", headers=self.headers
f"{self.base_url}/crawl/job/{task_id}"
)
result.raise_for_status()
status = result.json()
if status["status"] == "failed":
@@ -52,10 +47,10 @@ class Crawl4AiTester:
time.sleep(2)
def submit_sync(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
# Use synchronous crawl endpoint
response = requests.post(
f"{self.base_url}/crawl_sync",
f"{self.base_url}/crawl",
json=request_data,
headers=self.headers,
timeout=60,
)
if response.status_code == 408:
@@ -66,9 +61,8 @@ class Crawl4AiTester:
def test_docker_deployment(version="basic"):
tester = Crawl4AiTester(
# base_url="http://localhost:11235" ,
base_url="https://crawl4ai-sby74.ondigitalocean.app",
api_token="test",
base_url="http://localhost:11235",
#base_url="https://crawl4ai-sby74.ondigitalocean.app",
)
print(f"Testing Crawl4AI Docker {version} version")
@@ -88,63 +82,60 @@ def test_docker_deployment(version="basic"):
# Test cases based on version
test_basic_crawl(tester)
test_basic_crawl(tester)
test_basic_crawl_sync(tester)
# if version in ["full", "transformer"]:
# test_cosine_extraction(tester)
if version in ["full", "transformer"]:
test_cosine_extraction(tester)
# test_js_execution(tester)
# test_css_selector(tester)
# test_structured_extraction(tester)
# test_llm_extraction(tester)
# test_llm_with_ollama(tester)
# test_screenshot(tester)
test_js_execution(tester)
test_css_selector(tester)
test_structured_extraction(tester)
test_llm_extraction(tester)
test_llm_with_ollama(tester)
test_screenshot(tester)
def test_basic_crawl(tester: Crawl4AiTester):
print("\n=== Testing Basic Crawl ===")
print("\n=== Testing Basic Crawl (Async) ===")
request = {
"urls": ["https://www.nbcnews.com/business"],
"priority": 10,
"session_id": "test",
}
result = tester.submit_and_wait(request)
print(f"Basic crawl result length: {len(result['result']['markdown'])}")
print(f"Basic crawl result count: {len(result['result']['results'])}")
assert result["result"]["success"]
assert len(result["result"]["markdown"]) > 0
assert len(result["result"]["results"]) > 0
assert len(result["result"]["results"][0]["markdown"]) > 0
def test_basic_crawl_sync(tester: Crawl4AiTester):
print("\n=== Testing Basic Crawl (Sync) ===")
request = {
"urls": ["https://www.nbcnews.com/business"],
"priority": 10,
"session_id": "test",
}
result = tester.submit_sync(request)
print(f"Basic crawl result length: {len(result['result']['markdown'])}")
assert result["status"] == "completed"
assert result["result"]["success"]
assert len(result["result"]["markdown"]) > 0
print(f"Basic crawl result count: {len(result['results'])}")
assert result["success"]
assert len(result["results"]) > 0
assert len(result["results"][0]["markdown"]) > 0
def test_js_execution(tester: Crawl4AiTester):
print("\n=== Testing JS Execution ===")
request = {
"urls": ["https://www.nbcnews.com/business"],
"priority": 8,
"js_code": [
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"
],
"wait_for": "article.tease-card:nth-child(10)",
"crawler_params": {"headless": True},
"browser_config": {"headless": True},
"crawler_config": {
"js_code": [
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); if(loadMoreButton) loadMoreButton.click();"
],
"wait_for": "wide-tease-item__wrapper df flex-column flex-row-m flex-nowrap-m enable-new-sports-feed-mobile-design(10)"
}
}
result = tester.submit_and_wait(request)
print(f"JS execution result length: {len(result['result']['markdown'])}")
print(f"JS execution result count: {len(result['result']['results'])}")
assert result["result"]["success"]
@@ -152,51 +143,78 @@ def test_css_selector(tester: Crawl4AiTester):
print("\n=== Testing CSS Selector ===")
request = {
"urls": ["https://www.nbcnews.com/business"],
"priority": 7,
"css_selector": ".wide-tease-item__description",
"crawler_params": {"headless": True},
"extra": {"word_count_threshold": 10},
"browser_config": {"headless": True},
"crawler_config": {
"css_selector": ".wide-tease-item__description",
"word_count_threshold": 10
}
}
result = tester.submit_and_wait(request)
print(f"CSS selector result length: {len(result['result']['markdown'])}")
print(f"CSS selector result count: {len(result['result']['results'])}")
assert result["result"]["success"]
def test_structured_extraction(tester: Crawl4AiTester):
print("\n=== Testing Structured Extraction ===")
schema = {
"name": "Coinbase Crypto Prices",
"baseSelector": ".cds-tableRow-t45thuk",
"fields": [
{
"name": "crypto",
"selector": "td:nth-child(1) h2",
"type": "text",
},
{
"name": "symbol",
"selector": "td:nth-child(1) p",
"type": "text",
},
{
"name": "price",
"selector": "td:nth-child(2)",
"type": "text",
},
],
"name": "Cryptocurrency Prices",
"baseSelector": "table[data-testid=\"prices-table\"] tbody tr",
"fields": [
{
"name": "asset_name",
"selector": "td:nth-child(2) p.cds-headline-h4steop",
"type": "text"
},
{
"name": "asset_symbol",
"selector": "td:nth-child(2) p.cds-label2-l1sm09ec",
"type": "text"
},
{
"name": "asset_image_url",
"selector": "td:nth-child(2) img[alt=\"Asset Symbol\"]",
"type": "attribute",
"attribute": "src"
},
{
"name": "asset_url",
"selector": "td:nth-child(2) a[aria-label^=\"Asset page for\"]",
"type": "attribute",
"attribute": "href"
},
{
"name": "price",
"selector": "td:nth-child(3) div.cds-typographyResets-t6muwls.cds-body-bwup3gq",
"type": "text"
},
{
"name": "change",
"selector": "td:nth-child(7) p.cds-body-bwup3gq",
"type": "text"
}
]
}
request = {
"urls": ["https://www.coinbase.com/explore"],
"priority": 9,
"extraction_config": {"type": "json_css", "params": {"schema": schema}},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"extraction_strategy": {
"type": "JsonCssExtractionStrategy",
"params": {"schema": schema}
}
}
}
}
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print(f"Extracted {len(extracted)} items")
print("Sample item:", json.dumps(extracted[0], indent=2))
if extracted:
print("Sample item:", json.dumps(extracted[0], indent=2))
assert result["result"]["success"]
assert len(extracted) > 0
@@ -206,43 +224,54 @@ def test_llm_extraction(tester: Crawl4AiTester):
schema = {
"type": "object",
"properties": {
"model_name": {
"asset_name": {
"type": "string",
"description": "Name of the OpenAI model.",
"description": "Name of the asset.",
},
"input_fee": {
"price": {
"type": "string",
"description": "Fee for input token for the OpenAI model.",
"description": "Price of the asset.",
},
"output_fee": {
"change": {
"type": "string",
"description": "Fee for output token for the OpenAI model.",
"description": "Change in price of the asset.",
},
},
"required": ["model_name", "input_fee", "output_fee"],
"required": ["asset_name", "price", "change"],
}
request = {
"urls": ["https://openai.com/api/pricing"],
"priority": 8,
"extraction_config": {
"type": "llm",
"urls": ["https://www.coinbase.com/en-in/explore"],
"browser_config": {},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"provider": "openai/gpt-4o-mini",
"api_token": os.getenv("OPENAI_API_KEY"),
"schema": schema,
"extraction_type": "schema",
"instruction": """From the crawled content, extract all mentioned model names along with their fees for input and output tokens.""",
},
},
"crawler_params": {"word_count_threshold": 1},
"extraction_strategy": {
"type": "LLMExtractionStrategy",
"params": {
"llm_config": {
"type": "LLMConfig",
"params": {
"provider": "gemini/gemini-2.5-flash",
"api_token": os.getenv("GEMINI_API_KEY")
}
},
"schema": schema,
"extraction_type": "schema",
"instruction": "From the crawled content tioned asset names along with their prices and change in price.",
}
},
"word_count_threshold": 1
}
}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print(f"Extracted {len(extracted)} model pricing entries")
print("Sample entry:", json.dumps(extracted[0], indent=2))
if extracted:
print("Sample entry:", json.dumps(extracted[0], indent=2))
assert result["result"]["success"]
except Exception as e:
print(f"LLM extraction test failed (might be due to missing API key): {str(e)}")
@@ -271,23 +300,32 @@ def test_llm_with_ollama(tester: Crawl4AiTester):
request = {
"urls": ["https://www.nbcnews.com/business"],
"priority": 8,
"extraction_config": {
"type": "llm",
"browser_config": {"verbose": True},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"provider": "ollama/llama2",
"schema": schema,
"extraction_type": "schema",
"instruction": "Extract the main article information including title, summary, and main topics.",
},
},
"extra": {"word_count_threshold": 1},
"crawler_params": {"verbose": True},
"extraction_strategy": {
"type": "LLMExtractionStrategy",
"params": {
"llm_config": {
"type": "LLMConfig",
"params": {
"provider": "ollama/llama3.2:latest",
}
},
"schema": schema,
"extraction_type": "schema",
"instruction": "Extract the main article information including title, summary, and main topics.",
}
},
"word_count_threshold": 1
}
}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print("Extracted content:", json.dumps(extracted, indent=2))
assert result["result"]["success"]
except Exception as e:
@@ -298,23 +336,29 @@ def test_cosine_extraction(tester: Crawl4AiTester):
print("\n=== Testing Cosine Extraction ===")
request = {
"urls": ["https://www.nbcnews.com/business"],
"priority": 8,
"extraction_config": {
"type": "cosine",
"browser_config": {},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"semantic_filter": "business finance economy",
"word_count_threshold": 10,
"max_dist": 0.2,
"top_k": 3,
},
},
"extraction_strategy": {
"type": "CosineStrategy",
"params": {
"semantic_filter": "business finance economy",
"word_count_threshold": 10,
"max_dist": 0.2,
"top_k": 3,
}
}
}
}
}
try:
result = tester.submit_and_wait(request)
extracted = json.loads(result["result"]["extracted_content"])
extracted = json.loads(result["result"]["results"][0]["extracted_content"])
print(f"Extracted {len(extracted)} text clusters")
print("First cluster tags:", extracted[0]["tags"])
if extracted:
print("First cluster tags:", extracted[0]["tags"])
assert result["result"]["success"]
except Exception as e:
print(f"Cosine extraction test failed: {str(e)}")
@@ -324,19 +368,24 @@ def test_screenshot(tester: Crawl4AiTester):
print("\n=== Testing Screenshot ===")
request = {
"urls": ["https://www.nbcnews.com/business"],
"priority": 5,
"screenshot": True,
"crawler_params": {"headless": True},
"browser_config": {"headless": True},
"crawler_config": {
"type": "CrawlerRunConfig",
"params": {
"screenshot": True
}
}
}
result = tester.submit_and_wait(request)
print("Screenshot captured:", bool(result["result"]["screenshot"]))
screenshot_data = result["result"]["results"][0]["screenshot"]
print("Screenshot captured:", bool(screenshot_data))
if result["result"]["screenshot"]:
if screenshot_data:
# Save screenshot
screenshot_data = base64.b64decode(result["result"]["screenshot"])
screenshot_bytes = base64.b64decode(screenshot_data)
with open("test_screenshot.jpg", "wb") as f:
f.write(screenshot_data)
f.write(screenshot_bytes)
print("Screenshot saved as test_screenshot.jpg")
assert result["result"]["success"]

View File

@@ -0,0 +1,43 @@
import asyncio
import os
from crawl4ai.async_webcrawler import AsyncWebCrawler
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig, CacheMode
# Simple concurrency test for persistent context page creation
# Usage: python scripts/test_persistent_context.py
URLS = [
# "https://example.com",
"https://httpbin.org/html",
"https://www.python.org/",
"https://www.rust-lang.org/",
]
async def main():
profile_dir = os.path.join(os.path.expanduser("~"), ".crawl4ai", "profiles", "test-persistent-profile")
os.makedirs(profile_dir, exist_ok=True)
browser_config = BrowserConfig(
browser_type="chromium",
headless=True,
use_persistent_context=True,
user_data_dir=profile_dir,
use_managed_browser=True,
verbose=True,
)
run_cfg = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
stream=False,
verbose=True,
)
async with AsyncWebCrawler(config=browser_config) as crawler:
results = await crawler.arun_many(URLS, config=run_cfg)
for r in results:
print(r.url, r.success, len(r.markdown.raw_markdown) if r.markdown else 0)
# r = await crawler.arun(url=URLS[0], config=run_cfg)
# print(r.url, r.success, len(r.markdown.raw_markdown) if r.markdown else 0)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,55 @@
import sys
import pytest
import asyncio
from unittest.mock import patch, MagicMock
from crawl4ai.browser_profiler import BrowserProfiler
@pytest.mark.asyncio
@pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific msvcrt test")
async def test_keyboard_input_handling():
# Mock sequence of keystrokes: arrow key followed by 'q'
mock_keys = [b'\x00K', b'q']
mock_kbhit = MagicMock(side_effect=[True, True, False])
mock_getch = MagicMock(side_effect=mock_keys)
with patch('msvcrt.kbhit', mock_kbhit), patch('msvcrt.getch', mock_getch):
# profiler = BrowserProfiler()
user_done_event = asyncio.Event()
# Create a local async function to simulate the keyboard input handling
async def test_listen_for_quit_command():
if sys.platform == "win32":
while True:
try:
if mock_kbhit():
raw = mock_getch()
try:
key = raw.decode("utf-8")
except UnicodeDecodeError:
continue
if len(key) != 1 or not key.isprintable():
continue
if key.lower() == "q":
user_done_event.set()
return
await asyncio.sleep(0.1)
except Exception as e:
continue
# Run the listener
listener_task = asyncio.create_task(test_listen_for_quit_command())
# Wait for the event to be set
try:
await asyncio.wait_for(user_done_event.wait(), timeout=1.0)
assert user_done_event.is_set()
finally:
if not listener_task.done():
listener_task.cancel()
try:
await listener_task
except asyncio.CancelledError:
pass

View File

@@ -0,0 +1,582 @@
"""
Comprehensive test suite for ProxyConfig in different forms:
1. String form (ip:port:username:password)
2. Dict form (dictionary with keys)
3. Object form (ProxyConfig instance)
4. Environment variable form (from env vars)
Tests cover all possible scenarios and edge cases using pytest.
"""
import asyncio
import os
import pytest
import tempfile
from unittest.mock import patch
from crawl4ai import AsyncWebCrawler, BrowserConfig
from crawl4ai.async_configs import CrawlerRunConfig, ProxyConfig
from crawl4ai.cache_context import CacheMode
class TestProxyConfig:
"""Comprehensive test suite for ProxyConfig functionality."""
# Test data for different scenarios
# get free proxy server from from webshare.io https://www.webshare.io/?referral_code=3sqog0y1fvsl
TEST_PROXY_DATA = {
"server": "",
"username": "",
"password": "",
"ip": ""
}
def setup_method(self):
"""Setup for each test method."""
self.test_url = "https://httpbin.org/ip" # Use httpbin for testing
# ==================== OBJECT FORM TESTS ====================
def test_proxy_config_object_creation_basic(self):
"""Test basic ProxyConfig object creation."""
proxy = ProxyConfig(server="127.0.0.1:8080")
assert proxy.server == "127.0.0.1:8080"
assert proxy.username is None
assert proxy.password is None
assert proxy.ip == "127.0.0.1" # Should auto-extract IP
def test_proxy_config_object_creation_full(self):
"""Test ProxyConfig object creation with all parameters."""
proxy = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password'],
ip=self.TEST_PROXY_DATA['ip']
)
assert proxy.server == f"http://{self.TEST_PROXY_DATA['server']}"
assert proxy.username == self.TEST_PROXY_DATA['username']
assert proxy.password == self.TEST_PROXY_DATA['password']
assert proxy.ip == self.TEST_PROXY_DATA['ip']
def test_proxy_config_object_ip_extraction(self):
"""Test automatic IP extraction from server URL."""
test_cases = [
("http://192.168.1.1:8080", "192.168.1.1"),
("https://10.0.0.1:3128", "10.0.0.1"),
("192.168.1.100:8080", "192.168.1.100"),
("proxy.example.com:8080", "proxy.example.com"),
]
for server, expected_ip in test_cases:
proxy = ProxyConfig(server=server)
assert proxy.ip == expected_ip, f"Failed for server: {server}"
def test_proxy_config_object_invalid_server(self):
"""Test ProxyConfig with invalid server formats."""
# Should not raise exception but may not extract IP properly
proxy = ProxyConfig(server="invalid-format")
assert proxy.server == "invalid-format"
# IP extraction might fail but object should still be created
# ==================== DICT FORM TESTS ====================
def test_proxy_config_from_dict_basic(self):
"""Test creating ProxyConfig from basic dictionary."""
proxy_dict = {"server": "127.0.0.1:8080"}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == "127.0.0.1:8080"
assert proxy.username is None
assert proxy.password is None
def test_proxy_config_from_dict_full(self):
"""Test creating ProxyConfig from complete dictionary."""
proxy_dict = {
"server": f"http://{self.TEST_PROXY_DATA['server']}",
"username": self.TEST_PROXY_DATA['username'],
"password": self.TEST_PROXY_DATA['password'],
"ip": self.TEST_PROXY_DATA['ip']
}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == proxy_dict["server"]
assert proxy.username == proxy_dict["username"]
assert proxy.password == proxy_dict["password"]
assert proxy.ip == proxy_dict["ip"]
def test_proxy_config_from_dict_missing_keys(self):
"""Test creating ProxyConfig from dictionary with missing keys."""
proxy_dict = {"server": "127.0.0.1:8080", "username": "user"}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == "127.0.0.1:8080"
assert proxy.username == "user"
assert proxy.password is None
assert proxy.ip == "127.0.0.1" # Should auto-extract
def test_proxy_config_from_dict_empty(self):
"""Test creating ProxyConfig from empty dictionary."""
proxy_dict = {}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server is None
assert proxy.username is None
assert proxy.password is None
assert proxy.ip is None
def test_proxy_config_from_dict_none_values(self):
"""Test creating ProxyConfig from dictionary with None values."""
proxy_dict = {
"server": "127.0.0.1:8080",
"username": None,
"password": None,
"ip": None
}
proxy = ProxyConfig.from_dict(proxy_dict)
assert proxy.server == "127.0.0.1:8080"
assert proxy.username is None
assert proxy.password is None
assert proxy.ip == "127.0.0.1" # Should auto-extract despite None
# ==================== STRING FORM TESTS ====================
def test_proxy_config_from_string_full_format(self):
"""Test creating ProxyConfig from full string format (ip:port:username:password)."""
proxy_str = f"{self.TEST_PROXY_DATA['ip']}:6114:{self.TEST_PROXY_DATA['username']}:{self.TEST_PROXY_DATA['password']}"
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == f"http://{self.TEST_PROXY_DATA['ip']}:6114"
assert proxy.username == self.TEST_PROXY_DATA['username']
assert proxy.password == self.TEST_PROXY_DATA['password']
assert proxy.ip == self.TEST_PROXY_DATA['ip']
def test_proxy_config_from_string_ip_port_only(self):
"""Test creating ProxyConfig from string with only ip:port."""
proxy_str = "192.168.1.1:8080"
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == "http://192.168.1.1:8080"
assert proxy.username is None
assert proxy.password is None
assert proxy.ip == "192.168.1.1"
def test_proxy_config_from_string_invalid_format(self):
"""Test creating ProxyConfig from invalid string formats."""
invalid_formats = [
"invalid",
"ip:port:user", # Missing password (3 parts)
"ip:port:user:pass:extra", # Too many parts (5 parts)
"",
"::", # Empty parts but 3 total (invalid)
"::::", # Empty parts but 5 total (invalid)
]
for proxy_str in invalid_formats:
with pytest.raises(ValueError, match="Invalid proxy string format"):
ProxyConfig.from_string(proxy_str)
def test_proxy_config_from_string_edge_cases_that_work(self):
"""Test string formats that should work but might be edge cases."""
# These cases actually work as valid formats
edge_cases = [
(":", "http://:", ""), # ip:port format with empty values
(":::", "http://:", ""), # ip:port:user:pass format with empty values
]
for proxy_str, expected_server, expected_ip in edge_cases:
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == expected_server
assert proxy.ip == expected_ip
def test_proxy_config_from_string_edge_cases(self):
"""Test string parsing edge cases."""
# Test with different port numbers
proxy_str = "10.0.0.1:3128:user:pass"
proxy = ProxyConfig.from_string(proxy_str)
assert proxy.server == "http://10.0.0.1:3128"
# Test with special characters in credentials
proxy_str = "10.0.0.1:8080:user@domain:pass:word"
with pytest.raises(ValueError): # Should fail due to extra colon in password
ProxyConfig.from_string(proxy_str)
# ==================== ENVIRONMENT VARIABLE TESTS ====================
def test_proxy_config_from_env_single_proxy(self):
"""Test loading single proxy from environment variable."""
proxy_str = f"{self.TEST_PROXY_DATA['ip']}:6114:{self.TEST_PROXY_DATA['username']}:{self.TEST_PROXY_DATA['password']}"
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 1
proxy = proxies[0]
assert proxy.ip == self.TEST_PROXY_DATA['ip']
assert proxy.username == self.TEST_PROXY_DATA['username']
assert proxy.password == self.TEST_PROXY_DATA['password']
def test_proxy_config_from_env_multiple_proxies(self):
"""Test loading multiple proxies from environment variable."""
proxy_list = [
"192.168.1.1:8080:user1:pass1",
"192.168.1.2:8080:user2:pass2",
"10.0.0.1:3128" # No auth
]
proxy_str = ",".join(proxy_list)
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 3
# Check first proxy
assert proxies[0].ip == "192.168.1.1"
assert proxies[0].username == "user1"
assert proxies[0].password == "pass1"
# Check second proxy
assert proxies[1].ip == "192.168.1.2"
assert proxies[1].username == "user2"
assert proxies[1].password == "pass2"
# Check third proxy (no auth)
assert proxies[2].ip == "10.0.0.1"
assert proxies[2].username is None
assert proxies[2].password is None
def test_proxy_config_from_env_empty_var(self):
"""Test loading from empty environment variable."""
with patch.dict(os.environ, {'TEST_PROXIES': ''}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 0
def test_proxy_config_from_env_missing_var(self):
"""Test loading from missing environment variable."""
# Ensure the env var doesn't exist
with patch.dict(os.environ, {}, clear=True):
proxies = ProxyConfig.from_env('NON_EXISTENT_VAR')
assert len(proxies) == 0
def test_proxy_config_from_env_with_empty_entries(self):
"""Test loading proxies with empty entries in the list."""
proxy_str = "192.168.1.1:8080:user:pass,,10.0.0.1:3128,"
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
proxies = ProxyConfig.from_env('TEST_PROXIES')
assert len(proxies) == 2 # Empty entries should be skipped
assert proxies[0].ip == "192.168.1.1"
assert proxies[1].ip == "10.0.0.1"
def test_proxy_config_from_env_with_invalid_entries(self):
"""Test loading proxies with some invalid entries."""
proxy_str = "192.168.1.1:8080:user:pass,invalid_proxy,10.0.0.1:3128"
with patch.dict(os.environ, {'TEST_PROXIES': proxy_str}):
# Should handle errors gracefully and return valid proxies
proxies = ProxyConfig.from_env('TEST_PROXIES')
# Depending on implementation, might return partial list or empty
# This tests error handling
assert isinstance(proxies, list)
# ==================== SERIALIZATION TESTS ====================
def test_proxy_config_to_dict(self):
"""Test converting ProxyConfig to dictionary."""
proxy = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password'],
ip=self.TEST_PROXY_DATA['ip']
)
result_dict = proxy.to_dict()
expected = {
"server": f"http://{self.TEST_PROXY_DATA['server']}",
"username": self.TEST_PROXY_DATA['username'],
"password": self.TEST_PROXY_DATA['password'],
"ip": self.TEST_PROXY_DATA['ip']
}
assert result_dict == expected
def test_proxy_config_clone(self):
"""Test cloning ProxyConfig with modifications."""
original = ProxyConfig(
server="http://127.0.0.1:8080",
username="user",
password="pass"
)
# Clone with modifications
cloned = original.clone(username="new_user", password="new_pass")
# Original should be unchanged
assert original.username == "user"
assert original.password == "pass"
# Clone should have new values
assert cloned.username == "new_user"
assert cloned.password == "new_pass"
assert cloned.server == original.server # Unchanged value
def test_proxy_config_roundtrip_serialization(self):
"""Test that ProxyConfig can be serialized and deserialized without loss."""
original = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password'],
ip=self.TEST_PROXY_DATA['ip']
)
# Serialize to dict and back
serialized = original.to_dict()
deserialized = ProxyConfig.from_dict(serialized)
assert deserialized.server == original.server
assert deserialized.username == original.username
assert deserialized.password == original.password
assert deserialized.ip == original.ip
# ==================== INTEGRATION TESTS ====================
@pytest.mark.asyncio
async def test_crawler_with_proxy_config_object(self):
"""Test AsyncWebCrawler with ProxyConfig object."""
proxy_config = ProxyConfig(
server=f"http://{self.TEST_PROXY_DATA['server']}",
username=self.TEST_PROXY_DATA['username'],
password=self.TEST_PROXY_DATA['password']
)
browser_config = BrowserConfig(headless=True)
# Test that the crawler accepts the ProxyConfig object without errors
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
# Note: This might fail due to actual proxy connection, but should not fail due to config issues
result = await crawler.arun(
url=self.test_url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config,
page_timeout=10000 # Short timeout for testing
)
)
# If we get here, proxy config was accepted
assert result is not None
except Exception as e:
# We expect connection errors with test proxies, but not config errors
error_msg = str(e).lower()
assert "attribute" not in error_msg, f"Config error: {e}"
assert "proxy_config" not in error_msg, f"Proxy config error: {e}"
@pytest.mark.asyncio
async def test_crawler_with_proxy_config_dict(self):
"""Test AsyncWebCrawler with ProxyConfig from dictionary."""
proxy_dict = {
"server": f"http://{self.TEST_PROXY_DATA['server']}",
"username": self.TEST_PROXY_DATA['username'],
"password": self.TEST_PROXY_DATA['password']
}
proxy_config = ProxyConfig.from_dict(proxy_dict)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(
url=self.test_url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config,
page_timeout=10000
)
)
assert result is not None
except Exception as e:
error_msg = str(e).lower()
assert "attribute" not in error_msg, f"Config error: {e}"
@pytest.mark.asyncio
async def test_crawler_with_proxy_config_from_string(self):
"""Test AsyncWebCrawler with ProxyConfig from string."""
proxy_str = f"{self.TEST_PROXY_DATA['ip']}:6114:{self.TEST_PROXY_DATA['username']}:{self.TEST_PROXY_DATA['password']}"
proxy_config = ProxyConfig.from_string(proxy_str)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(
url=self.test_url,
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config,
page_timeout=10000
)
)
assert result is not None
except Exception as e:
error_msg = str(e).lower()
assert "attribute" not in error_msg, f"Config error: {e}"
# ==================== EDGE CASES AND ERROR HANDLING ====================
def test_proxy_config_with_none_server(self):
"""Test ProxyConfig behavior with None server."""
proxy = ProxyConfig(server=None)
assert proxy.server is None
assert proxy.ip is None # Should not crash
def test_proxy_config_with_empty_string_server(self):
"""Test ProxyConfig behavior with empty string server."""
proxy = ProxyConfig(server="")
assert proxy.server == ""
assert proxy.ip is None or proxy.ip == ""
def test_proxy_config_special_characters_in_credentials(self):
"""Test ProxyConfig with special characters in username/password."""
special_chars_tests = [
("user@domain.com", "pass!@#$%"),
("user_123", "p@ssw0rd"),
("user-test", "pass-word"),
]
for username, password in special_chars_tests:
proxy = ProxyConfig(
server="http://127.0.0.1:8080",
username=username,
password=password
)
assert proxy.username == username
assert proxy.password == password
def test_proxy_config_unicode_handling(self):
"""Test ProxyConfig with unicode characters."""
proxy = ProxyConfig(
server="http://127.0.0.1:8080",
username="ユーザー", # Japanese characters
password="пароль" # Cyrillic characters
)
assert proxy.username == "ユーザー"
assert proxy.password == "пароль"
# ==================== PERFORMANCE TESTS ====================
def test_proxy_config_creation_performance(self):
"""Test that ProxyConfig creation is reasonably fast."""
import time
start_time = time.time()
for i in range(1000):
proxy = ProxyConfig(
server=f"http://192.168.1.{i % 255}:8080",
username=f"user{i}",
password=f"pass{i}"
)
end_time = time.time()
# Should be able to create 1000 configs in less than 1 second
assert (end_time - start_time) < 1.0
def test_proxy_config_from_env_performance(self):
"""Test that loading many proxies from env is reasonably fast."""
import time
# Create a large list of proxy strings
proxy_list = [f"192.168.1.{i}:8080:user{i}:pass{i}" for i in range(100)]
proxy_str = ",".join(proxy_list)
with patch.dict(os.environ, {'PERF_TEST_PROXIES': proxy_str}):
start_time = time.time()
proxies = ProxyConfig.from_env('PERF_TEST_PROXIES')
end_time = time.time()
assert len(proxies) == 100
# Should be able to parse 100 proxies in less than 1 second
assert (end_time - start_time) < 1.0
# ==================== STANDALONE TEST FUNCTIONS ====================
@pytest.mark.asyncio
async def test_dict_proxy():
"""Original test function for dict proxy - kept for backward compatibility."""
proxy_config = {
"server": "23.95.150.145:6114",
"username": "cfyswbwn",
"password": "1gs266hoqysi"
}
proxy_config_obj = ProxyConfig.from_dict(proxy_config)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(url="https://httpbin.org/ip", config=CrawlerRunConfig(
stream=False,
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config_obj,
page_timeout=10000
))
print("Dict proxy test passed!")
print(result.markdown[:200] if result and result.markdown else "No result")
except Exception as e:
print(f"Dict proxy test error (expected): {e}")
@pytest.mark.asyncio
async def test_string_proxy():
"""Test function for string proxy format."""
proxy_str = "23.95.150.145:6114:cfyswbwn:1gs266hoqysi"
proxy_config_obj = ProxyConfig.from_string(proxy_str)
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(url="https://httpbin.org/ip", config=CrawlerRunConfig(
stream=False,
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config_obj,
page_timeout=10000
))
print("String proxy test passed!")
print(result.markdown[:200] if result and result.markdown else "No result")
except Exception as e:
print(f"String proxy test error (expected): {e}")
@pytest.mark.asyncio
async def test_env_proxy():
"""Test function for environment variable proxy."""
# Set environment variable
os.environ['TEST_PROXIES'] = "23.95.150.145:6114:cfyswbwn:1gs266hoqysi"
proxies = ProxyConfig.from_env('TEST_PROXIES')
if proxies:
proxy_config_obj = proxies[0] # Use first proxy
browser_config = BrowserConfig(headless=True)
async with AsyncWebCrawler(config=browser_config) as crawler:
try:
result = await crawler.arun(url="https://httpbin.org/ip", config=CrawlerRunConfig(
stream=False,
cache_mode=CacheMode.BYPASS,
proxy_config=proxy_config_obj,
page_timeout=10000
))
print("Environment proxy test passed!")
print(result.markdown[:200] if result and result.markdown else "No result")
except Exception as e:
print(f"Environment proxy test error (expected): {e}")
else:
print("No proxies loaded from environment")
if __name__ == "__main__":
print("Running comprehensive ProxyConfig tests...")
print("=" * 50)
# Run the standalone test functions
print("\n1. Testing dict proxy format...")
asyncio.run(test_dict_proxy())
print("\n2. Testing string proxy format...")
asyncio.run(test_string_proxy())
print("\n3. Testing environment variable proxy format...")
asyncio.run(test_env_proxy())
print("\n" + "=" * 50)
print("To run the full pytest suite, use: pytest " + __file__)
print("=" * 50)

File diff suppressed because it is too large Load Diff