Compare commits

..

1 Commits

Author SHA1 Message Date
UncleCode
0f00821df5 Fix version 2025-01-26 18:08:24 +08:00
25 changed files with 32 additions and 4195 deletions

View File

@@ -1,59 +0,0 @@
title: "[Feature Request]: "
labels: ["⚙️ New"]
body:
- type: markdown
attributes:
value: |
Thank you for your interest in suggesting a new feature! Before you submit, please take a moment to check if already exists in
this discussions category to avoid duplicates. 😊
- type: textarea
id: needs_to_be_done
attributes:
label: What needs to be done?
description: Please describe the feature or functionality you'd like to see.
placeholder: "e.g., Return alt text along with images scraped from a webpages in Result"
validations:
required: true
- type: textarea
id: problem_to_solve
attributes:
label: What problem does this solve?
description: Explain the pain point or issue this feature will help address.
placeholder: "e.g., Bypass Captchas added by cloudflare"
validations:
required: true
- type: textarea
id: target_users
attributes:
label: Target users/beneficiaries
description: Who would benefit from this feature? (e.g., specific teams, developers, users, etc.)
placeholder: "e.g., Marketing teams, developers"
validations:
required: false
- type: textarea
id: current_workarounds
attributes:
label: Current alternatives/workarounds
description: Are there any existing solutions or workarounds? How does this feature improve upon them?
placeholder: "e.g., Users manually select the css classes mapped to data fields to extract them"
validations:
required: false
- type: markdown
attributes:
value: |
### 💡 Implementation Ideas
- type: textarea
id: proposed_approach
attributes:
label: Proposed approach
description: Share any ideas you have for how this feature could be implemented. Point out any challenges your foresee
and the success metrics for this feature
placeholder: "e.g., Implement a breadth first traversal algorithm for scraper"
validations:
required: false

View File

@@ -1,127 +0,0 @@
name: Bug Report
description: Report a bug with the Crawl4AI.
title: "[Bug]: "
labels: ["🐞 Bug","🩺 Needs Triage"]
body:
- type: input
id: crawl4ai_version
attributes:
label: crawl4ai version
description: Specify the version of crawl4ai you are using.
placeholder: "e.g., 2.0.0"
validations:
required: true
- type: textarea
id: expected_behavior
attributes:
label: Expected Behavior
description: Describe what you expected to happen.
placeholder: "Provide a detailed explanation of the expected outcome."
validations:
required: true
- type: textarea
id: current_behavior
attributes:
label: Current Behavior
description: Describe what is happening instead of the expected behavior.
placeholder: "Describe the actual result or issue you encountered."
validations:
required: true
- type: dropdown
id: reproducible
attributes:
label: Is this reproducible?
description: Indicate whether this bug can be reproduced consistently.
options:
- "Yes"
- "No"
validations:
required: true
- type: textarea
id: inputs
attributes:
label: Inputs Causing the Bug
description: Provide details about the inputs causing the issue.
placeholder: |
- URL(s):
- Settings used:
- Input data (if applicable):
render: bash
- type: textarea
id: steps_to_reproduce
attributes:
label: Steps to Reproduce
description: Provide step-by-step instructions to reproduce the issue.
placeholder: |
1. Go to...
2. Click on...
3. Observe the issue...
render: bash
- type: textarea
id: code_snippets
attributes:
label: Code snippets
description: Provide code snippets(if any). Add comments as necessary
placeholder: print("Hello world")
render: python
# Header Section with Title
- type: markdown
attributes:
value: |
## Supporting Information
Please provide the following details to help us understand and resolve your issue. This will assist us in reproducing and diagnosing the problem
- type: input
id: os
attributes:
label: OS
description: Please provide the operating system & distro where the issue occurs.
placeholder: "e.g., Windows, macOS, Linux"
validations:
required: true
- type: input
id: python_version
attributes:
label: Python version
description: Specify the Python version being used.
placeholder: "e.g., 3.8.5"
validations:
required: true
# Browser Field
- type: input
id: browser
attributes:
label: Browser
description: Provide the name of the browser you are using.
placeholder: "e.g., Chrome, Firefox, Safari"
validations:
required: false
# Browser Version Field
- type: input
id: browser_version
attributes:
label: Browser version
description: Provide the version of the browser you are using.
placeholder: "e.g., 91.0.4472.124"
validations:
required: false
# Error Logs Field (Text Area)
- type: textarea
id: error_logs
attributes:
label: Error logs & Screenshots (if applicable)
description: If you encountered any errors, please provide the error logs. Attach any relevant screenshots to help us understand the issue.
placeholder: "Paste error logs here and attach your screenshots"
validations:
required: false

View File

@@ -1,8 +0,0 @@
blank_issues_enabled: false
contact_links:
- name: Feature Requests
url: https://github.com/unclecode/crawl4ai/discussions/categories/feature-requests
about: "Suggest new features or enhancements for Crawl4AI"
- name: Forums - Q&A
url: https://github.com/unclecode/crawl4ai/discussions/categories/forums-q-a
about: "Ask questions or engage in general discussions about Crawl4AI"

View File

@@ -1,19 +0,0 @@
## Summary
Please include a summary of the change and/or which issues are fixed.
eg: `Fixes #123` (Tag GitHub issue numbers in this format, so it automatically links the issues with your PR)
## List of files changed and why
eg: quickstart.py - To update the example as per new changes
## How Has This Been Tested?
Please describe the tests that you ran to verify your changes.
## Checklist:
- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] I have added/updated unit tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes

7
.gitignore vendored
View File

@@ -226,15 +226,8 @@ tree.md
.local
.do
/plans
plans/
# Codeium
.codeiumignore
todo/
# windsurf rules
.windsurfrules
# windsurf rules
.windsurfrules

View File

