merge:from next branch

This commit is contained in:
Aravind Karnam
2025-05-06 15:16:47 +05:30
24 changed files with 1511 additions and 335 deletions

3
.gitignore vendored
View File

@@ -263,4 +263,5 @@ tests/**/test_site
tests/**/reports tests/**/reports
tests/**/benchmark_reports tests/**/benchmark_reports
docs/**/data docs/**/data
.codecat/

View File

@@ -5,6 +5,21 @@ All notable changes to Crawl4AI will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.6.2] - 2025-05-02
### Added
- New `RegexExtractionStrategy` for fast pattern-based extraction without requiring LLM
- Built-in patterns for emails, URLs, phone numbers, dates, and more
- Support for custom regex patterns
- `generate_pattern` utility for LLM-assisted pattern creation (one-time use)
- Added `fit_html` as a top-level field in `CrawlResult` for optimized HTML extraction
- Added support for network response body capture in network request tracking
### Changed
- Updated documentation for no-LLM extraction strategies
- Enhanced API reference to include RegexExtractionStrategy examples and usage
- Improved HTML preprocessing with optimized performance for extraction strategies
## [0.6.1] - 2025-04-24 ## [0.6.1] - 2025-04-24
### Added ### Added

View File

@@ -23,7 +23,8 @@ from .extraction_strategy import (
CosineStrategy, CosineStrategy,
JsonCssExtractionStrategy, JsonCssExtractionStrategy,
JsonXPathExtractionStrategy, JsonXPathExtractionStrategy,
JsonLxmlExtractionStrategy JsonLxmlExtractionStrategy,
RegexExtractionStrategy
) )
from .chunking_strategy import ChunkingStrategy, RegexChunking from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator from .markdown_generation_strategy import DefaultMarkdownGenerator
@@ -105,6 +106,7 @@ __all__ = [
"JsonCssExtractionStrategy", "JsonCssExtractionStrategy",
"JsonXPathExtractionStrategy", "JsonXPathExtractionStrategy",
"JsonLxmlExtractionStrategy", "JsonLxmlExtractionStrategy",
"RegexExtractionStrategy",
"ChunkingStrategy", "ChunkingStrategy",
"RegexChunking", "RegexChunking",
"DefaultMarkdownGenerator", "DefaultMarkdownGenerator",

View File

@@ -571,6 +571,14 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
async def handle_response_capture(response): async def handle_response_capture(response):
try: try:
try:
# body = await response.body()
# json_body = await response.json()
text_body = await response.text()
except Exception as e:
body = None
# json_body = None
# text_body = None
captured_requests.append({ captured_requests.append({
"event_type": "response", "event_type": "response",
"url": response.url, "url": response.url,
@@ -579,7 +587,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"headers": dict(response.headers), # Convert Header dict "headers": dict(response.headers), # Convert Header dict
"from_service_worker": response.from_service_worker, "from_service_worker": response.from_service_worker,
"request_timing": response.request.timing, # Detailed timing info "request_timing": response.request.timing, # Detailed timing info
"timestamp": time.time() "timestamp": time.time(),
"body" : {
# "raw": body,
# "json": json_body,
"text": text_body
}
}) })
except Exception as e: except Exception as e:
if self.logger: if self.logger:

View File

@@ -171,7 +171,10 @@ class AsyncDatabaseManager:
f"Code context:\n{error_context['code_context']}" f"Code context:\n{error_context['code_context']}"
) )
self.logger.error( self.logger.error(
message=create_box_message(error_message, type="error"), message="{error}",
tag="ERROR",
params={"error": str(error_message)},
boxes=["error"],
) )
raise raise
@@ -189,7 +192,10 @@ class AsyncDatabaseManager:
f"Code context:\n{error_context['code_context']}" f"Code context:\n{error_context['code_context']}"
) )
self.logger.error( self.logger.error(
message=create_box_message(error_message, type="error"), message="{error}",
tag="ERROR",
params={"error": str(error_message)},
boxes=["error"],
) )
raise raise
finally: finally:

View File

@@ -1,10 +1,12 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, List
from colorama import Fore, Style, init
import os import os
from datetime import datetime from datetime import datetime
from urllib.parse import unquote from urllib.parse import unquote
from rich.console import Console
from rich.text import Text
from .utils import create_box_message
class LogLevel(Enum): class LogLevel(Enum):
@@ -21,6 +23,26 @@ class LogLevel(Enum):
FATAL = 10 FATAL = 10
def __str__(self):
return self.name.lower()
class LogColor(str, Enum):
"""Enum for log colors."""
DEBUG = "lightblack"
INFO = "cyan"
SUCCESS = "green"
WARNING = "yellow"
ERROR = "red"
CYAN = "cyan"
GREEN = "green"
YELLOW = "yellow"
MAGENTA = "magenta"
DIM_MAGENTA = "dim magenta"
def __str__(self):
"""Automatically convert rich color to string."""
return self.value
class AsyncLoggerBase(ABC): class AsyncLoggerBase(ABC):
@@ -52,6 +74,7 @@ class AsyncLoggerBase(ABC):
def error_status(self, url: str, error: str, tag: str = "ERROR", url_length: int = 100): def error_status(self, url: str, error: str, tag: str = "ERROR", url_length: int = 100):
pass pass
class AsyncLogger(AsyncLoggerBase): class AsyncLogger(AsyncLoggerBase):
""" """
Asynchronous logger with support for colored console output and file logging. Asynchronous logger with support for colored console output and file logging.
@@ -79,17 +102,11 @@ class AsyncLogger(AsyncLoggerBase):
} }
DEFAULT_COLORS = { DEFAULT_COLORS = {
LogLevel.DEBUG: Fore.LIGHTBLACK_EX, LogLevel.DEBUG: LogColor.DEBUG,
LogLevel.INFO: Fore.CYAN, LogLevel.INFO: LogColor.INFO,
LogLevel.SUCCESS: Fore.GREEN, LogLevel.SUCCESS: LogColor.SUCCESS,
LogLevel.WARNING: Fore.YELLOW, LogLevel.WARNING: LogColor.WARNING,
LogLevel.ERROR: Fore.RED, LogLevel.ERROR: LogColor.ERROR,
LogLevel.CRITICAL: Fore.RED + Style.BRIGHT,
LogLevel.ALERT: Fore.RED + Style.BRIGHT,
LogLevel.NOTICE: Fore.BLUE,
LogLevel.EXCEPTION: Fore.RED + Style.BRIGHT,
LogLevel.FATAL: Fore.RED + Style.BRIGHT,
LogLevel.DEFAULT: Fore.WHITE,
} }
def __init__( def __init__(
@@ -98,7 +115,7 @@ class AsyncLogger(AsyncLoggerBase):
log_level: LogLevel = LogLevel.DEBUG, log_level: LogLevel = LogLevel.DEBUG,
tag_width: int = 10, tag_width: int = 10,
icons: Optional[Dict[str, str]] = None, icons: Optional[Dict[str, str]] = None,
colors: Optional[Dict[LogLevel, str]] = None, colors: Optional[Dict[LogLevel, LogColor]] = None,
verbose: bool = True, verbose: bool = True,
): ):
""" """
@@ -112,13 +129,13 @@ class AsyncLogger(AsyncLoggerBase):
colors: Custom colors for different log levels colors: Custom colors for different log levels
verbose: Whether to output to console verbose: Whether to output to console
""" """
init() # Initialize colorama
self.log_file = log_file self.log_file = log_file
self.log_level = log_level self.log_level = log_level
self.tag_width = tag_width self.tag_width = tag_width
self.icons = icons or self.DEFAULT_ICONS self.icons = icons or self.DEFAULT_ICONS
self.colors = colors or self.DEFAULT_COLORS self.colors = colors or self.DEFAULT_COLORS
self.verbose = verbose self.verbose = verbose
self.console = Console()
# Create log file directory if needed # Create log file directory if needed
if log_file: if log_file:
@@ -143,16 +160,11 @@ class AsyncLogger(AsyncLoggerBase):
def _write_to_file(self, message: str): def _write_to_file(self, message: str):
"""Write a message to the log file if configured.""" """Write a message to the log file if configured."""
if self.log_file: if self.log_file:
text = Text.from_markup(message)
plain_text = text.plain
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
with open(self.log_file, "a", encoding="utf-8") as f: with open(self.log_file, "a", encoding="utf-8") as f:
# Strip ANSI color codes for file output f.write(f"[{timestamp}] {plain_text}\n")
clean_message = message.replace(Fore.RESET, "").replace(
Style.RESET_ALL, ""
)
for color in vars(Fore).values():
if isinstance(color, str):
clean_message = clean_message.replace(color, "")
f.write(f"[{timestamp}] {clean_message}\n")
def _log( def _log(
self, self,
@@ -160,8 +172,9 @@ class AsyncLogger(AsyncLoggerBase):
message: str, message: str,
tag: str, tag: str,
params: Optional[Dict[str, Any]] = None, params: Optional[Dict[str, Any]] = None,
colors: Optional[Dict[str, str]] = None, colors: Optional[Dict[str, LogColor]] = None,
base_color: Optional[str] = None, boxes: Optional[List[str]] = None,
base_color: Optional[LogColor] = None,
**kwargs, **kwargs,
): ):
""" """
@@ -173,55 +186,44 @@ class AsyncLogger(AsyncLoggerBase):
tag: Tag for the message tag: Tag for the message
params: Parameters to format into the message params: Parameters to format into the message
colors: Color overrides for specific parameters colors: Color overrides for specific parameters
boxes: Box overrides for specific parameters
base_color: Base color for the entire message base_color: Base color for the entire message
""" """
if level.value < self.log_level.value: if level.value < self.log_level.value:
return return
# Format the message with parameters if provided # avoid conflict with rich formatting
parsed_message = message.replace("[", "[[").replace("]", "]]")
if params: if params:
try: # FIXME: If there are formatting strings in floating point format,
# First format the message with raw parameters # this may result in colors and boxes not being applied properly.
formatted_message = message.format(**params) # such as {value:.2f}, the value is 0.23333 format it to 0.23,
# but we replace("0.23333", "[color]0.23333[/color]")
formatted_message = parsed_message.format(**params)
for key, value in params.items():
# value_str may discard `[` and `]`, so we need to replace it.
value_str = str(value).replace("[", "[[").replace("]", "]]")
# check is need apply color
if colors and key in colors:
color_str = f"[{colors[key]}]{value_str}[/{colors[key]}]"
formatted_message = formatted_message.replace(value_str, color_str)
value_str = color_str
# Then apply colors if specified # check is need apply box
color_map = { if boxes and key in boxes:
"green": Fore.GREEN, formatted_message = formatted_message.replace(value_str,
"red": Fore.RED, create_box_message(value_str, type=str(level)))
"yellow": Fore.YELLOW,
"blue": Fore.BLUE,
"cyan": Fore.CYAN,
"magenta": Fore.MAGENTA,
"white": Fore.WHITE,
"black": Fore.BLACK,
"reset": Style.RESET_ALL,
}
if colors:
for key, color in colors.items():
# Find the formatted value in the message and wrap it with color
if color in color_map:
color = color_map[color]
if key in params:
value_str = str(params[key])
formatted_message = formatted_message.replace(
value_str, f"{color}{value_str}{Style.RESET_ALL}"
)
except KeyError as e:
formatted_message = (
f"LOGGING ERROR: Missing parameter {e} in message template"
)
level = LogLevel.ERROR
else: else:
formatted_message = message formatted_message = parsed_message
# Construct the full log line # Construct the full log line
color = base_color or self.colors[level] color: LogColor = base_color or self.colors[level]
log_line = f"{color}{self._format_tag(tag)} {self._get_icon(tag)} {formatted_message}{Style.RESET_ALL}" log_line = f"[{color}]{self._format_tag(tag)} {self._get_icon(tag)} {formatted_message} [/{color}]"
# Output to console if verbose # Output to console if verbose
if self.verbose or kwargs.get("force_verbose", False): if self.verbose or kwargs.get("force_verbose", False):
print(log_line) self.console.print(log_line)
# Write to file if configured # Write to file if configured
self._write_to_file(log_line) self._write_to_file(log_line)
@@ -292,8 +294,8 @@ class AsyncLogger(AsyncLoggerBase):
"timing": timing, "timing": timing,
}, },
colors={ colors={
"status": Fore.GREEN if success else Fore.RED, "status": LogColor.SUCCESS if success else LogColor.ERROR,
"timing": Fore.YELLOW, "timing": LogColor.WARNING,
}, },
) )

View File

