Compare commits

..

2 Commits

Author SHA1 Message Date
Ahmed-Tawfik94
2b2ef12e25 #1156: Refactor completion function calls to use asynchronous version 2025-05-27 15:10:34 +08:00
Ahmed-Tawfik94
d9b3db925a Refactor extraction and completion functions to support asynchronous execution 2025-05-26 16:01:38 +08:00
23 changed files with 138 additions and 839 deletions

View File

@@ -789,8 +789,6 @@ class CrawlerRunConfig():
Default: False.
scroll_delay (float): Delay in seconds between scroll steps if scan_full_page is True.
Default: 0.2.
max_scroll_steps (Optional[int]): Maximum number of scroll steps to perform during full page scan.
If None, scrolls until the entire page is loaded. Default: None.
process_iframes (bool): If True, attempts to process and inline iframe content.
Default: False.
remove_overlay_elements (bool): If True, remove overlays/popups before extracting HTML.
@@ -921,7 +919,6 @@ class CrawlerRunConfig():
ignore_body_visibility: bool = True,
scan_full_page: bool = False,
scroll_delay: float = 0.2,
max_scroll_steps: Optional[int] = None,
process_iframes: bool = False,
remove_overlay_elements: bool = False,
simulate_user: bool = False,
@@ -1020,7 +1017,6 @@ class CrawlerRunConfig():
self.ignore_body_visibility = ignore_body_visibility
self.scan_full_page = scan_full_page
self.scroll_delay = scroll_delay
self.max_scroll_steps = max_scroll_steps
self.process_iframes = process_iframes
self.remove_overlay_elements = remove_overlay_elements
self.simulate_user = simulate_user
@@ -1162,7 +1158,6 @@ class CrawlerRunConfig():
ignore_body_visibility=kwargs.get("ignore_body_visibility", True),
scan_full_page=kwargs.get("scan_full_page", False),
scroll_delay=kwargs.get("scroll_delay", 0.2),
max_scroll_steps=kwargs.get("max_scroll_steps"),
process_iframes=kwargs.get("process_iframes", False),
remove_overlay_elements=kwargs.get("remove_overlay_elements", False),
simulate_user=kwargs.get("simulate_user", False),
@@ -1272,7 +1267,6 @@ class CrawlerRunConfig():
"ignore_body_visibility": self.ignore_body_visibility,
"scan_full_page": self.scan_full_page,
"scroll_delay": self.scroll_delay,
"max_scroll_steps": self.max_scroll_steps,
"process_iframes": self.process_iframes,
"remove_overlay_elements": self.remove_overlay_elements,
"simulate_user": self.simulate_user,

View File

@@ -466,15 +466,9 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
console_messages=captured_console,
)
#####
# Since both "raw:" and "raw://" start with "raw:", the first condition is always true for both, so "raw://" will be sliced as "//...", which is incorrect.
# Fix: Check for "raw://" first, then "raw:"
# Also, the prefix "raw://" is actually 6 characters long, not 7, so it should be sliced accordingly: url[6:]
#####
elif url.startswith("raw://") or url.startswith("raw:"):
elif url.startswith("raw:") or url.startswith("raw://"):
# Process raw HTML content
# raw_html = url[4:] if url[:4] == "raw:" else url[7:]
raw_html = url[6:] if url.startswith("raw://") else url[4:]
raw_html = url[4:] if url[:4] == "raw:" else url[7:]
html = raw_html
if config.screenshot:
screenshot_data = await self._generate_screenshot_from_html(html)
@@ -902,8 +896,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Handle full page scanning
if config.scan_full_page:
# await self._handle_full_page_scan(page, config.scroll_delay)
await self._handle_full_page_scan(page, config.scroll_delay, config.max_scroll_steps)
await self._handle_full_page_scan(page, config.scroll_delay)
# Execute JavaScript if provided
# if config.js_code:
@@ -1091,8 +1084,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# Close the page
await page.close()
# async def _handle_full_page_scan(self, page: Page, scroll_delay: float = 0.1):
async def _handle_full_page_scan(self, page: Page, scroll_delay: float = 0.1, max_scroll_steps: Optional[int] = None):
async def _handle_full_page_scan(self, page: Page, scroll_delay: float = 0.1):
"""
Helper method to handle full page scanning.
@@ -1107,7 +1099,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
Args:
page (Page): The Playwright page object
scroll_delay (float): The delay between page scrolls
max_scroll_steps (Optional[int]): Maximum number of scroll steps to perform. If None, scrolls until end.
"""
try:
@@ -1132,21 +1123,9 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
dimensions = await self.get_page_dimensions(page)
total_height = dimensions["height"]
scroll_step_count = 0
while current_position < total_height:
####
# NEW FEATURE: Check if we've reached the maximum allowed scroll steps
# This prevents infinite scrolling on very long pages or infinite scroll scenarios
# If max_scroll_steps is None, this check is skipped (unlimited scrolling - original behavior)
####
if max_scroll_steps is not None and scroll_step_count >= max_scroll_steps:
break
current_position = min(current_position + viewport_height, total_height)
await self.safe_scroll(page, 0, current_position, delay=scroll_delay)
# Increment the step counter for max_scroll_steps tracking
scroll_step_count += 1
# await page.evaluate(f"window.scrollTo(0, {current_position})")
# await asyncio.sleep(scroll_delay)
@@ -1596,31 +1575,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
# then wait for the new page to load before continuing
result = None
try:
# OLD VERSION:
# result = await page.evaluate(
# f"""
# (async () => {{
# try {{
# const script_result = {script};
# return {{ success: true, result: script_result }};
# }} catch (err) {{
# return {{ success: false, error: err.toString(), stack: err.stack }};
# }}
# }})();
# """
# )
# """ NEW VERSION:
# When {script} contains statements (e.g., const link = …; link.click();),
# this forms invalid JavaScript, causing Playwright execution error: SyntaxError: Unexpected token 'const'.
# """
result = await page.evaluate(
f"""
(async () => {{
try {{
return await (async () => {{
{script}
}})();
const script_result = {script};
return {{ success: true, result: script_result }};
}} catch (err) {{
return {{ success: false, error: err.toString(), stack: err.stack }};
}}

View File

@@ -607,7 +607,7 @@ class AsyncWebCrawler:
else config.chunking_strategy
)
sections = chunking.chunk(content)
extracted_content = config.extraction_strategy.run(url, sections)
extracted_content = await config.extraction_strategy.run(url, sections)
extracted_content = json.dumps(
extracted_content, indent=4, default=str, ensure_ascii=False
)

View File

@@ -9,7 +9,7 @@ from bs4 import NavigableString, Comment
from .utils import (
clean_tokens,
perform_completion_with_backoff,
aperform_completion_with_backoff,
escape_json_string,
sanitize_html,
get_home_folder,
@@ -953,7 +953,7 @@ class LLMContentFilter(RelevantContentFilter):
for var, value in prompt_variables.items():
prompt = prompt.replace("{" + var + "}", value)
def _proceed_with_chunk(
async def _proceed_with_chunk(
provider: str,
prompt: str,
api_token: str,
@@ -966,7 +966,7 @@ class LLMContentFilter(RelevantContentFilter):
tag="CHUNK",
params={"chunk_num": i + 1},
)
return perform_completion_with_backoff(
return await aperform_completion_with_backoff(
provider,
prompt,
api_token,

View File

@@ -15,7 +15,7 @@ from .config import (
)
from bs4 import NavigableString, Comment
from bs4 import PageElement, Tag
from urllib.parse import urljoin , urlparse
from urllib.parse import urljoin
from requests.exceptions import InvalidSchema
from .utils import (
extract_metadata,
@@ -24,7 +24,8 @@ from .utils import (
get_base_domain,
extract_metadata_using_lxml,
)
from lxml import etree, html as lhtml
from lxml import etree
from lxml import html as lhtml
from typing import List
from .models import ScrapingResult, MediaItem, Link, Media, Links
import copy
@@ -129,27 +130,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
ScrapingResult: A structured result containing the scraped content.
"""
actual_url = kwargs.get("redirected_url", url)
# raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
effective_base_url = actual_url
try:
soup_for_base_check = BeautifulSoup(html, "html.parser")
base_tag = soup_for_base_check.find("base", href=True)
if base_tag:
base_href_val = base_tag.get("href")
if base_href_val is not None:
resolved_base_href = urljoin(actual_url, base_href_val)
parsed_resolved_base = urlparse(resolved_base_href)
if parsed_resolved_base.scheme and parsed_resolved_base.netloc:
effective_base_url = resolved_base_href
except Exception as e:
self._log(
"error",
message="Error resolving base URL: {error}",
tag="SCRAPE",
params={"error": str(e)},
)
kwargs_for_scrap = {**kwargs, '_effective_base_url_override': effective_base_url }
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs_for_scrap)
raw_result = self._scrap(actual_url, html, is_async=False, **kwargs)
if raw_result is None:
return ScrapingResult(
cleaned_html="",
@@ -1506,27 +1487,6 @@ class LXMLWebScrapingStrategy(WebScrapingStrategy):
doc = lhtml.document_fromstring(html)
# Match BeautifulSoup's behavior of using body or full doc
# body = doc.xpath('//body')[0] if doc.xpath('//body') else doc
# Determine effective base URL considering <base href="...">
base_tag_element = doc.find(".//base[@href]")
if base_tag_element is not None:
base_href_value = base_tag_element.get("href")
if base_href_value is not None:
resolved_base_href = urljoin(url, base_href_value)
parse_resolved_base_href = urlparse(resolved_base_href)
if parse_resolved_base_href.scheme and parse_resolved_base_href.netloc:
effective_base_url = resolved_base_href
self._log(
"debug",
f"Using <base href='{base_href_value}'>, resolved effective base URL for links: {effective_base_url}",
url=url, # Log against original document URL
tag="SCRAPE_BASE_URL")
else:
effective_base_url = url
self._log(
"warning",
f"<base href='{base_href_value}'> resolved to non-absolute URL '{resolved_base_href}'. Using document URL '{actual_url}' as base.",
url=url, # Log against original document URL
tag="SCRAPE_BASE_URL")
body = doc
base_domain = get_base_domain(url)

View File

@@ -227,21 +227,10 @@ class URLPatternFilter(URLFilter):
# Prefix check (/foo/*)
if self._simple_prefixes:
path = url.split("?")[0]
# if any(path.startswith(p) for p in self._simple_prefixes):
# result = True
# self._update_stats(result)
# return not result if self._reverse else result
####
# Modified the prefix matching logic to ensure path boundary checking:
# - Check if the matched prefix is followed by a path separator (`/`), query parameter (`?`), fragment (`#`), or is at the end of the path
# - This ensures `/api/` only matches complete path segments, not substrings like `/apiv2/`
####
for prefix in self._simple_prefixes:
if path.startswith(prefix):
if len(path) == len(prefix) or path[len(prefix)] in ['/', '?', '#']:
result = True
self._update_stats(result)
return not result if self._reverse else result
if any(path.startswith(p) for p in self._simple_prefixes):
result = True
self._update_stats(result)
return not result if self._reverse else result
# Complex patterns
if self._path_patterns:
@@ -348,15 +337,6 @@ class ContentTypeFilter(URLFilter):
"sqlite": "application/vnd.sqlite3",
# Placeholder
"unknown": "application/octet-stream", # Fallback for unknown file types
# php
"php": "application/x-httpd-php",
"php3": "application/x-httpd-php",
"php4": "application/x-httpd-php",
"php5": "application/x-httpd-php",
"php7": "application/x-httpd-php",
"phtml": "application/x-httpd-php",
"phps": "application/x-httpd-php-source",
}
@staticmethod

