Merge branch 'next'

This commit is contained in:
UncleCode
2024-12-12 20:17:27 +08:00
34 changed files with 7886 additions and 1715 deletions

2
.gitignore vendored
View File

@@ -206,7 +206,7 @@ pypi_build.sh
git_issues.py
git_issues.md
.next
.next/
.tests/
.issues/
.docs/

View File

@@ -1,4 +1,4 @@
# Crawl4AI: Crawl Smarter, Faster, Freely. For AI.
# 🚀🤖 Crawl4AI: Crawl Smarter, Faster, Freely. For AI.
<a href="https://trendshift.io/repositories/11716" target="_blank"><img src="https://trendshift.io/api/badge/repositories/11716" alt="unclecode%2Fcrawl4ai | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>

View File

@@ -1,244 +0,0 @@
# Crawl4AI v0.2.77 🕷️🤖
[![GitHub Stars](https://img.shields.io/github/stars/unclecode/crawl4ai?style=social)](https://github.com/unclecode/crawl4ai/stargazers)
[![GitHub Forks](https://img.shields.io/github/forks/unclecode/crawl4ai?style=social)](https://github.com/unclecode/crawl4ai/network/members)
[![GitHub Issues](https://img.shields.io/github/issues/unclecode/crawl4ai)](https://github.com/unclecode/crawl4ai/issues)
[![GitHub Pull Requests](https://img.shields.io/github/issues-pr/unclecode/crawl4ai)](https://github.com/unclecode/crawl4ai/pulls)
[![License](https://img.shields.io/github/license/unclecode/crawl4ai)](https://github.com/unclecode/crawl4ai/blob/main/LICENSE)
Crawl4AI simplifies web crawling and data extraction, making it accessible for large language models (LLMs) and AI applications. 🆓🌐
#### [v0.2.77] - 2024-08-02
Major improvements in functionality, performance, and cross-platform compatibility! 🚀
- 🐳 **Docker enhancements**:
- Significantly improved Dockerfile for easy installation on Linux, Mac, and Windows.
- 🌐 **Official Docker Hub image**:
- Launched our first official image on Docker Hub for streamlined deployment (unclecode/crawl4ai).
- 🔧 **Selenium upgrade**:
- Removed dependency on ChromeDriver, now using Selenium's built-in capabilities for better compatibility.
- 🖼️ **Image description**:
- Implemented ability to generate textual descriptions for extracted images from web pages.
-**Performance boost**:
- Various improvements to enhance overall speed and performance.
## Try it Now!
✨ Play around with this [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1sJPAmeLj5PMrg2VgOwMJ2ubGIcK0cJeX?usp=sharing)
✨ visit our [Documentation Website](https://crawl4ai.com/mkdocs/)
✨ Check [Demo](https://crawl4ai.com/mkdocs/demo)
## Features ✨
- 🆓 Completely free and open-source
- 🤖 LLM-friendly output formats (JSON, cleaned HTML, markdown)
- 🌍 Supports crawling multiple URLs simultaneously
- 🎨 Extracts and returns all media tags (Images, Audio, and Video)
- 🔗 Extracts all external and internal links
- 📚 Extracts metadata from the page
- 🔄 Custom hooks for authentication, headers, and page modifications before crawling
- 🕵️ User-agent customization
- 🖼️ Takes screenshots of the page
- 📜 Executes multiple custom JavaScripts before crawling
- 📚 Various chunking strategies: topic-based, regex, sentence, and more
- 🧠 Advanced extraction strategies: cosine clustering, LLM, and more
- 🎯 CSS selector support
- 📝 Passes instructions/keywords to refine extraction
# Crawl4AI
## 🌟 Shoutout to Contributors of v0.2.77!
A big thank you to the amazing contributors who've made this release possible:
- [@aravindkarnam](https://github.com/aravindkarnam) for the new image description feature
- [@FractalMind](https://github.com/FractalMind) for our official Docker Hub image
- [@ketonkss4](https://github.com/ketonkss4) for helping streamline our Selenium setup
Your contributions are driving Crawl4AI forward! 🚀
## Cool Examples 🚀
### Quick Start
```python
from crawl4ai import WebCrawler
# Create an instance of WebCrawler
crawler = WebCrawler()
# Warm up the crawler (load necessary models)
crawler.warmup()
# Run the crawler on a URL
result = crawler.run(url="https://www.nbcnews.com/business")
# Print the extracted content
print(result.markdown)
```
## How to install 🛠
### Using pip 🐍
```bash
virtualenv venv
source venv/bin/activate
pip install "crawl4ai @ git+https://github.com/unclecode/crawl4ai.git"
```
### Using Docker 🐳
```bash
# For Mac users (M1/M2)
# docker build --platform linux/amd64 -t crawl4ai .
docker build -t crawl4ai .
docker run -d -p 8000:80 crawl4ai
```
### Using Docker Hub 🐳
```bash
docker pull unclecode/crawl4ai:latest
docker run -d -p 8000:80 unclecode/crawl4ai:latest
```
## Speed-First Design 🚀
Perhaps the most important design principle for this library is speed. We need to ensure it can handle many links and resources in parallel as quickly as possible. By combining this speed with fast LLMs like Groq, the results will be truly amazing.
```python
import time
from crawl4ai.web_crawler import WebCrawler
crawler = WebCrawler()
crawler.warmup()
start = time.time()
url = r"https://www.nbcnews.com/business"
result = crawler.run( url, word_count_threshold=10, bypass_cache=True)
end = time.time()
print(f"Time taken: {end - start}")
```
Let's take a look the calculated time for the above code snippet:
```bash
[LOG] 🚀 Crawling done, success: True, time taken: 1.3623387813568115 seconds
[LOG] 🚀 Content extracted, success: True, time taken: 0.05715131759643555 seconds
[LOG] 🚀 Extraction, time taken: 0.05750393867492676 seconds.
Time taken: 1.439958095550537
```
Fetching the content from the page took 1.3623 seconds, and extracting the content took 0.0575 seconds. 🚀
### Extract Structured Data from Web Pages 📊
Crawl all OpenAI models and their fees from the official page.
```python
import os
from crawl4ai import WebCrawler
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from pydantic import BaseModel, Field
class OpenAIModelFee(BaseModel):
model_name: str = Field(..., description="Name of the OpenAI model.")
input_fee: str = Field(..., description="Fee for input token for the OpenAI model.")
output_fee: str = Field(..., description="Fee for output token ßfor the OpenAI model.")
url = 'https://openai.com/api/pricing/'
crawler = WebCrawler()
crawler.warmup()
result = crawler.run(
url=url,
word_count_threshold=1,
extraction_strategy= LLMExtractionStrategy(
provider= "openai/gpt-4o", api_token = os.getenv('OPENAI_API_KEY'),
schema=OpenAIModelFee.schema(),
extraction_type="schema",
instruction="""From the crawled content, extract all mentioned model names along with their fees for input and output tokens.
Do not miss any models in the entire content. One extracted model JSON format should look like this:
{"model_name": "GPT-4", "input_fee": "US$10.00 / 1M tokens", "output_fee": "US$30.00 / 1M tokens"}."""
),
bypass_cache=True,
)
print(result.extracted_content)
```
### Execute JS, Filter Data with CSS Selector, and Clustering
```python
from crawl4ai import WebCrawler
from crawl4ai.chunking_strategy import CosineStrategy
js_code = ["const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"]
crawler = WebCrawler()
crawler.warmup()
result = crawler.run(
url="https://www.nbcnews.com/business",
js=js_code,
css_selector="p",
extraction_strategy=CosineStrategy(semantic_filter="technology")
)
print(result.extracted_content)
```
### Extract Structured Data from Web Pages With Proxy and BaseUrl
```python
from crawl4ai import WebCrawler
from crawl4ai.extraction_strategy import LLMExtractionStrategy
def create_crawler():
crawler = WebCrawler(verbose=True, proxy="http://127.0.0.1:7890")
crawler.warmup()
return crawler
crawler = create_crawler()
crawler.warmup()
result = crawler.run(
url="https://www.nbcnews.com/business",
extraction_strategy=LLMExtractionStrategy(
provider="openai/gpt-4o",
api_token="sk-",
base_url="https://api.openai.com/v1"
)
)
print(result.markdown)
```
## Documentation 📚
For detailed documentation, including installation instructions, advanced features, and API reference, visit our [Documentation Website](https://crawl4ai.com/mkdocs/).
## Contributing 🤝
We welcome contributions from the open-source community. Check out our [contribution guidelines](https://github.com/unclecode/crawl4ai/blob/main/CONTRIBUTING.md) for more information.
## License 📄
Crawl4AI is released under the [Apache 2.0 License](https://github.com/unclecode/crawl4ai/blob/main/LICENSE).
## Contact 📧
For questions, suggestions, or feedback, feel free to reach out:
- GitHub: [unclecode](https://github.com/unclecode)
- Twitter: [@unclecode](https://twitter.com/unclecode)
- Website: [crawl4ai.com](https://crawl4ai.com)
Happy Crawling! 🕸️🚀
## Star History
[![Star History Chart](https://api.star-history.com/svg?repos=unclecode/crawl4ai&type=Date)](https://star-history.com/#unclecode/crawl4ai&Date)

4214
a.md Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,11 @@
# __init__.py
from .async_webcrawler import AsyncWebCrawler, CacheMode
from .async_configs import BrowserConfig, CrawlerRunConfig
from .extraction_strategy import ExtractionStrategy, LLMExtractionStrategy, CosineStrategy, JsonCssExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
from .content_filter_strategy import PruningContentFilter, BM25ContentFilter
from .models import CrawlResult
from .__version__ import __version__
@@ -9,6 +13,17 @@ __all__ = [
"AsyncWebCrawler",
"CrawlResult",
"CacheMode",
'BrowserConfig',
'CrawlerRunConfig',
'ExtractionStrategy',
'LLMExtractionStrategy',
'CosineStrategy',
'JsonCssExtractionStrategy',
'ChunkingStrategy',
'RegexChunking',
'DefaultMarkdownGenerator',
'PruningContentFilter',
'BM25ContentFilter',
]
def is_sync_version_installed():

View File

@@ -1,2 +1,2 @@
# crawl4ai/_version.py
__version__ = "0.4.1"
__version__ = "0.4.2"

402
crawl4ai/async_configs.py Normal file
View File

@@ -0,0 +1,402 @@
from .config import (
MIN_WORD_THRESHOLD,
IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD,
SCREENSHOT_HEIGHT_TRESHOLD,
PAGE_TIMEOUT
)
from .user_agent_generator import UserAgentGenerator
from .extraction_strategy import ExtractionStrategy
from .chunking_strategy import ChunkingStrategy
class BrowserConfig:
"""
Configuration class for setting up a browser instance and its context in AsyncPlaywrightCrawlerStrategy.
This class centralizes all parameters that affect browser and context creation. Instead of passing
scattered keyword arguments, users can instantiate and modify this configuration object. The crawler
code will then reference these settings to initialize the browser in a consistent, documented manner.
Attributes:
browser_type (str): The type of browser to launch. Supported values: "chromium", "firefox", "webkit".
Default: "chromium".
headless (bool): Whether to run the browser in headless mode (no visible GUI).
Default: True.
use_managed_browser (bool): Launch the browser using a managed approach (e.g., via CDP), allowing
advanced manipulation. Default: False.
use_persistent_context (bool): Use a persistent browser context (like a persistent profile).
Automatically sets use_managed_browser=True. Default: False.
user_data_dir (str or None): Path to a user data directory for persistent sessions. If None, a
temporary directory may be used. Default: None.
chrome_channel (str): The Chrome channel to launch (e.g., "chrome", "msedge"). Only applies if browser_type
is "chromium". Default: "chrome".
proxy (str or None): Proxy server URL (e.g., "http://username:password@proxy:port"). If None, no proxy is used.
Default: None.
proxy_config (dict or None): Detailed proxy configuration, e.g. {"server": "...", "username": "..."}.
If None, no additional proxy config. Default: None.
viewport_width (int): Default viewport width for pages. Default: 1920.
viewport_height (int): Default viewport height for pages. Default: 1080.
verbose (bool): Enable verbose logging.
Default: True.
accept_downloads (bool): Whether to allow file downloads. If True, requires a downloads_path.
Default: False.
downloads_path (str or None): Directory to store downloaded files. If None and accept_downloads is True,
a default path will be created. Default: None.
storage_state (str or dict or None): Path or object describing storage state (cookies, localStorage).
Default: None.
ignore_https_errors (bool): Ignore HTTPS certificate errors. Default: True.
java_script_enabled (bool): Enable JavaScript execution in pages. Default: True.
cookies (list): List of cookies to add to the browser context. Each cookie is a dict with fields like
{"name": "...", "value": "...", "url": "..."}.
Default: [].
headers (dict): Extra HTTP headers to apply to all requests in this context.
Default: {}.
user_agent (str): Custom User-Agent string to use. Default: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36".
user_agent_mode (str or None): Mode for generating the user agent (e.g., "random"). If None, use the provided
user_agent as-is. Default: None.
user_agent_generator_config (dict or None): Configuration for user agent generation if user_agent_mode is set.
Default: None.
text_only (bool): If True, disables images and other rich content for potentially faster load times.
Default: False.
light_mode (bool): Disables certain background features for performance gains. Default: False.
extra_args (list): Additional command-line arguments passed to the browser.
Default: [].
"""
def __init__(
self,
browser_type: str = "chromium",
headless: bool = True,
use_managed_browser: bool = False,
use_persistent_context: bool = False,
user_data_dir: str = None,
chrome_channel: str = "chrome",
proxy: str = None,
proxy_config: dict = None,
viewport_width: int = 1920,
viewport_height: int = 1080,
accept_downloads: bool = False,
downloads_path: str = None,
storage_state=None,
ignore_https_errors: bool = True,
java_script_enabled: bool = True,
sleep_on_close: bool = False,
verbose: bool = True,
cookies: list = None,
headers: dict = None,
user_agent: str = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/116.0.5845.187 Safari/604.1 Edg/117.0.2045.47"
),
user_agent_mode: str = None,
user_agent_generator_config: dict = None,
text_only: bool = False,
light_mode: bool = False,
extra_args: list = None,
):
self.browser_type = browser_type
self.headless = headless
self.use_managed_browser = use_managed_browser
self.use_persistent_context = use_persistent_context
self.user_data_dir = user_data_dir
if self.browser_type == "chromium":
self.chrome_channel = "chrome"
elif self.browser_type == "firefox":
self.chrome_channel = "firefox"
elif self.browser_type == "webkit":
self.chrome_channel = "webkit"
else:
self.chrome_channel = chrome_channel or "chrome"
self.proxy = proxy
self.proxy_config = proxy_config
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.accept_downloads = accept_downloads
self.downloads_path = downloads_path
self.storage_state = storage_state
self.ignore_https_errors = ignore_https_errors
self.java_script_enabled = java_script_enabled
self.cookies = cookies if cookies is not None else []
self.headers = headers if headers is not None else {}
self.user_agent = user_agent
self.user_agent_mode = user_agent_mode
self.user_agent_generator_config = user_agent_generator_config
self.text_only = text_only
self.light_mode = light_mode
self.extra_args = extra_args if extra_args is not None else []
self.sleep_on_close = sleep_on_close
self.verbose = verbose
user_agenr_generator = UserAgentGenerator()
if self.user_agent_mode != "random":
self.user_agent = user_agenr_generator.generate(
**(self.user_agent_generator_config or {})
)
self.browser_hint = user_agenr_generator.generate_client_hints(self.user_agent)
self.headers.setdefault("sec-ch-ua", self.browser_hint)
# If persistent context is requested, ensure managed browser is enabled
if self.use_persistent_context:
self.use_managed_browser = True
@staticmethod
def from_kwargs(kwargs: dict) -> "BrowserConfig":
return BrowserConfig(
browser_type=kwargs.get("browser_type", "chromium"),
headless=kwargs.get("headless", True),
use_managed_browser=kwargs.get("use_managed_browser", False),
use_persistent_context=kwargs.get("use_persistent_context", False),
user_data_dir=kwargs.get("user_data_dir"),
chrome_channel=kwargs.get("chrome_channel", "chrome"),
proxy=kwargs.get("proxy"),
proxy_config=kwargs.get("proxy_config"),
viewport_width=kwargs.get("viewport_width", 1920),
viewport_height=kwargs.get("viewport_height", 1080),
accept_downloads=kwargs.get("accept_downloads", False),
downloads_path=kwargs.get("downloads_path"),
storage_state=kwargs.get("storage_state"),
ignore_https_errors=kwargs.get("ignore_https_errors", True),
java_script_enabled=kwargs.get("java_script_enabled", True),
cookies=kwargs.get("cookies", []),
headers=kwargs.get("headers", {}),
user_agent=kwargs.get("user_agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
),
user_agent_mode=kwargs.get("user_agent_mode"),
user_agent_generator_config=kwargs.get("user_agent_generator_config"),
text_only=kwargs.get("text_only", False),
light_mode=kwargs.get("light_mode", False),
extra_args=kwargs.get("extra_args", [])
)
class CrawlerRunConfig:
"""
Configuration class for controlling how the crawler runs each crawl operation.
This includes parameters for content extraction, page manipulation, waiting conditions,
caching, and other runtime behaviors.
This centralizes parameters that were previously scattered as kwargs to `arun()` and related methods.
By using this class, you have a single place to understand and adjust the crawling options.
Attributes:
word_count_threshold (int): Minimum word count threshold before processing content.
Default: MIN_WORD_THRESHOLD (typically 200).
extraction_strategy (ExtractionStrategy or None): Strategy to extract structured data from crawled pages.
Default: None (NoExtractionStrategy is used if None).
chunking_strategy (ChunkingStrategy): Strategy to chunk content before extraction.
Default: RegexChunking().
content_filter (RelevantContentFilter or None): Optional filter to prune irrelevant content.
Default: None.
cache_mode (CacheMode or None): Defines how caching is handled.
If None, defaults to CacheMode.ENABLED internally.
Default: None.
session_id (str or None): Optional session ID to persist the browser context and the created
page instance. If the ID already exists, the crawler does not
create a new page and uses the current page to preserve the state;
if not, it creates a new page and context then stores it in
memory with the given session ID.
bypass_cache (bool): Legacy parameter, if True acts like CacheMode.BYPASS.
Default: False.
disable_cache (bool): Legacy parameter, if True acts like CacheMode.DISABLED.
Default: False.
no_cache_read (bool): Legacy parameter, if True acts like CacheMode.WRITE_ONLY.
Default: False.
no_cache_write (bool): Legacy parameter, if True acts like CacheMode.READ_ONLY.
Default: False.
css_selector (str or None): CSS selector to extract a specific portion of the page.
Default: None.
screenshot (bool): Whether to take a screenshot after crawling.
Default: False.
pdf (bool): Whether to generate a PDF of the page.
Default: False.
verbose (bool): Enable verbose logging.
Default: True.
only_text (bool): If True, attempt to extract text-only content where applicable.
Default: False.
image_description_min_word_threshold (int): Minimum words for image description extraction.
Default: IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD (e.g., 50).
prettiify (bool): If True, apply `fast_format_html` to produce prettified HTML output.
Default: False.
js_code (str or list of str or None): JavaScript code/snippets to run on the page.
Default: None.
wait_for (str or None): A CSS selector or JS condition to wait for before extracting content.
Default: None.
js_only (bool): If True, indicates subsequent calls are JS-driven updates, not full page loads.
Default: False.
wait_until (str): The condition to wait for when navigating, e.g. "domcontentloaded".
Default: "domcontentloaded".
page_timeout (int): Timeout in ms for page operations like navigation.
Default: 60000 (60 seconds).
ignore_body_visibility (bool): If True, ignore whether the body is visible before proceeding.
Default: True.
wait_for_images (bool): If True, wait for images to load before extracting content.
Default: True.
adjust_viewport_to_content (bool): If True, adjust viewport according to the page content dimensions.
Default: False.
scan_full_page (bool): If True, scroll through the entire page to load all content.
Default: False.
scroll_delay (float): Delay in seconds between scroll steps if scan_full_page is True.
Default: 0.2.
process_iframes (bool): If True, attempts to process and inline iframe content.
Default: False.
remove_overlay_elements (bool): If True, remove overlays/popups before extracting HTML.
Default: False.
delay_before_return_html (float): Delay in seconds before retrieving final HTML.
Default: 0.1.
log_console (bool): If True, log console messages from the page.
Default: False.
simulate_user (bool): If True, simulate user interactions (mouse moves, clicks) for anti-bot measures.
Default: False.
override_navigator (bool): If True, overrides navigator properties for more human-like behavior.
Default: False.
magic (bool): If True, attempts automatic handling of overlays/popups.
Default: False.
screenshot_wait_for (float or None): Additional wait time before taking a screenshot.
Default: None.
screenshot_height_threshold (int): Threshold for page height to decide screenshot strategy.
Default: SCREENSHOT_HEIGHT_TRESHOLD (from config, e.g. 20000).
mean_delay (float): Mean base delay between requests when calling arun_many.
Default: 0.1.
max_range (float): Max random additional delay range for requests in arun_many.
Default: 0.3.
# session_id and semaphore_count might be set at runtime, not needed as defaults here.
"""
def __init__(
self,
word_count_threshold: int = MIN_WORD_THRESHOLD ,
extraction_strategy : ExtractionStrategy=None, # Will default to NoExtractionStrategy if None
chunking_strategy : ChunkingStrategy= None, # Will default to RegexChunking if None
content_filter=None,
cache_mode=None,
session_id: str = None,
bypass_cache: bool = False,
disable_cache: bool = False,
no_cache_read: bool = False,
no_cache_write: bool = False,
css_selector: str = None,
screenshot: bool = False,
pdf: bool = False,
verbose: bool = True,
only_text: bool = False,
image_description_min_word_threshold: int = IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD,
prettiify: bool = False,
js_code=None,
wait_for: str = None,
js_only: bool = False,
wait_until: str = "domcontentloaded",
page_timeout: int = PAGE_TIMEOUT,
ignore_body_visibility: bool = True,
wait_for_images: bool = True,
adjust_viewport_to_content: bool = False,
scan_full_page: bool = False,
scroll_delay: float = 0.2,
process_iframes: bool = False,
remove_overlay_elements: bool = False,
delay_before_return_html: float = 0.1,
log_console: bool = False,
simulate_user: bool = False,
override_navigator: bool = False,
magic: bool = False,
screenshot_wait_for: float = None,
screenshot_height_threshold: int = SCREENSHOT_HEIGHT_TRESHOLD,
mean_delay: float = 0.1,
max_range: float = 0.3,
semaphore_count: int = 5,
):
self.word_count_threshold = word_count_threshold
self.extraction_strategy = extraction_strategy
self.chunking_strategy = chunking_strategy
self.content_filter = content_filter
self.cache_mode = cache_mode
self.session_id = session_id
self.bypass_cache = bypass_cache
self.disable_cache = disable_cache
self.no_cache_read = no_cache_read
self.no_cache_write = no_cache_write
self.css_selector = css_selector
self.screenshot = screenshot
self.pdf = pdf
self.verbose = verbose
self.only_text = only_text
self.image_description_min_word_threshold = image_description_min_word_threshold
self.prettiify = prettiify
self.js_code = js_code
self.wait_for = wait_for
self.js_only = js_only
self.wait_until = wait_until
self.page_timeout = page_timeout
self.ignore_body_visibility = ignore_body_visibility
self.wait_for_images = wait_for_images
self.adjust_viewport_to_content = adjust_viewport_to_content
self.scan_full_page = scan_full_page
self.scroll_delay = scroll_delay
self.process_iframes = process_iframes
self.remove_overlay_elements = remove_overlay_elements
self.delay_before_return_html = delay_before_return_html
self.log_console = log_console
self.simulate_user = simulate_user
self.override_navigator = override_navigator
self.magic = magic
self.screenshot_wait_for = screenshot_wait_for
self.screenshot_height_threshold = screenshot_height_threshold
self.mean_delay = mean_delay
self.max_range = max_range
self.semaphore_count = semaphore_count
# Validate type of extraction strategy and chunking strategy if they are provided
if self.extraction_strategy is not None and not isinstance(self.extraction_strategy, ExtractionStrategy):
raise ValueError("extraction_strategy must be an instance of ExtractionStrategy")
if self.chunking_strategy is not None and not isinstance(self.chunking_strategy, ChunkingStrategy):
raise ValueError("chunking_strategy must be an instance of ChunkingStrategy")
# Set default chunking strategy if None
if self.chunking_strategy is None:
from .chunking_strategy import RegexChunking
self.chunking_strategy = RegexChunking()
@staticmethod
def from_kwargs(kwargs: dict) -> "CrawlerRunConfig":
return CrawlerRunConfig(
word_count_threshold=kwargs.get("word_count_threshold", 200),
extraction_strategy=kwargs.get("extraction_strategy"),
chunking_strategy=kwargs.get("chunking_strategy"),
content_filter=kwargs.get("content_filter"),
cache_mode=kwargs.get("cache_mode"),
session_id=kwargs.get("session_id"),
bypass_cache=kwargs.get("bypass_cache", False),
disable_cache=kwargs.get("disable_cache", False),
no_cache_read=kwargs.get("no_cache_read", False),
no_cache_write=kwargs.get("no_cache_write", False),
css_selector=kwargs.get("css_selector"),
screenshot=kwargs.get("screenshot", False),
pdf=kwargs.get("pdf", False),
verbose=kwargs.get("verbose", True),
only_text=kwargs.get("only_text", False),
image_description_min_word_threshold=kwargs.get("image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD),
prettiify=kwargs.get("prettiify", False),
js_code=kwargs.get("js_code"), # If not provided here, will default inside constructor
wait_for=kwargs.get("wait_for"),
js_only=kwargs.get("js_only", False),
wait_until=kwargs.get("wait_until", "domcontentloaded"),
page_timeout=kwargs.get("page_timeout", 60000),
ignore_body_visibility=kwargs.get("ignore_body_visibility", True),
adjust_viewport_to_content=kwargs.get("adjust_viewport_to_content", False),
scan_full_page=kwargs.get("scan_full_page", False),
scroll_delay=kwargs.get("scroll_delay", 0.2),
process_iframes=kwargs.get("process_iframes", False),
remove_overlay_elements=kwargs.get("remove_overlay_elements", False),
delay_before_return_html=kwargs.get("delay_before_return_html", 0.1),
log_console=kwargs.get("log_console", False),
simulate_user=kwargs.get("simulate_user", False),
override_navigator=kwargs.get("override_navigator", False),
magic=kwargs.get("magic", False),
screenshot_wait_for=kwargs.get("screenshot_wait_for"),
screenshot_height_threshold=kwargs.get("screenshot_height_threshold", 20000),
mean_delay=kwargs.get("mean_delay", 0.1),
max_range=kwargs.get("max_range", 0.3),
semaphore_count=kwargs.get("semaphore_count", 5)
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,4 @@
import os
import os, sys
from pathlib import Path
import aiosqlite
import asyncio
@@ -13,6 +13,7 @@ import aiofiles
from .config import NEED_MIGRATION
from .version_manager import VersionManager
from .async_logger import AsyncLogger
from .utils import get_error_context, create_box_message
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -97,35 +98,84 @@ class AsyncDatabaseManager:
@asynccontextmanager
async def get_connection(self):
"""Connection pool manager"""
"""Connection pool manager with enhanced error handling"""
if not self._initialized:
# Use an asyncio.Lock to ensure only one initialization occurs
async with self.init_lock:
if not self._initialized:
await self.initialize()
self._initialized = True
try:
await self.initialize()
self._initialized = True
except Exception as e:
import sys
error_context = get_error_context(sys.exc_info())
self.logger.error(
message="Database initialization failed:\n{error}\n\nContext:\n{context}\n\nTraceback:\n{traceback}",
tag="ERROR",
force_verbose=True,
params={
"error": str(e),
"context": error_context["code_context"],
"traceback": error_context["full_traceback"]
}
)
raise
await self.connection_semaphore.acquire()
task_id = id(asyncio.current_task())
try:
async with self.pool_lock:
if task_id not in self.connection_pool:
conn = await aiosqlite.connect(
self.db_path,
timeout=30.0
)
await conn.execute('PRAGMA journal_mode = WAL')
await conn.execute('PRAGMA busy_timeout = 5000')
self.connection_pool[task_id] = conn
try:
conn = await aiosqlite.connect(
self.db_path,
timeout=30.0
)
await conn.execute('PRAGMA journal_mode = WAL')
await conn.execute('PRAGMA busy_timeout = 5000')
# Verify database structure
async with conn.execute("PRAGMA table_info(crawled_data)") as cursor:
columns = await cursor.fetchall()
column_names = [col[1] for col in columns]
expected_columns = {
'url', 'html', 'cleaned_html', 'markdown', 'extracted_content',
'success', 'media', 'links', 'metadata', 'screenshot',
'response_headers', 'downloaded_files'
}
missing_columns = expected_columns - set(column_names)
if missing_columns:
raise ValueError(f"Database missing columns: {missing_columns}")
self.connection_pool[task_id] = conn
except Exception as e:
import sys
error_context = get_error_context(sys.exc_info())
error_message = (
f"Unexpected error in db get_connection at line {error_context['line_no']} "
f"in {error_context['function']} ({error_context['filename']}):\n"
f"Error: {str(e)}\n\n"
f"Code context:\n{error_context['code_context']}"
)
self.logger.error(
message=create_box_message(error_message, type= "error"),
)
raise
yield self.connection_pool[task_id]
except Exception as e:
import sys
error_context = get_error_context(sys.exc_info())
error_message = (
f"Unexpected error in db get_connection at line {error_context['line_no']} "
f"in {error_context['function']} ({error_context['filename']}):\n"
f"Error: {str(e)}\n\n"
f"Code context:\n{error_context['code_context']}"
)
self.logger.error(
message="Connection error: {error}",
tag="ERROR",
force_verbose=True,
params={"error": str(e)}
message=create_box_message(error_message, type= "error"),
)
raise
finally:
@@ -230,7 +280,8 @@ class AsyncDatabaseManager:
'cleaned_html': row_dict['cleaned_html'],
'markdown': row_dict['markdown'],
'extracted_content': row_dict['extracted_content'],
'screenshot': row_dict['screenshot']
'screenshot': row_dict['screenshot'],
'screenshots': row_dict['screenshot'],
}
for field, hash_value in content_fields.items():

183
crawl4ai/async_tools.py Normal file
View File

@@ -0,0 +1,183 @@
import asyncio
import base64
import time
from abc import ABC, abstractmethod
from typing import Callable, Dict, Any, List, Optional, Awaitable
import os, sys, shutil
import tempfile, subprocess
from playwright.async_api import async_playwright, Page, Browser, Error
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path
from playwright.async_api import ProxySettings
from pydantic import BaseModel
import hashlib
import json
import uuid
from .models import AsyncCrawlResponse
from .utils import create_box_message
from .user_agent_generator import UserAgentGenerator
from playwright_stealth import StealthConfig, stealth_async
class ManagedBrowser:
def __init__(self, browser_type: str = "chromium", user_data_dir: Optional[str] = None, headless: bool = False, logger = None, host: str = "localhost", debugging_port: int = 9222):
self.browser_type = browser_type
self.user_data_dir = user_data_dir
self.headless = headless
self.browser_process = None
self.temp_dir = None
self.debugging_port = debugging_port
self.host = host
self.logger = logger
self.shutting_down = False
async def start(self) -> str:
"""
Starts the browser process and returns the CDP endpoint URL.
If user_data_dir is not provided, creates a temporary directory.
"""
# Create temp dir if needed
if not self.user_data_dir:
self.temp_dir = tempfile.mkdtemp(prefix="browser-profile-")
self.user_data_dir = self.temp_dir
# Get browser path and args based on OS and browser type
browser_path = self._get_browser_path()
args = self._get_browser_args()
# Start browser process
try:
self.browser_process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# Monitor browser process output for errors
asyncio.create_task(self._monitor_browser_process())
await asyncio.sleep(2) # Give browser time to start
return f"http://{self.host}:{self.debugging_port}"
except Exception as e:
await self.cleanup()
raise Exception(f"Failed to start browser: {e}")
async def _monitor_browser_process(self):
"""Monitor the browser process for unexpected termination."""
if self.browser_process:
try:
stdout, stderr = await asyncio.gather(
asyncio.to_thread(self.browser_process.stdout.read),
asyncio.to_thread(self.browser_process.stderr.read)
)
# Check shutting_down flag BEFORE logging anything
if self.browser_process.poll() is not None:
if not self.shutting_down:
self.logger.error(
message="Browser process terminated unexpectedly | Code: {code} | STDOUT: {stdout} | STDERR: {stderr}",
tag="ERROR",
params={
"code": self.browser_process.returncode,
"stdout": stdout.decode(),
"stderr": stderr.decode()
}
)
await self.cleanup()
else:
self.logger.info(
message="Browser process terminated normally | Code: {code}",
tag="INFO",
params={"code": self.browser_process.returncode}
)
except Exception as e:
if not self.shutting_down:
self.logger.error(
message="Error monitoring browser process: {error}",
tag="ERROR",
params={"error": str(e)}
)
def _get_browser_path(self) -> str:
"""Returns the browser executable path based on OS and browser type"""
if sys.platform == "darwin": # macOS
paths = {
"chromium": "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"firefox": "/Applications/Firefox.app/Contents/MacOS/firefox",
"webkit": "/Applications/Safari.app/Contents/MacOS/Safari"
}
elif sys.platform == "win32": # Windows
paths = {
"chromium": "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
"firefox": "C:\\Program Files\\Mozilla Firefox\\firefox.exe",
"webkit": None # WebKit not supported on Windows
}
else: # Linux
paths = {
"chromium": "google-chrome",
"firefox": "firefox",
"webkit": None # WebKit not supported on Linux
}
return paths.get(self.browser_type)
def _get_browser_args(self) -> List[str]:
"""Returns browser-specific command line arguments"""
base_args = [self._get_browser_path()]
if self.browser_type == "chromium":
args = [
f"--remote-debugging-port={self.debugging_port}",
f"--user-data-dir={self.user_data_dir}",
]
if self.headless:
args.append("--headless=new")
elif self.browser_type == "firefox":
args = [
"--remote-debugging-port", str(self.debugging_port),
"--profile", self.user_data_dir,
]
if self.headless:
args.append("--headless")
else:
raise NotImplementedError(f"Browser type {self.browser_type} not supported")
return base_args + args
async def cleanup(self):
"""Cleanup browser process and temporary directory"""
# Set shutting_down flag BEFORE any termination actions
self.shutting_down = True
if self.browser_process:
try:
self.browser_process.terminate()
# Wait for process to end gracefully
for _ in range(10): # 10 attempts, 100ms each
if self.browser_process.poll() is not None:
break
await asyncio.sleep(0.1)
# Force kill if still running
if self.browser_process.poll() is None:
self.browser_process.kill()
await asyncio.sleep(0.1) # Brief wait for kill to take effect
except Exception as e:
self.logger.error(
message="Error terminating browser: {error}",
tag="ERROR",
params={"error": str(e)}
)
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
except Exception as e:
self.logger.error(
message="Error removing temporary directory: {error}",
tag="ERROR",
params={"error": str(e)}
)

File diff suppressed because it is too large Load Diff

View File

@@ -56,4 +56,7 @@ MAX_METRICS_HISTORY = 1000
NEED_MIGRATION = True
URL_LOG_SHORTEN_LENGTH = 30
SHOW_DEPRECATION_WARNINGS = True
SHOW_DEPRECATION_WARNINGS = True
SCREENSHOT_HEIGHT_TRESHOLD = 10000
PAGE_TIMEOUT=60000
DOWNLOAD_PAGE_TIMEOUT=60000

View File

@@ -14,15 +14,11 @@ from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#,
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .models import MarkdownGenerationResult
from .utils import (
sanitize_input_encode,
sanitize_html,
extract_metadata,
InvalidCSSSelectorError,
CustomHTML2Text,
normalize_url,
is_external_url
)
from .tools import profile_and_time
# Pre-compile regular expressions for Open Graph and Twitter metadata
OG_REGEX = re.compile(r'^og:')
@@ -76,10 +72,10 @@ class WebScrapingStrategy(ContentScrapingStrategy):
log_method(message=message, tag=tag, **kwargs)
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
return self._get_content_of_website_optimized(url, html, is_async=False, **kwargs)
return self._scrap(url, html, is_async=False, **kwargs)
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
return await asyncio.to_thread(self._get_content_of_website_optimized, url, html, **kwargs)
return await asyncio.to_thread(self._scrap, url, html, **kwargs)
def _generate_markdown_content(self,
cleaned_html: str,
@@ -103,8 +99,6 @@ class WebScrapingStrategy(ContentScrapingStrategy):
html2text_options=kwargs.get('html2text', {})
)
help_message = """"""
return {
'markdown': markdown_result.raw_markdown,
'fit_markdown': markdown_result.fit_markdown,
@@ -126,38 +120,40 @@ class WebScrapingStrategy(ContentScrapingStrategy):
}
# Legacy method
h = CustomHTML2Text()
h.update_params(**kwargs.get('html2text', {}))
markdown = h.handle(cleaned_html)
markdown = markdown.replace(' ```', '```')
"""
# h = CustomHTML2Text()
# h.update_params(**kwargs.get('html2text', {}))
# markdown = h.handle(cleaned_html)
# markdown = markdown.replace(' ```', '```')
fit_markdown = "Set flag 'fit_markdown' to True to get cleaned HTML content."
fit_html = "Set flag 'fit_markdown' to True to get cleaned HTML content."
# fit_markdown = "Set flag 'fit_markdown' to True to get cleaned HTML content."
# fit_html = "Set flag 'fit_markdown' to True to get cleaned HTML content."
if kwargs.get('content_filter', None) or kwargs.get('fit_markdown', False):
content_filter = kwargs.get('content_filter', None)
if not content_filter:
content_filter = BM25ContentFilter(
user_query=kwargs.get('fit_markdown_user_query', None),
bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0)
)
fit_html = content_filter.filter_content(html)
fit_html = '\n'.join('<div>{}</div>'.format(s) for s in fit_html)
fit_markdown = h.handle(fit_html)
# if kwargs.get('content_filter', None) or kwargs.get('fit_markdown', False):
# content_filter = kwargs.get('content_filter', None)
# if not content_filter:
# content_filter = BM25ContentFilter(
# user_query=kwargs.get('fit_markdown_user_query', None),
# bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0)
# )
# fit_html = content_filter.filter_content(html)
# fit_html = '\n'.join('<div>{}</div>'.format(s) for s in fit_html)
# fit_markdown = h.handle(fit_html)
markdown_v2 = MarkdownGenerationResult(
raw_markdown=markdown,
markdown_with_citations=markdown,
references_markdown=markdown,
fit_markdown=fit_markdown
)
# markdown_v2 = MarkdownGenerationResult(
# raw_markdown=markdown,
# markdown_with_citations=markdown,
# references_markdown=markdown,
# fit_markdown=fit_markdown
# )
return {
'markdown': markdown,
'fit_markdown': fit_markdown,
'fit_html': fit_html,
'markdown_v2' : markdown_v2
}
# return {
# 'markdown': markdown,
# 'fit_markdown': fit_markdown,
# 'fit_html': fit_html,
# 'markdown_v2' : markdown_v2
# }
"""
def flatten_nested_elements(self, node):
if isinstance(node, NavigableString):
@@ -483,7 +479,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
)
return False
def _get_content_of_website_optimized(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]:
def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]:
success = True
if not html:
return None

View File

@@ -634,7 +634,7 @@ class ContentSummarizationStrategy(ExtractionStrategy):
# Sort summaries by the original section index to maintain order
summaries.sort(key=lambda x: x[0])
return [summary for _, summary in summaries]
class JsonCssExtractionStrategy(ExtractionStrategy):
def __init__(self, schema: Dict[str, Any], **kwargs):
super().__init__(**kwargs)

View File

@@ -1006,10 +1006,136 @@ class HTML2Text(html.parser.HTMLParser):
newlines += 1
return result
def html2text(html: str, baseurl: str = "", bodywidth: Optional[int] = None) -> str:
if bodywidth is None:
bodywidth = config.BODY_WIDTH
h = HTML2Text(baseurl=baseurl, bodywidth=bodywidth)
return h.handle(html)
class CustomHTML2Text(HTML2Text):
def __init__(self, *args, handle_code_in_pre=False, **kwargs):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
self.preserve_tags = set() # Set of tags to preserve
self.current_preserved_tag = None
self.preserved_content = []
self.preserve_depth = 0
self.handle_code_in_pre = handle_code_in_pre
# Configuration options
self.skip_internal_links = False
self.single_line_break = False
self.mark_code = False
self.include_sup_sub = False
self.body_width = 0
self.ignore_mailto_links = True
self.ignore_links = False
self.escape_backslash = False
self.escape_dot = False
self.escape_plus = False
self.escape_dash = False
self.escape_snob = False
def update_params(self, **kwargs):
"""Update parameters and set preserved tags."""
for key, value in kwargs.items():
if key == 'preserve_tags':
self.preserve_tags = set(value)
elif key == 'handle_code_in_pre':
self.handle_code_in_pre = value
else:
setattr(self, key, value)
def handle_tag(self, tag, attrs, start):
# Handle preserved tags
if tag in self.preserve_tags:
if start:
if self.preserve_depth == 0:
self.current_preserved_tag = tag
self.preserved_content = []
# Format opening tag with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
self.preserve_depth += 1
return
else:
self.preserve_depth -= 1
if self.preserve_depth == 0:
self.preserved_content.append(f'</{tag}>')
# Output the preserved HTML block with proper spacing
preserved_html = ''.join(self.preserved_content)
self.o('\n' + preserved_html + '\n')
self.current_preserved_tag = None
return
# If we're inside a preserved tag, collect all content
if self.preserve_depth > 0:
if start:
# Format nested tags with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
else:
self.preserved_content.append(f'</{tag}>')
return
# Handle pre tags
if tag == 'pre':
if start:
self.o('```\n') # Markdown code block start
self.inside_pre = True
else:
self.o('\n```\n') # Markdown code block end
self.inside_pre = False
elif tag == 'code':
if self.inside_pre and not self.handle_code_in_pre:
# Ignore code tags inside pre blocks if handle_code_in_pre is False
return
if start:
self.o('`') # Markdown inline code start
self.inside_code = True
else:
self.o('`') # Markdown inline code end
self.inside_code = False
else:
super().handle_tag(tag, attrs, start)
def handle_data(self, data, entity_char=False):
"""Override handle_data to capture content within preserved tags."""
if self.preserve_depth > 0:
self.preserved_content.append(data)
return
if self.inside_pre:
# Output the raw content for pre blocks, including content inside code tags
self.o(data) # Directly output the data as-is (preserve newlines)
return
if self.inside_code:
# Inline code: no newlines allowed
self.o(data.replace('\n', ' '))
return
# Default behavior for other tags
super().handle_data(data, entity_char)
# # Handle pre tags
# if tag == 'pre':
# if start:
# self.o('```\n')
# self.inside_pre = True
# else:
# self.o('\n```')
# self.inside_pre = False
# # elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# # pass
# else:
# super().handle_tag(tag, attrs, start)
# def handle_data(self, data, entity_char=False):
# """Override handle_data to capture content within preserved tags."""
# if self.preserve_depth > 0:
# self.preserved_content.append(data)
# return
# super().handle_data(data, entity_char)

View File

@@ -0,0 +1,15 @@
import os, sys
# Create a function get name of a js script, then load from the CURRENT folder of this script and return its content as string, make sure its error free
def load_js_script(script_name):
# Get the path of the current script
current_script_path = os.path.dirname(os.path.realpath(__file__))
# Get the path of the script to load
script_path = os.path.join(current_script_path, script_name + '.js')
# Check if the script exists
if not os.path.exists(script_path):
raise ValueError(f"Script {script_name} not found in the folder {current_script_path}")
# Load the content of the script
with open(script_path, 'r') as f:
script_content = f.read()
return script_content

View File

@@ -0,0 +1,25 @@
// Pass the Permissions Test.
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) =>
parameters.name === "notifications"
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters);
Object.defineProperty(navigator, "webdriver", {
get: () => undefined,
});
window.navigator.chrome = {
runtime: {},
// Add other properties if necessary
};
Object.defineProperty(navigator, "plugins", {
get: () => [1, 2, 3, 4, 5],
});
Object.defineProperty(navigator, "languages", {
get: () => ["en-US", "en"],
});
Object.defineProperty(document, "hidden", {
get: () => false,
});
Object.defineProperty(document, "visibilityState", {
get: () => "visible",
});

View File

@@ -0,0 +1,119 @@
async () => {
// Function to check if element is visible
const isVisible = (elem) => {
const style = window.getComputedStyle(elem);
return style.display !== "none" && style.visibility !== "hidden" && style.opacity !== "0";
};
// Common selectors for popups and overlays
const commonSelectors = [
// Close buttons first
'button[class*="close" i]',
'button[class*="dismiss" i]',
'button[aria-label*="close" i]',
'button[title*="close" i]',
'a[class*="close" i]',
'span[class*="close" i]',
// Cookie notices
'[class*="cookie-banner" i]',
'[id*="cookie-banner" i]',
'[class*="cookie-consent" i]',
'[id*="cookie-consent" i]',
// Newsletter/subscription dialogs
'[class*="newsletter" i]',
'[class*="subscribe" i]',
// Generic popups/modals
'[class*="popup" i]',
'[class*="modal" i]',
'[class*="overlay" i]',
'[class*="dialog" i]',
'[role="dialog"]',
'[role="alertdialog"]',
];
// Try to click close buttons first
for (const selector of commonSelectors.slice(0, 6)) {
const closeButtons = document.querySelectorAll(selector);
for (const button of closeButtons) {
if (isVisible(button)) {
try {
button.click();
await new Promise((resolve) => setTimeout(resolve, 100));
} catch (e) {
console.log("Error clicking button:", e);
}
}
}
}
// Remove remaining overlay elements
const removeOverlays = () => {
// Find elements with high z-index
const allElements = document.querySelectorAll("*");
for (const elem of allElements) {
const style = window.getComputedStyle(elem);
const zIndex = parseInt(style.zIndex);
const position = style.position;
if (
isVisible(elem) &&
(zIndex > 999 || position === "fixed" || position === "absolute") &&
(elem.offsetWidth > window.innerWidth * 0.5 ||
elem.offsetHeight > window.innerHeight * 0.5 ||
style.backgroundColor.includes("rgba") ||
parseFloat(style.opacity) < 1)
) {
elem.remove();
}
}
// Remove elements matching common selectors
for (const selector of commonSelectors) {
const elements = document.querySelectorAll(selector);
elements.forEach((elem) => {
if (isVisible(elem)) {
elem.remove();
}
});
}
};
// Remove overlay elements
removeOverlays();
// Remove any fixed/sticky position elements at the top/bottom
const removeFixedElements = () => {
const elements = document.querySelectorAll("*");
elements.forEach((elem) => {
const style = window.getComputedStyle(elem);
if ((style.position === "fixed" || style.position === "sticky") && isVisible(elem)) {
elem.remove();
}
});
};
removeFixedElements();
// Remove empty block elements as: div, p, span, etc.
const removeEmptyBlockElements = () => {
const blockElements = document.querySelectorAll(
"div, p, span, section, article, header, footer, aside, nav, main, ul, ol, li, dl, dt, dd, h1, h2, h3, h4, h5, h6"
);
blockElements.forEach((elem) => {
if (elem.innerText.trim() === "") {
elem.remove();
}
});
};
// Remove margin-right and padding-right from body (often added by modal scripts)
document.body.style.marginRight = "0px";
document.body.style.paddingRight = "0px";
document.body.style.overflow = "auto";
// Wait a bit for any animations to complete
await new Promise((resolve) => setTimeout(resolve, 100));
};

View File

@@ -0,0 +1,54 @@
() => {
return new Promise((resolve) => {
const filterImage = (img) => {
// Filter out images that are too small
if (img.width < 100 && img.height < 100) return false;
// Filter out images that are not visible
const rect = img.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
// Filter out images with certain class names (e.g., icons, thumbnails)
if (img.classList.contains("icon") || img.classList.contains("thumbnail")) return false;
// Filter out images with certain patterns in their src (e.g., placeholder images)
if (img.src.includes("placeholder") || img.src.includes("icon")) return false;
return true;
};
const images = Array.from(document.querySelectorAll("img")).filter(filterImage);
let imagesLeft = images.length;
if (imagesLeft === 0) {
resolve();
return;
}
const checkImage = (img) => {
if (img.complete && img.naturalWidth !== 0) {
img.setAttribute("width", img.naturalWidth);
img.setAttribute("height", img.naturalHeight);
imagesLeft--;
if (imagesLeft === 0) resolve();
}
};
images.forEach((img) => {
checkImage(img);
if (!img.complete) {
img.onload = () => {
checkImage(img);
};
img.onerror = () => {
imagesLeft--;
if (imagesLeft === 0) resolve();
};
}
});
// Fallback timeout of 5 seconds
// setTimeout(() => resolve(), 5000);
resolve();
});
};

View File

@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, Tuple
from .models import MarkdownGenerationResult
from .utils import CustomHTML2Text
from .html2text import CustomHTML2Text
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter
import re
from urllib.parse import urljoin
@@ -9,6 +9,17 @@ from urllib.parse import urljoin
# Pre-compile the regex pattern
LINK_PATTERN = re.compile(r'!?\[([^\]]+)\]\(([^)]+?)(?:\s+"([^"]*)")?\)')
def fast_urljoin(base: str, url: str) -> str:
"""Fast URL joining for common cases."""
if url.startswith(('http://', 'https://', 'mailto:', '//')):
return url
if url.startswith('/'):
# Handle absolute paths
if base.endswith('/'):
return base[:-1] + url
return base + url
return urljoin(base, url)
class MarkdownGenerationStrategy(ABC):
"""Abstract base class for markdown generation strategies."""
def __init__(self, content_filter: Optional[RelevantContentFilter] = None, options: Optional[Dict[str, Any]] = None):
@@ -118,13 +129,3 @@ class DefaultMarkdownGenerator(MarkdownGenerationStrategy):
fit_html=filtered_html,
)
def fast_urljoin(base: str, url: str) -> str:
"""Fast URL joining for common cases."""
if url.startswith(('http://', 'https://', 'mailto:', '//')):
return url
if url.startswith('/'):
# Handle absolute paths
if base.endswith('/'):
return base[:-1] + url
return base + url
return urljoin(base, url)

View File

@@ -23,6 +23,7 @@ class CrawlResult(BaseModel):
links: Dict[str, List[Dict]] = {}
downloaded_files: Optional[List[str]] = None
screenshot: Optional[str] = None
pdf : Optional[bytes] = None
markdown: Optional[Union[str, MarkdownGenerationResult]] = None
markdown_v2: Optional[MarkdownGenerationResult] = None
fit_markdown: Optional[str] = None
@@ -39,6 +40,7 @@ class AsyncCrawlResponse(BaseModel):
response_headers: Dict[str, str]
status_code: int
screenshot: Optional[str] = None
pdf_data: Optional[bytes] = None
get_delayed_content: Optional[Callable[[Optional[float]], Awaitable[str]]] = None
downloaded_files: Optional[List[str]] = None

View File

@@ -1,34 +0,0 @@
import time
import cProfile
import pstats
from functools import wraps
def profile_and_time(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Start timer
start_time = time.perf_counter()
# Setup profiler
profiler = cProfile.Profile()
profiler.enable()
# Run function
result = func(self, *args, **kwargs)
# Stop profiler
profiler.disable()
# Calculate elapsed time
elapsed_time = time.perf_counter() - start_time
# Print timing
print(f"[PROFILER] Scraping completed in {elapsed_time:.2f} seconds")
# Print profiling stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative') # Sort by cumulative time
stats.print_stats(20) # Print top 20 time-consuming functions
return result
return wrapper

View File

@@ -19,142 +19,17 @@ from typing import Optional, Tuple, Dict, Any
import xxhash
from colorama import Fore, Style, init
import textwrap
import cProfile
import pstats
from functools import wraps
from .html2text import HTML2Text
class CustomHTML2Text(HTML2Text):
def __init__(self, *args, handle_code_in_pre=False, **kwargs):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
self.preserve_tags = set() # Set of tags to preserve
self.current_preserved_tag = None
self.preserved_content = []
self.preserve_depth = 0
self.handle_code_in_pre = handle_code_in_pre
# Configuration options
self.skip_internal_links = False
self.single_line_break = False
self.mark_code = False
self.include_sup_sub = False
self.body_width = 0
self.ignore_mailto_links = True
self.ignore_links = False
self.escape_backslash = False
self.escape_dot = False
self.escape_plus = False
self.escape_dash = False
self.escape_snob = False
def update_params(self, **kwargs):
"""Update parameters and set preserved tags."""
for key, value in kwargs.items():
if key == 'preserve_tags':
self.preserve_tags = set(value)
elif key == 'handle_code_in_pre':
self.handle_code_in_pre = value
else:
setattr(self, key, value)
def handle_tag(self, tag, attrs, start):
# Handle preserved tags
if tag in self.preserve_tags:
if start:
if self.preserve_depth == 0:
self.current_preserved_tag = tag
self.preserved_content = []
# Format opening tag with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
self.preserve_depth += 1
return
else:
self.preserve_depth -= 1
if self.preserve_depth == 0:
self.preserved_content.append(f'</{tag}>')
# Output the preserved HTML block with proper spacing
preserved_html = ''.join(self.preserved_content)
self.o('\n' + preserved_html + '\n')
self.current_preserved_tag = None
return
# If we're inside a preserved tag, collect all content
if self.preserve_depth > 0:
if start:
# Format nested tags with attributes
attr_str = ''.join(f' {k}="{v}"' for k, v in attrs.items() if v is not None)
self.preserved_content.append(f'<{tag}{attr_str}>')
else:
self.preserved_content.append(f'</{tag}>')
return
# Handle pre tags
if tag == 'pre':
if start:
self.o('```\n') # Markdown code block start
self.inside_pre = True
else:
self.o('\n```\n') # Markdown code block end
self.inside_pre = False
elif tag == 'code':
if self.inside_pre and not self.handle_code_in_pre:
# Ignore code tags inside pre blocks if handle_code_in_pre is False
return
if start:
self.o('`') # Markdown inline code start
self.inside_code = True
else:
self.o('`') # Markdown inline code end
self.inside_code = False
else:
super().handle_tag(tag, attrs, start)
def handle_data(self, data, entity_char=False):
"""Override handle_data to capture content within preserved tags."""
if self.preserve_depth > 0:
self.preserved_content.append(data)
return
if self.inside_pre:
# Output the raw content for pre blocks, including content inside code tags
self.o(data) # Directly output the data as-is (preserve newlines)
return
if self.inside_code:
# Inline code: no newlines allowed
self.o(data.replace('\n', ' '))
return
# Default behavior for other tags
super().handle_data(data, entity_char)
# # Handle pre tags
# if tag == 'pre':
# if start:
# self.o('```\n')
# self.inside_pre = True
# else:
# self.o('\n```')
# self.inside_pre = False
# # elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# # pass
# else:
# super().handle_tag(tag, attrs, start)
# def handle_data(self, data, entity_char=False):
# """Override handle_data to capture content within preserved tags."""
# if self.preserve_depth > 0:
# self.preserved_content.append(data)
# return
# super().handle_data(data, entity_char)
class InvalidCSSSelectorError(Exception):
pass
def create_box_message(
message: str,
type: str = "info",
width: int = 80,
width: int = 120,
add_newlines: bool = True,
double_line: bool = False
) -> str:
@@ -373,50 +248,6 @@ def escape_json_string(s):
return s
class CustomHTML2Text_v0(HTML2Text):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
self.skip_internal_links = False
self.single_line_break = False
self.mark_code = False
self.include_sup_sub = False
self.body_width = 0
self.ignore_mailto_links = True
self.ignore_links = False
self.escape_backslash = False
self.escape_dot = False
self.escape_plus = False
self.escape_dash = False
self.escape_snob = False
def handle_tag(self, tag, attrs, start):
if tag == 'pre':
if start:
self.o('```\n')
self.inside_pre = True
else:
self.o('\n```')
self.inside_pre = False
elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
pass
# elif tag == 'code' and not self.inside_pre:
# if start:
# if not self.inside_pre:
# self.o('`')
# self.inside_code = True
# else:
# if not self.inside_pre:
# self.o('`')
# self.inside_code = False
super().handle_tag(tag, attrs, start)
def replace_inline_tags(soup, tags, only_text=False):
tag_replacements = {
'b': lambda tag: f"**{tag.text}**",
@@ -978,7 +809,6 @@ def extract_metadata(html, soup=None):
return metadata
def extract_xml_tags(string):
tags = re.findall(r'<(\w+)>', string)
return list(set(tags))
@@ -996,7 +826,6 @@ def extract_xml_data(tags, string):
return data
# Function to perform the completion with exponential backoff
def perform_completion_with_backoff(
provider,
prompt_with_variables,
@@ -1010,7 +839,11 @@ def perform_completion_with_backoff(
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
extra_args = {}
extra_args = {
"temperature": 0.01,
'api_key': api_token,
'base_url': base_url
}
if json_response:
extra_args["response_format"] = { "type": "json_object" }
@@ -1019,14 +852,12 @@ def perform_completion_with_backoff(
for attempt in range(max_attempts):
try:
response =completion(
model=provider,
messages=[
{"role": "user", "content": prompt_with_variables}
],
temperature=0.01,
api_key=api_token,
base_url=base_url,
**extra_args
)
return response # Return the successful response
@@ -1350,6 +1181,35 @@ def clean_tokens(tokens: list[str]) -> list[str]:
and not token.startswith('')
and not token.startswith('')]
def profile_and_time(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Start timer
start_time = time.perf_counter()
# Setup profiler
profiler = cProfile.Profile()
profiler.enable()
# Run function
result = func(self, *args, **kwargs)
# Stop profiler
profiler.disable()
# Calculate elapsed time
elapsed_time = time.perf_counter() - start_time
# Print timing
print(f"[PROFILER] Scraping completed in {elapsed_time:.2f} seconds")
# Print profiling stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative') # Sort by cumulative time
stats.print_stats(20) # Print top 20 time-consuming functions
return result
return wrapper
def generate_content_hash(content: str) -> str:
"""Generate a unique hash for content"""
@@ -1363,7 +1223,8 @@ def ensure_content_dirs(base_path: str) -> Dict[str, str]:
'cleaned': 'cleaned_html',
'markdown': 'markdown_content',
'extracted': 'extracted_content',
'screenshots': 'screenshots'
'screenshots': 'screenshots',
'screenshot': 'screenshots'
}
content_paths = {}
@@ -1372,4 +1233,60 @@ def ensure_content_dirs(base_path: str) -> Dict[str, str]:
os.makedirs(path, exist_ok=True)
content_paths[key] = path
return content_paths
return content_paths
def get_error_context(exc_info, context_lines: int = 5):
"""
Extract error context with more reliable line number tracking.
Args:
exc_info: The exception info from sys.exc_info()
context_lines: Number of lines to show before and after the error
Returns:
dict: Error context information
"""
import traceback
import linecache
import os
# Get the full traceback
tb = traceback.extract_tb(exc_info[2])
# Get the last frame (where the error occurred)
last_frame = tb[-1]
filename = last_frame.filename
line_no = last_frame.lineno
func_name = last_frame.name
# Get the source code context using linecache
# This is more reliable than inspect.getsourcelines
context_start = max(1, line_no - context_lines)
context_end = line_no + context_lines + 1
# Build the context lines with line numbers
context_lines = []
for i in range(context_start, context_end):
line = linecache.getline(filename, i)
if line:
# Remove any trailing whitespace/newlines and add the pointer for error line
line = line.rstrip()
pointer = '' if i == line_no else ' '
context_lines.append(f"{i:4d} {pointer} {line}")
# Join the lines with newlines
code_context = '\n'.join(context_lines)
# Get relative path for cleaner output
try:
rel_path = os.path.relpath(filename)
except ValueError:
# Fallback if relpath fails (can happen on Windows with different drives)
rel_path = filename
return {
"filename": rel_path,
"line_no": line_no,
"function": func_name,
"code_context": code_context
}

View File

@@ -0,0 +1,58 @@
# Capturing Full-Page Screenshots and PDFs from Massive Webpages with Crawl4AI
When dealing with very long web pages, traditional full-page screenshots can be slow or fail entirely. For large pages (like extensive Wikipedia articles), generating a single massive screenshot often leads to delays, memory issues, or style differences.
**The New Approach:**
Weve introduced a new feature that effortlessly handles even the biggest pages by first exporting them as a PDF, then converting that PDF into a high-quality image. This approach leverages the browsers built-in PDF rendering, making it both stable and efficient for very long content. You also have the option to directly save the PDF for your own usage—no need for multiple passes or complex stitching logic.
**Key Benefits:**
- **Reliability:** The PDF export never times out and works regardless of page length.
- **Versatility:** Get both the PDF and a screenshot in one crawl, without reloading or reprocessing.
- **Performance:** Skips manual scrolling and stitching images, reducing complexity and runtime.
**Simple Example:**
```python
import os, sys
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
# Adjust paths as needed
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
async def main():
async with AsyncWebCrawler() as crawler:
# Request both PDF and screenshot
result = await crawler.arun(
url='https://en.wikipedia.org/wiki/List_of_common_misconceptions',
cache_mode=CacheMode.BYPASS,
pdf=True,
screenshot=True
)
if result.success:
# Save screenshot
if result.screenshot:
from base64 import b64decode
with open(os.path.join(__location__, "screenshot.png"), "wb") as f:
f.write(b64decode(result.screenshot))
# Save PDF
if result.pdf_data:
pdf_bytes = b64decode(result.pdf_data)
with open(os.path.join(__location__, "page.pdf"), "wb") as f:
f.write(pdf_bytes)
if __name__ == "__main__":
asyncio.run(main())
```
**What Happens Under the Hood:**
- Crawl4AI navigates to the target page.
- If `pdf=True`, it exports the current page as a full PDF, capturing all of its content no matter the length.
- If `screenshot=True`, and a PDF is already available, it directly converts the first page of that PDF to an image for you—no repeated loading or scrolling.
- Finally, you get your PDF and/or screenshot ready to use.
**Conclusion:**
With this feature, Crawl4AI becomes even more robust and versatile for large-scale content extraction. Whether you need a PDF snapshot or a quick screenshot, you now have a reliable solution for even the most extensive webpages.

View File

@@ -0,0 +1,517 @@
import os, sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
os.environ['FIRECRAWL_API_KEY'] = "fc-84b370ccfad44beabc686b38f1769692"
import asyncio
import time
import json
import re
from typing import Dict, List
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from crawl4ai import AsyncWebCrawler, CacheMode, BrowserConfig, CrawlerRunConfig
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
from crawl4ai.content_filter_strategy import BM25ContentFilter, PruningContentFilter
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, LLMExtractionStrategy
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
print("Crawl4AI: Advanced Web Crawling and Data Extraction")
print("GitHub Repository: https://github.com/unclecode/crawl4ai")
print("Twitter: @unclecode")
print("Website: https://crawl4ai.com")
# Basic Example - Simple Crawl
async def simple_crawl():
print("\n--- Basic Usage ---")
browser_config = BrowserConfig(headless=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://www.nbcnews.com/business",
config=crawler_config
)
print(result.markdown[:500])
# JavaScript Execution Example
async def simple_example_with_running_js_code():
print("\n--- Executing JavaScript and Using CSS Selectors ---")
browser_config = BrowserConfig(
headless=True,
java_script_enabled=True
)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
js_code=["const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"],
# wait_for="() => { return Array.from(document.querySelectorAll('article.tease-card')).length > 10; }"
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://www.nbcnews.com/business",
config=crawler_config
)
print(result.markdown[:500])
# CSS Selector Example
async def simple_example_with_css_selector():
print("\n--- Using CSS Selectors ---")
browser_config = BrowserConfig(headless=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
css_selector=".wide-tease-item__description"
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://www.nbcnews.com/business",
config=crawler_config
)
print(result.markdown[:500])
# Proxy Example
async def use_proxy():
print("\n--- Using a Proxy ---")
browser_config = BrowserConfig(
headless=True,
proxy="http://your-proxy-url:port"
)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://www.nbcnews.com/business",
config=crawler_config
)
if result.success:
print(result.markdown[:500])
# Screenshot Example
async def capture_and_save_screenshot(url: str, output_path: str):
browser_config = BrowserConfig(headless=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
screenshot=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url=url,
config=crawler_config
)
if result.success and result.screenshot:
import base64
screenshot_data = base64.b64decode(result.screenshot)
with open(output_path, 'wb') as f:
f.write(screenshot_data)
print(f"Screenshot saved successfully to {output_path}")
else:
print("Failed to capture screenshot")
# LLM Extraction Example
class OpenAIModelFee(BaseModel):
model_name: str = Field(..., description="Name of the OpenAI model.")
input_fee: str = Field(..., description="Fee for input token for the OpenAI model.")
output_fee: str = Field(..., description="Fee for output token for the OpenAI model.")
async def extract_structured_data_using_llm(provider: str, api_token: str = None, extra_headers: Dict[str, str] = None):
print(f"\n--- Extracting Structured Data with {provider} ---")
if api_token is None and provider != "ollama":
print(f"API token is required for {provider}. Skipping this example.")
return
browser_config = BrowserConfig(headless=True)
extra_args = {
"temperature": 0,
"top_p": 0.9,
"max_tokens": 2000
}
if extra_headers:
extra_args["extra_headers"] = extra_headers
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
word_count_threshold=1,
extraction_strategy=LLMExtractionStrategy(
provider=provider,
api_token=api_token,
schema=OpenAIModelFee.model_json_schema(),
extraction_type="schema",
instruction="""From the crawled content, extract all mentioned model names along with their fees for input and output tokens.
Do not miss any models in the entire content.""",
extra_args=extra_args
)
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://openai.com/api/pricing/",
config=crawler_config
)
print(result.extracted_content)
# CSS Extraction Example
async def extract_structured_data_using_css_extractor():
print("\n--- Using JsonCssExtractionStrategy for Fast Structured Output ---")
schema = {
"name": "KidoCode Courses",
"baseSelector": "section.charge-methodology .w-tab-content > div",
"fields": [
{
"name": "section_title",
"selector": "h3.heading-50",
"type": "text",
},
{
"name": "section_description",
"selector": ".charge-content",
"type": "text",
},
{
"name": "course_name",
"selector": ".text-block-93",
"type": "text",
},
{
"name": "course_description",
"selector": ".course-content-text",
"type": "text",
},
{
"name": "course_icon",
"selector": ".image-92",
"type": "attribute",
"attribute": "src"
}
]
}
browser_config = BrowserConfig(
headless=True,
java_script_enabled=True
)
js_click_tabs = """
(async () => {
const tabs = document.querySelectorAll("section.charge-methodology .tabs-menu-3 > div");
for(let tab of tabs) {
tab.scrollIntoView();
tab.click();
await new Promise(r => setTimeout(r, 500));
}
})();
"""
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
extraction_strategy=JsonCssExtractionStrategy(schema),
js_code=[js_click_tabs]
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="https://www.kidocode.com/degrees/technology",
config=crawler_config
)
companies = json.loads(result.extracted_content)
print(f"Successfully extracted {len(companies)} companies")
print(json.dumps(companies[0], indent=2))
# Dynamic Content Examples - Method 1
async def crawl_dynamic_content_pages_method_1():
print("\n--- Advanced Multi-Page Crawling with JavaScript Execution ---")
first_commit = ""
async def on_execution_started(page, **kwargs):
nonlocal first_commit
try:
while True:
await page.wait_for_selector("li.Box-sc-g0xbh4-0 h4")
commit = await page.query_selector("li.Box-sc-g0xbh4-0 h4")
commit = await commit.evaluate("(element) => element.textContent")
commit = re.sub(r"\s+", "", commit)
if commit and commit != first_commit:
first_commit = commit
break
await asyncio.sleep(0.5)
except Exception as e:
print(f"Warning: New content didn't appear after JavaScript execution: {e}")
browser_config = BrowserConfig(
headless=False,
java_script_enabled=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
crawler.crawler_strategy.set_hook("on_execution_started", on_execution_started)
url = "https://github.com/microsoft/TypeScript/commits/main"
session_id = "typescript_commits_session"
all_commits = []
js_next_page = """
const button = document.querySelector('a[data-testid="pagination-next-button"]');
if (button) button.click();
"""
for page in range(3):
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
css_selector="li.Box-sc-g0xbh4-0",
js_code=js_next_page if page > 0 else None,
js_only=page > 0,
session_id=session_id
)
result = await crawler.arun(url=url, config=crawler_config)
assert result.success, f"Failed to crawl page {page + 1}"
soup = BeautifulSoup(result.cleaned_html, "html.parser")
commits = soup.select("li")
all_commits.extend(commits)
print(f"Page {page + 1}: Found {len(commits)} commits")
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
# Dynamic Content Examples - Method 2
async def crawl_dynamic_content_pages_method_2():
print("\n--- Advanced Multi-Page Crawling with JavaScript Execution ---")
browser_config = BrowserConfig(
headless=False,
java_script_enabled=True
)
js_next_page_and_wait = """
(async () => {
const getCurrentCommit = () => {
const commits = document.querySelectorAll('li.Box-sc-g0xbh4-0 h4');
return commits.length > 0 ? commits[0].textContent.trim() : null;
};
const initialCommit = getCurrentCommit();
const button = document.querySelector('a[data-testid="pagination-next-button"]');
if (button) button.click();
while (true) {
await new Promise(resolve => setTimeout(resolve, 100));
const newCommit = getCurrentCommit();
if (newCommit && newCommit !== initialCommit) {
break;
}
}
})();
"""
schema = {
"name": "Commit Extractor",
"baseSelector": "li.Box-sc-g0xbh4-0",
"fields": [
{
"name": "title",
"selector": "h4.markdown-title",
"type": "text",
"transform": "strip",
},
],
}
async with AsyncWebCrawler(config=browser_config) as crawler:
url = "https://github.com/microsoft/TypeScript/commits/main"
session_id = "typescript_commits_session"
all_commits = []
extraction_strategy = JsonCssExtractionStrategy(schema)
for page in range(3):
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
css_selector="li.Box-sc-g0xbh4-0",
extraction_strategy=extraction_strategy,
js_code=js_next_page_and_wait if page > 0 else None,
js_only=page > 0,
session_id=session_id
)
result = await crawler.arun(url=url, config=crawler_config)
assert result.success, f"Failed to crawl page {page + 1}"
commits = json.loads(result.extracted_content)
all_commits.extend(commits)
print(f"Page {page + 1}: Found {len(commits)} commits")
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
# Browser Comparison
async def crawl_custom_browser_type():
print("\n--- Browser Comparison ---")
# Firefox
browser_config_firefox = BrowserConfig(
browser_type="firefox",
headless=True
)
start = time.time()
async with AsyncWebCrawler(config=browser_config_firefox) as crawler:
result = await crawler.arun(
url="https://www.example.com",
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
print("Firefox:", time.time() - start)
print(result.markdown[:500])
# WebKit
browser_config_webkit = BrowserConfig(
browser_type="webkit",
headless=True
)
start = time.time()
async with AsyncWebCrawler(config=browser_config_webkit) as crawler:
result = await crawler.arun(
url="https://www.example.com",
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
print("WebKit:", time.time() - start)
print(result.markdown[:500])
# Chromium (default)
browser_config_chromium = BrowserConfig(
browser_type="chromium",
headless=True
)
start = time.time()
async with AsyncWebCrawler(config=browser_config_chromium) as crawler:
result = await crawler.arun(
url="https://www.example.com",
config=CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
)
print("Chromium:", time.time() - start)
print(result.markdown[:500])
# Anti-Bot and User Simulation
async def crawl_with_user_simulation():
browser_config = BrowserConfig(
headless=True,
user_agent_mode="random",
user_agent_generator_config={
"device_type": "mobile",
"os_type": "android"
}
)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
magic=True,
simulate_user=True,
override_navigator=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
url="YOUR-URL-HERE",
config=crawler_config
)
print(result.markdown)
# Speed Comparison
async def speed_comparison():
print("\n--- Speed Comparison ---")
# Firecrawl comparison
from firecrawl import FirecrawlApp
app = FirecrawlApp(api_key=os.environ['FIRECRAWL_API_KEY'])
start = time.time()
scrape_status = app.scrape_url(
'https://www.nbcnews.com/business',
params={'formats': ['markdown', 'html']}
)
end = time.time()
print("Firecrawl:")
print(f"Time taken: {end - start:.2f} seconds")
print(f"Content length: {len(scrape_status['markdown'])} characters")
print(f"Images found: {scrape_status['markdown'].count('cldnry.s-nbcnews.com')}")
print()
# Crawl4AI comparisons
browser_config = BrowserConfig(headless=True)
# Simple crawl
async with AsyncWebCrawler(config=browser_config) as crawler:
start = time.time()
result = await crawler.arun(
url="https://www.nbcnews.com/business",
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
word_count_threshold=0
)
)
end = time.time()
print("Crawl4AI (simple crawl):")
print(f"Time taken: {end - start:.2f} seconds")
print(f"Content length: {len(result.markdown)} characters")
print(f"Images found: {result.markdown.count('cldnry.s-nbcnews.com')}")
print()
# Advanced filtering
start = time.time()
result = await crawler.arun(
url="https://www.nbcnews.com/business",
config=CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
word_count_threshold=0,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(
threshold=0.48,
threshold_type="fixed",
min_word_threshold=0
)
)
)
)
end = time.time()
print("Crawl4AI (Markdown Plus):")
print(f"Time taken: {end - start:.2f} seconds")
print(f"Content length: {len(result.markdown_v2.raw_markdown)} characters")
print(f"Fit Markdown: {len(result.markdown_v2.fit_markdown)} characters")
print(f"Images found: {result.markdown.count('cldnry.s-nbcnews.com')}")
print()
# Main execution
async def main():
# Basic examples
# await simple_crawl()
# await simple_example_with_running_js_code()
# await simple_example_with_css_selector()
# Advanced examples
# await extract_structured_data_using_css_extractor()
# await extract_structured_data_using_llm("openai/gpt-4o", os.getenv("OPENAI_API_KEY"))
# await crawl_dynamic_content_pages_method_1()
# await crawl_dynamic_content_pages_method_2()
# Browser comparisons
await crawl_custom_browser_type()
# Performance testing
# await speed_comparison()
# Screenshot example
await capture_and_save_screenshot(
"https://www.example.com",
os.path.join(__location__, "tmp/example_screenshot.jpg")
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -117,7 +117,13 @@ async def extract_structured_data_using_llm(provider: str, api_token: str = None
print(f"API token is required for {provider}. Skipping this example.")
return
extra_args = {}
# extra_args = {}
extra_args={
"temperature": 0,
"top_p": 0.9,
"max_tokens": 2000,
# any other supported parameters for litellm
}
if extra_headers:
extra_args["extra_headers"] = extra_headers
@@ -598,6 +604,8 @@ async def fit_markdown_remove_overlay():
async def main():
await extract_structured_data_using_llm("openai/gpt-4o", os.getenv("OPENAI_API_KEY"))
await simple_crawl()
await simple_example_with_running_js_code()
await simple_example_with_css_selector()
@@ -609,7 +617,6 @@ async def main():
# await extract_structured_data_using_llm()
# await extract_structured_data_using_llm("huggingface/meta-llama/Meta-Llama-3.1-8B-Instruct", os.getenv("HUGGINGFACE_API_KEY"))
# await extract_structured_data_using_llm("ollama/llama3.2")
await extract_structured_data_using_llm("openai/gpt-4o", os.getenv("OPENAI_API_KEY"))
# You always can pass custom headers to the extraction strategy
# custom_headers = {

View File

@@ -0,0 +1,225 @@
### Using `storage_state` to Pre-Load Cookies and LocalStorage
Crawl4ais `AsyncWebCrawler` lets you preserve and reuse session data, including cookies and localStorage, across multiple runs. By providing a `storage_state`, you can start your crawls already “logged in” or with any other necessary session data—no need to repeat the login flow every time.
#### What is `storage_state`?
`storage_state` can be:
- A dictionary containing cookies and localStorage data.
- A path to a JSON file that holds this information.
When you pass `storage_state` to the crawler, it applies these cookies and localStorage entries before loading any pages. This means your crawler effectively starts in a known authenticated or pre-configured state.
#### Example Structure
Heres an example storage state:
```json
{
"cookies": [
{
"name": "session",
"value": "abcd1234",
"domain": "example.com",
"path": "/",
"expires": 1675363572.037711,
"httpOnly": false,
"secure": false,
"sameSite": "None"
}
],
"origins": [
{
"origin": "https://example.com",
"localStorage": [
{ "name": "token", "value": "my_auth_token" },
{ "name": "refreshToken", "value": "my_refresh_token" }
]
}
]
}
```
This JSON sets a `session` cookie and two localStorage entries (`token` and `refreshToken`) for `https://example.com`.
---
### Passing `storage_state` as a Dictionary
You can directly provide the data as a dictionary:
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def main():
storage_dict = {
"cookies": [
{
"name": "session",
"value": "abcd1234",
"domain": "example.com",
"path": "/",
"expires": 1675363572.037711,
"httpOnly": False,
"secure": False,
"sameSite": "None"
}
],
"origins": [
{
"origin": "https://example.com",
"localStorage": [
{"name": "token", "value": "my_auth_token"},
{"name": "refreshToken", "value": "my_refresh_token"}
]
}
]
}
async with AsyncWebCrawler(
headless=True,
storage_state=storage_dict
) as crawler:
result = await crawler.arun(url='https://example.com/protected')
if result.success:
print("Crawl succeeded with pre-loaded session data!")
print("Page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
---
### Passing `storage_state` as a File
If you prefer a file-based approach, save the JSON above to `mystate.json` and reference it:
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def main():
async with AsyncWebCrawler(
headless=True,
storage_state="mystate.json" # Uses a JSON file instead of a dictionary
) as crawler:
result = await crawler.arun(url='https://example.com/protected')
if result.success:
print("Crawl succeeded with pre-loaded session data!")
print("Page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
---
### Using `storage_state` to Avoid Repeated Logins (Sign In Once, Use Later)
A common scenario is when you need to log in to a site (entering username/password, etc.) to access protected pages. Doing so every crawl is cumbersome. Instead, you can:
1. Perform the login once in a hook.
2. After login completes, export the resulting `storage_state` to a file.
3. On subsequent runs, provide that `storage_state` to skip the login step.
**Step-by-Step Example:**
**First Run (Perform Login and Save State):**
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
async def on_browser_created_hook(browser):
# Access the default context and create a page
context = browser.contexts[0]
page = await context.new_page()
# Navigate to the login page
await page.goto("https://example.com/login", wait_until="domcontentloaded")
# Fill in credentials and submit
await page.fill("input[name='username']", "myuser")
await page.fill("input[name='password']", "mypassword")
await page.click("button[type='submit']")
await page.wait_for_load_state("networkidle")
# Now the site sets tokens in localStorage and cookies
# Export this state to a file so we can reuse it
await context.storage_state(path="my_storage_state.json")
await page.close()
async def main():
# First run: perform login and export the storage_state
async with AsyncWebCrawler(
headless=True,
verbose=True,
hooks={"on_browser_created": on_browser_created_hook},
use_persistent_context=True,
user_data_dir="./my_user_data"
) as crawler:
# After on_browser_created_hook runs, we have storage_state saved to my_storage_state.json
result = await crawler.arun(
url='https://example.com/protected-page',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("First run result success:", result.success)
if result.success:
print("Protected page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
**Second Run (Reuse Saved State, No Login Needed):**
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
async def main():
# Second run: no need to hook on_browser_created this time.
# Just provide the previously saved storage state.
async with AsyncWebCrawler(
headless=True,
verbose=True,
use_persistent_context=True,
user_data_dir="./my_user_data",
storage_state="my_storage_state.json" # Reuse previously exported state
) as crawler:
# Now the crawler starts already logged in
result = await crawler.arun(
url='https://example.com/protected-page',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("Second run result success:", result.success)
if result.success:
print("Protected page HTML length:", len(result.html))
if __name__ == "__main__":
asyncio.run(main())
```
**Whats Happening Here?**
- During the first run, the `on_browser_created_hook` logs into the site.
- After logging in, the crawler exports the current session (cookies, localStorage, etc.) to `my_storage_state.json`.
- On subsequent runs, passing `storage_state="my_storage_state.json"` starts the browser context with these tokens already in place, skipping the login steps.
**Sign Out Scenario:**
If the website allows you to sign out by clearing tokens or by navigating to a sign-out URL, you can also run a script that uses `on_browser_created_hook` or `arun` to simulate signing out, then export the resulting `storage_state` again. That would give you a baseline “logged out” state to start fresh from next time.
---
### Conclusion
By using `storage_state`, you can skip repetitive actions, like logging in, and jump straight into crawling protected content. Whether you provide a file path or a dictionary, this powerful feature helps maintain state between crawls, simplifying your data extraction pipelines.

View File

@@ -0,0 +1,117 @@
# Tutorial: Clicking Buttons to Load More Content with Crawl4AI
## Introduction
When scraping dynamic websites, its common to encounter “Load More” or “Next” buttons that must be clicked to reveal new content. Crawl4AI provides a straightforward way to handle these situations using JavaScript execution and waiting conditions. In this tutorial, well cover two approaches:
1. **Step-by-step (Session-based) Approach:** Multiple calls to `arun()` to progressively load more content.
2. **Single-call Approach:** Execute a more complex JavaScript snippet inside a single `arun()` call to handle all clicks at once before the extraction.
## Prerequisites
- A working installation of Crawl4AI
- Basic familiarity with Pythons `async`/`await` syntax
## Step-by-Step Approach
Use a session ID to maintain state across multiple `arun()` calls:
```python
from crawl4ai import AsyncWebCrawler, CacheMode
js_code = [
# This JS finds the “Next” button and clicks it
"const nextButton = document.querySelector('button.next'); nextButton && nextButton.click();"
]
wait_for_condition = "css:.new-content-class"
async with AsyncWebCrawler(headless=True, verbose=True) as crawler:
# 1. Load the initial page
result_initial = await crawler.arun(
url="https://example.com",
cache_mode=CacheMode.BYPASS,
session_id="my_session"
)
# 2. Click the 'Next' button and wait for new content
result_next = await crawler.arun(
url="https://example.com",
session_id="my_session",
js_code=js_code,
wait_for=wait_for_condition,
js_only=True,
cache_mode=CacheMode.BYPASS
)
# `result_next` now contains the updated HTML after clicking 'Next'
```
**Key Points:**
- **`session_id`**: Keeps the same browser context open.
- **`js_code`**: Executes JavaScript in the context of the already loaded page.
- **`wait_for`**: Ensures the crawler waits until new content is fully loaded.
- **`js_only=True`**: Runs the JS in the current session without reloading the page.
By repeating the `arun()` call multiple times and modifying the `js_code` (e.g., clicking different modules or pages), you can iteratively load all the desired content.
## Single-call Approach
If the page allows it, you can run a single `arun()` call with a more elaborate JavaScript snippet that:
- Iterates over all the modules or "Next" buttons
- Clicks them one by one
- Waits for content updates between each click
- Once done, returns control to Crawl4AI for extraction.
Example snippet:
```python
from crawl4ai import AsyncWebCrawler, CacheMode
js_code = [
# Example JS that clicks multiple modules:
"""
(async () => {
const modules = document.querySelectorAll('.module-item');
for (let i = 0; i < modules.length; i++) {
modules[i].scrollIntoView();
modules[i].click();
// Wait for each modules content to load, adjust 100ms as needed
await new Promise(r => setTimeout(r, 100));
}
})();
"""
]
async with AsyncWebCrawler(headless=True, verbose=True) as crawler:
result = await crawler.arun(
url="https://example.com",
js_code=js_code,
wait_for="css:.final-loaded-content-class",
cache_mode=CacheMode.BYPASS
)
# `result` now contains all content after all modules have been clicked in one go.
```
**Key Points:**
- All interactions (clicks and waits) happen before the extraction.
- Ideal for pages where all steps can be done in a single pass.
## Choosing the Right Approach
- **Step-by-Step (Session-based)**:
- Good when you need fine-grained control or must dynamically check conditions before clicking the next page.
- Useful if the page requires multiple conditions checked at runtime.
- **Single-call**:
- Perfect if the sequence of interactions is known in advance.
- Cleaner code if the pages structure is consistent and predictable.
## Conclusion
Crawl4AI makes it easy to handle dynamic content:
- Use session IDs and multiple `arun()` calls for stepwise crawling.
- Or pack all actions into one `arun()` call if the interactions are well-defined upfront.
This flexibility ensures you can handle a wide range of dynamic web pages efficiently.

View File

@@ -1,7 +1,7 @@
# Crawl4AI Cache System and Migration Guide
## Overview
Starting from version X.X.X, Crawl4AI introduces a new caching system that replaces the old boolean flags with a more intuitive `CacheMode` enum. This change simplifies cache control and makes the behavior more predictable.
Starting from version 0.5.0, Crawl4AI introduces a new caching system that replaces the old boolean flags with a more intuitive `CacheMode` enum. This change simplifies cache control and makes the behavior more predictable.
## Old vs New Approach

View File

@@ -4,6 +4,15 @@ Welcome to the Crawl4AI blog! Here you'll find detailed release notes, technical
## Latest Release
### [0.4.2 - Configurable Crawlers, Session Management, and Smarter Screenshots](releases/0.4.2.md)
*December 12, 2024*
The 0.4.2 update brings massive improvements to configuration, making crawlers and browsers easier to manage with dedicated objects. You can now import/export local storage for seamless session management. Plus, long-page screenshots are faster and cleaner, and full-page PDF exports are now possible. Check out all the new features to make your crawling experience even smoother.
[Read full release notes →](releases/0.4.2.md)
---
### [0.4.1 - Smarter Crawling with Lazy-Load Handling, Text-Only Mode, and More](releases/0.4.1.md)
*December 8, 2024*
@@ -35,3 +44,4 @@ Curious about how Crawl4AI has evolved? Check out our [complete changelog](https
- Star us on [GitHub](https://github.com/unclecode/crawl4ai)
- Follow [@unclecode](https://twitter.com/unclecode) on Twitter
- Join our community discussions on GitHub

View File

@@ -0,0 +1,86 @@
## 🚀 Crawl4AI 0.4.2 Update: Smarter Crawling Just Got Easier (Dec 12, 2024)
### Hey Developers,
Im excited to share Crawl4AI 0.4.2—a major upgrade that makes crawling smarter, faster, and a whole lot more intuitive. Ive packed in a bunch of new features to simplify your workflows and improve your experience. Lets cut to the chase!
---
### 🔧 **Configurable Browser and Crawler Behavior**
Youve asked for better control over how browsers and crawlers are configured, and now youve got it. With the new `BrowserConfig` and `CrawlerRunConfig` objects, you can set up your browser and crawling behavior exactly how you want. No more cluttering `arun` with a dozen arguments—just pass in your configs and go.
**Example:**
```python
from crawl4ai import BrowserConfig, CrawlerRunConfig, AsyncWebCrawler
browser_config = BrowserConfig(headless=True, viewport_width=1920, viewport_height=1080)
crawler_config = CrawlerRunConfig(cache_mode="BYPASS")
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(url="https://example.com", config=crawler_config)
print(result.markdown[:500])
```
This setup is a game-changer for scalability, keeping your code clean and flexible as we add more parameters in the future.
Remember: If you like to use the old way, you can still pass arguments directly to `arun` as before, no worries!
---
### 🔐 **Streamlined Session Management**
Heres the big one: You can now pass local storage and cookies directly. Whether its setting values programmatically or importing a saved JSON state, managing sessions has never been easier. This is a must-have for authenticated crawls—just export your storage state once and reuse it effortlessly across runs.
**Example:**
1. Open a browser, log in manually, and export the storage state.
2. Import the JSON file for seamless authenticated crawling:
```python
result = await crawler.arun(
url="https://example.com/protected",
storage_state="my_storage_state.json"
)
```
---
### 🔢 **Handling Large Pages: Supercharged Screenshots and PDF Conversion**
Two big upgrades here:
- **Blazing-fast long-page screenshots**: Turn extremely long web pages into clean, high-quality screenshots—without breaking a sweat. Its optimized to handle large content without lag.
- **Full-page PDF exports**: Now, you can also convert any page into a PDF with all the details intact. Perfect for archiving or sharing complex layouts.
---
### 🔧 **Other Cool Stuff**
- **Anti-bot enhancements**: Magic mode now handles overlays, user simulation, and anti-detection features like a pro.
- **JavaScript execution**: Execute custom JS snippets to handle dynamic content. No more wrestling with endless page interactions.
---
### 📊 **Performance Boosts and Dev-friendly Updates**
- Faster rendering and viewport adjustments for better performance.
- Improved cookie and local storage handling for seamless authentication.
- Better debugging with detailed logs and actionable error messages.
---
### 🔠 **Use Cases Youll Love**
1. **Authenticated Crawls**: Login once, export your storage state, and reuse it across multiple requests without the headache.
2. **Long-page Screenshots**: Perfect for blogs, e-commerce pages, or any endless-scroll website.
3. **PDF Export**: Create professional-looking page PDFs in seconds.
---
### Lets Get Crawling
Crawl4AI 0.4.2 is ready for you to download and try. Im always looking for ways to improve, so dont hold back—share your thoughts and feedback.
Happy Crawling! 🚀

View File

@@ -342,7 +342,7 @@ app.add_middleware(
# API token security
security = HTTPBearer()
CRAWL4AI_API_TOKEN = os.getenv("CRAWL4AI_API_TOKEN") or "test_api_code"
CRAWL4AI_API_TOKEN = os.getenv("CRAWL4AI_API_TOKEN")
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
if not CRAWL4AI_API_TOKEN:

View File

@@ -0,0 +1,153 @@
import os, sys
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath( os.path.join(os.getcwd(), os.path.dirname(__file__)))
import os, sys
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# Assuming that the changes made allow different configurations
# for managed browser, persistent context, and so forth.
async def test_default_headless():
async with AsyncWebCrawler(
headless=True,
verbose=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "mobile", "os_type": "android"},
use_managed_browser=False,
use_persistent_context=False,
ignore_https_errors=True,
# Testing normal ephemeral context
) as crawler:
result = await crawler.arun(
url='https://www.kidocode.com/degrees/technology',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True}),
)
print("[test_default_headless] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_managed_browser_persistent():
# Treating use_persistent_context=True as managed_browser scenario.
async with AsyncWebCrawler(
headless=False,
verbose=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "desktop", "os_type": "mac"},
use_managed_browser=True,
use_persistent_context=True, # now should behave same as managed browser
user_data_dir="./outpu/test_profile",
# This should store and reuse profile data across runs
) as crawler:
result = await crawler.arun(
url='https://www.google.com',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_managed_browser_persistent] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_session_reuse():
# Test creating a session, using it for multiple calls
session_id = "my_session"
async with AsyncWebCrawler(
headless=False,
verbose=True,
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
# Fixed user-agent for consistency
use_managed_browser=False,
use_persistent_context=False,
) as crawler:
# First call: create session
result1 = await crawler.arun(
url='https://www.example.com',
cache_mode=CacheMode.BYPASS,
session_id=session_id,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_session_reuse first call] success:", result1.success)
# Second call: same session, possibly cookie retained
result2 = await crawler.arun(
url='https://www.example.com/about',
cache_mode=CacheMode.BYPASS,
session_id=session_id,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_session_reuse second call] success:", result2.success)
async def test_magic_mode():
# Test magic mode with override_navigator and simulate_user
async with AsyncWebCrawler(
headless=False,
verbose=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "desktop", "os_type": "windows"},
use_managed_browser=False,
use_persistent_context=False,
magic=True,
override_navigator=True,
simulate_user=True,
) as crawler:
result = await crawler.arun(
url='https://www.kidocode.com/degrees/business',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_magic_mode] success:", result.success)
print("HTML length:", len(result.html if result.html else ""))
async def test_proxy_settings():
# Test with a proxy (if available) to ensure code runs with proxy
async with AsyncWebCrawler(
headless=True,
verbose=False,
user_agent="Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
proxy="http://127.0.0.1:8080", # Assuming local proxy server for test
use_managed_browser=False,
use_persistent_context=False,
) as crawler:
result = await crawler.arun(
url='https://httpbin.org/ip',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_proxy_settings] success:", result.success)
if result.success:
print("HTML preview:", result.html[:200] if result.html else "")
async def test_ignore_https_errors():
# Test ignore HTTPS errors with a self-signed or invalid cert domain
# This is just conceptual, the domain should be one that triggers SSL error.
# Using a hypothetical URL that fails SSL:
async with AsyncWebCrawler(
headless=True,
verbose=True,
user_agent="Mozilla/5.0",
ignore_https_errors=True,
use_managed_browser=False,
use_persistent_context=False,
) as crawler:
result = await crawler.arun(
url='https://self-signed.badssl.com/',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(options={"ignore_links": True})
)
print("[test_ignore_https_errors] success:", result.success)
async def main():
print("Running tests...")
# await test_default_headless()
# await test_managed_browser_persistent()
# await test_session_reuse()
# await test_magic_mode()
# await test_proxy_settings()
await test_ignore_https_errors()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,231 @@
import os, sys
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
from crawl4ai import AsyncWebCrawler, CacheMode
from crawl4ai.async_configs import BrowserConfig, CrawlerRunConfig
from crawl4ai.content_filter_strategy import PruningContentFilter
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy
from crawl4ai.chunking_strategy import RegexChunking
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
# Category 1: Browser Configuration Tests
async def test_browser_config_object():
"""Test the new BrowserConfig object with various browser settings"""
browser_config = BrowserConfig(
browser_type="chromium",
headless=False,
viewport_width=1920,
viewport_height=1080,
use_managed_browser=True,
user_agent_mode="random",
user_agent_generator_config={"device_type": "desktop", "os_type": "windows"}
)
async with AsyncWebCrawler(config=browser_config, verbose=True) as crawler:
result = await crawler.arun('https://example.com', cache_mode=CacheMode.BYPASS)
assert result.success, "Browser config crawl failed"
assert len(result.html) > 0, "No HTML content retrieved"
async def test_browser_performance_config():
"""Test browser configurations focused on performance"""
browser_config = BrowserConfig(
text_only=True,
light_mode=True,
extra_args=['--disable-gpu', '--disable-software-rasterizer'],
ignore_https_errors=True,
java_script_enabled=False
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun('https://example.com')
assert result.success, "Performance optimized crawl failed"
assert result.status_code == 200, "Unexpected status code"
# Category 2: Content Processing Tests
async def test_content_extraction_config():
"""Test content extraction with various strategies"""
crawler_config = CrawlerRunConfig(
word_count_threshold=300,
extraction_strategy=JsonCssExtractionStrategy(
schema={
"name": "article",
"baseSelector": "div",
"fields": [{
"name": "title",
"selector": "h1",
"type": "text"
}]
}
),
chunking_strategy=RegexChunking(),
content_filter=PruningContentFilter()
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
'https://example.com/article',
config=crawler_config
)
assert result.extracted_content is not None, "Content extraction failed"
assert 'title' in result.extracted_content, "Missing expected content field"
# Category 3: Cache and Session Management Tests
async def test_cache_and_session_management():
"""Test different cache modes and session handling"""
browser_config = BrowserConfig(use_persistent_context=True)
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.WRITE_ONLY,
process_iframes=True,
remove_overlay_elements=True
)
async with AsyncWebCrawler(config=browser_config) as crawler:
# First request - should write to cache
result1 = await crawler.arun(
'https://example.com',
config=crawler_config
)
# Second request - should use fresh fetch due to WRITE_ONLY mode
result2 = await crawler.arun(
'https://example.com',
config=crawler_config
)
assert result1.success and result2.success, "Cache mode crawl failed"
assert result1.html == result2.html, "Inconsistent results between requests"
# Category 4: Media Handling Tests
async def test_media_handling_config():
"""Test configurations related to media handling"""
# Get the base path for home directroy ~/.crawl4ai/downloads, make sure it exists
os.makedirs(os.path.expanduser("~/.crawl4ai/downloads"), exist_ok=True)
browser_config = BrowserConfig(
viewport_width=1920,
viewport_height=1080,
accept_downloads=True,
downloads_path= os.path.expanduser("~/.crawl4ai/downloads")
)
crawler_config = CrawlerRunConfig(
screenshot=True,
pdf=True,
adjust_viewport_to_content=True,
wait_for_images=True,
screenshot_height_threshold=20000
)
async with AsyncWebCrawler(config=browser_config) as crawler:
result = await crawler.arun(
'https://example.com',
config=crawler_config
)
assert result.screenshot is not None, "Screenshot capture failed"
assert result.pdf is not None, "PDF generation failed"
# Category 5: Anti-Bot and Site Interaction Tests
async def test_antibot_config():
"""Test configurations for handling anti-bot measures"""
crawler_config = CrawlerRunConfig(
simulate_user=True,
override_navigator=True,
magic=True,
wait_for="js:()=>document.querySelector('body')",
delay_before_return_html=1.0,
log_console=True,
cache_mode=CacheMode.BYPASS
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
'https://example.com',
config=crawler_config
)
assert result.success, "Anti-bot measure handling failed"
# Category 6: Parallel Processing Tests
async def test_parallel_processing():
"""Test parallel processing capabilities"""
crawler_config = CrawlerRunConfig(
mean_delay=0.5,
max_range=1.0,
semaphore_count=5
)
urls = [
'https://example.com/1',
'https://example.com/2',
'https://example.com/3'
]
async with AsyncWebCrawler() as crawler:
results = await crawler.arun_many(
urls,
config=crawler_config
)
assert len(results) == len(urls), "Not all URLs were processed"
assert all(r.success for r in results), "Some parallel requests failed"
# Category 7: Backwards Compatibility Tests
async def test_legacy_parameter_support():
"""Test that legacy parameters still work"""
async with AsyncWebCrawler(
headless=True,
browser_type="chromium",
viewport_width=1024,
viewport_height=768
) as crawler:
result = await crawler.arun(
'https://example.com',
screenshot=True,
word_count_threshold=200,
bypass_cache=True,
css_selector=".main-content"
)
assert result.success, "Legacy parameter support failed"
# Category 8: Mixed Configuration Tests
async def test_mixed_config_usage():
"""Test mixing new config objects with legacy parameters"""
browser_config = BrowserConfig(headless=True)
crawler_config = CrawlerRunConfig(screenshot=True)
async with AsyncWebCrawler(
config=browser_config,
verbose=True # legacy parameter
) as crawler:
result = await crawler.arun(
'https://example.com',
config=crawler_config,
cache_mode=CacheMode.BYPASS, # legacy parameter
css_selector="body" # legacy parameter
)
assert result.success, "Mixed configuration usage failed"
if __name__ == "__main__":
async def run_tests():
test_functions = [
test_browser_config_object,
# test_browser_performance_config,
# test_content_extraction_config,
# test_cache_and_session_management,
# test_media_handling_config,
# test_antibot_config,
# test_parallel_processing,
# test_legacy_parameter_support,
# test_mixed_config_usage
]
for test in test_functions:
print(f"\nRunning {test.__name__}...")
try:
await test()
print(f"{test.__name__} passed")
except AssertionError as e:
print(f"{test.__name__} failed: {str(e)}")
except Exception as e:
print(f"{test.__name__} error: {str(e)}")
asyncio.run(run_tests())