@@ -2,7 +2,6 @@ from .__version__ import __version__ as crawl4ai_version
import os import os
import sys import sys
import time import time
from colorama import Fore
from pathlib import Path from pathlib import Path
from typing import Optional, List from typing import Optional, List
import json import json
@@ -44,7 +43,6 @@ from .utils import (
sanitize_input_encode, sanitize_input_encode,
InvalidCSSSelectorError, InvalidCSSSelectorError,
fast_format_html, fast_format_html,
create_box_message,
get_error_context, get_error_context,
RobotsParser, RobotsParser,
preprocess_html_for_schema, preprocess_html_for_schema,
@@ -419,7 +417,7 @@ class AsyncWebCrawler:
self.logger.error_status( self.logger.error_status(
url=url, url=url,
error=create_box_message(error_message, type="error"), error=error_message,
tag="ERROR", tag="ERROR",
) )
@@ -505,6 +503,8 @@ class AsyncWebCrawler:
tables = media.pop("tables", []) tables = media.pop("tables", [])
links = result.links.model_dump() links = result.links.model_dump()
metadata = result.metadata metadata = result.metadata
fit_html = preprocess_html_for_schema(html_content=html, text_threshold= 500, max_size= 300_000)
################################ ################################
# Generate Markdown # # Generate Markdown #
@@ -521,7 +521,7 @@ class AsyncWebCrawler:
html_source_selector = { html_source_selector = {
"raw_html": lambda: html, # The original raw HTML "raw_html": lambda: html, # The original raw HTML
"cleaned_html": lambda: cleaned_html, # The HTML after scraping strategy "cleaned_html": lambda: cleaned_html, # The HTML after scraping strategy
"fit_html": lambda: preprocess_html_for_schema(html_content=html), # Preprocessed raw HTML "fit_html": lambda: fit_html, # The HTML after preprocessing for schema
} }
markdown_input_html = cleaned_html # Default to cleaned_html markdown_input_html = cleaned_html # Default to cleaned_html
@@ -595,6 +595,7 @@ class AsyncWebCrawler:
content = { content = {
"markdown": markdown_result.raw_markdown, "markdown": markdown_result.raw_markdown,
"html": html, "html": html,
"fit_html": fit_html,
"cleaned_html": cleaned_html, "cleaned_html": cleaned_html,
"fit_markdown": markdown_result.fit_markdown, "fit_markdown": markdown_result.fit_markdown,
}.get(content_format, markdown_result.raw_markdown) }.get(content_format, markdown_result.raw_markdown)
@@ -602,7 +603,7 @@ class AsyncWebCrawler:
# Use IdentityChunking for HTML input, otherwise use provided chunking strategy # Use IdentityChunking for HTML input, otherwise use provided chunking strategy
chunking = ( chunking = (
IdentityChunking() IdentityChunking()
if content_format in ["html", "cleaned_html"] if content_format in ["html", "cleaned_html", "fit_html"]
else config.chunking_strategy else config.chunking_strategy
) )
sections = chunking.chunk(content) sections = chunking.chunk(content)
@@ -626,6 +627,7 @@ class AsyncWebCrawler:
return CrawlResult( return CrawlResult(
url=url, url=url,
html=html, html=html,
fit_html=fit_html,
cleaned_html=cleaned_html, cleaned_html=cleaned_html,
markdown=markdown_result, markdown=markdown_result,
media=media, media=media,

View File

@@ -15,12 +15,12 @@ import shutil
import json import json
import subprocess import subprocess
import time import time
from typing import List, Dict, Optional, Any, Tuple from typing import List, Dict, Optional, Any
from colorama import Fore, Style, init from rich.console import Console
from .async_configs import BrowserConfig from .async_configs import BrowserConfig
from .browser_manager import ManagedBrowser from .browser_manager import ManagedBrowser
from .async_logger import AsyncLogger, AsyncLoggerBase from .async_logger import AsyncLogger, AsyncLoggerBase, LogColor
from .utils import get_home_folder from .utils import get_home_folder
@@ -45,8 +45,8 @@ class BrowserProfiler:
logger (AsyncLoggerBase, optional): Logger for outputting messages. logger (AsyncLoggerBase, optional): Logger for outputting messages.
If None, a default AsyncLogger will be created. If None, a default AsyncLogger will be created.
""" """
# Initialize colorama for colorful terminal output # Initialize rich console for colorful input prompts
init() self.console = Console()
# Create a logger if not provided # Create a logger if not provided
if logger is None: if logger is None:
@@ -127,18 +127,18 @@ class BrowserProfiler:
profile_path = os.path.join(self.profiles_dir, profile_name) profile_path = os.path.join(self.profiles_dir, profile_name)
os.makedirs(profile_path, exist_ok=True) os.makedirs(profile_path, exist_ok=True)
# Print instructions for the user with colorama formatting # Print instructions for the user with rich formatting
border = f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}" border = "{'='*80}"
self.logger.info(f"\n{border}", tag="PROFILE") self.logger.info("{border}", tag="PROFILE", params={"border": f"\n{border}"}, colors={"border": LogColor.CYAN})
self.logger.info(f"Creating browser profile: {Fore.GREEN}{profile_name}{Style.RESET_ALL}", tag="PROFILE") self.logger.info("Creating browser profile: {profile_name}", tag="PROFILE", params={"profile_name": profile_name}, colors={"profile_name": LogColor.GREEN})
self.logger.info(f"Profile directory: {Fore.YELLOW}{profile_path}{Style.RESET_ALL}", tag="PROFILE") self.logger.info("Profile directory: {profile_path}", tag="PROFILE", params={"profile_path": profile_path}, colors={"profile_path": LogColor.YELLOW})
self.logger.info("\nInstructions:", tag="PROFILE") self.logger.info("\nInstructions:", tag="PROFILE")
self.logger.info("1. A browser window will open for you to set up your profile.", tag="PROFILE") self.logger.info("1. A browser window will open for you to set up your profile.", tag="PROFILE")
self.logger.info(f"2. {Fore.CYAN}Log in to websites{Style.RESET_ALL}, configure settings, etc. as needed.", tag="PROFILE") self.logger.info("{segment}, configure settings, etc. as needed.", tag="PROFILE", params={"segment": "2. Log in to websites"}, colors={"segment": LogColor.CYAN})
self.logger.info(f"3. When you're done, {Fore.YELLOW}press 'q' in this terminal{Style.RESET_ALL} to close the browser.", tag="PROFILE") self.logger.info("3. When you're done, {segment} to close the browser.", tag="PROFILE", params={"segment": "press 'q' in this terminal"}, colors={"segment": LogColor.YELLOW})
self.logger.info("4. The profile will be saved and ready to use with Crawl4AI.", tag="PROFILE") self.logger.info("4. The profile will be saved and ready to use with Crawl4AI.", tag="PROFILE")
self.logger.info(f"{border}\n", tag="PROFILE") self.logger.info("{border}", tag="PROFILE", params={"border": f"{border}\n"}, colors={"border": LogColor.CYAN})
browser_config.headless = False browser_config.headless = False
browser_config.user_data_dir = profile_path browser_config.user_data_dir = profile_path
@@ -185,7 +185,7 @@ class BrowserProfiler:
import select import select
# First output the prompt # First output the prompt
self.logger.info(f"{Fore.CYAN}Press '{Fore.WHITE}q{Fore.CYAN}' when you've finished using the browser...{Style.RESET_ALL}", tag="PROFILE") self.logger.info("Press 'q' when you've finished using the browser...", tag="PROFILE")
# Save original terminal settings # Save original terminal settings
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
@@ -201,7 +201,7 @@ class BrowserProfiler:
if readable: if readable:
key = sys.stdin.read(1) key = sys.stdin.read(1)
if key.lower() == 'q': if key.lower() == 'q':
self.logger.info(f"{Fore.GREEN}Closing browser and saving profile...{Style.RESET_ALL}", tag="PROFILE") self.logger.info("Closing browser and saving profile...", tag="PROFILE", base_color=LogColor.GREEN)
user_done_event.set() user_done_event.set()
return return
@@ -227,7 +227,7 @@ class BrowserProfiler:
self.logger.error("Failed to start browser process.", tag="PROFILE") self.logger.error("Failed to start browser process.", tag="PROFILE")
return None return None
self.logger.info(f"Browser launched. {Fore.CYAN}Waiting for you to finish...{Style.RESET_ALL}", tag="PROFILE") self.logger.info("Browser launched. Waiting for you to finish...", tag="PROFILE")
# Start listening for keyboard input # Start listening for keyboard input
listener_task = asyncio.create_task(listen_for_quit_command()) listener_task = asyncio.create_task(listen_for_quit_command())
@@ -249,10 +249,10 @@ class BrowserProfiler:
self.logger.info("Terminating browser process...", tag="PROFILE") self.logger.info("Terminating browser process...", tag="PROFILE")
await managed_browser.cleanup() await managed_browser.cleanup()
self.logger.success(f"Browser closed. Profile saved at: {Fore.GREEN}{profile_path}{Style.RESET_ALL}", tag="PROFILE") self.logger.success(f"Browser closed. Profile saved at: {profile_path}", tag="PROFILE")
except Exception as e: except Exception as e:
self.logger.error(f"Error creating profile: {str(e)}", tag="PROFILE") self.logger.error(f"Error creating profile: {e!s}", tag="PROFILE")
await managed_browser.cleanup() await managed_browser.cleanup()
return None return None
finally: finally:
@@ -444,25 +444,27 @@ class BrowserProfiler:
``` ```
""" """
while True: while True:
self.logger.info(f"\n{Fore.CYAN}Profile Management Options:{Style.RESET_ALL}", tag="MENU") self.logger.info("\nProfile Management Options:", tag="MENU")
self.logger.info(f"1. {Fore.GREEN}Create a new profile{Style.RESET_ALL}", tag="MENU") self.logger.info("1. Create a new profile", tag="MENU", base_color=LogColor.GREEN)
self.logger.info(f"2. {Fore.YELLOW}List available profiles{Style.RESET_ALL}", tag="MENU") self.logger.info("2. List available profiles", tag="MENU", base_color=LogColor.YELLOW)
self.logger.info(f"3. {Fore.RED}Delete a profile{Style.RESET_ALL}", tag="MENU") self.logger.info("3. Delete a profile", tag="MENU", base_color=LogColor.RED)
# Only show crawl option if callback provided # Only show crawl option if callback provided
if crawl_callback: if crawl_callback:
self.logger.info(f"4. {Fore.CYAN}Use a profile to crawl a website{Style.RESET_ALL}", tag="MENU") self.logger.info("4. Use a profile to crawl a website", tag="MENU", base_color=LogColor.CYAN)
self.logger.info(f"5. {Fore.MAGENTA}Exit{Style.RESET_ALL}", tag="MENU") self.logger.info("5. Exit", tag="MENU", base_color=LogColor.MAGENTA)
exit_option = "5" exit_option = "5"
else: else:
self.logger.info(f"4. {Fore.MAGENTA}Exit{Style.RESET_ALL}", tag="MENU") self.logger.info("4. Exit", tag="MENU", base_color=LogColor.MAGENTA)
exit_option = "4" exit_option = "4"
choice = input(f"\n{Fore.CYAN}Enter your choice (1-{exit_option}): {Style.RESET_ALL}") self.logger.print(f"\n[cyan]Enter your choice (1-{exit_option}): [/cyan]", end="")
choice = input()
if choice == "1": if choice == "1":
# Create new profile # Create new profile
name = input(f"{Fore.GREEN}Enter a name for the new profile (or press Enter for auto-generated name): {Style.RESET_ALL}") self.console.print("[green]Enter a name for the new profile (or press Enter for auto-generated name): [/green]", end="")
name = input()
await self.create_profile(name or None) await self.create_profile(name or None)
elif choice == "2": elif choice == "2":
@@ -473,11 +475,11 @@ class BrowserProfiler:
self.logger.warning(" No profiles found. Create one first with option 1.", tag="PROFILES") self.logger.warning(" No profiles found. Create one first with option 1.", tag="PROFILES")
continue continue
# Print profile information with colorama formatting # Print profile information
self.logger.info("\nAvailable profiles:", tag="PROFILES") self.logger.info("\nAvailable profiles:", tag="PROFILES")
for i, profile in enumerate(profiles): for i, profile in enumerate(profiles):
self.logger.info(f"[{i+1}] {Fore.CYAN}{profile['name']}{Style.RESET_ALL}", tag="PROFILES") self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES")
self.logger.info(f" Path: {Fore.YELLOW}{profile['path']}{Style.RESET_ALL}", tag="PROFILES") self.logger.info(f" Path: {profile['path']}", tag="PROFILES", base_color=LogColor.YELLOW)
self.logger.info(f" Created: {profile['created'].strftime('%Y-%m-%d %H:%M:%S')}", tag="PROFILES") self.logger.info(f" Created: {profile['created'].strftime('%Y-%m-%d %H:%M:%S')}", tag="PROFILES")
self.logger.info(f" Browser type: {profile['type']}", tag="PROFILES") self.logger.info(f" Browser type: {profile['type']}", tag="PROFILES")
self.logger.info("", tag="PROFILES") # Empty line for spacing self.logger.info("", tag="PROFILES") # Empty line for spacing
@@ -490,12 +492,13 @@ class BrowserProfiler:
continue continue
# Display numbered list # Display numbered list
self.logger.info(f"\n{Fore.YELLOW}Available profiles:{Style.RESET_ALL}", tag="PROFILES") self.logger.info("\nAvailable profiles:", tag="PROFILES", base_color=LogColor.YELLOW)
for i, profile in enumerate(profiles): for i, profile in enumerate(profiles):
self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES") self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES")
# Get profile to delete # Get profile to delete
profile_idx = input(f"{Fore.RED}Enter the number of the profile to delete (or 'c' to cancel): {Style.RESET_ALL}") self.console.print("[red]Enter the number of the profile to delete (or 'c' to cancel): [/red]", end="")
profile_idx = input()
if profile_idx.lower() == 'c': if profile_idx.lower() == 'c':
continue continue
@@ -503,17 +506,18 @@ class BrowserProfiler:
idx = int(profile_idx) - 1 idx = int(profile_idx) - 1
if 0 <= idx < len(profiles): if 0 <= idx < len(profiles):
profile_name = profiles[idx]["name"] profile_name = profiles[idx]["name"]
self.logger.info(f"Deleting profile: {Fore.YELLOW}{profile_name}{Style.RESET_ALL}", tag="PROFILES") self.logger.info(f"Deleting profile: [yellow]{profile_name}[/yellow]", tag="PROFILES")
# Confirm deletion # Confirm deletion
confirm = input(f"{Fore.RED}Are you sure you want to delete this profile? (y/n): {Style.RESET_ALL}") self.console.print("[red]Are you sure you want to delete this profile? (y/n): [/red]", end="")
confirm = input()
if confirm.lower() == 'y': if confirm.lower() == 'y':
success = self.delete_profile(profiles[idx]["path"]) success = self.delete_profile(profiles[idx]["path"])
if success: if success:
self.logger.success(f"Profile {Fore.GREEN}{profile_name}{Style.RESET_ALL} deleted successfully", tag="PROFILES") self.logger.success(f"Profile {profile_name} deleted successfully", tag="PROFILES")
else: else:
self.logger.error(f"Failed to delete profile {Fore.RED}{profile_name}{Style.RESET_ALL}", tag="PROFILES") self.logger.error(f"Failed to delete profile {profile_name}", tag="PROFILES")
else: else:
self.logger.error("Invalid profile number", tag="PROFILES") self.logger.error("Invalid profile number", tag="PROFILES")
except ValueError: except ValueError:
@@ -527,12 +531,13 @@ class BrowserProfiler:
continue continue
# Display numbered list # Display numbered list
self.logger.info(f"\n{Fore.YELLOW}Available profiles:{Style.RESET_ALL}", tag="PROFILES") self.logger.info("\nAvailable profiles:", tag="PROFILES", base_color=LogColor.YELLOW)
for i, profile in enumerate(profiles): for i, profile in enumerate(profiles):
self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES") self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES")
# Get profile to use # Get profile to use
profile_idx = input(f"{Fore.CYAN}Enter the number of the profile to use (or 'c' to cancel): {Style.RESET_ALL}") self.console.print("[cyan]Enter the number of the profile to use (or 'c' to cancel): [/cyan]", end="")
profile_idx = input()
if profile_idx.lower() == 'c': if profile_idx.lower() == 'c':
continue continue
@@ -540,7 +545,8 @@ class BrowserProfiler:
idx = int(profile_idx) - 1 idx = int(profile_idx) - 1
if 0 <= idx < len(profiles): if 0 <= idx < len(profiles):
profile_path = profiles[idx]["path"] profile_path = profiles[idx]["path"]
url = input(f"{Fore.CYAN}Enter the URL to crawl: {Style.RESET_ALL}") self.console.print("[cyan]Enter the URL to crawl: [/cyan]", end="")
url = input()
if url: if url:
# Call the provided crawl callback # Call the provided crawl callback
await crawl_callback(profile_path, url) await crawl_callback(profile_path, url)
@@ -603,11 +609,11 @@ class BrowserProfiler:
# Print initial information # Print initial information
border = f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}" border = f"{Fore.CYAN}{'='*80}{Style.RESET_ALL}"
self.logger.info(f"\n{border}", tag="CDP") self.logger.info(f"\n{border}", tag="CDP")
self.logger.info(f"Launching standalone browser with CDP debugging", tag="CDP") self.logger.info("Launching standalone browser with CDP debugging", tag="CDP")
self.logger.info(f"Browser type: {Fore.GREEN}{browser_type}{Style.RESET_ALL}", tag="CDP") self.logger.info("Browser type: {browser_type}", tag="CDP", params={"browser_type": browser_type}, colors={"browser_type": LogColor.CYAN})
self.logger.info(f"Profile path: {Fore.YELLOW}{profile_path}{Style.RESET_ALL}", tag="CDP") self.logger.info("Profile path: {profile_path}", tag="CDP", params={"profile_path": profile_path}, colors={"profile_path": LogColor.YELLOW})
self.logger.info(f"Debugging port: {Fore.CYAN}{debugging_port}{Style.RESET_ALL}", tag="CDP") self.logger.info(f"Debugging port: {debugging_port}", tag="CDP")
self.logger.info(f"Headless mode: {Fore.CYAN}{headless}{Style.RESET_ALL}", tag="CDP") self.logger.info(f"Headless mode: {headless}", tag="CDP")
# Create managed browser instance # Create managed browser instance
managed_browser = ManagedBrowser( managed_browser = ManagedBrowser(
@@ -650,7 +656,7 @@ class BrowserProfiler:
import select import select
# First output the prompt # First output the prompt
self.logger.info(f"{Fore.CYAN}Press '{Fore.WHITE}q{Fore.CYAN}' to stop the browser and exit...{Style.RESET_ALL}", tag="CDP") self.logger.info("Press 'q' to stop the browser and exit...", tag="CDP")
# Save original terminal settings # Save original terminal settings
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
@@ -666,7 +672,7 @@ class BrowserProfiler:
if readable: if readable:
key = sys.stdin.read(1) key = sys.stdin.read(1)
if key.lower() == 'q': if key.lower() == 'q':
self.logger.info(f"{Fore.GREEN}Closing browser...{Style.RESET_ALL}", tag="CDP") self.logger.info("Closing browser...", tag="CDP")
user_done_event.set() user_done_event.set()
return return
@@ -720,20 +726,20 @@ class BrowserProfiler:
self.logger.error("Failed to start browser process.", tag="CDP") self.logger.error("Failed to start browser process.", tag="CDP")
return None return None
self.logger.info(f"Browser launched successfully. Retrieving CDP information...", tag="CDP") self.logger.info("Browser launched successfully. Retrieving CDP information...", tag="CDP")
# Get CDP URL and JSON config # Get CDP URL and JSON config
cdp_url, config_json = await get_cdp_json(debugging_port) cdp_url, config_json = await get_cdp_json(debugging_port)
if cdp_url: if cdp_url:
self.logger.success(f"CDP URL: {Fore.GREEN}{cdp_url}{Style.RESET_ALL}", tag="CDP") self.logger.success(f"CDP URL: {cdp_url}", tag="CDP")
if config_json: if config_json:
# Display relevant CDP information # Display relevant CDP information
self.logger.info(f"Browser: {Fore.CYAN}{config_json.get('Browser', 'Unknown')}{Style.RESET_ALL}", tag="CDP") self.logger.info(f"Browser: {config_json.get('Browser', 'Unknown')}", tag="CDP", colors={"Browser": LogColor.CYAN})
self.logger.info(f"Protocol Version: {config_json.get('Protocol-Version', 'Unknown')}", tag="CDP") self.logger.info(f"Protocol Version: {config_json.get('Protocol-Version', 'Unknown')}", tag="CDP", colors={"Protocol-Version": LogColor.CYAN})
if 'webSocketDebuggerUrl' in config_json: if 'webSocketDebuggerUrl' in config_json:
self.logger.info(f"WebSocket URL: {Fore.GREEN}{config_json['webSocketDebuggerUrl']}{Style.RESET_ALL}", tag="CDP") self.logger.info("WebSocket URL: {webSocketDebuggerUrl}", tag="CDP", params={"webSocketDebuggerUrl": config_json['webSocketDebuggerUrl']}, colors={"webSocketDebuggerUrl": LogColor.GREEN})
else: else:
self.logger.warning("Could not retrieve CDP configuration JSON", tag="CDP") self.logger.warning("Could not retrieve CDP configuration JSON", tag="CDP")
else: else:
@@ -761,7 +767,7 @@ class BrowserProfiler:
self.logger.info("Terminating browser process...", tag="CDP") self.logger.info("Terminating browser process...", tag="CDP")
await managed_browser.cleanup() await managed_browser.cleanup()
self.logger.success(f"Browser closed.", tag="CDP") self.logger.success("Browser closed.", tag="CDP")
except Exception as e: except Exception as e:
self.logger.error(f"Error launching standalone browser: {str(e)}", tag="CDP") self.logger.error(f"Error launching standalone browser: {str(e)}", tag="CDP")

View File

@@ -27,8 +27,7 @@ import json
import hashlib import hashlib
from pathlib import Path from pathlib import Path
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from .async_logger import AsyncLogger, LogLevel from .async_logger import AsyncLogger, LogLevel, LogColor
from colorama import Fore, Style
class RelevantContentFilter(ABC): class RelevantContentFilter(ABC):
@@ -846,8 +845,7 @@ class LLMContentFilter(RelevantContentFilter):
}, },
colors={ colors={
**AsyncLogger.DEFAULT_COLORS, **AsyncLogger.DEFAULT_COLORS,
LogLevel.INFO: Fore.MAGENTA LogLevel.INFO: LogColor.DIM_MAGENTA # Dimmed purple for LLM ops
+ Style.DIM, # Dimmed purple for LLM ops
}, },
) )
else: else:
@@ -892,7 +890,7 @@ class LLMContentFilter(RelevantContentFilter):
"Starting LLM markdown content filtering process", "Starting LLM markdown content filtering process",
tag="LLM", tag="LLM",
params={"provider": self.llm_config.provider}, params={"provider": self.llm_config.provider},
colors={"provider": Fore.CYAN}, colors={"provider": LogColor.CYAN},
) )
# Cache handling # Cache handling
@@ -929,7 +927,7 @@ class LLMContentFilter(RelevantContentFilter):
"LLM markdown: Split content into {chunk_count} chunks", "LLM markdown: Split content into {chunk_count} chunks",
tag="CHUNK", tag="CHUNK",
params={"chunk_count": len(html_chunks)}, params={"chunk_count": len(html_chunks)},
colors={"chunk_count": Fore.YELLOW}, colors={"chunk_count": LogColor.YELLOW},
) )
start_time = time.time() start_time = time.time()
@@ -1038,7 +1036,7 @@ class LLMContentFilter(RelevantContentFilter):
"LLM markdown: Completed processing in {time:.2f}s", "LLM markdown: Completed processing in {time:.2f}s",
tag="LLM", tag="LLM",
params={"time": end_time - start_time}, params={"time": end_time - start_time},
colors={"time": Fore.YELLOW}, colors={"time": LogColor.YELLOW},
) )
result = ordered_results if ordered_results else [] result = ordered_results if ordered_results else []