@@ -5,12 +5,9 @@ All notable changes to Crawl4AI will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
---
### Changed
Okay, here's a detailed changelog in Markdown format, generated from the provided git diff and commit history. I've focused on user-facing changes, fixes, and features, and grouped them as requested:
## Version 0.4.3b2 (2025-01-21)
## Version 0.4.3 (2025-01-21)
This release introduces several powerful new features, including robots.txt compliance, dynamic proxy support, LLM-powered schema generation, and improved documentation.
@@ -138,11 +135,9 @@ This release introduces several powerful new features, including robots.txt comp
- **Multiple Element Selection**: Modified `_get_elements` in `JsonCssExtractionStrategy` to return all matching elements instead of just the first one, ensuring comprehensive extraction. ([#extraction_strategy.py](crawl4ai/extraction_strategy.py))
- **Error Handling in Scrolling**: Added robust error handling to ensure scrolling proceeds safely even if a configuration is missing. ([#async_crawler_strategy.py](crawl4ai/async_crawler_strategy.py))
## [0.4.267] - 2025 - 01 - 06
#### Other
- **Git Ignore Update**: Added `/plans` to `.gitignore` for better development environment consistency. ([#.gitignore](.gitignore))
### Added
- **Windows Event Loop Configuration**: Introduced a utility function `configure_windows_event_loop` to resolve `NotImplementedError` for asyncio subprocesses on Windows. ([#utils.py](crawl4ai/utils.py), [#tutorials/async-webcrawler-basics.md](docs/md_v3/tutorials/async-webcrawler-basics.md))
- **`page_need_scroll` Method**: Added a method to determine if a page requires scrolling before taking actions in `AsyncPlaywrightCrawlerStrategy`. ([#async_crawler_strategy.py](crawl4ai/async_crawler_strategy.py))
## [0.4.24] - 2024-12-31

View File

@@ -1,131 +0,0 @@
# Crawl4AI Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, caste, color, religion, or sexual
identity and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the overall
community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or advances of
any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email address,
without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official email address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
unclecode@crawl4ai.com. All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series of
actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or permanent
ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within the
community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.1, available at
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
Community Impact Guidelines were inspired by
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
[https://www.contributor-covenant.org/translations][translations].
[homepage]: https://www.contributor-covenant.org
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
[Mozilla CoC]: https://github.com/mozilla/diversity
[FAQ]: https://www.contributor-covenant.org/faq
[translations]: https://www.contributor-covenant.org/translations

View File

@@ -15,7 +15,6 @@
[![License](https://img.shields.io/github/license/unclecode/crawl4ai)](https://github.com/unclecode/crawl4ai/blob/main/LICENSE)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Security: bandit](https://img.shields.io/badge/security-bandit-yellow.svg)](https://github.com/PyCQA/bandit)
[![Contributor Covenant](https://img.shields.io/badge/Contributor%20Covenant-2.1-4baaaa.svg)](code_of_conduct.md)
</div>
@@ -447,7 +446,7 @@ if __name__ == "__main__":
</details>
<details>
<summary>🤖 <strong>Using You own Browser with Custom User Profile</strong></summary>
<summary>🤖 <strong>Using You own Browswer with Custome User Profile</strong></summary>
```python
import os, sys

View File

@@ -1,2 +1,3 @@
# crawl4ai/_version.py
__version__ = "0.4.3b3"
# __version__ = "0.4.3b3"
__version__ = "0.4.248b3"

View File

@@ -10,7 +10,6 @@ from .config import (
from .user_agent_generator import UserAgentGenerator, UAGen, ValidUAGenerator, OnlineUAGenerator
from .extraction_strategy import ExtractionStrategy
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .deep_crawl import DeepCrawlStrategy
from .markdown_generation_strategy import MarkdownGenerationStrategy
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter, LLMContentFilter, PruningContentFilter
from .content_scraping_strategy import ContentScrapingStrategy, WebScrapingStrategy
@@ -396,7 +395,6 @@ class CrawlerRunConfig:
word_count_threshold: int = MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
deep_crawl_strategy: DeepCrawlStrategy = None,
markdown_generator: MarkdownGenerationStrategy = None,
content_filter : RelevantContentFilter = None,
only_text: bool = False,
@@ -470,7 +468,6 @@ class CrawlerRunConfig:
self.word_count_threshold = word_count_threshold
self.extraction_strategy = extraction_strategy
self.chunking_strategy = chunking_strategy
self.deep_crawl_strategy = deep_crawl_strategy
self.markdown_generator = markdown_generator
self.content_filter = content_filter
self.only_text = only_text
@@ -558,14 +555,6 @@ class CrawlerRunConfig:
raise ValueError(
"extraction_strategy must be an instance of ExtractionStrategy"
)
if self.deep_crawl_strategy is not None and not isinstance(
self.deep_crawl_strategy, DeepCrawlStrategy
):
raise ValueError(
"deep_crawl_strategy must be an instance of DeepCrawlStrategy"
)
if self.chunking_strategy is not None and not isinstance(
self.chunking_strategy, ChunkingStrategy
):
@@ -584,7 +573,6 @@ class CrawlerRunConfig:
word_count_threshold=kwargs.get("word_count_threshold", 200),
extraction_strategy=kwargs.get("extraction_strategy"),
chunking_strategy=kwargs.get("chunking_strategy", RegexChunking()),
deep_crawl_strategy=kwargs.get("deep_crawl_strategy"),
markdown_generator=kwargs.get("markdown_generator"),
content_filter=kwargs.get("content_filter"),
only_text=kwargs.get("only_text", False),
@@ -668,7 +656,6 @@ class CrawlerRunConfig:
"word_count_threshold": self.word_count_threshold,
"extraction_strategy": self.extraction_strategy,
"chunking_strategy": self.chunking_strategy,
"deep_crawl_strategy": self.deep_crawl_strategy,
"markdown_generator": self.markdown_generator,
"content_filter": self.content_filter,
"only_text": self.only_text,

View File

@@ -10,19 +10,13 @@ import asyncio
# from contextlib import nullcontext, asynccontextmanager
from contextlib import asynccontextmanager
from .models import (
CrawlResult,
MarkdownGenerationResult,
CrawlerTaskResult,
DispatchResult,
)
from .models import CrawlResult, MarkdownGenerationResult, CrawlerTaskResult, DispatchResult
from .async_database import async_db_manager
from .chunking_strategy import * # noqa: F403
from .chunking_strategy import RegexChunking, ChunkingStrategy, IdentityChunking
from .content_filter_strategy import * # noqa: F403
from .content_filter_strategy import RelevantContentFilter
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import * # noqa: F403
from .extraction_strategy import NoExtractionStrategy, ExtractionStrategy
from .async_crawler_strategy import (
AsyncCrawlerStrategy,
@@ -36,9 +30,8 @@ from .markdown_generation_strategy import (
)
from .async_logger import AsyncLogger
from .async_configs import BrowserConfig, CrawlerRunConfig
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import * # noqa: F403
from .async_dispatcher import BaseDispatcher, MemoryAdaptiveDispatcher, RateLimiter
from .deep_crawl import DeepCrawlStrategy
from .config import MIN_WORD_THRESHOLD
from .utils import (
@@ -53,17 +46,10 @@ from .utils import (
from typing import Union, AsyncGenerator, List, TypeVar
from collections.abc import AsyncGenerator
from .__version__ import __version__ as crawl4ai_version
CrawlResultT = TypeVar("CrawlResultT", bound=CrawlResult)
CrawlResultT = TypeVar('CrawlResultT', bound=CrawlResult)
RunManyReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
DeepCrawlSingleReturn = Union[List[CrawlResultT], AsyncGenerator[CrawlResultT, None]]
DeepCrawlManyReturn = Union[
List[List[CrawlResultT]],
AsyncGenerator[CrawlResultT, None],
]
from .__version__ import __version__ as crawl4ai_version
class AsyncWebCrawler:
@@ -271,7 +257,7 @@ class AsyncWebCrawler:
@asynccontextmanager
async def nullcontext(self):
"""Asynchronous null context manager"""
"""异步空上下文管理器"""
yield
async def arun(
@@ -296,7 +282,7 @@ class AsyncWebCrawler:
user_agent: str = None,
verbose=True,
**kwargs,
) -> Union[CrawlResult, DeepCrawlSingleReturn]:
) -> CrawlResult:
"""
Runs the crawler for a single source: URL (web, local file, or raw HTML).
@@ -398,23 +384,6 @@ class AsyncWebCrawler:
extracted_content = None
start_time = time.perf_counter()
if crawler_config.deep_crawl_strategy:
if crawler_config.stream:
return crawler_config.deep_crawl_strategy.arun(
start_url=url,
crawler=self,
crawler_run_config=crawler_config,
)
else:
results = []
async for result in crawler_config.deep_crawl_strategy.arun(
start_url=url,
crawler=self,
crawler_run_config=crawler_config,
):
results.append(result)
return results
# Try to get cached result if appropriate
if cache_context.should_read():
cached_result = await async_db_manager.aget_cached_url(url)
@@ -451,18 +420,14 @@ class AsyncWebCrawler:
# Check robots.txt if enabled
if config and config.check_robots_txt:
if not await self.robots_parser.can_fetch(
url, self.browser_config.user_agent
):
if not await self.robots_parser.can_fetch(url, self.browser_config.user_agent):
return CrawlResult(
url=url,
html="",
success=False,
status_code=403,
error_message="Access denied by robots.txt",
response_headers={
"X-Robots-Status": "Blocked by robots.txt"
},
response_headers={"X-Robots-Status": "Blocked by robots.txt"}
)
# Pass config to crawl method
@@ -484,7 +449,7 @@ class AsyncWebCrawler:
)
# Process the HTML content
crawl_result: CrawlResult = await self.aprocess_html(
crawl_result : CrawlResult = await self.aprocess_html(
url=url,
html=html,
extracted_content=extracted_content,
@@ -752,7 +717,7 @@ class AsyncWebCrawler:
async def arun_many(
self,
urls: List[str],
config: Optional[CrawlerRunConfig] = None,
config: Optional[CrawlerRunConfig] = None,
dispatcher: Optional[BaseDispatcher] = None,
# Legacy parameters maintained for backwards compatibility
word_count_threshold=MIN_WORD_THRESHOLD,
@@ -766,8 +731,8 @@ class AsyncWebCrawler:
pdf: bool = False,
user_agent: str = None,
verbose=True,
**kwargs,
) -> Union[RunManyReturn, DeepCrawlManyReturn]:
**kwargs
) -> RunManyReturn:
"""
Runs the crawler for multiple URLs concurrently using a configurable dispatcher strategy.
@@ -798,22 +763,6 @@ class AsyncWebCrawler:
):
print(f"Processed {result.url}: {len(result.markdown)} chars")
"""
async def merge_async_generators(generators):
tasks = {asyncio.create_task(gen.__anext__()): gen for gen in generators}
while tasks:
done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
gen = tasks.pop(task) # Get the generator associated with this task
try:
result = task.result()
yield result # Yield the result
tasks[asyncio.create_task(gen.__anext__())] = gen # Fetch next item
except StopAsyncIteration:
pass # Generator is exhausted, don't add it back to the tasks
if config is None:
config = CrawlerRunConfig(
word_count_threshold=word_count_threshold,
@@ -837,9 +786,7 @@ class AsyncWebCrawler:
)
transform_result = lambda task_result: (
setattr(
task_result.result,
"dispatch_result",
setattr(task_result.result, 'dispatch_result',
DispatchResult(
task_id=task_result.task_id,
memory_usage=task_result.memory_usage,
@@ -847,46 +794,20 @@ class AsyncWebCrawler:
start_time=task_result.start_time,
end_time=task_result.end_time,
error_message=task_result.error_message,
),
)
or task_result.result
)
) or task_result.result
)
stream = config.stream
if config.deep_crawl_strategy:
if config.stream:
generators = []
for url in urls:
generators.append(
config.deep_crawl_strategy.arun(
start_url=url, crawler=self, crawler_run_config=config
)
)
return merge_async_generators(generators)
else:
results = []
for url in urls:
url_results = []
async for result in config.deep_crawl_strategy.arun(
start_url=url, crawler=self, crawler_run_config=config
):
url_results.append(result)
results.append(url_results)
return results
if stream:
async def result_transformer():
async for task_result in dispatcher.run_urls_stream(
crawler=self, urls=urls, config=config
):
async for task_result in dispatcher.run_urls_stream(crawler=self, urls=urls, config=config):
yield transform_result(task_result)
return result_transformer()
else:
_results = await dispatcher.run_urls(crawler=self, urls=urls, config=config)
return [transform_result(res) for res in _results]
return [transform_result(res) for res in _results]
async def aclear_cache(self):
"""Clear the cache database."""

View File

@@ -84,4 +84,3 @@ SHOW_DEPRECATION_WARNINGS = True
SCREENSHOT_HEIGHT_TRESHOLD = 10000
PAGE_TIMEOUT = 60000
DOWNLOAD_PAGE_TIMEOUT = 60000
DEEP_CRAWL_BATCH_SIZE = 5

View File

@@ -1,29 +0,0 @@
from .bfs_deep_crawl_strategy import BFSDeepCrawlStrategy
from .filters import (
URLFilter,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
)
from .scorers import (
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer,
)
from .deep_crawl_strategty import DeepCrawlStrategy
__all__ = [
"BFSDeepCrawlStrategy",
"FilterChain",
"URLFilter",
"URLPatternFilter",
"ContentTypeFilter",
"DomainFilter",
"KeywordRelevanceScorer",
"PathDepthScorer",
"FreshnessScorer",
"CompositeScorer",
"DeepCrawlStrategy",
]

View File

@@ -1,193 +0,0 @@
from typing import AsyncGenerator, Optional, Dict, Set, List
from datetime import datetime
import asyncio
import logging
from urllib.parse import urlparse
from ..models import CrawlResult, TraversalStats
from .filters import FilterChain
from .scorers import URLScorer
from .deep_crawl_strategty import DeepCrawlStrategy
from ..config import DEEP_CRAWL_BATCH_SIZE
class BFSDeepCrawlStrategy(DeepCrawlStrategy):
"""Best-First Search traversal strategy with filtering and scoring."""
def __init__(
self,
max_depth: int,
filter_chain: FilterChain,
url_scorer: URLScorer,
process_external_links: bool = False,
logger: Optional[logging.Logger] = None,
):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.logger = logger or logging.getLogger(__name__)
# Crawl control
self.stats = TraversalStats(start_time=datetime.now())
self._cancel_event = asyncio.Event()
self.process_external_links = process_external_links
async def can_process_url(self, url: str, depth: int) -> bool:
"""Check if URL can be processed based on filters
This is our gatekeeper method that determines if a URL should be processed. It:
- Validates URL format using a robust built-in method
- Applies custom filters from the filter chain
- Updates statistics for blocked URLs
- Returns False early if any check fails
"""
try:
result = urlparse(url)
if not all([result.scheme, result.netloc]):
raise ValueError("Invalid URL")
if result.scheme not in ("http", "https"):
raise ValueError("URL must be HTTP or HTTPS")
if not result.netloc or "." not in result.netloc:
raise ValueError("Invalid domain")
except Exception as e:
self.logger.warning(f"Invalid URL: {url}. Error: {str(e)}")
return False
# Apply the filter chain if it's not start page
if depth != 0 and not self.filter_chain.apply(url):
return False
return True
async def _process_links(
self,
result: CrawlResult,
source_url: str,
queue: asyncio.PriorityQueue,
visited: Set[str],
depths: Dict[str, int],
) -> List[str]:
"""Process extracted links from crawl result.
This is our link processor that:
Checks depth limits
Handles both internal and external links
Checks if URL is visited already
Checks if URL can be processed - validates URL, applies Filters with can_process_url
Scores URLs for priority
Updates depth tracking dictionary
Adds valid URLs to the queue
Updates maximum depth statistics
"""
next_depth = depths[source_url] + 1
# If depth limit reached, exit without processing links
if next_depth > self.max_depth:
return
links_to_process = result.links["internal"]
if self.process_external_links:
links_to_process += result.links["external"]
for link in links_to_process:
url = link["href"]
if url in visited:
continue
if not await self.can_process_url(url, next_depth):
self.stats.urls_skipped += 1
continue
score = self.url_scorer.score(url) if self.url_scorer else 0
await queue.put((score, next_depth, url, source_url))
depths[url] = next_depth
self.stats.total_depth_reached = max(
self.stats.total_depth_reached, next_depth
)
async def arun(
self,
start_url: str,
crawler: "AsyncWebCrawler",
crawler_run_config: Optional["CrawlerRunConfig"] = None,
) -> AsyncGenerator[CrawlResult, None]:
"""Implement BFS traversal strategy"""
# Initialize traversal state
"""
queue: A priority queue where items are tuples of (score, depth, url)
Score: Determines traversal priority (lower = higher priority)
Depth: Current distance from start_url
URL: The actual URL to crawl
visited: Keeps track of URLs we've already seen to avoid cycles
depths: Maps URLs to their depths from the start URL
active_crawls: Tracks currently running crawl tasks
"""
queue = asyncio.PriorityQueue()
await queue.put((0, 0, start_url, None))
visited: Set[str] = set()
depths = {start_url: 0}
active_crawls = {} # Track URLs currently being processed with depth and score
active_crawls_lock = (
asyncio.Lock()
) # Create the lock within the same event loop
try:
while (
not queue.empty() or active_crawls
) and not self._cancel_event.is_set():
"""
This sets up our main control loop which:
- Continues while there are URLs to process (not queue.empty())
- Or while there are active crawls still running (arun_many)
- Can be interrupted via cancellation (not self._cancel_event.is_set())
"""
# Collect batch of URLs into active_crawls to process
async with active_crawls_lock:
while (
len(active_crawls) < DEEP_CRAWL_BATCH_SIZE and not queue.empty()
):
score, depth, url, parent_url = await queue.get()
active_crawls[url] = {
"depth": depth,
"score": score,
"parent_url": parent_url,
}
self.stats.current_depth = depth
if not active_crawls:
# If no active crawls exist, wait a bit and continue
await asyncio.sleep(0.1)
continue
# Process batch
try:
# This is very important to ensure recursively you don't deep_crawl down the children.
if crawler_run_config:
crawler_run_config = crawler_run_config.clone(
deep_crawl_strategy=None, stream=True
)
async for result in await crawler.arun_many(
urls=list(active_crawls.keys()),
config=crawler_run_config
):
async with active_crawls_lock:
crawl_info = active_crawls.pop(result.url, None)
if crawl_info and result.success:
await self._process_links(
result, result.url, queue, visited, depths
)
result.depth = crawl_info["depth"]
result.score = crawl_info["score"]
result.parent_url = crawl_info["parent_url"]
yield result
else:
self.logger.warning(
f"Failed to crawl {result.url}: {result.error_message}"
)
except Exception as e:
self.logger.error(f"Batch processing error: {e}")
# Continue processing other batches
continue
except Exception as e:
self.logger.error(f"Error in crawl process: {e}")
raise
finally:
self.stats.end_time = datetime.now()
async def shutdown(self):
"""Clean up resources and stop crawling"""
self._cancel_event.set()

View File

@@ -1,30 +0,0 @@
from abc import ABC, abstractmethod
from typing import AsyncGenerator, Optional
from ..models import CrawlResult
class DeepCrawlStrategy(ABC):
@abstractmethod
async def arun(
self,
url: str,
crawler: "AsyncWebCrawler",
crawler_run_config: Optional["CrawlerRunConfig"] = None,
) -> AsyncGenerator[CrawlResult, None]:
"""Traverse the given URL using the specified crawler.
Args:
url (str): The starting URL for the traversal.
crawler (AsyncWebCrawler): The crawler instance to use for traversal.
crawler_run_config (CrawlerRunConfig, optional): The configuration for the crawler.
Returns:
AsyncGenerator[CrawlResult, None]: An async generator yielding crawl results.
"""
pass
@abstractmethod
async def shutdown(self):
"""Clean up resources used by the strategy"""
pass

View File

@@ -1,868 +0,0 @@
from abc import ABC, abstractmethod
from typing import List, Pattern, Set, Union, FrozenSet
import re, time
from urllib.parse import urlparse
from array import array
import logging
from functools import lru_cache
import fnmatch
from dataclasses import dataclass
from typing import ClassVar
import weakref
import mimetypes
@dataclass
class FilterStats:
# PERF: Using dataclass creates overhead with __init__ and property access
# PERF: Could use __slots__ to reduce memory footprint
# PERF: Consider using array.array('I') for atomic increments
total_urls: int = 0
rejected_urls: int = 0
passed_urls: int = 0
class URLFilter(ABC):
# PERF: Logger creation is expensive, consider lazy initialization
# PERF: stats object creation adds overhead for each filter instance
def __init__(self, name: str = None):
self.name = name or self.__class__.__name__
self.stats = FilterStats()
self.logger = logging.getLogger(f"urlfilter.{self.name}")
@abstractmethod
def apply(self, url: str) -> bool:
pass
def _update_stats(self, passed: bool):
# PERF: Already optimized but could use bitwise operations
# PERF: Consider removing stats entirely in production/fast mode
self.stats.total_urls += 1
self.stats.passed_urls += passed
self.stats.rejected_urls += not passed
class FilterChain:
# PERF: List traversal for each URL is expensive
# PERF: Could use array.array instead of list for filters
# PERF: Consider adding fast path for single filter case
def __init__(self, filters: List[URLFilter] = None):
self.filters = filters or []
self.stats = FilterStats()
self.logger = logging.getLogger("urlfilter.chain")
def apply(self, url: str) -> bool:
# PERF: Logging on every rejection is expensive
# PERF: Could reorder filters by rejection rate
# PERF: Consider batch processing mode
self.stats.total_urls += 1
for filter_ in self.filters:
if not filter_.apply(url):
self.stats.rejected_urls += 1
self.logger.debug(f"URL {url} rejected by {filter_.name}")
return False
self.stats.passed_urls += 1
return True
class URLPatternFilter(URLFilter):
# PERF: Converting glob to regex is expensive
# PERF: Multiple regex compilation is slow
# PERF: List of patterns causes multiple regex evaluations
def __init__(
self,
patterns: Union[str, Pattern, List[Union[str, Pattern]]],
use_glob: bool = True,
):
super().__init__()
self.patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns
self.use_glob = use_glob
self._compiled_patterns = []
# PERF: This could be consolidated into a single regex with OR conditions
# PERF: glob_to_regex creates complex patterns, could be simplified
for pattern in self.patterns:
if isinstance(pattern, str) and use_glob:
self._compiled_patterns.append(self._glob_to_regex(pattern))
else:
self._compiled_patterns.append(
re.compile(pattern) if isinstance(pattern, str) else pattern
)
def _glob_to_regex(self, pattern: str) -> Pattern:
# PERF: fnmatch.translate creates overly complex patterns
# PERF: Could cache common translations
return re.compile(fnmatch.translate(pattern))
def apply(self, url: str) -> bool:
# PERF: any() with generator is slower than direct loop with early return
# PERF: searching entire string is slower than anchored match
matches = any(pattern.search(url) for pattern in self._compiled_patterns)
self._update_stats(matches)
return matches
class ContentTypeFilter(URLFilter):
# PERF: mimetypes guessing is extremely slow
# PERF: URL parsing on every check is expensive
# PERF: No caching of results for similar extensions
def __init__(
self, allowed_types: Union[str, List[str]], check_extension: bool = True
):
super().__init__()
self.allowed_types = (
[allowed_types] if isinstance(allowed_types, str) else allowed_types
)
self.check_extension = check_extension
self._normalize_types()
def _normalize_types(self):
"""Normalize content type strings"""
self.allowed_types = [t.lower() for t in self.allowed_types]
def _check_extension(self, url: str) -> bool:
# PERF: urlparse is called on every check
# PERF: multiple string splits are expensive
# PERF: mimetypes.guess_type is very slow
ext = (
urlparse(url).path.split(".")[-1].lower()
if "." in urlparse(url).path
else ""
)
if not ext:
return True
# PERF: guess_type is main bottleneck
guessed_type = mimetypes.guess_type(url)[0]
return any(
allowed in (guessed_type or "").lower() for allowed in self.allowed_types
)
def apply(self, url: str) -> bool:
"""Check if URL's content type is allowed"""
result = True
if self.check_extension:
result = self._check_extension(url)
self._update_stats(result)
return result
class DomainFilter(URLFilter):
# PERF: Set lookups are fast but string normalizations on init are not
# PERF: Creating two sets doubles memory usage
def __init__(
self,
allowed_domains: Union[str, List[str]] = None,
blocked_domains: Union[str, List[str]] = None,
):
super().__init__()
# PERF: Normalizing domains on every init is wasteful
# PERF: Could use frozenset for immutable lists
self.allowed_domains = (
set(self._normalize_domains(allowed_domains)) if allowed_domains else None
)
self.blocked_domains = (
set(self._normalize_domains(blocked_domains)) if blocked_domains else set()
)
def _normalize_domains(self, domains: Union[str, List[str]]) -> List[str]:
# PERF: strip() and lower() create new strings for each domain
# PERF: List comprehension creates intermediate list
if isinstance(domains, str):
domains = [domains]
return [d.lower().strip() for d in domains]
def _extract_domain(self, url: str) -> str:
# PERF: urlparse is called for every URL check
# PERF: lower() creates new string every time
# PERF: Could cache recent results
return urlparse(url).netloc.lower()
def apply(self, url: str) -> bool:
# PERF: Two separate set lookups in worst case
# PERF: Domain extraction happens before knowing if we have any filters
domain = self._extract_domain(url)
if domain in self.blocked_domains:
self._update_stats(False)
return False
if self.allowed_domains is not None and domain not in self.allowed_domains:
self._update_stats(False)
return False
self._update_stats(True)
return True
# Example usage:
def create_common_filter_chain() -> FilterChain:
"""Create a commonly used filter chain"""
return FilterChain(
[
URLPatternFilter(
[
"*.html",
"*.htm", # HTML files
"*/article/*",
"*/blog/*", # Common content paths
]
),
ContentTypeFilter(["text/html", "application/xhtml+xml"]),
DomainFilter(blocked_domains=["ads.*", "analytics.*"]),
]
)
####################################################################################
# Uncledoe: Optimized Version
####################################################################################
# Use __slots__ and array for maximum memory/speed efficiency
class FastFilterStats:
__slots__ = ("_counters",)
def __init__(self):
# Use array of unsigned ints for atomic operations
self._counters = array("I", [0, 0, 0]) # total, passed, rejected
@property
def total_urls(self):
return self._counters[0]
@property
def passed_urls(self):
return self._counters[1]
@property
def rejected_urls(self):
return self._counters[2]
class FastURLFilter(ABC):
"""Optimized base filter class"""
__slots__ = ("name", "stats", "_logger_ref")
def __init__(self, name: str = None):
self.name = name or self.__class__.__name__
self.stats = FastFilterStats()
# Lazy logger initialization using weakref
self._logger_ref = None
@property
def logger(self):
if self._logger_ref is None or self._logger_ref() is None:
logger = logging.getLogger(f"urlfilter.{self.name}")
self._logger_ref = weakref.ref(logger)
return self._logger_ref()
@abstractmethod
def apply(self, url: str) -> bool:
pass
def _update_stats(self, passed: bool):
# Use direct array index for speed
self.stats._counters[0] += 1 # total
self.stats._counters[1] += passed # passed
self.stats._counters[2] += not passed # rejected
class FastFilterChain:
"""Optimized filter chain"""
__slots__ = ("filters", "stats", "_logger_ref")
def __init__(self, filters: List[FastURLFilter] = None):
self.filters = tuple(filters or []) # Immutable tuple for speed
self.stats = FastFilterStats()
self._logger_ref = None
@property
def logger(self):
if self._logger_ref is None or self._logger_ref() is None:
logger = logging.getLogger("urlfilter.chain")
self._logger_ref = weakref.ref(logger)
return self._logger_ref()
def add_filter(self, filter_: FastURLFilter) -> "FastFilterChain":
"""Add a filter to the chain"""
self.filters.append(filter_)
return self # Enable method chaining
def apply(self, url: str) -> bool:
"""Optimized apply with minimal operations"""
self.stats._counters[0] += 1 # total
# Direct tuple iteration is faster than list
for f in self.filters:
if not f.apply(url):
self.stats._counters[2] += 1 # rejected
return False
self.stats._counters[1] += 1 # passed
return True
class FastURLPatternFilter(FastURLFilter):
"""Pattern filter balancing speed and completeness"""
__slots__ = ('_simple_suffixes', '_simple_prefixes', '_domain_patterns', '_path_patterns')
PATTERN_TYPES = {
'SUFFIX': 1, # *.html
'PREFIX': 2, # /foo/*
'DOMAIN': 3, # *.example.com
'PATH': 4 , # Everything else
'REGEX': 5
}
def __init__(self, patterns: Union[str, Pattern, List[Union[str, Pattern]]], use_glob: bool = True):
super().__init__()
patterns = [patterns] if isinstance(patterns, (str, Pattern)) else patterns
self._simple_suffixes = set()
self._simple_prefixes = set()
self._domain_patterns = []
self._path_patterns = []
for pattern in patterns:
pattern_type = self._categorize_pattern(pattern)
self._add_pattern(pattern, pattern_type)
def _categorize_pattern(self, pattern: str) -> int:
"""Categorize pattern for specialized handling"""
if not isinstance(pattern, str):
return self.PATTERN_TYPES['PATH']
# Check if it's a regex pattern
if pattern.startswith('^') or pattern.endswith('$') or '\\d' in pattern:
return self.PATTERN_TYPES['REGEX']
if pattern.count('*') == 1:
if pattern.startswith('*.'):
return self.PATTERN_TYPES['SUFFIX']
if pattern.endswith('/*'):
return self.PATTERN_TYPES['PREFIX']
if '://' in pattern and pattern.startswith('*.'):
return self.PATTERN_TYPES['DOMAIN']
return self.PATTERN_TYPES['PATH']
def _add_pattern(self, pattern: str, pattern_type: int):
"""Add pattern to appropriate matcher"""
if pattern_type == self.PATTERN_TYPES['REGEX']:
# For regex patterns, compile directly without glob translation
if isinstance(pattern, str) and (pattern.startswith('^') or pattern.endswith('$') or '\\d' in pattern):
self._path_patterns.append(re.compile(pattern))
return
elif pattern_type == self.PATTERN_TYPES['SUFFIX']:
self._simple_suffixes.add(pattern[2:])
elif pattern_type == self.PATTERN_TYPES['PREFIX']:
self._simple_prefixes.add(pattern[:-2])
elif pattern_type == self.PATTERN_TYPES['DOMAIN']:
self._domain_patterns.append(
re.compile(pattern.replace('*.', r'[^/]+\.'))
)
else:
if isinstance(pattern, str):
# Handle complex glob patterns
if '**' in pattern:
pattern = pattern.replace('**', '.*')
if '{' in pattern:
# Convert {a,b} to (a|b)
pattern = re.sub(r'\{([^}]+)\}',
lambda m: f'({"|".join(m.group(1).split(","))})',
pattern)
pattern = fnmatch.translate(pattern)
self._path_patterns.append(
pattern if isinstance(pattern, Pattern) else re.compile(pattern)
)
@lru_cache(maxsize=10000)
def apply(self, url: str) -> bool:
"""Hierarchical pattern matching"""
# Quick suffix check (*.html)
if self._simple_suffixes:
path = url.split('?')[0]
if path.split('/')[-1].split('.')[-1] in self._simple_suffixes:
self._update_stats(True)
return True
# Domain check
if self._domain_patterns:
for pattern in self._domain_patterns:
if pattern.match(url):
self._update_stats(True)
return True
# Prefix check (/foo/*)
if self._simple_prefixes:
path = url.split('?')[0]
if any(path.startswith(p) for p in self._simple_prefixes):
self._update_stats(True)
return True
# Complex patterns
if self._path_patterns:
if any(p.search(url) for p in self._path_patterns):
self._update_stats(True)
return True
self._update_stats(False)
return False
class FastContentTypeFilter(FastURLFilter):
"""Optimized content type filter using fast lookups"""
__slots__ = ("allowed_types", "_ext_map", "_check_extension")
# Fast extension to mime type mapping
_MIME_MAP = {
# Text Formats
"txt": "text/plain",
"html": "text/html",
"htm": "text/html",
"xhtml": "application/xhtml+xml",
"css": "text/css",
"csv": "text/csv",
"ics": "text/calendar",
"js": "application/javascript",
# Images
"bmp": "image/bmp",
"gif": "image/gif",
"jpeg": "image/jpeg",
"jpg": "image/jpeg",
"png": "image/png",
"svg": "image/svg+xml",
"tiff": "image/tiff",
"ico": "image/x-icon",
"webp": "image/webp",
# Audio
"mp3": "audio/mpeg",
"wav": "audio/wav",
"ogg": "audio/ogg",
"m4a": "audio/mp4",
"aac": "audio/aac",
# Video
"mp4": "video/mp4",
"mpeg": "video/mpeg",
"webm": "video/webm",
"avi": "video/x-msvideo",
"mov": "video/quicktime",
"flv": "video/x-flv",
"wmv": "video/x-ms-wmv",
"mkv": "video/x-matroska",
# Applications
"json": "application/json",
"xml": "application/xml",
"pdf": "application/pdf",
"zip": "application/zip",
"gz": "application/gzip",
"tar": "application/x-tar",
"rar": "application/vnd.rar",
"7z": "application/x-7z-compressed",
"exe": "application/vnd.microsoft.portable-executable",
"msi": "application/x-msdownload",
# Fonts
"woff": "font/woff",
"woff2": "font/woff2",
"ttf": "font/ttf",
"otf": "font/otf",
# Microsoft Office
"doc": "application/msword",
"dot": "application/msword",
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"xls": "application/vnd.ms-excel",
"ppt": "application/vnd.ms-powerpoint",
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
# OpenDocument Formats
"odt": "application/vnd.oasis.opendocument.text",
"ods": "application/vnd.oasis.opendocument.spreadsheet",
"odp": "application/vnd.oasis.opendocument.presentation",
# Archives
"tar.gz": "application/gzip",
"tgz": "application/gzip",
"bz2": "application/x-bzip2",
# Others
"rtf": "application/rtf",
"apk": "application/vnd.android.package-archive",
"epub": "application/epub+zip",
"jar": "application/java-archive",
"swf": "application/x-shockwave-flash",
"midi": "audio/midi",
"mid": "audio/midi",
"ps": "application/postscript",
"ai": "application/postscript",
"eps": "application/postscript",
# Custom or less common
"bin": "application/octet-stream",
"dmg": "application/x-apple-diskimage",
"iso": "application/x-iso9660-image",
"deb": "application/x-debian-package",
"rpm": "application/x-rpm",
"sqlite": "application/vnd.sqlite3",
# Placeholder
"unknown": "application/octet-stream", # Fallback for unknown file types
}
@staticmethod
@lru_cache(maxsize=1000)
def _extract_extension(path: str) -> str:
"""Fast extension extraction with caching"""
if "." not in path:
return ""
return path.rpartition(".")[-1].lower()
def __init__(
self, allowed_types: Union[str, List[str]], check_extension: bool = True
):
super().__init__()
# Normalize and store as frozenset for fast lookup
self.allowed_types = frozenset(
t.lower()
for t in (
allowed_types if isinstance(allowed_types, list) else [allowed_types]
)
)
self._check_extension = check_extension
# Pre-compute extension map for allowed types
self._ext_map = frozenset(
ext
for ext, mime in self._MIME_MAP.items()
if any(allowed in mime for allowed in self.allowed_types)
)
@lru_cache(maxsize=1000)
def _check_url_cached(self, url: str) -> bool:
"""Cached URL checking"""
if not self._check_extension:
return True
path = url.split("?")[0] # Fast path split
ext = self._extract_extension(path)
if not ext:
return True
return ext in self._ext_map
def apply(self, url: str) -> bool:
"""Fast extension check with caching"""
result = self._check_url_cached(url)
self._update_stats(result)
return result
class FastDomainFilter(FastURLFilter):
"""Optimized domain filter with fast lookups and caching"""
__slots__ = ("_allowed_domains", "_blocked_domains", "_domain_cache")
# Regex for fast domain extraction
_DOMAIN_REGEX = re.compile(r"://([^/]+)")
def __init__(
self,
allowed_domains: Union[str, List[str]] = None,
blocked_domains: Union[str, List[str]] = None,
):
super().__init__()
# Convert inputs to frozensets for immutable, fast lookups
self._allowed_domains = (
frozenset(self._normalize_domains(allowed_domains))
if allowed_domains
else None
)
self._blocked_domains = (
frozenset(self._normalize_domains(blocked_domains))
if blocked_domains
else frozenset()
)
@staticmethod
def _normalize_domains(domains: Union[str, List[str]]) -> Set[str]:
"""Fast domain normalization"""
if isinstance(domains, str):
return {domains.lower()}
return {d.lower() for d in domains}
@staticmethod
@lru_cache(maxsize=10000)
def _extract_domain(url: str) -> str:
"""Ultra-fast domain extraction with regex and caching"""
match = FastDomainFilter._DOMAIN_REGEX.search(url)
return match.group(1).lower() if match else ""
def apply(self, url: str) -> bool:
"""Optimized domain checking with early returns"""
# Skip processing if no filters
if not self._blocked_domains and self._allowed_domains is None:
self._update_stats(True)
return True
domain = self._extract_domain(url)
# Early return for blocked domains
if domain in self._blocked_domains:
self._update_stats(False)
return False
# If no allowed domains specified, accept all non-blocked
if self._allowed_domains is None:
self._update_stats(True)
return True
# Final allowed domains check
result = domain in self._allowed_domains
self._update_stats(result)
return result
def create_fast_filter_chain() -> FastFilterChain:
"""Create an optimized filter chain with filters ordered by rejection rate"""
return FastFilterChain(
[
# Domain filter first (fastest rejection)
FastDomainFilter(blocked_domains=["ads.*", "analytics.*"]),
# Content filter second (medium speed)
FastContentTypeFilter(["text/html", "application/xhtml+xml"]),
# Pattern filter last (most expensive)
FastURLPatternFilter(
[
"*.html",
"*.htm",
"*/article/*",
"*/blog/*",
]
),
]
)
def run_performance_test():
import time
import random
from itertools import cycle
# Generate test URLs
base_urls = [
"https://example.com/article/123",
"https://blog.example.com/post/456",
"https://ads.example.com/tracking",
"https://example.com/about.html",
"https://analytics.example.com/script.js",
"https://example.com/products.php",
"https://subdomain.example.com/blog/post-123",
"https://example.com/path/file.pdf",
]
# Create more varied test data
test_urls = []
for base in base_urls:
# Add original
test_urls.append(base)
# Add variations
parts = base.split("/")
for i in range(10):
parts[-1] = f"page_{i}.html"
test_urls.append("/".join(parts))
# Multiply to get enough test data
test_urls = test_urls * 10000 # Creates ~800k URLs
def benchmark(name: str, func, *args, warmup=True):
if warmup:
# Warmup run
func(*args)
# Actual timing
start = time.perf_counter_ns()
result = func(*args)
elapsed = (time.perf_counter_ns() - start) / 1_000_000 # Convert to ms
print(
f"{name:<30} {elapsed:>8.3f} ms ({len(test_urls)/elapsed*1000:,.0f} URLs/sec)"
)
return result
print("\nBenchmarking original vs optimized implementations...")
print("-" * 70)
# Original implementation
pattern_filter = URLPatternFilter(["*.html", "*/article/*"])
content_filter = ContentTypeFilter(["text/html"])
domain_filter = DomainFilter(blocked_domains=["ads.*", "analytics.*"])
chain = FilterChain([pattern_filter, content_filter, domain_filter])
# Optimized implementation
fast_pattern_filter = FastURLPatternFilter(["*.html", "*/article/*"])
fast_content_filter = FastContentTypeFilter(["text/html"])
fast_domain_filter = FastDomainFilter(blocked_domains=["ads.*", "analytics.*"])
fast_chain = FastFilterChain(
[fast_domain_filter, fast_content_filter, fast_pattern_filter]
)
# Test individual filters
print("\nSingle filter performance (first 1000 URLs):")
test_subset = test_urls[:1000]
print("\nPattern Filters:")
benchmark(
"Original Pattern Filter",
lambda: [pattern_filter.apply(url) for url in test_subset],
)
benchmark(
"Optimized Pattern Filter",
lambda: [fast_pattern_filter.apply(url) for url in test_subset],
)
print("\nContent Filters:")
benchmark(
"Original Content Filter",
lambda: [content_filter.apply(url) for url in test_subset],
)
benchmark(
"Optimized Content Filter",
lambda: [fast_content_filter.apply(url) for url in test_subset],
)
print("\nDomain Filters:")
benchmark(
"Original Domain Filter",
lambda: [domain_filter.apply(url) for url in test_subset],
)
benchmark(
"Optimized Domain Filter",
lambda: [fast_domain_filter.apply(url) for url in test_subset],
)
print("\nFull Chain Performance (all URLs):")
# Test chain
benchmark("Original Chain", lambda: [chain.apply(url) for url in test_urls])
benchmark("Optimized Chain", lambda: [fast_chain.apply(url) for url in test_urls])
# Memory usage
import sys
print("\nMemory Usage per Filter:")
print(f"Original Pattern Filter: {sys.getsizeof(pattern_filter):,} bytes")
print(f"Optimized Pattern Filter: {sys.getsizeof(fast_pattern_filter):,} bytes")
print(f"Original Content Filter: {sys.getsizeof(content_filter):,} bytes")
print(f"Optimized Content Filter: {sys.getsizeof(fast_content_filter):,} bytes")
print(f"Original Domain Filter: {sys.getsizeof(domain_filter):,} bytes")
print(f"Optimized Domain Filter: {sys.getsizeof(fast_domain_filter):,} bytes")
def test_pattern_filter():
import time
from itertools import chain
# Test cases as list of tuples instead of dict for multiple patterns
test_cases = [
# Simple suffix patterns (*.html)
("*.html", {
"https://example.com/page.html": True,
"https://example.com/path/doc.html": True,
"https://example.com/page.htm": False,
"https://example.com/page.html?param=1": True,
}),
# Path prefix patterns (/foo/*)
("*/article/*", {
"https://example.com/article/123": True,
"https://example.com/blog/article/456": True,
"https://example.com/articles/789": False,
"https://example.com/article": False,
}),
# Complex patterns
("blog-*-[0-9]", {
"https://example.com/blog-post-1": True,
"https://example.com/blog-test-9": True,
"https://example.com/blog-post": False,
"https://example.com/blog-post-x": False,
}),
# Multiple patterns case
(["*.pdf", "*/download/*"], {
"https://example.com/doc.pdf": True,
"https://example.com/download/file.txt": True,
"https://example.com/path/download/doc": True,
"https://example.com/uploads/file.txt": False,
}),
# Edge cases
("*", {
"https://example.com": True,
"": True,
"http://test.com/path": True,
}),
# Complex regex
(r"^https?://.*\.example\.com/\d+", {
"https://sub.example.com/123": True,
"http://test.example.com/456": True,
"https://example.com/789": False,
"https://sub.example.com/abc": False,
})
]
def run_accuracy_test():
print("\nAccuracy Tests:")
print("-" * 50)
all_passed = True
for patterns, test_urls in test_cases:
filter_obj = FastURLPatternFilter(patterns)
for url, expected in test_urls.items():
result = filter_obj.apply(url)
if result != expected:
print(f"❌ Failed: Pattern '{patterns}' with URL '{url}'")
print(f" Expected: {expected}, Got: {result}")
all_passed = False
else:
print(f"✅ Passed: Pattern '{patterns}' with URL '{url}'")
return all_passed
def run_speed_test():
print("\nSpeed Tests:")
print("-" * 50)
# Create a large set of test URLs
all_urls = list(chain.from_iterable(urls.keys() for _, urls in test_cases))
test_urls = all_urls * 10000 # 100K+ URLs
# Test both implementations
original = URLPatternFilter(["*.html", "*/article/*", "blog-*"])
optimized = FastURLPatternFilter(["*.html", "*/article/*", "blog-*"])
def benchmark(name, filter_obj):
start = time.perf_counter()
for url in test_urls:
filter_obj.apply(url)
elapsed = time.perf_counter() - start
urls_per_sec = len(test_urls) / elapsed
print(f"{name:<20} {elapsed:.3f}s ({urls_per_sec:,.0f} URLs/sec)")
benchmark("Original Filter:", original)
benchmark("Optimized Filter:", optimized)
# Run tests
print("Running Pattern Filter Tests...")
accuracy_passed = run_accuracy_test()
if accuracy_passed:
print("\n✨ All accuracy tests passed!")
run_speed_test()
else:
print("\n❌ Some accuracy tests failed!")
if __name__ == "__main__":
run_performance_test()
# test_pattern_filter()

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,3 @@
from __future__ import annotations
from pydantic import BaseModel, HttpUrl
from typing import List, Dict, Optional, Callable, Awaitable, Union, Any
from enum import Enum
@@ -6,7 +5,6 @@ from dataclasses import dataclass
from .ssl_certificate import SSLCertificate
from datetime import datetime
from datetime import timedelta
from math import inf
###############################
@@ -97,18 +95,6 @@ class DispatchResult(BaseModel):
error_message: str = ""
@dataclass
class TraversalStats:
"""Statistics for the traversal process"""
start_time: datetime
urls_processed: int = 0
urls_failed: int = 0
urls_skipped: int = 0
total_depth_reached: int = 0
current_depth: int = 0
class CrawlResult(BaseModel):
url: str
html: str
@@ -132,14 +118,11 @@ class CrawlResult(BaseModel):
ssl_certificate: Optional[SSLCertificate] = None
dispatch_result: Optional[DispatchResult] = None
redirected_url: Optional[str] = None
# Attributes for position
depth: Optional[int] = None
score: Optional[float] = -inf
parent_url: Optional[str] = None
class Config:
arbitrary_types_allowed = True
class AsyncCrawlResponse(BaseModel):
html: str
response_headers: Dict[str, str]
@@ -178,12 +161,12 @@ class Link(BaseModel):
class Media(BaseModel):
images: List[MediaItem] = []
videos: List[MediaItem] = (
[]
) # Using MediaItem model for now, can be extended with Video model if needed
audios: List[MediaItem] = (
[]
) # Using MediaItem model for now, can be extended with Audio model if needed
videos: List[
MediaItem
] = [] # Using MediaItem model for now, can be extended with Video model if needed
audios: List[
MediaItem
] = [] # Using MediaItem model for now, can be extended with Audio model if needed
class Links(BaseModel):

View File

@@ -1,244 +0,0 @@
# BFS Scraper Strategy: Smart Web Traversal
The BFS (Breadth-First Search) Scraper Strategy provides an intelligent way to traverse websites systematically. It crawls websites level by level, ensuring thorough coverage while respecting web crawling etiquette.
```mermaid
flowchart TB
Start([Start]) --> Init[Initialize BFS Strategy]
Init --> InitStats[Initialize CrawlStats]
InitStats --> InitQueue[Initialize Priority Queue]
InitQueue --> AddStart[Add Start URL to Queue]
AddStart --> CheckState{Queue Empty or\nTasks Pending?}
CheckState -->|No| Cleanup[Cleanup & Stats]
Cleanup --> End([End])
CheckState -->|Yes| CheckCancel{Cancel\nRequested?}
CheckCancel -->|Yes| Cleanup
CheckCancel -->|No| CheckConcurrent{Under Max\nConcurrent?}
CheckConcurrent -->|No| WaitComplete[Wait for Task Completion]
WaitComplete --> YieldResult[Yield Result]
YieldResult --> CheckState
CheckConcurrent -->|Yes| GetNextURL[Get Next URL from Queue]
GetNextURL --> ValidateURL{Already\nVisited?}
ValidateURL -->|Yes| CheckState
ValidateURL -->|No| ProcessURL[Process URL]
subgraph URL_Processing [URL Processing]
ProcessURL --> CheckValid{URL Valid?}
CheckValid -->|No| UpdateStats[Update Skip Stats]
CheckValid -->|Yes| CheckRobots{Allowed by\nrobots.txt?}
CheckRobots -->|No| UpdateRobotStats[Update Robot Stats]
CheckRobots -->|Yes| ApplyDelay[Apply Politeness Delay]
ApplyDelay --> FetchContent[Fetch Content with Rate Limit]
FetchContent --> CheckError{Error?}
CheckError -->|Yes| Retry{Retry\nNeeded?}
Retry -->|Yes| FetchContent
Retry -->|No| UpdateFailStats[Update Fail Stats]
CheckError -->|No| ExtractLinks[Extract & Process Links]
ExtractLinks --> ScoreURLs[Score New URLs]
ScoreURLs --> AddToQueue[Add to Priority Queue]
end
ProcessURL --> CreateTask{Parallel\nProcessing?}
CreateTask -->|Yes| AddTask[Add to Pending Tasks]
CreateTask -->|No| DirectProcess[Process Directly]
AddTask --> CheckState
DirectProcess --> YieldResult
UpdateStats --> CheckState
UpdateRobotStats --> CheckState
UpdateFailStats --> CheckState
classDef process fill:#90caf9,stroke:#000,stroke-width:2px;
classDef decision fill:#fff59d,stroke:#000,stroke-width:2px;
classDef error fill:#ef9a9a,stroke:#000,stroke-width:2px;
classDef stats fill:#a5d6a7,stroke:#000,stroke-width:2px;
class Start,End stats;
class CheckState,CheckCancel,CheckConcurrent,ValidateURL,CheckValid,CheckRobots,CheckError,Retry,CreateTask decision;
class UpdateStats,UpdateRobotStats,UpdateFailStats,InitStats,Cleanup stats;
class ProcessURL,FetchContent,ExtractLinks,ScoreURLs process;
```
## How It Works
The BFS strategy crawls a website by:
1. Starting from a root URL
2. Processing all URLs at the current depth
3. Moving to URLs at the next depth level
4. Continuing until maximum depth is reached
This ensures systematic coverage of the website while maintaining control over the crawling process.
## Key Features
### 1. Smart URL Processing
```python
strategy = BFSScraperStrategy(
max_depth=2,
filter_chain=my_filters,
url_scorer=my_scorer,
max_concurrent=5
)
```
- Controls crawl depth
- Filters unwanted URLs
- Scores URLs for priority
- Manages concurrent requests
### 2. Polite Crawling
The strategy automatically implements web crawling best practices:
- Respects robots.txt
- Implements rate limiting
- Adds politeness delays
- Manages concurrent requests
### 3. Link Processing Control
```python
strategy = BFSScraperStrategy(
...,
process_external_links=False # Only process internal links
)
```
- Control whether to follow external links
- Default: internal links only
- Enable external links when needed
## Configuration Options
| Parameter | Description | Default |
|-----------|-------------|---------|
| max_depth | Maximum crawl depth | Required |
| filter_chain | URL filtering rules | Required |
| url_scorer | URL priority scoring | Required |
| max_concurrent | Max parallel requests | 5 |
| min_crawl_delay | Seconds between requests | 1 |
| process_external_links | Follow external links | False |
## Best Practices
1. **Set Appropriate Depth**
- Start with smaller depths (2-3)
- Increase based on needs
- Consider site structure
2. **Configure Filters**
- Use URL patterns
- Filter by content type
- Avoid unwanted sections
3. **Tune Performance**
- Adjust max_concurrent
- Set appropriate delays
- Monitor resource usage
4. **Handle External Links**
- Keep external_links=False for focused crawls
- Enable only when needed
- Consider additional filtering
## Example Usage
```python
from crawl4ai.scraper import BFSScraperStrategy
from crawl4ai.scraper.filters import FilterChain
from crawl4ai.scraper.scorers import BasicURLScorer
# Configure strategy
strategy = BFSScraperStrategy(
max_depth=3,
filter_chain=FilterChain([
URLPatternFilter("*.example.com/*"),
ContentTypeFilter(["text/html"])
]),
url_scorer=BasicURLScorer(),
max_concurrent=5,
min_crawl_delay=1,
process_external_links=False
)
# Use with AsyncWebScraper
scraper = AsyncWebScraper(crawler, strategy)
results = await scraper.ascrape("https://example.com")
```
## Common Use Cases
### 1. Site Mapping
```python
strategy = BFSScraperStrategy(
max_depth=5,
filter_chain=site_filter,
url_scorer=depth_scorer,
process_external_links=False
)
```
Perfect for creating complete site maps or understanding site structure.
### 2. Content Aggregation
```python
strategy = BFSScraperStrategy(
max_depth=2,
filter_chain=content_filter,
url_scorer=relevance_scorer,
max_concurrent=3
)
```
Ideal for collecting specific types of content (articles, products, etc.).
### 3. Link Analysis
```python
strategy = BFSScraperStrategy(
max_depth=1,
filter_chain=link_filter,
url_scorer=link_scorer,
process_external_links=True
)
```
Useful for analyzing both internal and external link structures.
## Advanced Features
### Progress Monitoring
```python
async for result in scraper.ascrape(url):
print(f"Current depth: {strategy.stats.current_depth}")
print(f"Processed URLs: {strategy.stats.urls_processed}")
```
### Custom URL Scoring
```python
class CustomScorer(URLScorer):
def score(self, url: str) -> float:
# Lower scores = higher priority
return score_based_on_criteria(url)
```
## Troubleshooting
1. **Slow Crawling**
- Increase max_concurrent
- Adjust min_crawl_delay
- Check network conditions
2. **Missing Content**
- Verify max_depth
- Check filter settings
- Review URL patterns
3. **High Resource Usage**
- Reduce max_concurrent
- Increase crawl delay
- Add more specific filters

View File

@@ -1,260 +0,0 @@
from crawl4ai.async_configs import CrawlerRunConfig, BrowserConfig
from crawl4ai.content_scraping_strategy import LXMLWebScrapingStrategy
from crawl4ai.deep_crawl import (
BFSDeepCrawlStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer,
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
import re
import time
import logging
browser_config = BrowserConfig(headless=True, viewport_width=800, viewport_height=600)
async def basic_example():
"""
Basic example: Deep crawl a blog site for articles
- Crawls only HTML pages
- Stays within the blog section
- Collects all results at once
"""
# Create a simple filter chain
filter_chain = FilterChain(
[
# Only crawl pages within the blog section
URLPatternFilter("*/basic/*"),
# Only process HTML pages
ContentTypeFilter(["text/html"]),
]
)
# Initialize the strategy with basic configuration
bfs_strategy = BFSDeepCrawlStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
process_external_links=True,
)
# Create the crawler
async with AsyncWebCrawler(
config=browser_config,
) as crawler:
# Start scraping
try:
results = await crawler.arun(
"https://crawl4ai.com/mkdocs",
CrawlerRunConfig(deep_crawl_strategy=bfs_strategy),
)
# Process results
print(f"Crawled {len(results)} pages:")
for result in results:
print(f"- {result.url}: {len(result.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
async def advanced_example():
"""
Advanced example: Intelligent news site crawling
- Uses all filter types
- Implements sophisticated scoring
- Streams results
- Includes monitoring and logging
"""
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("advanced_deep_crawler")
# Create sophisticated filter chain
filter_chain = FilterChain(
[
# Domain control
DomainFilter(
allowed_domains=["techcrunch.com"],
blocked_domains=["login.techcrunch.com", "legal.yahoo.com"],
),
# URL patterns
URLPatternFilter(
[
"*/article/*",
"*/news/*",
"*/blog/*",
re.compile(r"\d{4}/\d{2}/.*"), # Date-based URLs
]
),
# Content types
ContentTypeFilter(["text/html", "application/xhtml+xml"]),
]
)
# Create composite scorer
scorer = CompositeScorer(
[
# Prioritize by keywords
KeywordRelevanceScorer(
keywords=["news", "breaking", "update", "latest"], weight=1.0
),
# Prefer optimal URL structure
PathDepthScorer(optimal_depth=3, weight=0.7),
# Prioritize fresh content
FreshnessScorer(weight=0.9),
]
)
# Initialize strategy with advanced configuration
bfs_strategy = BFSDeepCrawlStrategy(
max_depth=2, filter_chain=filter_chain, url_scorer=scorer
)
# Create crawler
async with AsyncWebCrawler(
config=browser_config,
) as crawler:
# Track statistics
stats = {"processed": 0, "errors": 0, "total_size": 0}
try:
# Use streaming mode
results = []
result_generator = await crawler.arun(
"https://techcrunch.com",
config=CrawlerRunConfig(deep_crawl_strategy=bfs_strategy, stream=True),
)
async for result in result_generator:
stats["processed"] += 1
if result.success:
stats["total_size"] += len(result.html)
logger.info(
f"Processed at depth: {result.depth} with score: {result.score:.3f} : \n {result.url}"
)
results.append(result)
else:
stats["errors"] += 1
logger.error(
f"Failed to process {result.url}: {result.error_message}"
)
# Log progress regularly
if stats["processed"] % 10 == 0:
logger.info(f"Progress: {stats['processed']} URLs processed")
except Exception as e:
logger.error(f"Scraping error: {e}")
finally:
# Print final statistics
logger.info("Scraping completed:")
logger.info(f"- URLs processed: {stats['processed']}")
logger.info(f"- Errors: {stats['errors']}")
logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB")
# Print filter statistics
for filter_ in filter_chain.filters:
logger.info(f"{filter_.name} stats:")
logger.info(f"- Passed: {filter_.stats.passed_urls}")
logger.info(f"- Rejected: {filter_.stats.rejected_urls}")
# Print scorer statistics
logger.info("Scoring statistics:")
logger.info(f"- Average score: {scorer.stats.average_score:.2f}")
logger.info(
f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}"
)
async def basic_example_many_urls():
filter_chain = FilterChain(
[
URLPatternFilter("*/basic/*"),
ContentTypeFilter(["text/html"]),
]
)
# Initialize the strategy with basic configuration
bfs_strategy = BFSDeepCrawlStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
process_external_links=False,
)
# Create the crawler
async with AsyncWebCrawler(
config=browser_config,
) as crawler:
# Start scraping
try:
results = await crawler.arun_many(
urls=["https://crawl4ai.com/mkdocs","https://aravindkarnam.com"],
config=CrawlerRunConfig(deep_crawl_strategy=bfs_strategy),
)
# Process results
print(f"Crawled {len(results)} pages:")
for url_result in results:
for result in url_result:
print(f"- {result.url}: {len(result.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
async def basic_example_many_urls_stream():
filter_chain = FilterChain(
[
URLPatternFilter("*/basic/*"),
ContentTypeFilter(["text/html"]),
]
)
# Initialize the strategy with basic configuration
bfs_strategy = BFSDeepCrawlStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
process_external_links=False,
)
# Create the crawler
async with AsyncWebCrawler(
config=browser_config,
) as crawler:
# Start scraping
try:
async for result in await crawler.arun_many(
urls=["https://crawl4ai.com/mkdocs","https://aravindkarnam.com"],
config=CrawlerRunConfig(deep_crawl_strategy=bfs_strategy,stream=True),
):
# Process results
print(f"- {result.url}: {len(result.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
if __name__ == "__main__":
import asyncio
import time
# Run basic example
start_time = time.perf_counter()
print("Running basic Deep crawl example...")
asyncio.run(basic_example())
end_time = time.perf_counter()
print(f"Basic deep crawl example completed in {end_time - start_time:.2f} seconds")
# Run advanced example
print("\nRunning advanced deep crawl example...")
asyncio.run(advanced_example())
print("\nRunning advanced deep crawl example with arun_many...")
asyncio.run(basic_example_many_urls())
print("\nRunning advanced deep crawl example with arun_many streaming enabled...")
asyncio.run(basic_example_many_urls_stream())

View File

@@ -1,342 +0,0 @@
# URL Filters and Scorers
The crawl4ai library provides powerful URL filtering and scoring capabilities that help you control and prioritize your web crawling. This guide explains how to use these features effectively.
```mermaid
flowchart TB
Start([URL Input]) --> Chain[Filter Chain]
subgraph Chain Process
Chain --> Pattern{URL Pattern\nFilter}
Pattern -->|Match| Content{Content Type\nFilter}
Pattern -->|No Match| Reject1[Reject URL]
Content -->|Allowed| Domain{Domain\nFilter}
Content -->|Not Allowed| Reject2[Reject URL]
Domain -->|Allowed| Accept[Accept URL]
Domain -->|Blocked| Reject3[Reject URL]
end
subgraph Statistics
Pattern --> UpdatePattern[Update Pattern Stats]
Content --> UpdateContent[Update Content Stats]
Domain --> UpdateDomain[Update Domain Stats]
Accept --> UpdateChain[Update Chain Stats]
Reject1 --> UpdateChain
Reject2 --> UpdateChain
Reject3 --> UpdateChain
end
Accept --> End([End])
Reject1 --> End
Reject2 --> End
Reject3 --> End
classDef process fill:#90caf9,stroke:#000,stroke-width:2px;
classDef decision fill:#fff59d,stroke:#000,stroke-width:2px;
classDef reject fill:#ef9a9a,stroke:#000,stroke-width:2px;
classDef accept fill:#a5d6a7,stroke:#000,stroke-width:2px;
class Start,End accept;
class Pattern,Content,Domain decision;
class Reject1,Reject2,Reject3 reject;
class Chain,UpdatePattern,UpdateContent,UpdateDomain,UpdateChain process;
```
## URL Filters
URL filters help you control which URLs are crawled. Multiple filters can be chained together to create sophisticated filtering rules.
### Available Filters
1. **URL Pattern Filter**
```python
pattern_filter = URLPatternFilter([
"*.example.com/*", # Glob pattern
"*/article/*", # Path pattern
re.compile(r"blog-\d+") # Regex pattern
])
```
- Supports glob patterns and regex
- Multiple patterns per filter
- Pattern pre-compilation for performance
2. **Content Type Filter**
```python
content_filter = ContentTypeFilter([
"text/html",
"application/pdf"
], check_extension=True)
```
- Filter by MIME types
- Extension checking
- Support for multiple content types
3. **Domain Filter**
```python
domain_filter = DomainFilter(
allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com"]
)
```
- Allow/block specific domains
- Subdomain support
- Efficient domain matching
### Creating Filter Chains
```python
# Create and configure a filter chain
filter_chain = FilterChain([
URLPatternFilter(["*.example.com/*"]),
ContentTypeFilter(["text/html"]),
DomainFilter(blocked_domains=["ads.*"])
])
# Add more filters
filter_chain.add_filter(
URLPatternFilter(["*/article/*"])
)
```
```mermaid
flowchart TB
Start([URL Input]) --> Composite[Composite Scorer]
subgraph Scoring Process
Composite --> Keywords[Keyword Relevance]
Composite --> Path[Path Depth]
Composite --> Content[Content Type]
Composite --> Fresh[Freshness]
Composite --> Domain[Domain Authority]
Keywords --> KeywordScore[Calculate Score]
Path --> PathScore[Calculate Score]
Content --> ContentScore[Calculate Score]
Fresh --> FreshScore[Calculate Score]
Domain --> DomainScore[Calculate Score]
KeywordScore --> Weight1[Apply Weight]
PathScore --> Weight2[Apply Weight]
ContentScore --> Weight3[Apply Weight]
FreshScore --> Weight4[Apply Weight]
DomainScore --> Weight5[Apply Weight]
end
Weight1 --> Combine[Combine Scores]
Weight2 --> Combine
Weight3 --> Combine
Weight4 --> Combine
Weight5 --> Combine
Combine --> Normalize{Normalize?}
Normalize -->|Yes| NormalizeScore[Normalize Combined Score]
Normalize -->|No| FinalScore[Final Score]
NormalizeScore --> FinalScore
FinalScore --> Stats[Update Statistics]
Stats --> End([End])
classDef process fill:#90caf9,stroke:#000,stroke-width:2px;
classDef scorer fill:#fff59d,stroke:#000,stroke-width:2px;
classDef calc fill:#a5d6a7,stroke:#000,stroke-width:2px;
classDef decision fill:#ef9a9a,stroke:#000,stroke-width:2px;
class Start,End calc;
class Keywords,Path,Content,Fresh,Domain scorer;
class KeywordScore,PathScore,ContentScore,FreshScore,DomainScore process;
class Normalize decision;
```
## URL Scorers
URL scorers help prioritize which URLs to crawl first. Higher scores indicate higher priority.
### Available Scorers
1. **Keyword Relevance Scorer**
```python
keyword_scorer = KeywordRelevanceScorer(
keywords=["python", "programming"],
weight=1.0,
case_sensitive=False
)
```
- Score based on keyword matches
- Case sensitivity options
- Weighted scoring
2. **Path Depth Scorer**
```python
path_scorer = PathDepthScorer(
optimal_depth=3, # Preferred URL depth
weight=0.7
)
```
- Score based on URL path depth
- Configurable optimal depth
- Diminishing returns for deeper paths
3. **Content Type Scorer**
```python
content_scorer = ContentTypeScorer({
r'\.html$': 1.0,
r'\.pdf$': 0.8,
r'\.xml$': 0.6
})
```
- Score based on file types
- Configurable type weights
- Pattern matching support
4. **Freshness Scorer**
```python
freshness_scorer = FreshnessScorer(weight=0.9)
```
- Score based on date indicators in URLs
- Multiple date format support
- Recency weighting
5. **Domain Authority Scorer**
```python
authority_scorer = DomainAuthorityScorer({
"python.org": 1.0,
"github.com": 0.9,
"medium.com": 0.7
})
```
- Score based on domain importance
- Configurable domain weights
- Default weight for unknown domains
### Combining Scorers
```python
# Create a composite scorer
composite_scorer = CompositeScorer([
KeywordRelevanceScorer(["python"], weight=1.0),
PathDepthScorer(optimal_depth=2, weight=0.7),
FreshnessScorer(weight=0.8)
], normalize=True)
```
## Best Practices
### Filter Configuration
1. **Start Restrictive**
```python
# Begin with strict filters
filter_chain = FilterChain([
DomainFilter(allowed_domains=["example.com"]),
ContentTypeFilter(["text/html"])
])
```
2. **Layer Filters**
```python
# Add more specific filters
filter_chain.add_filter(
URLPatternFilter(["*/article/*", "*/blog/*"])
)
```
3. **Monitor Filter Statistics**
```python
# Check filter performance
for filter in filter_chain.filters:
print(f"{filter.name}: {filter.stats.rejected_urls} rejected")
```
### Scorer Configuration
1. **Balance Weights**
```python
# Balanced scoring configuration
scorer = create_balanced_scorer()
```
2. **Customize for Content**
```python
# News site configuration
news_scorer = CompositeScorer([
KeywordRelevanceScorer(["news", "article"], weight=1.0),
FreshnessScorer(weight=1.0),
PathDepthScorer(optimal_depth=2, weight=0.5)
])
```
3. **Monitor Scoring Statistics**
```python
# Check scoring distribution
print(f"Average score: {scorer.stats.average_score}")
print(f"Score range: {scorer.stats.min_score} - {scorer.stats.max_score}")
```
## Common Use Cases
### Blog Crawling
```python
blog_config = {
'filters': FilterChain([
URLPatternFilter(["*/blog/*", "*/post/*"]),
ContentTypeFilter(["text/html"])
]),
'scorer': CompositeScorer([
FreshnessScorer(weight=1.0),
KeywordRelevanceScorer(["blog", "article"], weight=0.8)
])
}
```
### Documentation Sites
```python
docs_config = {
'filters': FilterChain([
URLPatternFilter(["*/docs/*", "*/guide/*"]),
ContentTypeFilter(["text/html", "application/pdf"])
]),
'scorer': CompositeScorer([
PathDepthScorer(optimal_depth=3, weight=1.0),
KeywordRelevanceScorer(["guide", "tutorial"], weight=0.9)
])
}
```
### E-commerce Sites
```python
ecommerce_config = {
'filters': FilterChain([
URLPatternFilter(["*/product/*", "*/category/*"]),
DomainFilter(blocked_domains=["ads.*", "tracker.*"])
]),
'scorer': CompositeScorer([
PathDepthScorer(optimal_depth=2, weight=1.0),
ContentTypeScorer({
r'/product/': 1.0,
r'/category/': 0.8
})
])
}
```
## Advanced Topics
### Custom Filters
```python
class CustomFilter(URLFilter):
def apply(self, url: str) -> bool:
# Your custom filtering logic
return True
```
### Custom Scorers
```python
class CustomScorer(URLScorer):
def _calculate_score(self, url: str) -> float:
# Your custom scoring logic
return 1.0
```
For more examples, check our [example repository](https://github.com/example/crawl4ai/examples).

View File

@@ -1,206 +0,0 @@
# Scraper Examples Guide
This guide provides two complete examples of using the crawl4ai scraper: a basic implementation for simple use cases and an advanced implementation showcasing all features.
## Basic Example
The basic example demonstrates a simple blog scraping scenario:
```python
from crawl4ai.scraper import AsyncWebScraper, BFSScraperStrategy, FilterChain
# Create simple filter chain
filter_chain = FilterChain([
URLPatternFilter("*/blog/*"),
ContentTypeFilter(["text/html"])
])
# Initialize strategy
strategy = BFSScraperStrategy(
max_depth=2,
filter_chain=filter_chain,
url_scorer=None,
max_concurrent=3
)
# Create and run scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
result = await scraper.ascrape("https://example.com/blog/")
```
### Features Demonstrated
- Basic URL filtering
- Simple content type filtering
- Depth control
- Concurrent request limiting
- Result collection
## Advanced Example
The advanced example shows a sophisticated news site scraping setup with all features enabled:
```python
# Create comprehensive filter chain
filter_chain = FilterChain([
DomainFilter(
allowed_domains=["example.com"],
blocked_domains=["ads.example.com"]
),
URLPatternFilter([
"*/article/*",
re.compile(r"\d{4}/\d{2}/.*")
]),
ContentTypeFilter(["text/html"])
])
# Create intelligent scorer
scorer = CompositeScorer([
KeywordRelevanceScorer(
keywords=["news", "breaking"],
weight=1.0
),
PathDepthScorer(optimal_depth=3, weight=0.7),
FreshnessScorer(weight=0.9)
])
# Initialize advanced strategy
strategy = BFSScraperStrategy(
max_depth=4,
filter_chain=filter_chain,
url_scorer=scorer,
max_concurrent=5
)
```
### Features Demonstrated
1. **Advanced Filtering**
- Domain filtering
- Pattern matching
- Content type control
2. **Intelligent Scoring**
- Keyword relevance
- Path optimization
- Freshness priority
3. **Monitoring**
- Progress tracking
- Error handling
- Statistics collection
4. **Resource Management**
- Concurrent processing
- Rate limiting
- Cleanup handling
## Running the Examples
```bash
# Basic usage
python basic_scraper_example.py
# Advanced usage with logging
PYTHONPATH=. python advanced_scraper_example.py
```
## Example Output
### Basic Example
```
Crawled 15 pages:
- https://example.com/blog/post1: 24560 bytes
- https://example.com/blog/post2: 18920 bytes
...
```
### Advanced Example
```
INFO: Starting crawl of https://example.com/news/
INFO: Processed: https://example.com/news/breaking/story1
DEBUG: KeywordScorer: 0.85
DEBUG: FreshnessScorer: 0.95
INFO: Progress: 10 URLs processed
...
INFO: Scraping completed:
INFO: - URLs processed: 50
INFO: - Errors: 2
INFO: - Total content size: 1240.50 KB
```
## Customization
### Adding Custom Filters
```python
class CustomFilter(URLFilter):
def apply(self, url: str) -> bool:
# Your custom filtering logic
return True
filter_chain.add_filter(CustomFilter())
```
### Custom Scoring Logic
```python
class CustomScorer(URLScorer):
def _calculate_score(self, url: str) -> float:
# Your custom scoring logic
return 1.0
scorer = CompositeScorer([
CustomScorer(weight=1.0),
...
])
```
## Best Practices
1. **Start Simple**
- Begin with basic filtering
- Add features incrementally
- Test thoroughly at each step
2. **Monitor Performance**
- Watch memory usage
- Track processing times
- Adjust concurrency as needed
3. **Handle Errors**
- Implement proper error handling
- Log important events
- Track error statistics
4. **Optimize Resources**
- Set appropriate delays
- Limit concurrent requests
- Use streaming for large crawls
## Troubleshooting
Common issues and solutions:
1. **Too Many Requests**
```python
strategy = BFSScraperStrategy(
max_concurrent=3, # Reduce concurrent requests
min_crawl_delay=2 # Increase delay between requests
)
```
2. **Memory Issues**
```python
# Use streaming mode for large crawls
async for result in scraper.ascrape(url, stream=True):
process_result(result)
```
3. **Missing Content**
```python
# Check your filter chain
filter_chain = FilterChain([
URLPatternFilter("*"), # Broaden patterns
ContentTypeFilter(["*"]) # Accept all content
])
```
For more examples and use cases, visit our [GitHub repository](https://github.com/example/crawl4ai/examples).

View File

@@ -1,137 +0,0 @@
# Installation 💻
Crawl4AI offers flexible installation options to suit various use cases. You can install it as a Python package, use it with Docker, or run it as a local server.
## Option 1: Python Package Installation (Recommended)
Crawl4AI is now available on PyPI, making installation easier than ever. Choose the option that best fits your needs:
### Basic Installation
For basic web crawling and scraping tasks:
```bash
pip install crawl4ai
playwright install # Install Playwright dependencies
```
### Installation with PyTorch
For advanced text clustering (includes CosineSimilarity cluster strategy):
```bash
pip install crawl4ai[torch]
```
### Installation with Transformers
For text summarization and Hugging Face models:
```bash
pip install crawl4ai[transformer]
```
### Full Installation
For all features:
```bash
pip install crawl4ai[all]
```
### Development Installation
For contributors who plan to modify the source code:
```bash
git clone https://github.com/unclecode/crawl4ai.git
cd crawl4ai
pip install -e ".[all]"
playwright install # Install Playwright dependencies
```
💡 After installation with "torch", "transformer", or "all" options, it's recommended to run the following CLI command to load the required models:
```bash
crawl4ai-download-models
```
This is optional but will boost the performance and speed of the crawler. You only need to do this once after installation.
## Playwright Installation Note for Ubuntu
If you encounter issues with Playwright installation on Ubuntu, you may need to install additional dependencies:
```bash
sudo apt-get install -y \
libwoff1 \
libopus0 \
libwebp7 \
libwebpdemux2 \
libenchant-2-2 \
libgudev-1.0-0 \
libsecret-1-0 \
libhyphen0 \
libgdk-pixbuf2.0-0 \
libegl1 \
libnotify4 \
libxslt1.1 \
libevent-2.1-7 \
libgles2 \
libxcomposite1 \
libatk1.0-0 \
libatk-bridge2.0-0 \
libepoxy0 \
libgtk-3-0 \
libharfbuzz-icu0 \
libgstreamer-gl1.0-0 \
libgstreamer-plugins-bad1.0-0 \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad \
libxt6 \
libxaw7 \
xvfb \
fonts-noto-color-emoji \
libfontconfig \
libfreetype6 \
xfonts-cyrillic \
xfonts-scalable \
fonts-liberation \
fonts-ipafont-gothic \
fonts-wqy-zenhei \
fonts-tlwg-loma-otf \
fonts-freefont-ttf
```
## Option 2: Using Docker (Coming Soon)
Docker support for Crawl4AI is currently in progress and will be available soon. This will allow you to run Crawl4AI in a containerized environment, ensuring consistency across different systems.
## Option 3: Local Server Installation
For those who prefer to run Crawl4AI as a local server, instructions will be provided once the Docker implementation is complete.
## Verifying Your Installation
After installation, you can verify that Crawl4AI is working correctly by running a simple Python script:
```python
import asyncio
from crawl4ai import AsyncWebCrawler
async def main():
async with AsyncWebCrawler(verbose=True) as crawler:
result = await crawler.arun(url="https://www.example.com")
print(result.markdown[:500]) # Print first 500 characters
if __name__ == "__main__":
asyncio.run(main())
```
This script should successfully crawl the example website and print the first 500 characters of the extracted content.
## Getting Help
If you encounter any issues during installation or usage, please check the [documentation](https://docs.crawl4ai.com/) or raise an issue on the [GitHub repository](https://github.com/unclecode/crawl4ai/issues).
Happy crawling! 🕷️🤖

View File

View File

@@ -1,184 +0,0 @@
# basic_scraper_example.py
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
async def basic_scraper_example():
"""
Basic example: Scrape a blog site for articles
- Crawls only HTML pages
- Stays within the blog section
- Collects all results at once
"""
# Create a simple filter chain
filter_chain = FilterChain([
# Only crawl pages within the blog section
URLPatternFilter("*/blog/*"),
# Only process HTML pages
ContentTypeFilter(["text/html"])
])
# Initialize the strategy with basic configuration
strategy = BFSScraperStrategy(
max_depth=2, # Only go 2 levels deep
filter_chain=filter_chain,
url_scorer=None, # Use default scoring
max_concurrent=3 # Limit concurrent requests
)
# Create the crawler and scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
# Start scraping
try:
result = await scraper.ascrape("https://example.com/blog/")
# Process results
print(f"Crawled {len(result.crawled_urls)} pages:")
for url, data in result.extracted_data.items():
print(f"- {url}: {len(data.html)} bytes")
except Exception as e:
print(f"Error during scraping: {e}")
# advanced_scraper_example.py
import logging
from crawl4ai.scraper import (
AsyncWebScraper,
BFSScraperStrategy,
FilterChain,
URLPatternFilter,
ContentTypeFilter,
DomainFilter,
KeywordRelevanceScorer,
PathDepthScorer,
FreshnessScorer,
CompositeScorer
)
from crawl4ai.async_webcrawler import AsyncWebCrawler
async def advanced_scraper_example():
"""
Advanced example: Intelligent news site scraping
- Uses all filter types
- Implements sophisticated scoring
- Streams results
- Includes monitoring and logging
"""
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("advanced_scraper")
# Create sophisticated filter chain
filter_chain = FilterChain([
# Domain control
DomainFilter(
allowed_domains=["example.com", "blog.example.com"],
blocked_domains=["ads.example.com", "tracker.example.com"]
),
# URL patterns
URLPatternFilter([
"*/article/*",
"*/news/*",
"*/blog/*",
re.compile(r"\d{4}/\d{2}/.*") # Date-based URLs
]),
# Content types
ContentTypeFilter([
"text/html",
"application/xhtml+xml"
])
])
# Create composite scorer
scorer = CompositeScorer([
# Prioritize by keywords
KeywordRelevanceScorer(
keywords=["news", "breaking", "update", "latest"],
weight=1.0
),
# Prefer optimal URL structure
PathDepthScorer(
optimal_depth=3,
weight=0.7
),
# Prioritize fresh content
FreshnessScorer(weight=0.9)
])
# Initialize strategy with advanced configuration
strategy = BFSScraperStrategy(
max_depth=4,
filter_chain=filter_chain,
url_scorer=scorer,
max_concurrent=5,
min_crawl_delay=1
)
# Create crawler and scraper
crawler = AsyncWebCrawler()
scraper = AsyncWebScraper(crawler, strategy)
# Track statistics
stats = {
'processed': 0,
'errors': 0,
'total_size': 0
}
try:
# Use streaming mode
async for result in scraper.ascrape("https://example.com/news/", stream=True):
stats['processed'] += 1
if result.success:
stats['total_size'] += len(result.html)
logger.info(f"Processed: {result.url}")
# Print scoring information
for scorer_name, score in result.scores.items():
logger.debug(f"{scorer_name}: {score:.2f}")
else:
stats['errors'] += 1
logger.error(f"Failed to process {result.url}: {result.error_message}")
# Log progress regularly
if stats['processed'] % 10 == 0:
logger.info(f"Progress: {stats['processed']} URLs processed")
except Exception as e:
logger.error(f"Scraping error: {e}")
finally:
# Print final statistics
logger.info("Scraping completed:")
logger.info(f"- URLs processed: {stats['processed']}")
logger.info(f"- Errors: {stats['errors']}")
logger.info(f"- Total content size: {stats['total_size'] / 1024:.2f} KB")
# Print filter statistics
for filter_ in filter_chain.filters:
logger.info(f"{filter_.name} stats:")
logger.info(f"- Passed: {filter_.stats.passed_urls}")
logger.info(f"- Rejected: {filter_.stats.rejected_urls}")
# Print scorer statistics
logger.info("Scoring statistics:")
logger.info(f"- Average score: {scorer.stats.average_score:.2f}")
logger.info(f"- Score range: {scorer.stats.min_score:.2f} - {scorer.stats.max_score:.2f}")
if __name__ == "__main__":
import asyncio
# Run basic example
print("Running basic scraper example...")
asyncio.run(basic_scraper_example())
print("\nRunning advanced scraper example...")
asyncio.run(advanced_scraper_example())