View File

@@ -3,6 +3,7 @@ import inspect
from typing import Any, List, Dict, Optional, Tuple, Pattern, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import asyncio
import time
from enum import IntFlag, auto
@@ -19,7 +20,7 @@ from .utils import * # noqa: F403
from .utils import (
sanitize_html,
escape_json_string,
perform_completion_with_backoff,
aperform_completion_with_backoff,
extract_xml_data,
split_and_parse_json_objects,
sanitize_input_encode,
@@ -66,7 +67,7 @@ class ExtractionStrategy(ABC):
self.verbose = kwargs.get("verbose", False)
@abstractmethod
def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
async def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
"""
Extract meaningful blocks or chunks from the given HTML.
@@ -76,7 +77,7 @@ class ExtractionStrategy(ABC):
"""
pass
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Process sections of text in parallel by default.
@@ -85,13 +86,13 @@ class ExtractionStrategy(ABC):
:return: A list of processed JSON blocks.
"""
extracted_content = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.extract, url, section, **kwargs)
for section in sections
]
for future in as_completed(futures):
extracted_content.extend(future.result())
tasks = [
asyncio.create_task(self.extract(url, section, **kwargs))
for section in sections
]
results = await asyncio.gather(*tasks)
for result in results:
extracted_content.extend(result)
return extracted_content
@@ -100,19 +101,18 @@ class NoExtractionStrategy(ExtractionStrategy):
A strategy that does not extract any meaningful content from the HTML. It simply returns the entire HTML as a single block.
"""
def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
async def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
"""
Extract meaningful blocks or chunks from the given HTML.
"""
return [{"index": 0, "content": html}]
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
return [
{"index": i, "tags": [], "content": section}
for i, section in enumerate(sections)
]
#######################################################
# Strategies using clustering for text data extraction #
#######################################################
@@ -386,7 +386,7 @@ class CosineStrategy(ExtractionStrategy):
return filtered_clusters
def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
async def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
"""
Extract clusters from HTML content using hierarchical clustering.
@@ -458,7 +458,7 @@ class CosineStrategy(ExtractionStrategy):
return cluster_list
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Process sections using hierarchical clustering.
@@ -584,7 +584,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
super().__setattr__(name, value)
def extract(self, url: str, ix: int, html: str) -> List[Dict[str, Any]]:
async def extract(self, url: str, ix: int, html: str) -> List[Dict[str, Any]]:
"""
Extract meaningful blocks or chunks from the given HTML using an LLM.
@@ -628,7 +628,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
)
try:
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
self.llm_config.provider,
prompt_with_variables,
self.llm_config.api_token,
@@ -723,7 +723,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
)
return sections
def run(self, url: str, sections: List[str]) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str]) -> List[Dict[str, Any]]:
"""
Process sections sequentially with a delay for rate limiting issues, specifically for LLMExtractionStrategy.
@@ -748,35 +748,11 @@ class LLMExtractionStrategy(ExtractionStrategy):
extracted_content.extend(
extract_func(ix, sanitize_input_encode(section))
)
time.sleep(0.5) # 500 ms delay between each processing
await asyncio.sleep(0.5) # 500 ms delay between each processing
else:
# Parallel processing using ThreadPoolExecutor
# extract_func = partial(self.extract, url)
# for ix, section in enumerate(merged_sections):
# extracted_content.append(extract_func(ix, section))
with ThreadPoolExecutor(max_workers=4) as executor:
extract_func = partial(self.extract, url)
futures = [
executor.submit(extract_func, ix, sanitize_input_encode(section))
for ix, section in enumerate(merged_sections)
]
for future in as_completed(futures):
try:
extracted_content.extend(future.result())
except Exception as e:
if self.verbose:
print(f"Error in thread execution: {e}")
# Add error information to extracted_content
extracted_content.append(
{
"index": 0,
"error": True,
"tags": ["error"],
"content": str(e),
}
)
extract_func = partial(self.extract, url)
extracted_content = await asyncio.gather(*[extract_func(ix, sanitize_input_encode(section)) for ix, section in enumerate(merged_sections)])
return extracted_content
@@ -797,7 +773,6 @@ class LLMExtractionStrategy(ExtractionStrategy):
f"{i:<10} {usage.completion_tokens:>12,} {usage.prompt_tokens:>12,} {usage.total_tokens:>12,}"
)
#######################################################
# New extraction strategies for JSON-based extraction #
#######################################################
@@ -846,7 +821,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
self.schema = schema
self.verbose = kwargs.get("verbose", False)
def extract(
async def extract(
self, url: str, html_content: str, *q, **kwargs
) -> List[Dict[str, Any]]:
"""
@@ -1044,7 +1019,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
print(f"Error computing field {field['name']}: {str(e)}")
return field.get("default")
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
async def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
"""
Run the extraction strategy on a combined HTML content.
@@ -1063,7 +1038,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
"""
combined_html = self.DEL.join(sections)
return self.extract(url, combined_html, **kwargs)
return await self.extract(url, combined_html, **kwargs)
@abstractmethod
def _get_element_text(self, element) -> str:
@@ -1086,7 +1061,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
}
@staticmethod
def generate_schema(
async def generate_schema(
html: str,
schema_type: str = "CSS", # or XPATH
query: str = None,
@@ -1112,7 +1087,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
dict: Generated schema following the JsonElementExtractionStrategy format
"""
from .prompts import JSON_SCHEMA_BUILDER
from .utils import perform_completion_with_backoff
from .utils import aperform_completion_with_backoff
for name, message in JsonElementExtractionStrategy._GENERATE_SCHEMA_UNWANTED_PROPS.items():
if locals()[name] is not None:
raise AttributeError(f"Setting '{name}' is deprecated. {message}")
@@ -1179,7 +1154,7 @@ In this scenario, use your best judgment to generate the schema. You need to exa
try:
# Call LLM with backoff handling
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_message["content"], user_message["content"]]),
json_response = True,
@@ -1858,7 +1833,7 @@ class RegexExtractionStrategy(ExtractionStrategy):
# ------------------------------------------------------------------ #
# Extraction
# ------------------------------------------------------------------ #
def extract(self, url: str, content: str, *q, **kw) -> List[Dict[str, Any]]:
async def extract(self, url: str, content: str, *q, **kw) -> List[Dict[str, Any]]:
# text = self._plain_text(html)
out: List[Dict[str, Any]] = []
@@ -1889,7 +1864,7 @@ class RegexExtractionStrategy(ExtractionStrategy):
# LLM-assisted one-off pattern builder
# ------------------------------------------------------------------ #
@staticmethod
def generate_pattern(
async def generate_pattern(
label: str,
html: str,
*,
@@ -1946,7 +1921,7 @@ class RegexExtractionStrategy(ExtractionStrategy):
user_msg = "\n\n".join(user_parts)
# ── LLM call (with retry/backoff)
resp = perform_completion_with_backoff(
resp = await aperform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_msg, user_msg]),
json_response=True,

View File

@@ -15,10 +15,9 @@ from .html2text import html2text, CustomHTML2Text
from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD, IMAGE_SCORE_THRESHOLD, DEFAULT_PROVIDER, PROVIDER_MODELS
import httpx
from socket import gaierror
from pathlib import Path , PurePath
from pathlib import Path
from typing import Dict, Any, List, Optional, Callable
from urllib.parse import urljoin
import requests
from requests.exceptions import InvalidSchema
import xxhash
@@ -1673,7 +1672,7 @@ def extract_xml_data(tags, string):
return data
def perform_completion_with_backoff(
async def aperform_completion_with_backoff(
provider,
prompt_with_variables,
api_token,
@@ -1701,7 +1700,7 @@ def perform_completion_with_backoff(
dict: The API response or an error message after all retries.
"""
from litellm import completion
from litellm import acompletion
from litellm.exceptions import RateLimitError
max_attempts = 3
@@ -1716,7 +1715,7 @@ def perform_completion_with_backoff(
for attempt in range(max_attempts):
try:
response = completion(
response = await acompletion(
model=provider,
messages=[{"role": "user", "content": prompt_with_variables}],
**extra_args,
@@ -1755,7 +1754,7 @@ def perform_completion_with_backoff(
# ]
def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
async def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_url=None):
"""
Extract content blocks from website HTML using an AI provider.
@@ -1789,7 +1788,7 @@ def extract_blocks(url, html, provider=DEFAULT_PROVIDER, api_token=None, base_ur
"{" + variable + "}", variable_values[variable]
)
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider, prompt_with_variables, api_token, base_url=base_url
)
@@ -2057,29 +2056,18 @@ def fast_format_html(html_string):
def normalize_url(href, base_url):
"""Normalize URLs to ensure consistent format"""
from urllib.parse import urljoin, urlparse
if href is None:
return None
href_str = str(href).strip()
if not href_str:
# Empty href, conventionally resolves to the base URL itself.
return base_url
# Parse base URL to get components
parsed_href = urlparse(href_str)
if parsed_href.scheme and parsed_href.scheme.lower() in ["mailto", "tel", "javascript", "data", "file"]:
# If href is already a full URL, return it as is
return href_str
parsed_base = urlparse(base_url)
if not parsed_base.scheme or not parsed_base.netloc:
raise ValueError(f"Invalid base URL format: {base_url}")
# # Ensure base_url ends with a trailing slash if it's a directory path
# if not base_url.endswith('/'):
# base_url = base_url + '/'
# Ensure base_url ends with a trailing slash if it's a directory path
if not base_url.endswith('/'):
base_url = base_url + '/'
# Use urljoin to handle all cases
normalized = urljoin(base_url, href_str)
normalized = urljoin(base_url, href.strip())
return normalized
@@ -2092,7 +2080,7 @@ def normalize_url_for_deep_crawl(href, base_url):
return None
# Use urljoin to handle relative URLs
full_url = urljoin(base_url, str(href).strip())
full_url = urljoin(base_url, href.strip())
# Parse the URL for normalization
parsed = urlparse(full_url)
@@ -2122,7 +2110,7 @@ def normalize_url_for_deep_crawl(href, base_url):
normalized = urlunparse((
parsed.scheme,
netloc,
str(PurePath(parsed.path)).rstrip('/'), # Normalize path to remove duplicate slashes
parsed.path.rstrip('/'), # Normalize trailing slash
parsed.params,
query,
fragment
@@ -2139,7 +2127,7 @@ def efficient_normalize_url_for_deep_crawl(href, base_url):
return None
# Resolve relative URLs
full_url = urljoin(base_url, str(href).strip())
full_url = urljoin(base_url, href.strip())
# Use proper URL parsing
parsed = urlparse(full_url)
@@ -2147,51 +2135,52 @@ def efficient_normalize_url_for_deep_crawl(href, base_url):
# Only perform the most critical normalizations
# 1. Lowercase hostname
# 2. Remove fragment
path = parsed.path
if len(path) > 1 and path.endswith('/'):
path = path.rstrip('/')
normalized = urlunparse((
parsed.scheme,
parsed.netloc.lower(),
parsed.path.rstrip('/'),
parsed.params,
parsed.query,
'' # Remove fragment
))
return normalized
# def normalize_url_tmp(href, base_url):
# """Normalize URLs to ensure consistent format"""
# # Extract protocol and domain from base URL
# try:
# base_parts = base_url.split("/")
# protocol = base_parts[0]
# domain = base_parts[2]
# except IndexError:
# raise ValueError(f"Invalid base URL format: {base_url}")
def normalize_url_tmp(href, base_url):
"""Normalize URLs to ensure consistent format"""
# Extract protocol and domain from base URL
try:
base_parts = base_url.split("/")
protocol = base_parts[0]
domain = base_parts[2]
except IndexError:
raise ValueError(f"Invalid base URL format: {base_url}")
# # Handle special protocols
# special_protocols = {"mailto:", "tel:", "ftp:", "file:", "data:", "javascript:"}
# if any(href.lower().startswith(proto) for proto in special_protocols):
# return href.strip()
# Handle special protocols
special_protocols = {"mailto:", "tel:", "ftp:", "file:", "data:", "javascript:"}
if any(href.lower().startswith(proto) for proto in special_protocols):
return href.strip()
# # Handle anchor links
# if href.startswith("#"):
# return f"{base_url}{href}"
# Handle anchor links
if href.startswith("#"):
return f"{base_url}{href}"
# # Handle protocol-relative URLs
# if href.startswith("//"):
# return f"{protocol}{href}"
# Handle protocol-relative URLs
if href.startswith("//"):
return f"{protocol}{href}"
# # Handle root-relative URLs
# if href.startswith("/"):
# return f"{protocol}//{domain}{href}"
# Handle root-relative URLs
if href.startswith("/"):
return f"{protocol}//{domain}{href}"
# # Handle relative URLs
# if not href.startswith(("http://", "https://")):
# # Remove leading './' if present
# href = href.lstrip("./")
# return f"{protocol}//{domain}/{href}"
# Handle relative URLs
if not href.startswith(("http://", "https://")):
# Remove leading './' if present
href = href.lstrip("./")
return f"{protocol}//{domain}/{href}"
# return href.strip()
return href.strip()
def get_base_domain(url: str) -> str:

View File

@@ -24,7 +24,7 @@ from crawl4ai import (
RateLimiter,
LLMConfig
)
from crawl4ai.utils import perform_completion_with_backoff
from crawl4ai.utils import aperform_completion_with_backoff
from crawl4ai.content_filter_strategy import (
PruningContentFilter,
BM25ContentFilter,
@@ -88,7 +88,7 @@ async def handle_llm_qa(
Answer:"""
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider=config["llm"]["provider"],
prompt_with_variables=prompt,
api_token=os.environ.get(config["llm"].get("api_key_env", ""))

View File

@@ -3553,7 +3553,7 @@ from .utils import * # noqa: F403
from .utils import (
sanitize_html,
escape_json_string,
perform_completion_with_backoff,
aperform_completion_with_backoff,
extract_xml_data,
split_and_parse_json_objects,
sanitize_input_encode,
@@ -4162,7 +4162,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
)
try:
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
self.llm_config.provider,
prompt_with_variables,
self.llm_config.api_token,
@@ -4646,7 +4646,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
dict: Generated schema following the JsonElementExtractionStrategy format
"""
from .prompts import JSON_SCHEMA_BUILDER
from .utils import perform_completion_with_backoff
from .utils import aperform_completion_with_backoff
for name, message in JsonElementExtractionStrategy._GENERATE_SCHEMA_UNWANTED_PROPS.items():
if locals()[name] is not None:
raise AttributeError(f"Setting '{name}' is deprecated. {message}")
@@ -4709,7 +4709,7 @@ In this scenario, use your best judgment to generate the schema. You need to exa
try:
# Call LLM with backoff handling
response = perform_completion_with_backoff(
response = await aperform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_message["content"], user_message["content"]]),
json_response = True,
@@ -5597,7 +5597,7 @@ from bs4 import NavigableString, Comment
from .utils import (
clean_tokens,
perform_completion_with_backoff,
aperform_completion_with_backoff,
escape_json_string,
sanitize_html,
get_home_folder,
@@ -6556,7 +6556,7 @@ class LLMContentFilter(RelevantContentFilter):
tag="CHUNK",
params={"chunk_num": i + 1},
)
return perform_completion_with_backoff(
return await aperform_completion_with_backoff(
provider,
prompt,
api_token,

View File

@@ -332,7 +332,7 @@ The `clone()` method:
### Key fields to note
1. **`provider`**:
- Which LLM provider to use.
- Which LLM provoder to use.
- Possible values are `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)*
2. **`api_token`**:

View File

@@ -1,11 +1,7 @@
# Crawl4AIProspectWizard stepbystep guide
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/10nRCwmfxPjVrRUHyJsYlX7BH5bvPoGpx?usp=sharing)
A threestage demo that goes from **LinkedIn scraping****LLM reasoning****graph visualisation**.
**Try it in Google Colab!** Click the badge above to run this demo in a cloud environment with zero setup required.
```
prospectwizard/
├─ c4ai_discover.py # Stage 1 scrape companies + people

View File

@@ -1,55 +1,43 @@
from crawl4ai import LLMConfig
from crawl4ai import AsyncWebCrawler, LLMExtractionStrategy
import asyncio
from pydantic import BaseModel, Field
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, LLMConfig, BrowserConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
from typing import Dict
import os
import json
from pydantic import BaseModel, Field
url = "https://openai.com/api/pricing/"
class OpenAIModelFee(BaseModel):
model_name: str = Field(..., description="Name of the OpenAI model.")
input_fee: str = Field(..., description="Fee for input token for the OpenAI model.")
output_fee: str = Field(..., description="Fee for output token for the OpenAI model.")
async def extract_structured_data_using_llm(provider: str, api_token: str = None, extra_headers: Dict[str, str] = None):
print(f"\n--- Extracting Structured Data with {provider} ---")
if api_token is None and provider != "ollama":
print(f"API token is required for {provider}. Skipping this example.")
return
browser_config = BrowserConfig(headless=True)
extra_args = {"temperature": 0, "top_p": 0.9, "max_tokens": 2000}
if extra_headers:
extra_args["extra_headers"] = extra_headers
crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
word_count_threshold=1,
page_timeout=80000,
extraction_strategy=LLMExtractionStrategy(
llm_config=LLMConfig(provider=provider, api_token=api_token),
schema=OpenAIModelFee.model_json_schema(),
extraction_type="schema",
instruction="""From the crawled content, extract all mentioned model names along with their fees for input and output tokens.
Do not miss any models in the entire content.""",
extra_args=extra_args,
),
output_fee: str = Field(
..., description="Fee for output token for the OpenAI model."
)
async with AsyncWebCrawler(config=browser_config) as crawler:
async def main():
# Use AsyncWebCrawler
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://openai.com/api/pricing/",
config=crawler_config
url=url,
word_count_threshold=1,
extraction_strategy=LLMExtractionStrategy(
# provider= "openai/gpt-4o", api_token = os.getenv('OPENAI_API_KEY'),
llm_config=LLMConfig(provider="groq/llama-3.1-70b-versatile", api_token=os.getenv("GROQ_API_KEY")),
schema=OpenAIModelFee.model_json_schema(),
extraction_type="schema",
instruction="From the crawled content, extract all mentioned model names along with their "
"fees for input and output tokens. Make sure not to miss anything in the entire content. "
"One extracted model JSON format should look like this: "
'{ "model_name": "GPT-4", "input_fee": "US$10.00 / 1M tokens", "output_fee": "US$30.00 / 1M tokens" }',
),
)
print(result.extracted_content)
print("Success:", result.success)
model_fees = json.loads(result.extracted_content)
print(len(model_fees))
with open(".data/data.json", "w", encoding="utf-8") as f:
f.write(result.extracted_content)
if __name__ == "__main__":
asyncio.run(
extract_structured_data_using_llm(
provider="openai/gpt-4o", api_token=os.getenv("OPENAI_API_KEY")
)
)
asyncio.run(main())

View File

@@ -1,204 +0,0 @@
Okay, here is the Markdown documentation for `PDFCrawlerStrategy` and `PDFContentScrapingStrategy`, formatted for an MkDocs site.
# PDF Processing Strategies
Crawl4AI provides specialized strategies for handling and extracting content from PDF files. These strategies allow you to seamlessly integrate PDF processing into your crawling workflows, whether the PDFs are hosted online or stored locally.
## `PDFCrawlerStrategy`
### Overview
`PDFCrawlerStrategy` is an implementation of `AsyncCrawlerStrategy` designed specifically for PDF documents. Instead of interpreting the input URL as an HTML webpage, this strategy treats it as a pointer to a PDF file. It doesn't perform deep crawling or HTML parsing itself but rather prepares the PDF source for a dedicated PDF scraping strategy. Its primary role is to identify the PDF source (web URL or local file) and pass it along the processing pipeline in a way that `AsyncWebCrawler` can handle.
### When to Use
Use `PDFCrawlerStrategy` when you need to:
- Process PDF files using the `AsyncWebCrawler`.
- Handle PDFs from both web URLs (e.g., `https://example.com/document.pdf`) and local file paths (e.g., `file:///path/to/your/document.pdf`).
- Integrate PDF content extraction into a unified `CrawlResult` object, allowing consistent handling of PDF data alongside web page data.
### Key Methods and Their Behavior
- **`__init__(self, logger: AsyncLogger = None)`**:
- Initializes the strategy.
- `logger`: An optional `AsyncLogger` instance (from `crawl4ai.async_logger`) for logging purposes.
- **`async crawl(self, url: str, **kwargs) -> AsyncCrawlResponse`**:
- This method is called by the `AsyncWebCrawler` during the `arun` process.
- It takes the `url` (which should point to a PDF) and creates a minimal `AsyncCrawlResponse`.
- The `html` attribute of this response is typically empty or a placeholder, as the actual PDF content processing is deferred to the `PDFContentScrapingStrategy` (or a similar PDF-aware scraping strategy).
- It sets `response_headers` to indicate "application/pdf" and `status_code` to 200.
- **`async close(self)`**:
- A method for cleaning up any resources used by the strategy. For `PDFCrawlerStrategy`, this is usually minimal.
- **`async __aenter__(self)` / `async __aexit__(self, exc_type, exc_val, exc_tb)`**:
- Enables asynchronous context management for the strategy, allowing it to be used with `async with`.
### Example Usage
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.processors.pdf import PDFCrawlerStrategy, PDFContentScrapingStrategy
async def main():
# Initialize the PDF crawler strategy
pdf_crawler_strategy = PDFCrawlerStrategy()
# PDFCrawlerStrategy is typically used in conjunction with PDFContentScrapingStrategy
# The scraping strategy handles the actual PDF content extraction
pdf_scraping_strategy = PDFContentScrapingStrategy()
run_config = CrawlerRunConfig(scraping_strategy=pdf_scraping_strategy)
async with AsyncWebCrawler(crawler_strategy=pdf_crawler_strategy) as crawler:
# Example with a remote PDF URL
pdf_url = "https://arxiv.org/pdf/2310.06825.pdf" # A public PDF from arXiv
print(f"Attempting to process PDF: {pdf_url}")
result = await crawler.arun(url=pdf_url, config=run_config)
if result.success:
print(f"Successfully processed PDF: {result.url}")
print(f"Metadata Title: {result.metadata.get('title', 'N/A')}")
# Further processing of result.markdown, result.media, etc.
# would be done here, based on what PDFContentScrapingStrategy extracts.
if result.markdown and hasattr(result.markdown, 'raw_markdown'):
print(f"Extracted text (first 200 chars): {result.markdown.raw_markdown[:200]}...")
else:
print("No markdown (text) content extracted.")
else:
print(f"Failed to process PDF: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
```
### Pros and Cons
**Pros:**
- Enables `AsyncWebCrawler` to handle PDF sources directly using familiar `arun` calls.
- Provides a consistent interface for specifying PDF sources (URLs or local paths).
- Abstracts the source handling, allowing a separate scraping strategy to focus on PDF content parsing.
**Cons:**
- Does not perform any PDF data extraction itself; it strictly relies on a compatible scraping strategy (like `PDFContentScrapingStrategy`) to process the PDF.
- Has limited utility on its own; most of its value comes from being paired with a PDF-specific content scraping strategy.
---
## `PDFContentScrapingStrategy`
### Overview
`PDFContentScrapingStrategy` is an implementation of `ContentScrapingStrategy` designed to extract text, metadata, and optionally images from PDF documents. It is intended to be used in conjunction with a crawler strategy that can provide it with a PDF source, such as `PDFCrawlerStrategy`. This strategy uses the `NaivePDFProcessorStrategy` internally to perform the low-level PDF parsing.
### When to Use
Use `PDFContentScrapingStrategy` when your `AsyncWebCrawler` (often configured with `PDFCrawlerStrategy`) needs to:
- Extract textual content page by page from a PDF document.
- Retrieve standard metadata embedded within the PDF (e.g., title, author, subject, creation date, page count).
- Optionally, extract images contained within the PDF pages. These images can be saved to a local directory or made available for further processing.
- Produce a `ScrapingResult` that can be converted into a `CrawlResult`, making PDF content accessible in a manner similar to HTML web content (e.g., text in `result.markdown`, metadata in `result.metadata`).
### Key Configuration Attributes
When initializing `PDFContentScrapingStrategy`, you can configure its behavior using the following attributes:
- **`extract_images: bool = False`**: If `True`, the strategy will attempt to extract images from the PDF.
- **`save_images_locally: bool = False`**: If `True` (and `extract_images` is also `True`), extracted images will be saved to disk in the `image_save_dir`. If `False`, image data might be available in another form (e.g., base64, depending on the underlying processor) but not saved as separate files by this strategy.
- **`image_save_dir: str = None`**: Specifies the directory where extracted images should be saved if `save_images_locally` is `True`. If `None`, a default or temporary directory might be used.
- **`batch_size: int = 4`**: Defines how many PDF pages are processed in a single batch. This can be useful for managing memory when dealing with very large PDF documents.
- **`logger: AsyncLogger = None`**: An optional `AsyncLogger` instance for logging.
### Key Methods and Their Behavior
- **`__init__(self, save_images_locally: bool = False, extract_images: bool = False, image_save_dir: str = None, batch_size: int = 4, logger: AsyncLogger = None)`**:
- Initializes the strategy with configurations for image handling, batch processing, and logging. It sets up an internal `NaivePDFProcessorStrategy` instance which performs the actual PDF parsing.
- **`scrap(self, url: str, html: str, **params) -> ScrapingResult`**:
- This is the primary synchronous method called by the crawler (via `ascrap`) to process the PDF.
- `url`: The path or URL to the PDF file (provided by `PDFCrawlerStrategy` or similar).
- `html`: Typically an empty string when used with `PDFCrawlerStrategy`, as the content is a PDF, not HTML.
- It first ensures the PDF is accessible locally (downloads it to a temporary file if `url` is remote).
- It then uses its internal PDF processor to extract text, metadata, and images (if configured).
- The extracted information is compiled into a `ScrapingResult` object:
- `cleaned_html`: Contains an HTML-like representation of the PDF, where each page's content is often wrapped in a `<div>` with page number information.
- `media`: A dictionary where `media["images"]` will contain information about extracted images if `extract_images` was `True`.
- `links`: A dictionary where `links["urls"]` can contain URLs found within the PDF content.
- `metadata`: A dictionary holding PDF metadata (e.g., title, author, num_pages).
- **`async ascrap(self, url: str, html: str, **kwargs) -> ScrapingResult`**:
- The asynchronous version of `scrap`. Under the hood, it typically runs the synchronous `scrap` method in a separate thread using `asyncio.to_thread` to avoid blocking the event loop.
- **`_get_pdf_path(self, url: str) -> str`**:
- A private helper method to manage PDF file access. If the `url` is remote (http/https), it downloads the PDF to a temporary local file and returns its path. If `url` indicates a local file (`file://` or a direct path), it resolves and returns the local path.
### Example Usage
```python
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.processors.pdf import PDFCrawlerStrategy, PDFContentScrapingStrategy
import os # For creating image directory
async def main():
# Define the directory for saving extracted images
image_output_dir = "./my_pdf_images"
os.makedirs(image_output_dir, exist_ok=True)
# Configure the PDF content scraping strategy
# Enable image extraction and specify where to save them
pdf_scraping_cfg = PDFContentScrapingStrategy(
extract_images=True,
save_images_locally=True,
image_save_dir=image_output_dir,
batch_size=2 # Process 2 pages at a time for demonstration
)
# The PDFCrawlerStrategy is needed to tell AsyncWebCrawler how to "crawl" a PDF
pdf_crawler_cfg = PDFCrawlerStrategy()
# Configure the overall crawl run
run_cfg = CrawlerRunConfig(
scraping_strategy=pdf_scraping_cfg # Use our PDF scraping strategy
)
# Initialize the crawler with the PDF-specific crawler strategy
async with AsyncWebCrawler(crawler_strategy=pdf_crawler_cfg) as crawler:
pdf_url = "https://arxiv.org/pdf/2310.06825.pdf" # Example PDF
print(f"Starting PDF processing for: {pdf_url}")
result = await crawler.arun(url=pdf_url, config=run_cfg)
if result.success:
print("\n--- PDF Processing Successful ---")
print(f"Processed URL: {result.url}")
print("\n--- Metadata ---")
for key, value in result.metadata.items():
print(f" {key.replace('_', ' ').title()}: {value}")
if result.markdown and hasattr(result.markdown, 'raw_markdown'):
print(f"\n--- Extracted Text (Markdown Snippet) ---")
print(result.markdown.raw_markdown[:500].strip() + "...")
else:
print("\nNo text (markdown) content extracted.")
if result.media and result.media.get("images"):
print(f"\n--- Image Extraction ---")
print(f"Extracted {len(result.media['images'])} image(s).")
for i, img_info in enumerate(result.media["images"][:2]): # Show info for first 2 images
print(f" Image {i+1}:")
print(f" Page: {img_info.get('page')}")
print(f" Format: {img_info.get('format', 'N/A')}")
if img_info.get('path'):
print(f" Saved at: {img_info.get('path')}")
else:
print("\nNo images were extracted (or extract_images was False).")
else:
print(f"\n--- PDF Processing Failed ---")
print(f"Error: {result.error_message}")
if __name__ == "__main__":
asyncio.run(main())
```
### Pros and Cons
**Pros:**
- Provides a comprehensive way to extract text, metadata, and (optionally) images from PDF documents.
- Handles both remote PDFs (via URL) and local PDF files.
- Configurable image extraction allows saving images to disk or accessing their data.
- Integrates smoothly with the `CrawlResult` object structure, making PDF-derived data accessible in a way consistent with web-scraped data.
- The `batch_size` parameter can help in managing memory consumption when processing large or numerous PDF pages.
**Cons:**
- Extraction quality and performance can vary significantly depending on the PDF's complexity, encoding, and whether it's image-based (scanned) or text-based.
- Image extraction can be resource-intensive (both CPU and disk space if `save_images_locally` is true).
- Relies on `NaivePDFProcessorStrategy` internally, which might have limitations with very complex layouts, encrypted PDFs, or forms compared to more sophisticated PDF parsing libraries. Scanned PDFs will not yield text unless an OCR step is performed (which is not part of this strategy by default).
- Link extraction from PDFs can be basic and depends on how hyperlinks are embedded in the document.

View File

@@ -259,7 +259,7 @@ LLMConfig is useful to pass LLM provider config to strategies and functions that
## 3.1 Parameters
| **Parameter** | **Type / Default** | **What It Does** |
|-----------------------|----------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| **`provider`** | `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)* | Which LLM provider to use.
| **`provider`** | `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)* | Which LLM provoder to use.
| **`api_token`** |1.Optional. When not provided explicitly, api_token will be read from environment variables based on provider. For example: If a gemini model is passed as provider then,`"GEMINI_API_KEY"` will be read from environment variables <br/> 2. API token of LLM provider <br/> eg: `api_token = "gsk_1ClHGGJ7Lpn4WGybR7vNWGdyb3FY7zXEw3SCiy0BAVM9lL8CQv"` <br/> 3. Environment variable - use with prefix "env:" <br/> eg:`api_token = "env: GROQ_API_KEY"` | API token to use for the given provider
| **`base_url`** |Optional. Custom API endpoint | If your provider has a custom endpoint

View File

@@ -252,7 +252,7 @@ The `clone()` method:
### Key fields to note
1. **`provider`**:
- Which LLM provider to use.
- Which LLM provoder to use.
- Possible values are `"ollama/llama3","groq/llama3-70b-8192","groq/llama3-8b-8192", "openai/gpt-4o-mini" ,"openai/gpt-4o","openai/o1-mini","openai/o1-preview","openai/o3-mini","openai/o3-mini-high","anthropic/claude-3-haiku-20240307","anthropic/claude-3-opus-20240229","anthropic/claude-3-sonnet-20240229","anthropic/claude-3-5-sonnet-20240620","gemini/gemini-pro","gemini/gemini-1.5-pro","gemini/gemini-2.0-flash","gemini/gemini-2.0-flash-exp","gemini/gemini-2.0-flash-lite-preview-02-05","deepseek/deepseek-chat"`<br/>*(default: `"openai/gpt-4o-mini"`)*
2. **`api_token`**:

View File

@@ -218,7 +218,7 @@ import json
import asyncio
from typing import List
from pydantic import BaseModel, Field
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode, LLMConfig
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.extraction_strategy import LLMExtractionStrategy
class Entity(BaseModel):
@@ -238,8 +238,8 @@ class KnowledgeGraph(BaseModel):
async def main():
# LLM extraction strategy
llm_strat = LLMExtractionStrategy(
llmConfig = LLMConfig(provider="openai/gpt-4", api_token=os.getenv('OPENAI_API_KEY')),
schema=KnowledgeGraph.model_json_schema(),
llmConfig = LlmConfig(provider="openai/gpt-4", api_token=os.getenv('OPENAI_API_KEY')),
schema=KnowledgeGraph.schema_json(),
extraction_type="schema",
instruction="Extract entities and relationships from the content. Return valid JSON.",
chunk_token_threshold=1400,
@@ -258,10 +258,6 @@ async def main():
url = "https://www.nbcnews.com/business"
result = await crawler.arun(url=url, config=crawl_config)
print("--- LLM RAW RESPONSE ---")
print(result.extracted_content)
print("--- END LLM RAW RESPONSE ---")
if result.success:
with open("kb_result.json", "w", encoding="utf-8") as f:
f.write(result.extracted_content)

View File

@@ -43,7 +43,6 @@ nav:
- "Identity Based Crawling": "advanced/identity-based-crawling.md"
- "SSL Certificate": "advanced/ssl-certificate.md"
- "Network & Console Capture": "advanced/network-console-capture.md"
- "PDF Parsing": "advanced/pdf-parsing.md"
- Extraction:
- "LLM-Free Strategies": "extraction/no-llm-strategies.md"
- "LLM Strategies": "extraction/llm-strategies.md"

View File

@@ -1,75 +0,0 @@
# // File: tests/deep_crawling/test_filters.py
import pytest
from urllib.parse import urlparse
from crawl4ai import ContentTypeFilter, URLFilter
# Minimal URLFilter base class stub if not already importable directly for tests
# In a real scenario, this would be imported from the library
if not hasattr(URLFilter, '_update_stats'): # Check if it's a basic stub
class URLFilter: # Basic stub for testing if needed
def __init__(self, name=None): self.name = name
def apply(self, url: str) -> bool: raise NotImplementedError
def _update_stats(self, passed: bool): pass # Mock implementation
# Assume ContentTypeFilter is structured as discussed. If its definition is not fully
# available for direct import in the test environment, a more elaborate stub or direct
# instantiation of the real class (if possible) would be needed.
# For this example, we assume ContentTypeFilter can be imported and used.
class TestContentTypeFilter:
@pytest.mark.parametrize(
"url, allowed_types, expected",
[
# Existing tests (examples)
("http://example.com/page.html", ["text/html"], True),
("http://example.com/page.json", ["application/json"], True),
("http://example.com/image.png", ["text/html"], False),
("http://example.com/document.pdf", ["application/pdf"], True),
("http://example.com/page", ["text/html"], True), # No extension, allowed
("http://example.com/page", ["text/html"], False), # No extension, disallowed
("http://example.com/page.unknown", ["text/html"], False), # Unknown extension
# Tests for PHP extensions
("http://example.com/index.php", ["application/x-httpd-php"], True),
("http://example.com/script.php3", ["application/x-httpd-php"], True),
("http://example.com/legacy.php4", ["application/x-httpd-php"], True),
("http://example.com/main.php5", ["application/x-httpd-php"], True),
("http://example.com/api.php7", ["application/x-httpd-php"], True),
("http://example.com/index.phtml", ["application/x-httpd-php"], True),
("http://example.com/source.phps", ["application/x-httpd-php-source"], True),
# Test rejection of PHP extensions
("http://example.com/index.php", ["text/html"], False),
("http://example.com/script.php3", ["text/plain"], False),
("http://example.com/source.phps", ["application/x-httpd-php"], False), # Mismatch MIME
("http://example.com/source.php", ["application/x-httpd-php-source"], False), # Mismatch MIME for .php
# Test case-insensitivity of extensions in URL
("http://example.com/PAGE.HTML", ["text/html"], True),
("http://example.com/INDEX.PHP", ["application/x-httpd-php"], True),
("http://example.com/SOURCE.PHPS", ["application/x-httpd-php-source"], True),
# Test case-insensitivity of allowed_types
("http://example.com/index.php", ["APPLICATION/X-HTTPD-PHP"], True),
],
)
def test_apply(self, url, allowed_types, expected):
content_filter = ContentTypeFilter(
allowed_types=allowed_types
)
assert content_filter.apply(url) == expected
@pytest.mark.parametrize(
"url, expected_extension",
[
("http://example.com/file.html", "html"),
("http://example.com/file.tar.gz", "gz"),
("http://example.com/path/", ""),
("http://example.com/nodot", ""),
("http://example.com/.config", "config"), # hidden file with extension
("http://example.com/path/to/archive.BIG.zip", "zip"), # Case test
]
)
def test_extract_extension(self, url, expected_extension):
# Test the static method directly
assert ContentTypeFilter._extract_extension(url) == expected_extension

View File

@@ -15,24 +15,6 @@ CRAWL4AI_HOME_DIR = Path(os.path.expanduser("~")).joinpath(".crawl4ai")
if not CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").exists():
CRAWL4AI_HOME_DIR.joinpath("profiles", "test_profile").mkdir(parents=True)
@pytest.fixture
def basic_html():
return """
<html lang="en">
<head>
<title>Basic HTML</title>
</head>
<body>
<h1>Main Heading</h1>
<main>
<div class="container">
<p>Basic HTML document for testing purposes.</p>
</div>
</main>
</body>
</html>
"""
# Test Config Files
@pytest.fixture
def basic_browser_config():
@@ -343,13 +325,6 @@ async def test_stealth_mode(crawler_strategy):
)
assert response.status_code == 200
@pytest.mark.asyncio
@pytest.mark.parametrize("prefix", ("raw:", "raw://"))
async def test_raw_urls(crawler_strategy, basic_html, prefix):
url = f"{prefix}{basic_html}"
response = await crawler_strategy.crawl(url, CrawlerRunConfig())
assert response.html == basic_html
# Error Handling Tests
@pytest.mark.asyncio
async def test_invalid_url():

View File

@@ -1,34 +0,0 @@
import asyncio
from crawl4ai import CrawlerRunConfig, AsyncWebCrawler, BrowserConfig
from pathlib import Path
import os
async def test_basic_download():
# Custom folder (otherwise defaults to ~/.crawl4ai/downloads)
downloads_path = os.path.join(Path.home(), ".crawl4ai", "downloads")
os.makedirs(downloads_path, exist_ok=True)
browser_config = BrowserConfig(
accept_downloads=True,
downloads_path=downloads_path
)
async with AsyncWebCrawler(config=browser_config) as crawler:
run_config = CrawlerRunConfig(
js_code="""
const link = document.querySelector('a[href$=".exe"]');
if (link) { link.click(); }
""",
delay_before_return_html=5
)
result = await crawler.arun("https://www.python.org/downloads/", config=run_config)
if result.downloaded_files:
print("Downloaded files:")
for file_path in result.downloaded_files:
print("", file_path)
else:
print("No files downloaded.")
if __name__ == "__main__":
asyncio.run(test_basic_download())

View File

@@ -1,115 +0,0 @@
"""
Sample script to test the max_scroll_steps parameter implementation
"""
import asyncio
import os
import sys
# Get the grandparent directory
grandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(grandparent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
from crawl4ai import AsyncWebCrawler
from crawl4ai.async_configs import CrawlerRunConfig
async def test_max_scroll_steps():
"""
Test the max_scroll_steps parameter with different configurations
"""
print("🚀 Testing max_scroll_steps parameter implementation")
print("=" * 60)
async with AsyncWebCrawler(verbose=True) as crawler:
# Test 1: Without max_scroll_steps (unlimited scrolling)
print("\\n📋 Test 1: Unlimited scrolling (max_scroll_steps=None)")
config1 = CrawlerRunConfig(
scan_full_page=True,
scroll_delay=0.1,
max_scroll_steps=None, # Default behavior
verbose=True
)
print(f"Config: scan_full_page={config1.scan_full_page}, max_scroll_steps={config1.max_scroll_steps}")
try:
result1 = await crawler.arun(
url="https://example.com", # Simple page for testing
config=config1
)
print(f"✅ Test 1 Success: Crawled {len(result1.markdown)} characters")
except Exception as e:
print(f"❌ Test 1 Failed: {e}")
# Test 2: With limited scroll steps
print("\\n📋 Test 2: Limited scrolling (max_scroll_steps=3)")
config2 = CrawlerRunConfig(
scan_full_page=True,
scroll_delay=0.1,
max_scroll_steps=3, # Limit to 3 scroll steps
verbose=True
)
print(f"Config: scan_full_page={config2.scan_full_page}, max_scroll_steps={config2.max_scroll_steps}")
try:
result2 = await crawler.arun(
url="https://techcrunch.com/", # Another test page
config=config2
)
print(f"✅ Test 2 Success: Crawled {len(result2.markdown)} characters")
except Exception as e:
print(f"❌ Test 2 Failed: {e}")
# Test 3: Test serialization/deserialization
print("\\n📋 Test 3: Configuration serialization test")
config3 = CrawlerRunConfig(
scan_full_page=True,
max_scroll_steps=5,
scroll_delay=0.2
)
# Test to_dict
config_dict = config3.to_dict()
print(f"Serialized max_scroll_steps: {config_dict.get('max_scroll_steps')}")
# Test from_kwargs
config4 = CrawlerRunConfig.from_kwargs({
'scan_full_page': True,
'max_scroll_steps': 7,
'scroll_delay': 0.3
})
print(f"Deserialized max_scroll_steps: {config4.max_scroll_steps}")
print("✅ Test 3 Success: Serialization works correctly")
# Test 4: Edge case - max_scroll_steps = 0
print("\\n📋 Test 4: Edge case (max_scroll_steps=0)")
config5 = CrawlerRunConfig(
scan_full_page=True,
max_scroll_steps=0, # Should not scroll at all
verbose=True
)
try:
result5 = await crawler.arun(
url="https://techcrunch.com/",
config=config5
)
print(f"✅ Test 4 Success: No scrolling performed, crawled {len(result5.markdown)} characters")
except Exception as e:
print(f"❌ Test 4 Failed: {e}")
print("\\n" + "=" * 60)
print("🎉 All tests completed!")
print("\\nThe max_scroll_steps parameter is working correctly:")
print("- None: Unlimited scrolling (default behavior)")
print("- Positive integer: Limits scroll steps to that number")
print("- 0: No scrolling performed")
print("- Properly serializes/deserializes in config")
if __name__ == "__main__":
print("Starting max_scroll_steps test...")
asyncio.run(test_max_scroll_steps())

View File

@@ -1,85 +0,0 @@
import sys
import os
# Get the grandparent directory
grandparent_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(grandparent_dir)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
import asyncio
from crawl4ai.deep_crawling.filters import URLPatternFilter
def test_prefix_boundary_matching():
"""Test that prefix patterns respect path boundaries"""
print("=== Testing URLPatternFilter Prefix Boundary Fix ===")
filter_obj = URLPatternFilter(patterns=['https://langchain-ai.github.io/langgraph/*'])
test_cases = [
('https://langchain-ai.github.io/langgraph/', True),
('https://langchain-ai.github.io/langgraph/concepts/', True),
('https://langchain-ai.github.io/langgraph/tutorials/', True),
('https://langchain-ai.github.io/langgraph?param=1', True),
('https://langchain-ai.github.io/langgraph#section', True),
('https://langchain-ai.github.io/langgraphjs/', False),
('https://langchain-ai.github.io/langgraphjs/concepts/', False),
('https://other-site.com/langgraph/', False),
]
all_passed = True
for url, expected in test_cases:
result = filter_obj.apply(url)
status = "PASS" if result == expected else "FAIL"
if result != expected:
all_passed = False
print(f"{status:4} | Expected: {expected:5} | Got: {result:5} | {url}")
return all_passed
def test_edge_cases():
"""Test edge cases for path boundary matching"""
print("\n=== Testing Edge Cases ===")
test_patterns = [
('/api/*', [
('/api/', True),
('/api/v1', True),
('/api?param=1', True),
('/apiv2/', False),
('/api_old/', False),
]),
('*/docs/*', [
('example.com/docs/', True),
('example.com/docs/guide', True),
('example.com/documentation/', False),
('example.com/docs_old/', False),
]),
]
all_passed = True
for pattern, test_cases in test_patterns:
print(f"\nPattern: {pattern}")
filter_obj = URLPatternFilter(patterns=[pattern])
for url, expected in test_cases:
result = filter_obj.apply(url)
status = "PASS" if result == expected else "FAIL"
if result != expected:
all_passed = False
print(f" {status:4} | Expected: {expected:5} | Got: {result:5} | {url}")
return all_passed
if __name__ == "__main__":
test1_passed = test_prefix_boundary_matching()
test2_passed = test_edge_cases()
if test1_passed and test2_passed:
print("\n✅ All tests passed!")
sys.exit(0)
else:
print("\n❌ Some tests failed!")
sys.exit(1)