View File

@@ -1,9 +1,10 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import inspect import inspect
from typing import Any, List, Dict, Optional from typing import Any, List, Dict, Optional, Tuple, Pattern, Union
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
import json import json
import time import time
from enum import IntFlag, auto
from .prompts import PROMPT_EXTRACT_BLOCKS, PROMPT_EXTRACT_BLOCKS_WITH_INSTRUCTION, PROMPT_EXTRACT_SCHEMA_WITH_INSTRUCTION, JSON_SCHEMA_BUILDER_XPATH, PROMPT_EXTRACT_INFERRED_SCHEMA from .prompts import PROMPT_EXTRACT_BLOCKS, PROMPT_EXTRACT_BLOCKS_WITH_INSTRUCTION, PROMPT_EXTRACT_SCHEMA_WITH_INSTRUCTION, JSON_SCHEMA_BUILDER_XPATH, PROMPT_EXTRACT_INFERRED_SCHEMA
from .config import ( from .config import (
@@ -1668,3 +1669,303 @@ class JsonXPathExtractionStrategy(JsonElementExtractionStrategy):
def _get_element_attribute(self, element, attribute: str): def _get_element_attribute(self, element, attribute: str):
return element.get(attribute) return element.get(attribute)
"""
RegexExtractionStrategy
Fast, zero-LLM extraction of common entities via regular expressions.
"""
_CTRL = {c: rf"\x{ord(c):02x}" for c in map(chr, range(32)) if c not in "\t\n\r"}
_WB_FIX = re.compile(r"\x08") # stray back-space → word-boundary
_NEEDS_ESCAPE = re.compile(r"(?<!\\)\\(?![\\u])") # lone backslash
def _sanitize_schema(schema: Dict[str, str]) -> Dict[str, str]:
"""Fix common JSON-escape goofs coming from LLMs or manual edits."""
safe = {}
for label, pat in schema.items():
# 1⃣ replace accidental control chars (inc. the infamous back-space)
pat = _WB_FIX.sub(r"\\b", pat).translate(_CTRL)
# 2⃣ double any single backslash that JSON kept single
pat = _NEEDS_ESCAPE.sub(r"\\\\", pat)
# 3⃣ quick sanity compile
try:
re.compile(pat)
except re.error as e:
raise ValueError(f"Regex for '{label}' wont compile after fix: {e}") from None
safe[label] = pat
return safe
class RegexExtractionStrategy(ExtractionStrategy):
"""
A lean strategy that finds e-mails, phones, URLs, dates, money, etc.,
using nothing but pre-compiled regular expressions.
Extraction returns::
{
"url": "<page-url>",
"label": "<pattern-label>",
"value": "<matched-string>",
"span": [start, end]
}
Only `generate_schema()` touches an LLM, extraction itself is pure Python.
"""
# -------------------------------------------------------------- #
# Built-in patterns exposed as IntFlag so callers can bit-OR them
# -------------------------------------------------------------- #
class _B(IntFlag):
EMAIL = auto()
PHONE_INTL = auto()
PHONE_US = auto()
URL = auto()
IPV4 = auto()
IPV6 = auto()
UUID = auto()
CURRENCY = auto()
PERCENTAGE = auto()
NUMBER = auto()
DATE_ISO = auto()
DATE_US = auto()
TIME_24H = auto()
POSTAL_US = auto()
POSTAL_UK = auto()
HTML_COLOR_HEX = auto()
TWITTER_HANDLE = auto()
HASHTAG = auto()
MAC_ADDR = auto()
IBAN = auto()
CREDIT_CARD = auto()
NOTHING = auto()
ALL = (
EMAIL | PHONE_INTL | PHONE_US | URL | IPV4 | IPV6 | UUID
| CURRENCY | PERCENTAGE | NUMBER | DATE_ISO | DATE_US | TIME_24H
| POSTAL_US | POSTAL_UK | HTML_COLOR_HEX | TWITTER_HANDLE
| HASHTAG | MAC_ADDR | IBAN | CREDIT_CARD
)
# user-friendly aliases (RegexExtractionStrategy.Email, .IPv4, …)
Email = _B.EMAIL
PhoneIntl = _B.PHONE_INTL
PhoneUS = _B.PHONE_US
Url = _B.URL
IPv4 = _B.IPV4
IPv6 = _B.IPV6
Uuid = _B.UUID
Currency = _B.CURRENCY
Percentage = _B.PERCENTAGE
Number = _B.NUMBER
DateIso = _B.DATE_ISO
DateUS = _B.DATE_US
Time24h = _B.TIME_24H
PostalUS = _B.POSTAL_US
PostalUK = _B.POSTAL_UK
HexColor = _B.HTML_COLOR_HEX
TwitterHandle = _B.TWITTER_HANDLE
Hashtag = _B.HASHTAG
MacAddr = _B.MAC_ADDR
Iban = _B.IBAN
CreditCard = _B.CREDIT_CARD
All = _B.ALL
Nothing = _B(0) # no patterns
# ------------------------------------------------------------------ #
# Built-in pattern catalog
# ------------------------------------------------------------------ #
DEFAULT_PATTERNS: Dict[str, str] = {
# Communication
"email": r"[\w.+-]+@[\w-]+\.[\w.-]+",
"phone_intl": r"\+?\d[\d .()-]{7,}\d",
"phone_us": r"\(?\d{3}\)?[ -. ]?\d{3}[ -. ]?\d{4}",
# Web
"url": r"https?://[^\s\"'<>]+",
"ipv4": r"(?:\d{1,3}\.){3}\d{1,3}",
"ipv6": r"[A-F0-9]{1,4}(?::[A-F0-9]{1,4}){7}",
# IDs
"uuid": r"[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}",
# Money / numbers
"currency": r"(?:USD|EUR|RM|\$|€|£)\s?\d+(?:[.,]\d{2})?",
"percentage": r"\d+(?:\.\d+)?%",
"number": r"\b\d{1,3}(?:[,.\s]\d{3})*(?:\.\d+)?\b",
# Dates / Times
"date_iso": r"\d{4}-\d{2}-\d{2}",
"date_us": r"\d{1,2}/\d{1,2}/\d{2,4}",
"time_24h": r"\b(?:[01]?\d|2[0-3]):[0-5]\d(?:[:.][0-5]\d)?\b",
# Misc
"postal_us": r"\b\d{5}(?:-\d{4})?\b",
"postal_uk": r"\b[A-Z]{1,2}\d[A-Z\d]? ?\d[A-Z]{2}\b",
"html_color_hex": r"#[0-9A-Fa-f]{6}\b",
"twitter_handle": r"@[\w]{1,15}",
"hashtag": r"#[\w-]+",
"mac_addr": r"(?:[0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}",
"iban": r"[A-Z]{2}\d{2}[A-Z0-9]{11,30}",
"credit_card": r"\b(?:4\d{12}(?:\d{3})?|5[1-5]\d{14}|3[47]\d{13}|6(?:011|5\d{2})\d{12})\b",
}
_FLAGS = re.IGNORECASE | re.MULTILINE
_UNWANTED_PROPS = {
"provider": "Use llm_config instead",
"api_token": "Use llm_config instead",
}
# ------------------------------------------------------------------ #
# Construction
# ------------------------------------------------------------------ #
def __init__(
self,
pattern: "_B" = _B.NOTHING,
*,
custom: Optional[Union[Dict[str, str], List[Tuple[str, str]]]] = None,
input_format: str = "fit_html",
**kwargs,
) -> None:
"""
Args:
patterns: Custom patterns overriding or extending defaults.
Dict[label, regex] or list[tuple(label, regex)].
input_format: "html", "markdown" or "text".
**kwargs: Forwarded to ExtractionStrategy.
"""
super().__init__(input_format=input_format, **kwargs)
# 1⃣ take only the requested built-ins
merged: Dict[str, str] = {
key: rx
for key, rx in self.DEFAULT_PATTERNS.items()
if getattr(self._B, key.upper()).value & pattern
}
# 2⃣ apply user overrides / additions
if custom:
if isinstance(custom, dict):
merged.update(custom)
else: # iterable of (label, regex)
merged.update({lbl: rx for lbl, rx in custom})
self._compiled: Dict[str, Pattern] = {
lbl: re.compile(rx, self._FLAGS) for lbl, rx in merged.items()
}
# ------------------------------------------------------------------ #
# Extraction
# ------------------------------------------------------------------ #
def extract(self, url: str, content: str, *q, **kw) -> List[Dict[str, Any]]:
# text = self._plain_text(html)
out: List[Dict[str, Any]] = []
for label, cre in self._compiled.items():
for m in cre.finditer(content):
out.append(
{
"url": url,
"label": label,
"value": m.group(0),
"span": [m.start(), m.end()],
}
)
return out
# ------------------------------------------------------------------ #
# Helpers
# ------------------------------------------------------------------ #
def _plain_text(self, content: str) -> str:
if self.input_format == "text":
return content
return BeautifulSoup(content, "lxml").get_text(" ", strip=True)
# ------------------------------------------------------------------ #
# LLM-assisted pattern generator
# ------------------------------------------------------------------ #
# ------------------------------------------------------------------ #
# LLM-assisted one-off pattern builder
# ------------------------------------------------------------------ #
@staticmethod
def generate_pattern(
label: str,
html: str,
*,
query: Optional[str] = None,
examples: Optional[List[str]] = None,
llm_config: Optional[LLMConfig] = None,
**kwargs,
) -> Dict[str, str]:
"""
Ask an LLM for a single page-specific regex and return
{label: pattern} ── ready for RegexExtractionStrategy(custom=…)
"""
# ── guard deprecated kwargs
for k in RegexExtractionStrategy._UNWANTED_PROPS:
if k in kwargs:
raise AttributeError(
f"{k} is deprecated, {RegexExtractionStrategy._UNWANTED_PROPS[k]}"
)
# ── default LLM config
if llm_config is None:
llm_config = create_llm_config()
# ── system prompt hardened
system_msg = (
"You are an expert Python-regex engineer.\n"
f"Return **one** JSON object whose single key is exactly \"{label}\", "
"and whose value is a raw-string regex pattern that works with "
"the standard `re` module in Python.\n\n"
"Strict rules (obey every bullet):\n"
"• If a *user query* is supplied, treat it as the precise semantic target and optimise the "
" pattern to capture ONLY text that answers that query. If the query conflicts with the "
" sample HTML, the HTML wins.\n"
"• Tailor the pattern to the *sample HTML* reproduce its exact punctuation, spacing, "
" symbols, capitalisation, etc. Do **NOT** invent a generic form.\n"
"• Keep it minimal and fast: avoid unnecessary capturing, prefer non-capturing `(?: … )`, "
" and guard against catastrophic backtracking.\n"
"• Anchor with `^`, `$`, or `\\b` only when it genuinely improves precision.\n"
"• Use inline flags like `(?i)` when needed; no verbose flag comments.\n"
"• Output must be valid JSON no markdown, code fences, comments, or extra keys.\n"
"• The regex value must be a Python string literal: **double every backslash** "
"(e.g. `\\\\b`, `\\\\d`, `\\\\\\\\`).\n\n"
"Example valid output:\n"
f"{{\"{label}\": \"(?:RM|rm)\\\\s?\\\\d{{1,3}}(?:,\\\\d{{3}})*(?:\\\\.\\\\d{{2}})?\"}}"
)
# ── user message: cropped HTML + optional hints
user_parts = ["```html", html[:5000], "```"] # protect token budget
if query:
user_parts.append(f"\n\n## Query\n{query.strip()}")
if examples:
user_parts.append("## Examples\n" + "\n".join(examples[:20]))
user_msg = "\n\n".join(user_parts)
# ── LLM call (with retry/backoff)
resp = perform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_msg, user_msg]),
json_response=True,
api_token=llm_config.api_token,
base_url=llm_config.base_url,
extra_args=kwargs,
)
# ── clean & load JSON (fix common escape mistakes *before* json.loads)
raw = resp.choices[0].message.content
raw = raw.replace("\x08", "\\b") # stray back-space → \b
raw = re.sub(r'(?<!\\)\\(?![\\u"])', r"\\\\", raw) # lone \ → \\
try:
pattern_dict = json.loads(raw)
except Exception as exc:
raise ValueError(f"LLM did not return valid JSON: {raw}") from exc
# quick sanity-compile
for lbl, pat in pattern_dict.items():
try:
re.compile(pat)
except re.error as e:
raise ValueError(f"Invalid regex for '{lbl}': {e}") from None
return pattern_dict

View File

@@ -129,6 +129,7 @@ class MarkdownGenerationResult(BaseModel):
class CrawlResult(BaseModel): class CrawlResult(BaseModel):
url: str url: str
html: str html: str
fit_html: Optional[str] = None
success: bool success: bool
cleaned_html: Optional[str] = None cleaned_html: Optional[str] = None
media: Dict[str, List[Dict]] = {} media: Dict[str, List[Dict]] = {}

View File

@@ -20,7 +20,6 @@ from urllib.parse import urljoin
import requests import requests
from requests.exceptions import InvalidSchema from requests.exceptions import InvalidSchema
import xxhash import xxhash
from colorama import Fore, Style, init
import textwrap import textwrap
import cProfile import cProfile
import pstats import pstats
@@ -441,14 +440,13 @@ def create_box_message(
str: A formatted string containing the styled message box. str: A formatted string containing the styled message box.
""" """
init()
# Define border and text colors for different types # Define border and text colors for different types
styles = { styles = {
"warning": (Fore.YELLOW, Fore.LIGHTYELLOW_EX, ""), "warning": ("yellow", "bright_yellow", ""),
"info": (Fore.BLUE, Fore.LIGHTBLUE_EX, ""), "info": ("blue", "bright_blue", ""),
"success": (Fore.GREEN, Fore.LIGHTGREEN_EX, ""), "debug": ("lightblack", "bright_black", ""),
"error": (Fore.RED, Fore.LIGHTRED_EX, "×"), "success": ("green", "bright_green", ""),
"error": ("red", "bright_red", "×"),
} }
border_color, text_color, prefix = styles.get(type.lower(), styles["info"]) border_color, text_color, prefix = styles.get(type.lower(), styles["info"])
@@ -480,12 +478,12 @@ def create_box_message(
# Create the box with colored borders and lighter text # Create the box with colored borders and lighter text
horizontal_line = h_line * (width - 1) horizontal_line = h_line * (width - 1)
box = [ box = [
f"{border_color}{tl}{horizontal_line}{tr}", f"[{border_color}]{tl}{horizontal_line}{tr}[/{border_color}]",
*[ *[
f"{border_color}{v_line}{text_color} {line:<{width-2}}{border_color}{v_line}" f"[{border_color}]{v_line}[{text_color}] {line:<{width-2}}[/{text_color}][{border_color}]{v_line}[/{border_color}]"
for line in formatted_lines for line in formatted_lines
], ],
f"{border_color}{bl}{horizontal_line}{br}{Style.RESET_ALL}", f"[{border_color}]{bl}{horizontal_line}{br}[/{border_color}]",
] ]
result = "\n".join(box) result = "\n".join(box)
@@ -2737,33 +2735,67 @@ def preprocess_html_for_schema(html_content, text_threshold=100, attr_value_thre
# Also truncate tail text if present # Also truncate tail text if present
if element.tail and len(element.tail.strip()) > text_threshold: if element.tail and len(element.tail.strip()) > text_threshold:
element.tail = element.tail.strip()[:text_threshold] + '...' element.tail = element.tail.strip()[:text_threshold] + '...'
# 4. Find repeated patterns and keep only a few examples # 4. Detect duplicates and drop them in a single pass
# This is a simplistic approach - more sophisticated pattern detection could be implemented seen: dict[tuple, None] = {}
pattern_elements = {} for el in list(tree.xpath('//*[@class]')): # snapshot once, XPath is fast
for element in tree.xpath('//*[contains(@class, "")]'): parent = el.getparent()
parent = element.getparent()
if parent is None: if parent is None:
continue continue
# Create a signature based on tag and classes cls = el.get('class')
classes = element.get('class', '') if not cls:
if not classes:
continue continue
signature = f"{element.tag}.{classes}"
# ── build signature ───────────────────────────────────────────
if signature in pattern_elements: h = xxhash.xxh64() # stream, no big join()
pattern_elements[signature].append(element) for txt in el.itertext():
h.update(txt)
sig = (el.tag, cls, h.intdigest()) # tuple cheaper & hashable
# ── first seen? keep else drop ─────────────
if sig in seen and parent is not None:
parent.remove(el) # duplicate
else: else:
pattern_elements[signature] = [element] seen[sig] = None
# Keep only 3 examples of each repeating pattern # # 4. Find repeated patterns and keep only a few examples
for signature, elements in pattern_elements.items(): # # This is a simplistic approach - more sophisticated pattern detection could be implemented
if len(elements) > 3: # pattern_elements = {}
# Keep the first 2 and last elements # for element in tree.xpath('//*[contains(@class, "")]'):
for element in elements[2:-1]: # parent = element.getparent()
if element.getparent() is not None: # if parent is None:
element.getparent().remove(element) # continue
# # Create a signature based on tag and classes
# classes = element.get('class', '')
# if not classes:
# continue
# innert_text = ''.join(element.xpath('.//text()'))
# innert_text_hash = xxhash.xxh64(innert_text.encode()).hexdigest()
# signature = f"{element.tag}.{classes}.{innert_text_hash}"
# if signature in pattern_elements:
# pattern_elements[signature].append(element)
# else:
# pattern_elements[signature] = [element]
# # Keep only first examples of each repeating pattern
# for signature, elements in pattern_elements.items():
# if len(elements) > 1:
# # Keep the first element and remove the rest
# for element in elements[1:]:
# if element.getparent() is not None:
# element.getparent().remove(element)
# # Keep only 3 examples of each repeating pattern
# for signature, elements in pattern_elements.items():
# if len(elements) > 3:
# # Keep the first 2 and last elements
# for element in elements[2:-1]:
# if element.getparent() is not None:
# element.getparent().remove(element)
# 5. Convert back to string # 5. Convert back to string
result = etree.tostring(tree, encoding='unicode', method='html') result = etree.tostring(tree, encoding='unicode', method='html')
@@ -2778,4 +2810,3 @@ def preprocess_html_for_schema(html_content, text_threshold=100, attr_value_thre
# Fallback for parsing errors # Fallback for parsing errors
return html_content[:max_size] if len(html_content) > max_size else html_content return html_content[:max_size] if len(html_content) > max_size else html_content

View File

@@ -1,8 +1,10 @@
import os import os
import json import json
import asyncio import asyncio
from typing import List, Tuple from typing import List, Tuple, Dict
from functools import partial from functools import partial
from uuid import uuid4
from datetime import datetime
import logging import logging
from typing import Optional, AsyncGenerator from typing import Optional, AsyncGenerator
@@ -272,7 +274,9 @@ async def handle_llm_request(
async def handle_task_status( async def handle_task_status(
redis: aioredis.Redis, redis: aioredis.Redis,
task_id: str, task_id: str,
base_url: str base_url: str,
*,
keep: bool = False
) -> JSONResponse: ) -> JSONResponse:
"""Handle task status check requests.""" """Handle task status check requests."""
task = await redis.hgetall(f"task:{task_id}") task = await redis.hgetall(f"task:{task_id}")
@@ -286,7 +290,7 @@ async def handle_task_status(
response = create_task_response(task, task_id, base_url) response = create_task_response(task, task_id, base_url)
if task["status"] in [TaskStatus.COMPLETED, TaskStatus.FAILED]: if task["status"] in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
if should_cleanup_task(task["created_at"]): if not keep and should_cleanup_task(task["created_at"]):
await redis.delete(f"task:{task_id}") await redis.delete(f"task:{task_id}")
return JSONResponse(response) return JSONResponse(response)
@@ -520,4 +524,48 @@ async def handle_stream_crawl_request(
raise HTTPException( raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e) detail=str(e)
) )
async def handle_crawl_job(
redis,
background_tasks: BackgroundTasks,
urls: List[str],
browser_config: Dict,
crawler_config: Dict,
config: Dict,
) -> Dict:
"""
Fire-and-forget version of handle_crawl_request.
Creates a task in Redis, runs the heavy work in a background task,
lets /crawl/job/{task_id} polling fetch the result.
"""
task_id = f"crawl_{uuid4().hex[:8]}"
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.PROCESSING, # <-- keep enum values consistent
"created_at": datetime.utcnow().isoformat(),
"url": json.dumps(urls), # store list as JSON string
"result": "",
"error": "",
})
async def _runner():
try:
result = await handle_crawl_request(
urls=urls,
browser_config=browser_config,
crawler_config=crawler_config,
config=config,
)
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.COMPLETED,
"result": json.dumps(result),
})
await asyncio.sleep(5) # Give Redis time to process the update
except Exception as exc:
await redis.hset(f"task:{task_id}", mapping={
"status": TaskStatus.FAILED,
"error": str(exc),
})
background_tasks.add_task(_runner)
return {"task_id": task_id}

View File

@@ -3,7 +3,7 @@ app:
title: "Crawl4AI API" title: "Crawl4AI API"
version: "1.0.0" version: "1.0.0"
host: "0.0.0.0" host: "0.0.0.0"
port: 11235 port: 11234
reload: False reload: False
workers: 1 workers: 1
timeout_keep_alive: 300 timeout_keep_alive: 300

99
deploy/docker/job.py Normal file
View File

@@ -0,0 +1,99 @@
"""
Job endpoints (enqueue + poll) for long-running LLM extraction and raw crawl.
Relies on the existing Redis task helpers in api.py
"""
from typing import Dict, Optional, Callable
from fastapi import APIRouter, BackgroundTasks, Depends, Request
from pydantic import BaseModel, HttpUrl
from api import (
handle_llm_request,
handle_crawl_job,
handle_task_status,
)
# ------------- dependency placeholders -------------
_redis = None # will be injected from server.py
_config = None
_token_dep: Callable = lambda: None # dummy until injected
# public router
router = APIRouter()
# === init hook called by server.py =========================================
def init_job_router(redis, config, token_dep) -> APIRouter:
"""Inject shared singletons and return the router for mounting."""
global _redis, _config, _token_dep
_redis, _config, _token_dep = redis, config, token_dep
return router
# ---------- payload models --------------------------------------------------
class LlmJobPayload(BaseModel):
url: HttpUrl
q: str
schema: Optional[str] = None
cache: bool = False
class CrawlJobPayload(BaseModel):
urls: list[HttpUrl]
browser_config: Dict = {}
crawler_config: Dict = {}
# ---------- LLM job ---------------------------------------------------------
@router.post("/llm/job", status_code=202)
async def llm_job_enqueue(
payload: LlmJobPayload,
background_tasks: BackgroundTasks,
request: Request,
_td: Dict = Depends(lambda: _token_dep()), # late-bound dep
):
return await handle_llm_request(
_redis,
background_tasks,
request,
str(payload.url),
query=payload.q,
schema=payload.schema,
cache=payload.cache,
config=_config,
)
@router.get("/llm/job/{task_id}")
async def llm_job_status(
request: Request,
task_id: str,
_td: Dict = Depends(lambda: _token_dep())
):
return await handle_task_status(_redis, task_id)
# ---------- CRAWL job -------------------------------------------------------
@router.post("/crawl/job", status_code=202)
async def crawl_job_enqueue(
payload: CrawlJobPayload,
background_tasks: BackgroundTasks,
_td: Dict = Depends(lambda: _token_dep()),
):
return await handle_crawl_job(
_redis,
background_tasks,
[str(u) for u in payload.urls],
payload.browser_config,
payload.crawler_config,
config=_config,
)
@router.get("/crawl/job/{task_id}")
async def crawl_job_status(
request: Request,
task_id: str,
_td: Dict = Depends(lambda: _token_dep())
):
return await handle_task_status(_redis, task_id, base_url=str(request.base_url))

42
deploy/docker/schemas.py Normal file
View File

@@ -0,0 +1,42 @@
from typing import List, Optional, Dict
from enum import Enum
from pydantic import BaseModel, Field
from utils import FilterType
class CrawlRequest(BaseModel):
urls: List[str] = Field(min_length=1, max_length=100)
browser_config: Optional[Dict] = Field(default_factory=dict)
crawler_config: Optional[Dict] = Field(default_factory=dict)
class MarkdownRequest(BaseModel):
"""Request body for the /md endpoint."""
url: str = Field(..., description="Absolute http/https URL to fetch")
f: FilterType = Field(FilterType.FIT,
description="Contentfilter strategy: FIT, RAW, BM25, or LLM")
q: Optional[str] = Field(None, description="Query string used by BM25/LLM filters")
c: Optional[str] = Field("0", description="Cachebust / revision counter")
class RawCode(BaseModel):
code: str
class HTMLRequest(BaseModel):
url: str
class ScreenshotRequest(BaseModel):
url: str
screenshot_wait_for: Optional[float] = 2
output_path: Optional[str] = None
class PDFRequest(BaseModel):
url: str
output_path: Optional[str] = None
class JSEndpointRequest(BaseModel):
url: str
scripts: List[str] = Field(
...,
description="List of separated JavaScript snippets to execute"
)

View File

@@ -12,7 +12,7 @@ from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from auth import create_access_token, get_token_dependency, TokenRequest from auth import create_access_token, get_token_dependency, TokenRequest
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional, List, Dict from typing import Optional, List, Dict
from fastapi import Request, Depends from fastapi import Request, Depends
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
import base64 import base64
import re import re
@@ -22,6 +22,16 @@ from api import (
handle_stream_crawl_request, handle_crawl_request, handle_stream_crawl_request, handle_crawl_request,
stream_results stream_results
) )
from schemas import (
CrawlRequest,
MarkdownRequest,
RawCode,
HTMLRequest,
ScreenshotRequest,
PDFRequest,
JSEndpointRequest,
)
from utils import ( from utils import (
FilterType, load_config, setup_logging, verify_email_domain FilterType, load_config, setup_logging, verify_email_domain
) )
@@ -37,23 +47,13 @@ from fastapi import (
FastAPI, HTTPException, Request, Path, Query, Depends FastAPI, HTTPException, Request, Path, Query, Depends
) )
from rank_bm25 import BM25Okapi from rank_bm25 import BM25Okapi
def chunk_code_functions(code: str) -> List[str]:
tree = ast.parse(code)
lines = code.splitlines()
chunks = []
for node in tree.body:
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
start = node.lineno - 1
end = getattr(node, 'end_lineno', start + 1)
chunks.append("\n".join(lines[start:end]))
return chunks
from fastapi.responses import ( from fastapi.responses import (
StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse StreamingResponse, RedirectResponse, PlainTextResponse, JSONResponse
) )
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from job import init_job_router
from mcp_bridge import attach_mcp, mcp_resource, mcp_template, mcp_tool from mcp_bridge import attach_mcp, mcp_resource, mcp_template, mcp_tool
@@ -129,8 +129,6 @@ app.mount(
name="play", name="play",
) )
# Optional nicetohave: opening the root shows the playground
@app.get("/") @app.get("/")
async def root(): async def root():
@@ -211,48 +209,10 @@ def _safe_eval_config(expr: str) -> dict:
return obj.dump() return obj.dump()
# ───────────────────────── Schemas ─────────────────────────── # ── job router ──────────────────────────────────────────────
class CrawlRequest(BaseModel): app.include_router(init_job_router(redis, config, token_dep))
urls: List[str] = Field(min_length=1, max_length=100)
browser_config: Optional[Dict] = Field(default_factory=dict)
crawler_config: Optional[Dict] = Field(default_factory=dict)
# ────────────── Schemas ──────────────
class MarkdownRequest(BaseModel):
"""Request body for the /md endpoint."""
url: str = Field(..., description="Absolute http/https URL to fetch")
f: FilterType = Field(FilterType.FIT,
description="Contentfilter strategy: FIT, RAW, BM25, or LLM")
q: Optional[str] = Field(None, description="Query string used by BM25/LLM filters")
c: Optional[str] = Field("0", description="Cachebust / revision counter")
class RawCode(BaseModel):
code: str
class HTMLRequest(BaseModel):
url: str
class ScreenshotRequest(BaseModel):
url: str
screenshot_wait_for: Optional[float] = 2
output_path: Optional[str] = None
class PDFRequest(BaseModel):
url: str
output_path: Optional[str] = None
class JSEndpointRequest(BaseModel):
url: str
scripts: List[str] = Field(
...,
description="List of separated JavaScript snippets to execute"
)
# ──────────────────────── Endpoints ────────────────────────── # ──────────────────────── Endpoints ──────────────────────────
@app.post("/token") @app.post("/token")
async def get_token(req: TokenRequest): async def get_token(req: TokenRequest):
if not verify_email_domain(req.email): if not verify_email_domain(req.email):
@@ -278,7 +238,8 @@ async def get_markdown(
_td: Dict = Depends(token_dep), _td: Dict = Depends(token_dep),
): ):
if not body.url.startswith(("http://", "https://")): if not body.url.startswith(("http://", "https://")):
raise HTTPException(400, "URL must be absolute and start with http/https") raise HTTPException(
400, "URL must be absolute and start with http/https")
markdown = await handle_markdown_request( markdown = await handle_markdown_request(
body.url, body.f, body.q, body.c, config body.url, body.f, body.q, body.c, config
) )
@@ -314,12 +275,13 @@ async def generate_html(
# Screenshot endpoint # Screenshot endpoint
@app.post("/screenshot") @app.post("/screenshot")
@limiter.limit(config["rate_limiting"]["default_limit"]) @limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("screenshot") @mcp_tool("screenshot")
async def generate_screenshot( async def generate_screenshot(
request: Request, request: Request,
body: ScreenshotRequest, body: ScreenshotRequest,
_td: Dict = Depends(token_dep), _td: Dict = Depends(token_dep),
): ):
""" """
@@ -327,7 +289,8 @@ async def generate_screenshot(
Use when you need an image snapshot of the rendered page. Its recommened to provide an output path to save the screenshot. Use when you need an image snapshot of the rendered page. Its recommened to provide an output path to save the screenshot.
Then in result instead of the screenshot you will get a path to the saved file. Then in result instead of the screenshot you will get a path to the saved file.
""" """
cfg = CrawlerRunConfig(screenshot=True, screenshot_wait_for=body.screenshot_wait_for) cfg = CrawlerRunConfig(
screenshot=True, screenshot_wait_for=body.screenshot_wait_for)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler: async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
results = await crawler.arun(url=body.url, config=cfg) results = await crawler.arun(url=body.url, config=cfg)
screenshot_data = results[0].screenshot screenshot_data = results[0].screenshot
@@ -341,12 +304,13 @@ async def generate_screenshot(
# PDF endpoint # PDF endpoint
@app.post("/pdf") @app.post("/pdf")
@limiter.limit(config["rate_limiting"]["default_limit"]) @limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("pdf") @mcp_tool("pdf")
async def generate_pdf( async def generate_pdf(
request: Request, request: Request,
body: PDFRequest, body: PDFRequest,
_td: Dict = Depends(token_dep), _td: Dict = Depends(token_dep),
): ):
""" """
@@ -384,7 +348,7 @@ async def execute_js(
Your script will replace '{script}' and execute in the browser context. So provide either an IIFE or a sync/async function that returns a value. Your script will replace '{script}' and execute in the browser context. So provide either an IIFE or a sync/async function that returns a value.
Return Format: Return Format:
- The return result is an instance of CrawlResult, so you have access to markdown, links, and other stuff. If this is enough, you don't need to call again for other endpoints. - The return result is an instance of CrawlResult, so you have access to markdown, links, and other stuff. If this is enough, you don't need to call again for other endpoints.
```python ```python
class CrawlResult(BaseModel): class CrawlResult(BaseModel):
url: str url: str
@@ -418,7 +382,7 @@ async def execute_js(
fit_markdown: Optional[str] = None fit_markdown: Optional[str] = None
fit_html: Optional[str] = None fit_html: Optional[str] = None
``` ```
""" """
cfg = CrawlerRunConfig(js_code=body.scripts) cfg = CrawlerRunConfig(js_code=body.scripts)
async with AsyncWebCrawler(config=BrowserConfig()) as crawler: async with AsyncWebCrawler(config=BrowserConfig()) as crawler:
@@ -507,6 +471,7 @@ async def crawl_stream(
}, },
) )
def chunk_code_functions(code_md: str) -> List[str]: def chunk_code_functions(code_md: str) -> List[str]:
"""Extract each function/class from markdown code blocks per file.""" """Extract each function/class from markdown code blocks per file."""
pattern = re.compile( pattern = re.compile(
@@ -530,6 +495,7 @@ def chunk_code_functions(code_md: str) -> List[str]:
chunks.append(f"# File: {file_path}\n{snippet}") chunks.append(f"# File: {file_path}\n{snippet}")
return chunks return chunks
def chunk_doc_sections(doc: str) -> List[str]: def chunk_doc_sections(doc: str) -> List[str]:
lines = doc.splitlines(keepends=True) lines = doc.splitlines(keepends=True)
sections = [] sections = []
@@ -545,6 +511,7 @@ def chunk_doc_sections(doc: str) -> List[str]:
sections.append("".join(current)) sections.append("".join(current))
return sections return sections
@app.get("/ask") @app.get("/ask")
@limiter.limit(config["rate_limiting"]["default_limit"]) @limiter.limit(config["rate_limiting"]["default_limit"])
@mcp_tool("ask") @mcp_tool("ask")
@@ -552,21 +519,24 @@ async def get_context(
request: Request, request: Request,
_td: Dict = Depends(token_dep), _td: Dict = Depends(token_dep),
context_type: str = Query("all", regex="^(code|doc|all)$"), context_type: str = Query("all", regex="^(code|doc|all)$"),
query: Optional[str] = Query(None, description="search query to filter chunks"), query: Optional[str] = Query(
score_ratio: float = Query(0.5, ge=0.0, le=1.0, description="min score as fraction of max_score"), None, description="search query to filter chunks"),
max_results: int = Query(20, ge=1, description="absolute cap on returned chunks"), score_ratio: float = Query(
0.5, ge=0.0, le=1.0, description="min score as fraction of max_score"),
max_results: int = Query(
20, ge=1, description="absolute cap on returned chunks"),
): ):
""" """
This end point is design for any questions about Crawl4ai library. It returns a plain text markdown with extensive information about Crawl4ai. This end point is design for any questions about Crawl4ai library. It returns a plain text markdown with extensive information about Crawl4ai.
You can use this as a context for any AI assistant. Use this endpoint for AI assistants to retrieve library context for decision making or code generation tasks. You can use this as a context for any AI assistant. Use this endpoint for AI assistants to retrieve library context for decision making or code generation tasks.
Alway is BEST practice you provide a query to filter the context. Otherwise the lenght of the response will be very long. Alway is BEST practice you provide a query to filter the context. Otherwise the lenght of the response will be very long.
Parameters: Parameters:
- context_type: Specify "code" for code context, "doc" for documentation context, or "all" for both. - context_type: Specify "code" for code context, "doc" for documentation context, or "all" for both.
- query: RECOMMENDED search query to filter paragraphs using BM25. You can leave this empty to get all the context. - query: RECOMMENDED search query to filter paragraphs using BM25. You can leave this empty to get all the context.
- score_ratio: Minimum score as a fraction of the maximum score for filtering results. - score_ratio: Minimum score as a fraction of the maximum score for filtering results.
- max_results: Maximum number of results to return. Default is 20. - max_results: Maximum number of results to return. Default is 20.
Returns: Returns:
- JSON response with the requested context. - JSON response with the requested context.
- If "code" is specified, returns the code context. - If "code" is specified, returns the code context.
@@ -576,7 +546,7 @@ async def get_context(
# load contexts # load contexts
base = os.path.dirname(__file__) base = os.path.dirname(__file__)
code_path = os.path.join(base, "c4ai-code-context.md") code_path = os.path.join(base, "c4ai-code-context.md")
doc_path = os.path.join(base, "c4ai-doc-context.md") doc_path = os.path.join(base, "c4ai-doc-context.md")
if not os.path.exists(code_path) or not os.path.exists(doc_path): if not os.path.exists(code_path) or not os.path.exists(doc_path):
raise HTTPException(404, "Context files not found") raise HTTPException(404, "Context files not found")
@@ -626,7 +596,7 @@ async def get_context(
] ]
return JSONResponse(results) return JSONResponse(results)
# attach MCP layer (adds /mcp/ws, /mcp/sse, /mcp/schema) # attach MCP layer (adds /mcp/ws, /mcp/sse, /mcp/schema)
print(f"MCP server running on {config['app']['host']}:{config['app']['port']}") print(f"MCP server running on {config['app']['host']}:{config['app']['port']}")

View File

@@ -45,10 +45,10 @@ def datetime_handler(obj: any) -> Optional[str]:
return obj.isoformat() return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable") raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
def should_cleanup_task(created_at: str) -> bool: def should_cleanup_task(created_at: str, ttl_seconds: int = 3600) -> bool:
"""Check if task should be cleaned up based on creation time.""" """Check if task should be cleaned up based on creation time."""
created = datetime.fromisoformat(created_at) created = datetime.fromisoformat(created_at)
return (datetime.now() - created).total_seconds() > 3600 return (datetime.now() - created).total_seconds() > ttl_seconds
def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]: def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
"""Decode Redis hash data from bytes to strings.""" """Decode Redis hash data from bytes to strings."""

View File

@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""
demo_docker_polling.py
Quick sanity-check for the asynchronous crawl job endpoints:
• POST /crawl/job enqueue work, get task_id
• GET /crawl/job/{id} poll status / fetch result
The style matches demo_docker_api.py (console.rule banners, helper
functions, coloured status lines). Adjust BASE_URL as needed.
Run: python demo_docker_polling.py
"""
import asyncio, json, os, time, urllib.parse
from typing import Dict, List
import httpx
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
console = Console()
BASE_URL = os.getenv("BASE_URL", "http://localhost:11234")
SIMPLE_URL = "https://example.org"
LINKS_URL = "https://httpbin.org/links/10/1"
# --- helpers --------------------------------------------------------------
def print_payload(payload: Dict):
console.print(Panel(Syntax(json.dumps(payload, indent=2),
"json", theme="monokai", line_numbers=False),
title="Payload", border_style="cyan", expand=False))
async def check_server_health(client: httpx.AsyncClient) -> bool:
try:
resp = await client.get("/health")
if resp.is_success:
console.print("[green]Server healthy[/]")
return True
except Exception:
pass
console.print("[bold red]Server is not responding on /health[/]")
return False
async def poll_for_result(client: httpx.AsyncClient, task_id: str,
poll_interval: float = 1.5, timeout: float = 90.0):
"""Hit /crawl/job/{id} until COMPLETED/FAILED or timeout."""
start = time.time()
while True:
resp = await client.get(f"/crawl/job/{task_id}")
resp.raise_for_status()
data = resp.json()
status = data.get("status")
if status.upper() in ("COMPLETED", "FAILED"):
return data
if time.time() - start > timeout:
raise TimeoutError(f"Task {task_id} did not finish in {timeout}s")
await asyncio.sleep(poll_interval)
# --- demo functions -------------------------------------------------------
async def demo_poll_single_url(client: httpx.AsyncClient):
payload = {
"urls": [SIMPLE_URL],
"browser_config": {"type": "BrowserConfig",
"params": {"headless": True}},
"crawler_config": {"type": "CrawlerRunConfig",
"params": {"cache_mode": "BYPASS"}}
}
console.rule("[bold blue]Demo A: /crawl/job Single URL[/]", style="blue")
print_payload(payload)
# enqueue
resp = await client.post("/crawl/job", json=payload)
console.print(f"Enqueue status: [bold]{resp.status_code}[/]")
resp.raise_for_status()
task_id = resp.json()["task_id"]
console.print(f"Task ID: [yellow]{task_id}[/]")
# poll
console.print("Polling…")
result = await poll_for_result(client, task_id)
console.print(Panel(Syntax(json.dumps(result, indent=2),
"json", theme="fruity"),
title="Final result", border_style="green"))
if result["status"] == "COMPLETED":
console.print("[green]✅ Crawl succeeded[/]")
else:
console.print("[red]❌ Crawl failed[/]")
async def demo_poll_multi_url(client: httpx.AsyncClient):
payload = {
"urls": [SIMPLE_URL, LINKS_URL],
"browser_config": {"type": "BrowserConfig",
"params": {"headless": True}},
"crawler_config": {"type": "CrawlerRunConfig",
"params": {"cache_mode": "BYPASS"}}
}
console.rule("[bold magenta]Demo B: /crawl/job Multi-URL[/]",
style="magenta")
print_payload(payload)
resp = await client.post("/crawl/job", json=payload)
console.print(f"Enqueue status: [bold]{resp.status_code}[/]")
resp.raise_for_status()
task_id = resp.json()["task_id"]
console.print(f"Task ID: [yellow]{task_id}[/]")
console.print("Polling…")
result = await poll_for_result(client, task_id)
console.print(Panel(Syntax(json.dumps(result, indent=2),
"json", theme="fruity"),
title="Final result", border_style="green"))
if result["status"] == "COMPLETED":
console.print(
f"[green]✅ {len(json.loads(result['result'])['results'])} URLs crawled[/]")
else:
console.print("[red]❌ Crawl failed[/]")
# --- main runner ----------------------------------------------------------
async def main_demo():
async with httpx.AsyncClient(base_url=BASE_URL, timeout=300.0) as client:
if not await check_server_health(client):
return
await demo_poll_single_url(client)
await demo_poll_multi_url(client)
console.rule("[bold green]Polling demos complete[/]", style="green")
if __name__ == "__main__":
try:
asyncio.run(main_demo())
except KeyboardInterrupt:
console.print("\n[yellow]Interrupted by user[/]")
except Exception:
console.print_exception(show_locals=False)

View File

@@ -3,42 +3,19 @@ from crawl4ai import (
AsyncWebCrawler, AsyncWebCrawler,
BrowserConfig, BrowserConfig,
CrawlerRunConfig, CrawlerRunConfig,
CacheMode,
DefaultMarkdownGenerator, DefaultMarkdownGenerator,
PruningContentFilter, PruningContentFilter,
CrawlResult CrawlResult
) )
async def example_cdp():
browser_conf = BrowserConfig(
headless=False,
cdp_url="http://localhost:9223"
)
crawler_config = CrawlerRunConfig(
session_id="test",
js_code = """(() => { return {"result": "Hello World!"} })()""",
js_only=True
)
async with AsyncWebCrawler(
config=browser_conf,
verbose=True,
) as crawler:
result : CrawlResult = await crawler.arun(
url="https://www.helloworld.org",
config=crawler_config,
)
print(result.js_execution_result)
async def main(): async def main():
browser_config = BrowserConfig(headless=False, verbose=True) browser_config = BrowserConfig(headless=True, verbose=True)
async with AsyncWebCrawler(config=browser_config) as crawler: async with AsyncWebCrawler(config=browser_config) as crawler:
crawler_config = CrawlerRunConfig( crawler_config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator( markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter( content_filter=PruningContentFilter()
threshold=0.48, threshold_type="fixed", min_word_threshold=0
)
), ),
) )
result : CrawlResult = await crawler.arun( result : CrawlResult = await crawler.arun(

View File

@@ -0,0 +1,143 @@
# == File: regex_extraction_quickstart.py ==
"""
Miniquick-start for RegexExtractionStrategy
────────────────────────────────────────────
3 bite-sized demos that parallel the style of *quickstart_examples_set_1.py*:
1. **Default catalog** scrape a page and pull out e-mails / phones / URLs, etc.
2. **Custom pattern** add your own regex at instantiation time.
3. **LLM-assisted schema** ask the model to write a pattern, cache it, then
run extraction _without_ further LLM calls.
Run the whole thing with::
python regex_extraction_quickstart.py
"""
import os, json, asyncio
from pathlib import Path
from typing import List
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
CrawlResult,
RegexExtractionStrategy,
LLMConfig,
)
# ────────────────────────────────────────────────────────────────────────────
# 1. Default-catalog extraction
# ────────────────────────────────────────────────────────────────────────────
async def demo_regex_default() -> None:
print("\n=== 1. Regex extraction default patterns ===")
url = "https://www.iana.org/domains/example" # has e-mail + URLs
strategy = RegexExtractionStrategy(
pattern = RegexExtractionStrategy.Url | RegexExtractionStrategy.Currency
)
config = CrawlerRunConfig(extraction_strategy=strategy)
async with AsyncWebCrawler() as crawler:
result: CrawlResult = await crawler.arun(url, config=config)
print(f"Fetched {url} - success={result.success}")
if result.success:
data = json.loads(result.extracted_content)
for d in data[:10]:
print(f" {d['label']:<12} {d['value']}")
print(f"... total matches: {len(data)}")
else:
print(" !!! crawl failed")
# ────────────────────────────────────────────────────────────────────────────
# 2. Custom pattern override / extension
# ────────────────────────────────────────────────────────────────────────────
async def demo_regex_custom() -> None:
print("\n=== 2. Regex extraction custom price pattern ===")
url = "https://www.apple.com/shop/buy-mac/macbook-pro"
price_pattern = {"usd_price": r"\$\s?\d{1,3}(?:,\d{3})*(?:\.\d{2})?"}
strategy = RegexExtractionStrategy(custom = price_pattern)
config = CrawlerRunConfig(extraction_strategy=strategy)
async with AsyncWebCrawler() as crawler:
result: CrawlResult = await crawler.arun(url, config=config)
if result.success:
data = json.loads(result.extracted_content)
for d in data:
print(f" {d['value']}")
if not data:
print(" (No prices found - page layout may have changed)")
else:
print(" !!! crawl failed")
# ────────────────────────────────────────────────────────────────────────────
# 3. One-shot LLM pattern generation, then fast extraction
# ────────────────────────────────────────────────────────────────────────────
async def demo_regex_generate_pattern() -> None:
print("\n=== 3. generate_pattern → regex extraction ===")
cache_dir = Path(__file__).parent / "tmp"
cache_dir.mkdir(exist_ok=True)
pattern_file = cache_dir / "price_pattern.json"
url = "https://www.lazada.sg/tag/smartphone/"
# ── 3-A. build or load the cached pattern
if pattern_file.exists():
pattern = json.load(pattern_file.open(encoding="utf-8"))
print("Loaded cached pattern:", pattern)
else:
print("Generating pattern via LLM…")
llm_cfg = LLMConfig(
provider="openai/gpt-4o-mini",
api_token="env:OPENAI_API_KEY",
)
# pull one sample page as HTML context
async with AsyncWebCrawler() as crawler:
html = (await crawler.arun(url)).fit_html
pattern = RegexExtractionStrategy.generate_pattern(
label="price",
html=html,
query="Prices in Malaysian Ringgit (e.g. RM1,299.00 or RM200)",
llm_config=llm_cfg,
)
json.dump(pattern, pattern_file.open("w", encoding="utf-8"), indent=2)
print("Saved pattern:", pattern_file)
# ── 3-B. extraction pass zero LLM calls
strategy = RegexExtractionStrategy(custom=pattern)
config = CrawlerRunConfig(extraction_strategy=strategy, delay_before_return_html=3)
async with AsyncWebCrawler() as crawler:
result: CrawlResult = await crawler.arun(url, config=config)
if result.success:
data = json.loads(result.extracted_content)
for d in data[:15]:
print(f" {d['value']}")
print(f"... total matches: {len(data)}")
else:
print(" !!! crawl failed")
# ────────────────────────────────────────────────────────────────────────────
# Entrypoint
# ────────────────────────────────────────────────────────────────────────────
async def main() -> None:
# await demo_regex_default()
# await demo_regex_custom()
await demo_regex_generate_pattern()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -10,6 +10,7 @@ class CrawlResult(BaseModel):
html: str html: str
success: bool success: bool
cleaned_html: Optional[str] = None cleaned_html: Optional[str] = None
fit_html: Optional[str] = None # Preprocessed HTML optimized for extraction
media: Dict[str, List[Dict]] = {} media: Dict[str, List[Dict]] = {}
links: Dict[str, List[Dict]] = {} links: Dict[str, List[Dict]] = {}
downloaded_files: Optional[List[str]] = None downloaded_files: Optional[List[str]] = None
@@ -50,7 +51,7 @@ if not result.success:
``` ```
### 1.3 **`status_code`** *(Optional[int])* ### 1.3 **`status_code`** *(Optional[int])*
**What**: The pages HTTP status code (e.g., 200, 404). **What**: The page's HTTP status code (e.g., 200, 404).
**Usage**: **Usage**:
```python ```python
if result.status_code == 404: if result.status_code == 404:
@@ -82,7 +83,7 @@ if result.response_headers:
``` ```
### 1.7 **`ssl_certificate`** *(Optional[SSLCertificate])* ### 1.7 **`ssl_certificate`** *(Optional[SSLCertificate])*
**What**: If `fetch_ssl_certificate=True` in your CrawlerRunConfig, **`result.ssl_certificate`** contains a [**`SSLCertificate`**](../advanced/ssl-certificate.md) object describing the sites certificate. You can export the cert in multiple formats (PEM/DER/JSON) or access its properties like `issuer`, **What**: If `fetch_ssl_certificate=True` in your CrawlerRunConfig, **`result.ssl_certificate`** contains a [**`SSLCertificate`**](../advanced/ssl-certificate.md) object describing the site's certificate. You can export the cert in multiple formats (PEM/DER/JSON) or access its properties like `issuer`,
`subject`, `valid_from`, `valid_until`, etc. `subject`, `valid_from`, `valid_until`, etc.
**Usage**: **Usage**:
```python ```python
@@ -109,14 +110,6 @@ print(len(result.html))
print(result.cleaned_html[:500]) # Show a snippet print(result.cleaned_html[:500]) # Show a snippet
``` ```
### 2.3 **`fit_html`** *(Optional[str])*
**What**: If a **content filter** or heuristic (e.g., Pruning/BM25) modifies the HTML, the “fit” or post-filter version.
**When**: This is **only** present if your `markdown_generator` or `content_filter` produces it.
**Usage**:
```python
if result.markdown.fit_html:
print("High-value HTML content:", result.markdown.fit_html[:300])
```
--- ---
@@ -135,7 +128,7 @@ Crawl4AI can convert HTML→Markdown, optionally including:
- **`raw_markdown`** *(str)*: The full HTML→Markdown conversion. - **`raw_markdown`** *(str)*: The full HTML→Markdown conversion.
- **`markdown_with_citations`** *(str)*: Same markdown, but with link references as academic-style citations. - **`markdown_with_citations`** *(str)*: Same markdown, but with link references as academic-style citations.
- **`references_markdown`** *(str)*: The reference list or footnotes at the end. - **`references_markdown`** *(str)*: The reference list or footnotes at the end.
- **`fit_markdown`** *(Optional[str])*: If content filtering (Pruning/BM25) was applied, the filtered fit text. - **`fit_markdown`** *(Optional[str])*: If content filtering (Pruning/BM25) was applied, the filtered "fit" text.
- **`fit_html`** *(Optional[str])*: The HTML that led to `fit_markdown`. - **`fit_html`** *(Optional[str])*: The HTML that led to `fit_markdown`.
**Usage**: **Usage**:
@@ -157,7 +150,7 @@ print(result.markdown.raw_markdown[:200])
print(result.markdown.fit_markdown) print(result.markdown.fit_markdown)
print(result.markdown.fit_html) print(result.markdown.fit_html)
``` ```
**Important**: Fit content (in `fit_markdown`/`fit_html`) exists in result.markdown, only if you used a **filter** (like **PruningContentFilter** or **BM25ContentFilter**) within a `MarkdownGenerationStrategy`. **Important**: "Fit" content (in `fit_markdown`/`fit_html`) exists in result.markdown, only if you used a **filter** (like **PruningContentFilter** or **BM25ContentFilter**) within a `MarkdownGenerationStrategy`.
--- ---
@@ -169,7 +162,7 @@ print(result.markdown.fit_html)
- `src` *(str)*: Media URL - `src` *(str)*: Media URL
- `alt` or `title` *(str)*: Descriptive text - `alt` or `title` *(str)*: Descriptive text
- `score` *(float)*: Relevance score if the crawlers heuristic found it important - `score` *(float)*: Relevance score if the crawler's heuristic found it "important"
- `desc` or `description` *(Optional[str])*: Additional context extracted from surrounding text - `desc` or `description` *(Optional[str])*: Additional context extracted from surrounding text
**Usage**: **Usage**:
@@ -263,7 +256,7 @@ A `DispatchResult` object providing additional concurrency and resource usage in
- **`task_id`**: A unique identifier for the parallel task. - **`task_id`**: A unique identifier for the parallel task.
- **`memory_usage`** (float): The memory (in MB) used at the time of completion. - **`memory_usage`** (float): The memory (in MB) used at the time of completion.
- **`peak_memory`** (float): The peak memory usage (in MB) recorded during the tasks execution. - **`peak_memory`** (float): The peak memory usage (in MB) recorded during the task's execution.
- **`start_time`** / **`end_time`** (datetime): Time range for this crawling task. - **`start_time`** / **`end_time`** (datetime): Time range for this crawling task.
- **`error_message`** (str): Any dispatcher- or concurrency-related error encountered. - **`error_message`** (str): Any dispatcher- or concurrency-related error encountered.
@@ -358,7 +351,7 @@ async def handle_result(result: CrawlResult):
# HTML # HTML
print("Original HTML size:", len(result.html)) print("Original HTML size:", len(result.html))
print("Cleaned HTML size:", len(result.cleaned_html or "")) print("Cleaned HTML size:", len(result.cleaned_html or ""))
# Markdown output # Markdown output
if result.markdown: if result.markdown:
print("Raw Markdown:", result.markdown.raw_markdown[:300]) print("Raw Markdown:", result.markdown.raw_markdown[:300])

View File

@@ -36,6 +36,45 @@ LLMExtractionStrategy(
) )
``` ```
### RegexExtractionStrategy
Used for fast pattern-based extraction of common entities using regular expressions.
```python
RegexExtractionStrategy(
# Pattern Configuration
pattern: IntFlag = RegexExtractionStrategy.Nothing, # Bit flags of built-in patterns to use
custom: Optional[Dict[str, str]] = None, # Custom pattern dictionary {label: regex}
# Input Format
input_format: str = "fit_html", # "html", "markdown", "text" or "fit_html"
)
# Built-in Patterns as Bit Flags
RegexExtractionStrategy.Email # Email addresses
RegexExtractionStrategy.PhoneIntl # International phone numbers
RegexExtractionStrategy.PhoneUS # US-format phone numbers
RegexExtractionStrategy.Url # HTTP/HTTPS URLs
RegexExtractionStrategy.IPv4 # IPv4 addresses
RegexExtractionStrategy.IPv6 # IPv6 addresses
RegexExtractionStrategy.Uuid # UUIDs
RegexExtractionStrategy.Currency # Currency values (USD, EUR, etc)
RegexExtractionStrategy.Percentage # Percentage values
RegexExtractionStrategy.Number # Numeric values
RegexExtractionStrategy.DateIso # ISO format dates
RegexExtractionStrategy.DateUS # US format dates
RegexExtractionStrategy.Time24h # 24-hour format times
RegexExtractionStrategy.PostalUS # US postal codes
RegexExtractionStrategy.PostalUK # UK postal codes
RegexExtractionStrategy.HexColor # HTML hex color codes
RegexExtractionStrategy.TwitterHandle # Twitter handles
RegexExtractionStrategy.Hashtag # Hashtags
RegexExtractionStrategy.MacAddr # MAC addresses
RegexExtractionStrategy.Iban # International bank account numbers
RegexExtractionStrategy.CreditCard # Credit card numbers
RegexExtractionStrategy.All # All available patterns
```
### CosineStrategy ### CosineStrategy
Used for content similarity-based extraction and clustering. Used for content similarity-based extraction and clustering.
@@ -156,6 +195,55 @@ result = await crawler.arun(
data = json.loads(result.extracted_content) data = json.loads(result.extracted_content)
``` ```
### Regex Extraction
```python
import json
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, RegexExtractionStrategy
# Method 1: Use built-in patterns
strategy = RegexExtractionStrategy(
pattern = RegexExtractionStrategy.Email | RegexExtractionStrategy.Url
)
# Method 2: Use custom patterns
price_pattern = {"usd_price": r"\$\s?\d{1,3}(?:,\d{3})*(?:\.\d{2})?"}
strategy = RegexExtractionStrategy(custom=price_pattern)
# Method 3: Generate pattern with LLM assistance (one-time)
from crawl4ai import LLMConfig
async with AsyncWebCrawler() as crawler:
# Get sample HTML first
sample_result = await crawler.arun("https://example.com/products")
html = sample_result.fit_html
# Generate regex pattern once
pattern = RegexExtractionStrategy.generate_pattern(
label="price",
html=html,
query="Product prices in USD format",
llm_config=LLMConfig(provider="openai/gpt-4o-mini")
)
# Save pattern for reuse
import json
with open("price_pattern.json", "w") as f:
json.dump(pattern, f)
# Use pattern for extraction (no LLM calls)
strategy = RegexExtractionStrategy(custom=pattern)
result = await crawler.arun(
url="https://example.com/products",
config=CrawlerRunConfig(extraction_strategy=strategy)
)
# Process results
data = json.loads(result.extracted_content)
for item in data:
print(f"{item['label']}: {item['value']}")
```
### CSS Extraction ### CSS Extraction
```python ```python
@@ -220,12 +308,28 @@ result = await crawler.arun(
## Best Practices ## Best Practices
1. **Choose the Right Strategy** 1. **Choose the Right Strategy**
- Use `LLMExtractionStrategy` for complex, unstructured content - Use `RegexExtractionStrategy` for common data types like emails, phones, URLs, dates
- Use `JsonCssExtractionStrategy` for well-structured HTML - Use `JsonCssExtractionStrategy` for well-structured HTML with consistent patterns
- Use `LLMExtractionStrategy` for complex, unstructured content requiring reasoning
- Use `CosineStrategy` for content similarity and clustering - Use `CosineStrategy` for content similarity and clustering
2. **Optimize Chunking** 2. **Strategy Selection Guide**
```
Is the target data a common type (email/phone/date/URL)?
→ RegexExtractionStrategy
Does the page have consistent HTML structure?
→ JsonCssExtractionStrategy or JsonXPathExtractionStrategy
Is the data semantically complex or unstructured?
→ LLMExtractionStrategy
Need to find content similar to a specific topic?
→ CosineStrategy
```
3. **Optimize Chunking**
```python ```python
# For long documents # For long documents
strategy = LLMExtractionStrategy( strategy = LLMExtractionStrategy(
@@ -234,7 +338,26 @@ result = await crawler.arun(
) )
``` ```
3. **Handle Errors** 4. **Combine Strategies for Best Performance**
```python
# First pass: Extract structure with CSS
css_strategy = JsonCssExtractionStrategy(product_schema)
css_result = await crawler.arun(url, config=CrawlerRunConfig(extraction_strategy=css_strategy))
product_data = json.loads(css_result.extracted_content)
# Second pass: Extract specific fields with regex
descriptions = [product["description"] for product in product_data]
regex_strategy = RegexExtractionStrategy(
pattern=RegexExtractionStrategy.Email | RegexExtractionStrategy.PhoneUS,
custom={"dimension": r"\d+x\d+x\d+ (?:cm|in)"}
)
# Process descriptions with regex
for text in descriptions:
matches = regex_strategy.extract("", text) # Direct extraction
```
5. **Handle Errors**
```python ```python
try: try:
result = await crawler.arun( result = await crawler.arun(
@@ -247,11 +370,31 @@ result = await crawler.arun(
print(f"Extraction failed: {e}") print(f"Extraction failed: {e}")
``` ```
4. **Monitor Performance** 6. **Monitor Performance**
```python ```python
strategy = CosineStrategy( strategy = CosineStrategy(
verbose=True, # Enable logging verbose=True, # Enable logging
word_count_threshold=20, # Filter short content word_count_threshold=20, # Filter short content
top_k=5 # Limit results top_k=5 # Limit results
) )
```
7. **Cache Generated Patterns**
```python
# For RegexExtractionStrategy pattern generation
import json
from pathlib import Path
cache_dir = Path("./pattern_cache")
cache_dir.mkdir(exist_ok=True)
pattern_file = cache_dir / "product_pattern.json"
if pattern_file.exists():
with open(pattern_file) as f:
pattern = json.load(f)
else:
# Generate once with LLM
pattern = RegexExtractionStrategy.generate_pattern(...)
with open(pattern_file, "w") as f:
json.dump(pattern, f)
``` ```

View File

@@ -1,15 +1,20 @@
# Extracting JSON (No LLM) # Extracting JSON (No LLM)
One of Crawl4AIs **most powerful** features is extracting **structured JSON** from websites **without** relying on large language models. By defining a **schema** with CSS or XPath selectors, you can extract data instantly—even from complex or nested HTML structures—without the cost, latency, or environmental impact of an LLM. One of Crawl4AI's **most powerful** features is extracting **structured JSON** from websites **without** relying on large language models. Crawl4AI offers several strategies for LLM-free extraction:
1. **Schema-based extraction** with CSS or XPath selectors via `JsonCssExtractionStrategy` and `JsonXPathExtractionStrategy`
2. **Regular expression extraction** with `RegexExtractionStrategy` for fast pattern matching
These approaches let you extract data instantly—even from complex or nested HTML structures—without the cost, latency, or environmental impact of an LLM.
**Why avoid LLM for basic extractions?** **Why avoid LLM for basic extractions?**
1. **Faster & Cheaper**: No API calls or GPU overhead. 1. **Faster & Cheaper**: No API calls or GPU overhead.
2. **Lower Carbon Footprint**: LLM inference can be energy-intensive. A well-defined schema is practically carbon-free. 2. **Lower Carbon Footprint**: LLM inference can be energy-intensive. Pattern-based extraction is practically carbon-free.
3. **Precise & Repeatable**: CSS/XPath selectors do exactly what you specify. LLM outputs can vary or hallucinate. 3. **Precise & Repeatable**: CSS/XPath selectors and regex patterns do exactly what you specify. LLM outputs can vary or hallucinate.
4. **Scales Readily**: For thousands of pages, schema-based extraction runs quickly and in parallel. 4. **Scales Readily**: For thousands of pages, pattern-based extraction runs quickly and in parallel.
Below, well explore how to craft these schemas and use them with **JsonCssExtractionStrategy** (or **JsonXPathExtractionStrategy** if you prefer XPath). Well also highlight advanced features like **nested fields** and **base element attributes**. Below, we'll explore how to craft these schemas and use them with **JsonCssExtractionStrategy** (or **JsonXPathExtractionStrategy** if you prefer XPath). We'll also highlight advanced features like **nested fields** and **base element attributes**.
--- ---
@@ -17,17 +22,17 @@ Below, well explore how to craft these schemas and use them with **JsonCssExt
A schema defines: A schema defines:
1. A **base selector** that identifies each container element on the page (e.g., a product row, a blog post card). 1. A **base selector** that identifies each "container" element on the page (e.g., a product row, a blog post card).
2. **Fields** describing which CSS/XPath selectors to use for each piece of data you want to capture (text, attribute, HTML block, etc.). 2. **Fields** describing which CSS/XPath selectors to use for each piece of data you want to capture (text, attribute, HTML block, etc.).
3. **Nested** or **list** types for repeated or hierarchical structures. 3. **Nested** or **list** types for repeated or hierarchical structures.
For example, if you have a list of products, each one might have a name, price, reviews, and related products. This approach is faster and more reliable than an LLM for consistent, structured pages. For example, if you have a list of products, each one might have a name, price, reviews, and "related products." This approach is faster and more reliable than an LLM for consistent, structured pages.
--- ---
## 2. Simple Example: Crypto Prices ## 2. Simple Example: Crypto Prices
Lets begin with a **simple** schema-based extraction using the `JsonCssExtractionStrategy`. Below is a snippet that extracts cryptocurrency prices from a site (similar to the legacy Coinbase example). Notice we **dont** call any LLM: Let's begin with a **simple** schema-based extraction using the `JsonCssExtractionStrategy`. Below is a snippet that extracts cryptocurrency prices from a site (similar to the legacy Coinbase example). Notice we **don't** call any LLM:
```python ```python
import json import json
@@ -87,7 +92,7 @@ asyncio.run(extract_crypto_prices())
**Highlights**: **Highlights**:
- **`baseSelector`**: Tells us where each item (crypto row) is. - **`baseSelector`**: Tells us where each "item" (crypto row) is.
- **`fields`**: Two fields (`coin_name`, `price`) using simple CSS selectors. - **`fields`**: Two fields (`coin_name`, `price`) using simple CSS selectors.
- Each field defines a **`type`** (e.g., `text`, `attribute`, `html`, `regex`, etc.). - Each field defines a **`type`** (e.g., `text`, `attribute`, `html`, `regex`, etc.).
@@ -97,7 +102,7 @@ No LLM is needed, and the performance is **near-instant** for hundreds or thousa
### **XPath Example with `raw://` HTML** ### **XPath Example with `raw://` HTML**
Below is a short example demonstrating **XPath** extraction plus the **`raw://`** scheme. Well pass a **dummy HTML** directly (no network request) and define the extraction strategy in `CrawlerRunConfig`. Below is a short example demonstrating **XPath** extraction plus the **`raw://`** scheme. We'll pass a **dummy HTML** directly (no network request) and define the extraction strategy in `CrawlerRunConfig`.
```python ```python
import json import json
@@ -168,12 +173,12 @@ asyncio.run(extract_crypto_prices_xpath())
**Key Points**: **Key Points**:
1. **`JsonXPathExtractionStrategy`** is used instead of `JsonCssExtractionStrategy`. 1. **`JsonXPathExtractionStrategy`** is used instead of `JsonCssExtractionStrategy`.
2. **`baseSelector`** and each fields `"selector"` use **XPath** instead of CSS. 2. **`baseSelector`** and each field's `"selector"` use **XPath** instead of CSS.
3. **`raw://`** lets us pass `dummy_html` with no real network request—handy for local testing. 3. **`raw://`** lets us pass `dummy_html` with no real network request—handy for local testing.
4. Everything (including the extraction strategy) is in **`CrawlerRunConfig`**. 4. Everything (including the extraction strategy) is in **`CrawlerRunConfig`**.
Thats how you keep the config self-contained, illustrate **XPath** usage, and demonstrate the **raw** scheme for direct HTML input—all while avoiding the old approach of passing `extraction_strategy` directly to `arun()`. That's how you keep the config self-contained, illustrate **XPath** usage, and demonstrate the **raw** scheme for direct HTML input—all while avoiding the old approach of passing `extraction_strategy` directly to `arun()`.
--- ---
@@ -187,7 +192,7 @@ We have a **sample e-commerce** HTML file on GitHub (example):
``` ```
https://gist.githubusercontent.com/githubusercontent/2d7b8ba3cd8ab6cf3c8da771ddb36878/raw/1ae2f90c6861ce7dd84cc50d3df9920dee5e1fd2/sample_ecommerce.html https://gist.githubusercontent.com/githubusercontent/2d7b8ba3cd8ab6cf3c8da771ddb36878/raw/1ae2f90c6861ce7dd84cc50d3df9920dee5e1fd2/sample_ecommerce.html
``` ```
This snippet includes categories, products, features, reviews, and related items. Lets see how to define a schema that fully captures that structure **without LLM**. This snippet includes categories, products, features, reviews, and related items. Let's see how to define a schema that fully captures that structure **without LLM**.
```python ```python
schema = { schema = {
@@ -333,24 +338,253 @@ async def extract_ecommerce_data():
asyncio.run(extract_ecommerce_data()) asyncio.run(extract_ecommerce_data())
``` ```
If all goes well, you get a **structured** JSON array with each category, containing an array of `products`. Each product includes `details`, `features`, `reviews`, etc. All of that **without** an LLM. If all goes well, you get a **structured** JSON array with each "category," containing an array of `products`. Each product includes `details`, `features`, `reviews`, etc. All of that **without** an LLM.
--- ---
## 4. Why “No LLM” Is Often Better ## 4. RegexExtractionStrategy - Fast Pattern-Based Extraction
1. **Zero Hallucination**: Schema-based extraction doesnt guess text. It either finds it or not. Crawl4AI now offers a powerful new zero-LLM extraction strategy: `RegexExtractionStrategy`. This strategy provides lightning-fast extraction of common data types like emails, phone numbers, URLs, dates, and more using pre-compiled regular expressions.
2. **Guaranteed Structure**: The same schema yields consistent JSON across many pages, so your downstream pipeline can rely on stable keys.
3. **Speed**: LLM-based extraction can be 101000x slower for large-scale crawling.
4. **Scalable**: Adding or updating a field is a matter of adjusting the schema, not re-tuning a model.
**When might you consider an LLM?** Possibly if the site is extremely unstructured or you want AI summarization. But always try a schema approach first for repeated or consistent data patterns. ### Key Features
- **Zero LLM Dependency**: Extracts data without any AI model calls
- **Blazing Fast**: Uses pre-compiled regex patterns for maximum performance
- **Built-in Patterns**: Includes ready-to-use patterns for common data types
- **Custom Patterns**: Add your own regex patterns for domain-specific extraction
- **LLM-Assisted Pattern Generation**: Optionally use an LLM once to generate optimized patterns, then reuse them without further LLM calls
### Simple Example: Extracting Common Entities
The easiest way to start is by using the built-in pattern catalog:
```python
import json
import asyncio
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
RegexExtractionStrategy
)
async def extract_with_regex():
# Create a strategy using built-in patterns for URLs and currencies
strategy = RegexExtractionStrategy(
pattern = RegexExtractionStrategy.Url | RegexExtractionStrategy.Currency
)
config = CrawlerRunConfig(extraction_strategy=strategy)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com",
config=config
)
if result.success:
data = json.loads(result.extracted_content)
for item in data[:5]: # Show first 5 matches
print(f"{item['label']}: {item['value']}")
print(f"Total matches: {len(data)}")
asyncio.run(extract_with_regex())
```
### Available Built-in Patterns
`RegexExtractionStrategy` provides these common patterns as IntFlag attributes for easy combining:
```python
# Use individual patterns
strategy = RegexExtractionStrategy(pattern=RegexExtractionStrategy.Email)
# Combine multiple patterns
strategy = RegexExtractionStrategy(
pattern = (
RegexExtractionStrategy.Email |
RegexExtractionStrategy.PhoneUS |
RegexExtractionStrategy.Url
)
)
# Use all available patterns
strategy = RegexExtractionStrategy(pattern=RegexExtractionStrategy.All)
```
Available patterns include:
- `Email` - Email addresses
- `PhoneIntl` - International phone numbers
- `PhoneUS` - US-format phone numbers
- `Url` - HTTP/HTTPS URLs
- `IPv4` - IPv4 addresses
- `IPv6` - IPv6 addresses
- `Uuid` - UUIDs
- `Currency` - Currency values (USD, EUR, etc.)
- `Percentage` - Percentage values
- `Number` - Numeric values
- `DateIso` - ISO format dates
- `DateUS` - US format dates
- `Time24h` - 24-hour format times
- `PostalUS` - US postal codes
- `PostalUK` - UK postal codes
- `HexColor` - HTML hex color codes
- `TwitterHandle` - Twitter handles
- `Hashtag` - Hashtags
- `MacAddr` - MAC addresses
- `Iban` - International bank account numbers
- `CreditCard` - Credit card numbers
### Custom Pattern Example
For more targeted extraction, you can provide custom patterns:
```python
import json
import asyncio
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
RegexExtractionStrategy
)
async def extract_prices():
# Define a custom pattern for US Dollar prices
price_pattern = {"usd_price": r"\$\s?\d{1,3}(?:,\d{3})*(?:\.\d{2})?"}
# Create strategy with custom pattern
strategy = RegexExtractionStrategy(custom=price_pattern)
config = CrawlerRunConfig(extraction_strategy=strategy)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://www.example.com/products",
config=config
)
if result.success:
data = json.loads(result.extracted_content)
for item in data:
print(f"Found price: {item['value']}")
asyncio.run(extract_prices())
```
### LLM-Assisted Pattern Generation
For complex or site-specific patterns, you can use an LLM once to generate an optimized pattern, then save and reuse it without further LLM calls:
```python
import json
import asyncio
from pathlib import Path
from crawl4ai import (
AsyncWebCrawler,
CrawlerRunConfig,
RegexExtractionStrategy,
LLMConfig
)
async def extract_with_generated_pattern():
cache_dir = Path("./pattern_cache")
cache_dir.mkdir(exist_ok=True)
pattern_file = cache_dir / "price_pattern.json"
# 1. Generate or load pattern
if pattern_file.exists():
pattern = json.load(pattern_file.open())
print(f"Using cached pattern: {pattern}")
else:
print("Generating pattern via LLM...")
# Configure LLM
llm_config = LLMConfig(
provider="openai/gpt-4o-mini",
api_token="env:OPENAI_API_KEY",
)
# Get sample HTML for context
async with AsyncWebCrawler() as crawler:
result = await crawler.arun("https://example.com/products")
html = result.fit_html
# Generate pattern (one-time LLM usage)
pattern = RegexExtractionStrategy.generate_pattern(
label="price",
html=html,
query="Product prices in USD format",
llm_config=llm_config,
)
# Cache pattern for future use
json.dump(pattern, pattern_file.open("w"), indent=2)
# 2. Use pattern for extraction (no LLM calls)
strategy = RegexExtractionStrategy(custom=pattern)
config = CrawlerRunConfig(extraction_strategy=strategy)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(
url="https://example.com/products",
config=config
)
if result.success:
data = json.loads(result.extracted_content)
for item in data[:10]:
print(f"Extracted: {item['value']}")
print(f"Total matches: {len(data)}")
asyncio.run(extract_with_generated_pattern())
```
This pattern allows you to:
1. Use an LLM once to generate a highly optimized regex for your specific site
2. Save the pattern to disk for reuse
3. Extract data using only regex (no further LLM calls) in production
### Extraction Results Format
The `RegexExtractionStrategy` returns results in a consistent format:
```json
[
{
"url": "https://example.com",
"label": "email",
"value": "contact@example.com",
"span": [145, 163]
},
{
"url": "https://example.com",
"label": "url",
"value": "https://support.example.com",
"span": [210, 235]
}
]
```
Each match includes:
- `url`: The source URL
- `label`: The pattern name that matched (e.g., "email", "phone_us")
- `value`: The extracted text
- `span`: The start and end positions in the source content
--- ---
## 5. Base Element Attributes & Additional Fields ## 5. Why "No LLM" Is Often Better
Its easy to **extract attributes** (like `href`, `src`, or `data-xxx`) from your base or nested elements using: 1. **Zero Hallucination**: Pattern-based extraction doesn't guess text. It either finds it or not.
2. **Guaranteed Structure**: The same schema or regex yields consistent JSON across many pages, so your downstream pipeline can rely on stable keys.
3. **Speed**: LLM-based extraction can be 101000x slower for large-scale crawling.
4. **Scalable**: Adding or updating a field is a matter of adjusting the schema or regex, not re-tuning a model.
**When might you consider an LLM?** Possibly if the site is extremely unstructured or you want AI summarization. But always try a schema or regex approach first for repeated or consistent data patterns.
---
## 6. Base Element Attributes & Additional Fields
It's easy to **extract attributes** (like `href`, `src`, or `data-xxx`) from your base or nested elements using:
```json ```json
{ {
@@ -361,11 +595,11 @@ Its easy to **extract attributes** (like `href`, `src`, or `data-xxx`) from y
} }
``` ```
You can define them in **`baseFields`** (extracted from the main container element) or in each fields sub-lists. This is especially helpful if you need an items link or ID stored in the parent `<div>`. You can define them in **`baseFields`** (extracted from the main container element) or in each field's sub-lists. This is especially helpful if you need an item's link or ID stored in the parent `<div>`.
--- ---
## 6. Putting It All Together: Larger Example ## 7. Putting It All Together: Larger Example
Consider a blog site. We have a schema that extracts the **URL** from each post card (via `baseFields` with an `"attribute": "href"`), plus the title, date, summary, and author: Consider a blog site. We have a schema that extracts the **URL** from each post card (via `baseFields` with an `"attribute": "href"`), plus the title, date, summary, and author:
@@ -389,19 +623,20 @@ Then run with `JsonCssExtractionStrategy(schema)` to get an array of blog post o
--- ---
## 7. Tips & Best Practices ## 8. Tips & Best Practices
1. **Inspect the DOM** in Chrome DevTools or Firefoxs Inspector to find stable selectors. 1. **Inspect the DOM** in Chrome DevTools or Firefox's Inspector to find stable selectors.
2. **Start Simple**: Verify you can extract a single field. Then add complexity like nested objects or lists. 2. **Start Simple**: Verify you can extract a single field. Then add complexity like nested objects or lists.
3. **Test** your schema on partial HTML or a test page before a big crawl. 3. **Test** your schema on partial HTML or a test page before a big crawl.
4. **Combine with JS Execution** if the site loads content dynamically. You can pass `js_code` or `wait_for` in `CrawlerRunConfig`. 4. **Combine with JS Execution** if the site loads content dynamically. You can pass `js_code` or `wait_for` in `CrawlerRunConfig`.
5. **Look at Logs** when `verbose=True`: if your selectors are off or your schema is malformed, itll often show warnings. 5. **Look at Logs** when `verbose=True`: if your selectors are off or your schema is malformed, it'll often show warnings.
6. **Use baseFields** if you need attributes from the container element (e.g., `href`, `data-id`), especially for the parent item. 6. **Use baseFields** if you need attributes from the container element (e.g., `href`, `data-id`), especially for the "parent" item.
7. **Performance**: For large pages, make sure your selectors are as narrow as possible. 7. **Performance**: For large pages, make sure your selectors are as narrow as possible.
8. **Consider Using Regex First**: For simple data types like emails, URLs, and dates, `RegexExtractionStrategy` is often the fastest approach.
--- ---
## 8. Schema Generation Utility ## 9. Schema Generation Utility
While manually crafting schemas is powerful and precise, Crawl4AI now offers a convenient utility to **automatically generate** extraction schemas using LLM. This is particularly useful when: While manually crafting schemas is powerful and precise, Crawl4AI now offers a convenient utility to **automatically generate** extraction schemas using LLM. This is particularly useful when:
@@ -481,27 +716,26 @@ strategy = JsonCssExtractionStrategy(css_schema)
- Use OpenAI for production-quality schemas - Use OpenAI for production-quality schemas
- Use Ollama for development, testing, or when you need a self-hosted solution - Use Ollama for development, testing, or when you need a self-hosted solution
That's it for **Extracting JSON (No LLM)**! You've seen how schema-based approaches (either CSS or XPath) can handle everything from simple lists to deeply nested product catalogs—instantly, with minimal overhead. Enjoy building robust scrapers that produce consistent, structured JSON for your data pipelines!
--- ---
## 9. Conclusion ## 10. Conclusion
With **JsonCssExtractionStrategy** (or **JsonXPathExtractionStrategy**), you can build powerful, **LLM-free** pipelines that: With Crawl4AI's LLM-free extraction strategies - `JsonCssExtractionStrategy`, `JsonXPathExtractionStrategy`, and now `RegexExtractionStrategy` - you can build powerful pipelines that:
- Scrape any consistent site for structured data. - Scrape any consistent site for structured data.
- Support nested objects, repeating lists, or advanced transformations. - Support nested objects, repeating lists, or pattern-based extraction.
- Scale to thousands of pages quickly and reliably. - Scale to thousands of pages quickly and reliably.
**Next Steps**: **Choosing the Right Strategy**:
- Combine your extracted JSON with advanced filtering or summarization in a second pass if needed. - Use **`RegexExtractionStrategy`** for fast extraction of common data types like emails, phones, URLs, dates, etc.
- For dynamic pages, combine strategies with `js_code` or infinite scroll hooking to ensure all content is loaded. - Use **`JsonCssExtractionStrategy`** or **`JsonXPathExtractionStrategy`** for structured data with clear HTML patterns
- If you need both: first extract structured data with JSON strategies, then use regex on specific fields
**Remember**: For repeated, structured data, you dont need to pay for or wait on an LLM. A well-crafted schema plus CSS or XPath gets you the data faster, cleaner, and cheaper—**the real power** of Crawl4AI. **Remember**: For repeated, structured data, you don't need to pay for or wait on an LLM. Well-crafted schemas and regex patterns get you the data faster, cleaner, and cheaper—**the real power** of Crawl4AI.
**Last Updated**: 2025-01-01 **Last Updated**: 2025-05-02
--- ---
Thats it for **Extracting JSON (No LLM)**! Youve seen how schema-based approaches (either CSS or XPath) can handle everything from simple lists to deeply nested product catalogs—instantly, with minimal overhead. Enjoy building robust scrapers that produce consistent, structured JSON for your data pipelines! That's it for **Extracting JSON (No LLM)**! You've seen how schema-based approaches (either CSS or XPath) and regex patterns can handle everything from simple lists to deeply nested product catalogs—instantly, with minimal overhead. Enjoy building robust scrapers that produce consistent, structured JSON for your data pipelines!