Merge branch '2025-MAY-2' of https://github.com/unclecode/crawl4ai into 2025-MAY-2

This commit is contained in:
ntohidi
2025-06-13 11:26:17 +02:00
78 changed files with 104266 additions and 192 deletions

View File

@@ -66,6 +66,11 @@ from .deep_crawling import (
DeepCrawlDecorator,
)
from .utils import (
start_colab_display_server,
setup_colab_environment
)
__all__ = [
"AsyncLoggerBase",
"AsyncLogger",
@@ -124,7 +129,9 @@ __all__ = [
"Crawl4aiDockerClient",
"ProxyRotationStrategy",
"RoundRobinProxyStrategy",
"ProxyConfig"
"ProxyConfig",
"start_colab_display_server",
"setup_colab_environment",
]

View File

@@ -764,6 +764,9 @@ class CrawlerRunConfig():
Default: 60000 (60 seconds).
wait_for (str or None): A CSS selector or JS condition to wait for before extracting content.
Default: None.
wait_for_timeout (int or None): Specific timeout in ms for the wait_for condition.
If None, uses page_timeout instead.
Default: None.
wait_for_images (bool): If True, wait for images to load before extracting content.
Default: False.
delay_before_return_html (float): Delay in seconds before retrieving final HTML.
@@ -904,6 +907,7 @@ class CrawlerRunConfig():
wait_until: str = "domcontentloaded",
page_timeout: int = PAGE_TIMEOUT,
wait_for: str = None,
wait_for_timeout: int = None,
wait_for_images: bool = False,
delay_before_return_html: float = 0.1,
mean_delay: float = 0.1,
@@ -1000,6 +1004,7 @@ class CrawlerRunConfig():
self.wait_until = wait_until
self.page_timeout = page_timeout
self.wait_for = wait_for
self.wait_for_timeout = wait_for_timeout
self.wait_for_images = wait_for_images
self.delay_before_return_html = delay_before_return_html
self.mean_delay = mean_delay
@@ -1141,6 +1146,7 @@ class CrawlerRunConfig():
wait_until=kwargs.get("wait_until", "domcontentloaded"),
page_timeout=kwargs.get("page_timeout", 60000),
wait_for=kwargs.get("wait_for"),
wait_for_timeout=kwargs.get("wait_for_timeout"),
wait_for_images=kwargs.get("wait_for_images", False),
delay_before_return_html=kwargs.get("delay_before_return_html", 0.1),
mean_delay=kwargs.get("mean_delay", 0.1),
@@ -1250,6 +1256,7 @@ class CrawlerRunConfig():
"wait_until": self.wait_until,
"page_timeout": self.page_timeout,
"wait_for": self.wait_for,
"wait_for_timeout": self.wait_for_timeout,
"wait_for_images": self.wait_for_images,
"delay_before_return_html": self.delay_before_return_html,
"mean_delay": self.mean_delay,
@@ -1329,7 +1336,7 @@ class LLMConfig:
provider: str = DEFAULT_PROVIDER,
api_token: Optional[str] = None,
base_url: Optional[str] = None,
temprature: Optional[float] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
frequency_penalty: Optional[float] = None,
@@ -1357,7 +1364,7 @@ class LLMConfig:
self.provider = DEFAULT_PROVIDER
self.api_token = os.getenv(DEFAULT_PROVIDER_API_KEY)
self.base_url = base_url
self.temprature = temprature
self.temperature = temperature
self.max_tokens = max_tokens
self.top_p = top_p
self.frequency_penalty = frequency_penalty
@@ -1371,7 +1378,7 @@ class LLMConfig:
provider=kwargs.get("provider", DEFAULT_PROVIDER),
api_token=kwargs.get("api_token"),
base_url=kwargs.get("base_url"),
temprature=kwargs.get("temprature"),
temperature=kwargs.get("temperature"),
max_tokens=kwargs.get("max_tokens"),
top_p=kwargs.get("top_p"),
frequency_penalty=kwargs.get("frequency_penalty"),
@@ -1385,7 +1392,7 @@ class LLMConfig:
"provider": self.provider,
"api_token": self.api_token,
"base_url": self.base_url,
"temprature": self.temprature,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"top_p": self.top_p,
"frequency_penalty": self.frequency_penalty,

View File

@@ -744,18 +744,49 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
)
redirected_url = page.url
except Error as e:
raise RuntimeError(f"Failed on navigating ACS-GOTO:\n{str(e)}")
# Allow navigation to be aborted when downloading files
# This is expected behavior for downloads in some browser engines
if 'net::ERR_ABORTED' in str(e) and self.browser_config.accept_downloads:
self.logger.info(
message=f"Navigation aborted, likely due to file download: {url}",
tag="GOTO",
params={"url": url},
)
response = None
else:
raise RuntimeError(f"Failed on navigating ACS-GOTO:\n{str(e)}")
await self.execute_hook(
"after_goto", page, context=context, url=url, response=response, config=config
)
# ──────────────────────────────────────────────────────────────
# Walk the redirect chain. Playwright returns only the last
# hop, so we trace the `request.redirected_from` links until the
# first response that differs from the final one and surface its
# status-code.
# ──────────────────────────────────────────────────────────────
if response is None:
status_code = 200
response_headers = {}
else:
status_code = response.status
response_headers = response.headers
first_resp = response
req = response.request
while req and req.redirected_from:
prev_req = req.redirected_from
prev_resp = await prev_req.response()
if prev_resp: # keep earliest
first_resp = prev_resp
req = prev_req
status_code = first_resp.status
response_headers = first_resp.headers
# if response is None:
# status_code = 200
# response_headers = {}
# else:
# status_code = response.status
# response_headers = response.headers
else:
status_code = 200
@@ -940,8 +971,10 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if config.wait_for:
try:
# Use wait_for_timeout if specified, otherwise fall back to page_timeout
timeout = config.wait_for_timeout if config.wait_for_timeout is not None else config.page_timeout
await self.smart_wait(
page, config.wait_for, timeout=config.page_timeout
page, config.wait_for, timeout=timeout
)
except Exception as e:
raise RuntimeError(f"Wait condition failed: {str(e)}")
@@ -1066,7 +1099,13 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
finally:
# If no session_id is given we should close the page
if not config.session_id:
all_contexts = page.context.browser.contexts
total_pages = sum(len(context.pages) for context in all_contexts)
if config.session_id:
pass
elif total_pages <= 1 and (self.browser_config.use_managed_browser or self.browser_config.headless):
pass
else:
# Detach listeners before closing to prevent potential errors during close
if config.capture_network_requests:
page.remove_listener("request", handle_request_capture)
@@ -1076,6 +1115,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
page.remove_listener("console", handle_console_capture)
page.remove_listener("pageerror", handle_pageerror_capture)
# Close the page
await page.close()
async def _handle_full_page_scan(self, page: Page, scroll_delay: float = 0.1):
@@ -1435,12 +1475,32 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
num_segments = (page_height // viewport_height) + 1
for i in range(num_segments):
y_offset = i * viewport_height
# Special handling for the last segment
if i == num_segments - 1:
last_part_height = page_height % viewport_height
# If page_height is an exact multiple of viewport_height,
# we don't need an extra segment
if last_part_height == 0:
# Skip last segment if page height is exact multiple of viewport
break
# Adjust viewport to exactly match the remaining content height
await page.set_viewport_size({"width": page_width, "height": last_part_height})
await page.evaluate(f"window.scrollTo(0, {y_offset})")
await asyncio.sleep(0.01) # wait for render
seg_shot = await page.screenshot(full_page=False)
# Capture the current segment
# Note: Using compression options (format, quality) would go here
seg_shot = await page.screenshot(full_page=False, type="jpeg", quality=85)
# seg_shot = await page.screenshot(full_page=False)
img = Image.open(BytesIO(seg_shot)).convert("RGB")
segments.append(img)
# Reset viewport to original size after capturing segments
await page.set_viewport_size({"width": page_width, "height": viewport_height})
total_height = sum(img.height for img in segments)
stitched = Image.new("RGB", (segments[0].width, total_height))
offset = 0

View File

@@ -360,7 +360,7 @@ class AsyncWebCrawler:
pdf_data=pdf_data,
verbose=config.verbose,
is_raw_html=True if url.startswith("raw:") else False,
redirected_url=async_response.redirected_url,
redirected_url=async_response.redirected_url,
**kwargs,
)
@@ -503,7 +503,7 @@ class AsyncWebCrawler:
tables = media.pop("tables", [])
links = result.links.model_dump()
metadata = result.metadata
fit_html = preprocess_html_for_schema(html_content=html, text_threshold= 500, max_size= 300_000)
################################
@@ -585,11 +585,13 @@ class AsyncWebCrawler:
# Choose content based on input_format
content_format = config.extraction_strategy.input_format
if content_format == "fit_markdown" and not markdown_result.fit_markdown:
self.logger.warning(
message="Fit markdown requested but not available. Falling back to raw markdown.",
tag="EXTRACT",
params={"url": _url},
)
self.logger.url_status(
url=_url,
success=bool(html),
timing=time.perf_counter() - t1,
tag="EXTRACT",
)
content_format = "markdown"
content = {
@@ -613,11 +615,12 @@ class AsyncWebCrawler:
)
# Log extraction completion
self.logger.info(
message="Completed for {url:.50}... | Time: {timing}s",
tag="EXTRACT",
params={"url": _url, "timing": time.perf_counter() - t1},
)
self.logger.url_status(
url=_url,
success=bool(html),
timing=time.perf_counter() - t1,
tag="EXTRACT",
)
# Apply HTML formatting if requested
if config.prettiify:

View File

@@ -255,6 +255,13 @@ class ManagedBrowser:
preexec_fn=os.setpgrp # Start in a new process group
)
# If verbose is True print args used to run the process
if self.logger and self.browser_config.verbose:
self.logger.debug(
f"Starting browser with args: {' '.join(args)}",
tag="BROWSER"
)
# We'll monitor for a short time to make sure it starts properly, but won't keep monitoring
await asyncio.sleep(0.5) # Give browser time to start
await self._initial_startup_check()
@@ -511,6 +518,56 @@ class ManagedBrowser:
return profiler.delete_profile(profile_name_or_path)
async def clone_runtime_state(
src: BrowserContext,
dst: BrowserContext,
crawlerRunConfig: CrawlerRunConfig | None = None,
browserConfig: BrowserConfig | None = None,
) -> None:
"""
Bring everything that *can* be changed at runtime from `src` → `dst`.
1. Cookies
2. localStorage (and sessionStorage, same API)
3. Extra headers, permissions, geolocation if supplied in configs
"""
# ── 1. cookies ────────────────────────────────────────────────────────────
cookies = await src.cookies()
if cookies:
await dst.add_cookies(cookies)
# ── 2. localStorage / sessionStorage ──────────────────────────────────────
state = await src.storage_state()
for origin in state.get("origins", []):
url = origin["origin"]
kvs = origin.get("localStorage", [])
if not kvs:
continue
page = dst.pages[0] if dst.pages else await dst.new_page()
await page.goto(url, wait_until="domcontentloaded")
for k, v in kvs:
await page.evaluate("(k,v)=>localStorage.setItem(k,v)", k, v)
# ── 3. runtime-mutable extras from configs ────────────────────────────────
# headers
if browserConfig and browserConfig.headers:
await dst.set_extra_http_headers(browserConfig.headers)
# geolocation
if crawlerRunConfig and crawlerRunConfig.geolocation:
await dst.grant_permissions(["geolocation"])
await dst.set_geolocation(
{
"latitude": crawlerRunConfig.geolocation.latitude,
"longitude": crawlerRunConfig.geolocation.longitude,
"accuracy": crawlerRunConfig.geolocation.accuracy,
}
)
return dst
class BrowserManager:
@@ -960,11 +1017,17 @@ class BrowserManager:
# If using a managed browser, just grab the shared default_context
if self.config.use_managed_browser:
context = self.default_context
pages = context.pages
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
if not page:
page = context.pages[0] # await context.new_page()
if self.config.storage_state:
context = await self.create_browser_context(crawlerRunConfig)
ctx = self.default_context # default context, one window only
ctx = await clone_runtime_state(context, ctx, crawlerRunConfig, self.config)
page = await ctx.new_page()
else:
context = self.default_context
pages = context.pages
page = next((p for p in pages if p.url == crawlerRunConfig.url), None)
if not page:
page = context.pages[0] # await context.new_page()
else:
# Otherwise, check if we have an existing context for this config
config_signature = self._make_config_signature(crawlerRunConfig)

View File

@@ -218,8 +218,18 @@ class BrowserProfiler:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
try:
from playwright.async_api import async_playwright
# Start the browser
await managed_browser.start()
# await managed_browser.start()
# 1. ── Start the browser ─────────────────────────────────────────
cdp_url = await managed_browser.start()
# 2. ── Attach Playwright to that running Chrome ──────────────────
pw = await async_playwright().start()
browser = await pw.chromium.connect_over_cdp(cdp_url)
# Grab the existing default context (there is always one)
context = browser.contexts[0]
# Check if browser started successfully
browser_process = managed_browser.browser_process
@@ -244,6 +254,18 @@ class BrowserProfiler:
except asyncio.CancelledError:
pass
# 3. ── Persist storage state *before* we kill Chrome ─────────────
state_file = os.path.join(profile_path, "storage_state.json")
try:
await context.storage_state(path=state_file)
self.logger.info(f"[PROFILE].i storage_state saved → {state_file}", tag="PROFILE")
except Exception as e:
self.logger.warning(f"[PROFILE].w failed to save storage_state: {e}", tag="PROFILE")
# 4. ── Close everything cleanly ──────────────────────────────────
await browser.close()
await pw.stop()
# If the browser is still running and the user pressed 'q', terminate it
if browser_process.poll() is None and user_done_event.is_set():
self.logger.info("Terminating browser process...", tag="PROFILE")
@@ -615,9 +637,18 @@ class BrowserProfiler:
self.logger.info(f"Debugging port: {debugging_port}", tag="CDP")
self.logger.info(f"Headless mode: {headless}", tag="CDP")
# create browser config
browser_config = BrowserConfig(
browser_type=browser_type,
headless=headless,
user_data_dir=profile_path,
debugging_port=debugging_port,
verbose=True
)
# Create managed browser instance
managed_browser = ManagedBrowser(
browser_type=browser_type,
browser_config=browser_config,
user_data_dir=profile_path,
headless=headless,
logger=self.logger,

View File

@@ -718,13 +718,18 @@ class WebScrapingStrategy(ContentScrapingStrategy):
# Check flag if we should remove external images
if kwargs.get("exclude_external_images", False):
element.decompose()
return False
# src_url_base = src.split('/')[2]
# url_base = url.split('/')[2]
# if url_base not in src_url_base:
# element.decompose()
# return False
# Handle relative URLs (which are always from the same domain)
if not src.startswith('http') and not src.startswith('//'):
return True # Keep relative URLs
# For absolute URLs, compare the base domains using the existing function
src_base_domain = get_base_domain(src)
url_base_domain = get_base_domain(url)
# If the domains don't match and both are valid, the image is external
if src_base_domain and url_base_domain and src_base_domain != url_base_domain:
element.decompose()
return False
# if kwargs.get('exclude_social_media_links', False):
# if image_src_base_domain in exclude_social_media_domains:

View File

@@ -150,6 +150,14 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
break
# Calculate how many more URLs we can process in this batch
remaining = self.max_pages - self._pages_crawled
batch_size = min(BATCH_SIZE, remaining)
if batch_size <= 0:
# No more pages to crawl
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
break
batch: List[Tuple[float, int, str, Optional[str]]] = []
# Retrieve up to BATCH_SIZE items from the priority queue.
for _ in range(BATCH_SIZE):
@@ -184,6 +192,10 @@ class BestFirstCrawlingStrategy(DeepCrawlStrategy):
# Count only successful crawls toward max_pages limit
if result.success:
self._pages_crawled += 1
# Check if we've reached the limit during batch processing
if self._pages_crawled >= self.max_pages:
self.logger.info(f"Max pages limit ({self.max_pages}) reached during batch, stopping crawl")
break # Exit the generator
yield result

View File

@@ -157,6 +157,11 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
results: List[CrawlResult] = []
while current_level and not self._cancel_event.is_set():
# Check if we've already reached max_pages before starting a new level
if self._pages_crawled >= self.max_pages:
self.logger.info(f"Max pages limit ({self.max_pages}) reached, stopping crawl")
break
next_level: List[Tuple[str, Optional[str]]] = []
urls = [url for url, _ in current_level]
@@ -221,6 +226,10 @@ class BFSDeepCrawlStrategy(DeepCrawlStrategy):
# Count only successful crawls
if result.success:
self._pages_crawled += 1
# Check if we've reached the limit during batch processing
if self._pages_crawled >= self.max_pages:
self.logger.info(f"Max pages limit ({self.max_pages}) reached during batch, stopping crawl")
break # Exit the generator
results_count += 1
yield result

View File

@@ -49,6 +49,10 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
# Count only successful crawls toward max_pages limit
if result.success:
self._pages_crawled += 1
# Check if we've reached the limit during batch processing
if self._pages_crawled >= self.max_pages:
self.logger.info(f"Max pages limit ({self.max_pages}) reached during batch, stopping crawl")
break # Exit the generator
# Only discover links from successful crawls
new_links: List[Tuple[str, Optional[str]]] = []
@@ -94,6 +98,10 @@ class DFSDeepCrawlStrategy(BFSDeepCrawlStrategy):
# and only discover links from successful crawls
if result.success:
self._pages_crawled += 1
# Check if we've reached the limit during batch processing
if self._pages_crawled >= self.max_pages:
self.logger.info(f"Max pages limit ({self.max_pages}) reached during batch, stopping crawl")
break # Exit the generator
new_links: List[Tuple[str, Optional[str]]] = []
await self.link_discovery(result, url, depth, visited, new_links, depths)

View File

@@ -541,7 +541,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
api_token: The API token for the provider.
base_url: The base URL for the API request.
api_base: The base URL for the API request.
extra_args: Additional arguments for the API request, such as temprature, max_tokens, etc.
extra_args: Additional arguments for the API request, such as temperature, max_tokens, etc.
"""
super().__init__( input_format=input_format, **kwargs)
self.llm_config = llm_config
@@ -1168,7 +1168,11 @@ In this scenario, use your best judgment to generate the schema. You need to exa
elif not query and not target_json_example:
user_message["content"] += """IMPORTANT: Since we neither have a query nor an example, it is crucial to rely solely on the HTML content provided. Leverage your expertise to determine the schema based on the repetitive patterns observed in the content."""
user_message["content"] += """IMPORTANT: Ensure your schema remains reliable by avoiding selectors that appear to generate dynamically and are not dependable. You want a reliable schema, as it consistently returns the same data even after many page reloads.
user_message["content"] += """IMPORTANT:
0/ Ensure your schema remains reliable by avoiding selectors that appear to generate dynamically and are not dependable. You want a reliable schema, as it consistently returns the same data even after many page reloads.
1/ DO NOT USE use base64 kind of classes, they are temporary and not reliable.
2/ Every selector must refer to only one unique element. You should ensure your selector points to a single element and is unique to the place that contains the information. You have to use available techniques based on CSS or XPATH requested schema to make sure your selector is unique and also not fragile, meaning if we reload the page now or in the future, the selector should remain reliable.
3/ Do not use Regex as much as possible.
Analyze the HTML and generate a JSON schema that follows the specified format. Only output valid JSON schema, nothing else.
"""

View File

@@ -14,7 +14,7 @@ class PDFCrawlerStrategy(AsyncCrawlerStrategy):
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
# Just pass through with empty HTML - scraper will handle actual processing
return AsyncCrawlResponse(
html="", # Scraper will handle the real work
html="Scraper will handle the real work", # Scraper will handle the real work
response_headers={"Content-Type": "application/pdf"},
status_code=200
)
@@ -66,6 +66,7 @@ class PDFContentScrapingStrategy(ContentScrapingStrategy):
image_save_dir=image_save_dir,
batch_size=batch_size
)
self._temp_files = [] # Track temp files for cleanup
def scrap(self, url: str, html: str, **params) -> ScrapingResult:
"""
@@ -124,7 +125,13 @@ class PDFContentScrapingStrategy(ContentScrapingStrategy):
finally:
# Cleanup temp file if downloaded
if url.startswith(("http://", "https://")):
Path(pdf_path).unlink(missing_ok=True)
try:
Path(pdf_path).unlink(missing_ok=True)
if pdf_path in self._temp_files:
self._temp_files.remove(pdf_path)
except Exception as e:
if self.logger:
self.logger.warning(f"Failed to cleanup temp file {pdf_path}: {e}")
async def ascrap(self, url: str, html: str, **kwargs) -> ScrapingResult:
# For simple cases, you can use the sync version
@@ -138,22 +145,45 @@ class PDFContentScrapingStrategy(ContentScrapingStrategy):
# Create temp file with .pdf extension
temp_file = tempfile.NamedTemporaryFile(suffix='.pdf', delete=False)
self._temp_files.append(temp_file.name)
try:
# Download PDF with streaming
response = requests.get(url, stream=True)
if self.logger:
self.logger.info(f"Downloading PDF from {url}...")
# Download PDF with streaming and timeout
# Connection timeout: 10s, Read timeout: 300s (5 minutes for large PDFs)
response = requests.get(url, stream=True, timeout=(20, 60 * 10))
response.raise_for_status()
# Get file size if available
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
# Write to temp file
with open(temp_file.name, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if self.logger and total_size > 0:
progress = (downloaded / total_size) * 100
if progress % 10 < 0.1: # Log every 10%
self.logger.debug(f"PDF download progress: {progress:.0f}%")
if self.logger:
self.logger.info(f"PDF downloaded successfully: {temp_file.name}")
return temp_file.name
except requests.exceptions.Timeout as e:
# Clean up temp file if download fails
Path(temp_file.name).unlink(missing_ok=True)
self._temp_files.remove(temp_file.name)
raise RuntimeError(f"Timeout downloading PDF from {url}: {str(e)}")
except Exception as e:
# Clean up temp file if download fails
Path(temp_file.name).unlink(missing_ok=True)
self._temp_files.remove(temp_file.name)
raise RuntimeError(f"Failed to download PDF from {url}: {str(e)}")
elif url.startswith("file://"):

View File

@@ -6,6 +6,7 @@ import html
import lxml
import re
import os
import subprocess
import platform
from .prompts import PROMPT_EXTRACT_BLOCKS
from array import array
@@ -42,6 +43,29 @@ from itertools import chain
from collections import deque
from typing import Generator, Iterable
# Monkey patch to fix wildcard handling in urllib.robotparser
from urllib.robotparser import RuleLine
import re
original_applies_to = RuleLine.applies_to
def patched_applies_to(self, filename):
# Handle wildcards in paths
if '*' in self.path or '%2A' in self.path or self.path in ("*", "%2A"):
pattern = self.path.replace('%2A', '*')
pattern = re.escape(pattern).replace('\\*', '.*')
pattern = '^' + pattern
if pattern.endswith('\\$'):
pattern = pattern[:-2] + '$'
try:
return bool(re.match(pattern, filename))
except re.error:
return original_applies_to(self, filename)
return original_applies_to(self, filename)
RuleLine.applies_to = patched_applies_to
# Monkey patch ends
def chunk_documents(
documents: Iterable[str],
chunk_token_threshold: int,
@@ -135,13 +159,20 @@ def merge_chunks(
word_token_ratio: float = 1.0,
splitter: Callable = None
) -> List[str]:
"""Merges documents into chunks of specified token size.
"""
Merges a sequence of documents into chunks based on a target token count, with optional overlap.
Each document is split into tokens using the provided splitter function (defaults to str.split). Tokens are distributed into chunks aiming for the specified target size, with optional overlapping tokens between consecutive chunks. Returns a list of non-empty merged chunks as strings.
Args:
docs: Input documents
target_size: Desired token count per chunk
overlap: Number of tokens to overlap between chunks
word_token_ratio: Multiplier for word->token conversion
docs: Sequence of input document strings to be merged.
target_size: Target number of tokens per chunk.
overlap: Number of tokens to overlap between consecutive chunks.
word_token_ratio: Multiplier to estimate token count from word count.
splitter: Callable used to split each document into tokens.
Returns:
List of merged document chunks as strings, each not exceeding the target token size.
"""
# Pre-tokenize all docs and store token counts
splitter = splitter or str.split
@@ -150,7 +181,7 @@ def merge_chunks(
total_tokens = 0
for doc in docs:
tokens = doc.split()
tokens = splitter(doc)
count = int(len(tokens) * word_token_ratio)
if count: # Skip empty docs
token_counts.append(count)
@@ -303,7 +334,7 @@ class RobotsParser:
robots_url = f"{scheme}://{domain}/robots.txt"
async with aiohttp.ClientSession() as session:
async with session.get(robots_url, timeout=2) as response:
async with session.get(robots_url, timeout=2, ssl=False) as response:
if response.status == 200:
rules = await response.text()
self._cache_rules(domain, rules)
@@ -1109,6 +1140,23 @@ def get_content_of_website_optimized(
css_selector: str = None,
**kwargs,
) -> Dict[str, Any]:
"""
Extracts and cleans content from website HTML, optimizing for useful media and contextual information.
Parses the provided HTML to extract internal and external links, filters and scores images for usefulness, gathers contextual descriptions for media, removes unwanted or low-value elements, and converts the cleaned HTML to Markdown. Also extracts metadata and returns all structured content in a dictionary.
Args:
url: The URL of the website being processed.
html: The raw HTML content to extract from.
word_count_threshold: Minimum word count for elements to be retained.
css_selector: Optional CSS selector to restrict extraction to specific elements.
Returns:
A dictionary containing Markdown content, cleaned HTML, extraction success status, media and link lists, and metadata.
Raises:
InvalidCSSSelectorError: If a provided CSS selector does not match any elements.
"""
if not html:
return None
@@ -1151,6 +1199,20 @@ def get_content_of_website_optimized(
def process_image(img, url, index, total_images):
# Check if an image has valid display and inside undesired html elements
"""
Processes an HTML image element to determine its relevance and extract metadata.
Evaluates an image's visibility, context, and usefulness based on its attributes and parent elements. If the image passes validation and exceeds a usefulness score threshold, returns a dictionary with its source, alt text, contextual description, score, and type. Otherwise, returns None.
Args:
img: The BeautifulSoup image tag to process.
url: The base URL of the page containing the image.
index: The index of the image in the list of images on the page.
total_images: The total number of images on the page.
Returns:
A dictionary with image metadata if the image is considered useful, or None otherwise.
"""
def is_valid_image(img, parent, parent_classes):
style = img.get("style", "")
src = img.get("src", "")
@@ -1172,6 +1234,20 @@ def get_content_of_website_optimized(
# Score an image for it's usefulness
def score_image_for_usefulness(img, base_url, index, images_count):
# Function to parse image height/width value and units
"""
Scores an HTML image element for usefulness based on size, format, attributes, and position.
The function evaluates an image's dimensions, file format, alt text, and its position among all images on the page to assign a usefulness score. Higher scores indicate images that are likely more relevant or informative for content extraction or summarization.
Args:
img: The HTML image element to score.
base_url: The base URL used to resolve relative image sources.
index: The position of the image in the list of images on the page (zero-based).
images_count: The total number of images on the page.
Returns:
An integer usefulness score for the image.
"""
def parse_dimension(dimension):
if dimension:
match = re.match(r"(\d+)(\D*)", dimension)
@@ -1186,6 +1262,16 @@ def get_content_of_website_optimized(
# Fetch image file metadata to extract size and extension
def fetch_image_file_size(img, base_url):
# If src is relative path construct full URL, if not it may be CDN URL
"""
Fetches the file size of an image by sending a HEAD request to its URL.
Args:
img: A BeautifulSoup tag representing the image element.
base_url: The base URL to resolve relative image sources.
Returns:
The value of the "Content-Length" header as a string if available, otherwise None.
"""
img_url = urljoin(base_url, img.get("src"))
try:
response = requests.head(img_url)
@@ -1196,8 +1282,6 @@ def get_content_of_website_optimized(
return None
except InvalidSchema:
return None
finally:
return
image_height = img.get("height")
height_value, height_unit = parse_dimension(image_height)
@@ -2822,5 +2906,73 @@ def preprocess_html_for_schema(html_content, text_threshold=100, attr_value_thre
except Exception as e:
# Fallback for parsing errors
return html_content[:max_size] if len(html_content) > max_size else html_content
return html_content[:max_size] if len(html_content) > max_size else html_content
def start_colab_display_server():
"""
Start virtual display server in Google Colab.
Raises error if not running in Colab environment.
"""
# Check if running in Google Colab
try:
import google.colab
from google.colab import output
from IPython.display import IFrame, display
except ImportError:
raise RuntimeError("This function must be run in Google Colab environment.")
import os, time, subprocess
os.environ["DISPLAY"] = ":99"
# Xvfb
xvfb = subprocess.Popen(["Xvfb", ":99", "-screen", "0", "1280x720x24"])
time.sleep(2)
# minimal window manager
fluxbox = subprocess.Popen(["fluxbox"])
# VNC → X
x11vnc = subprocess.Popen(["x11vnc",
"-display", ":99",
"-nopw", "-forever", "-shared",
"-rfbport", "5900", "-quiet"])
# websockify → VNC
novnc = subprocess.Popen(["/opt/novnc/utils/websockify/run",
"6080", "localhost:5900",
"--web", "/opt/novnc"])
time.sleep(2) # give ports a moment
# Colab proxy url
url = output.eval_js("google.colab.kernel.proxyPort(6080)")
display(IFrame(f"{url}/vnc.html?autoconnect=true&resize=scale", width=1024, height=768))
def setup_colab_environment():
"""
Alternative setup using IPython magic commands
"""
from IPython import get_ipython
ipython = get_ipython()
print("🚀 Setting up Crawl4AI environment in Google Colab...")
# Run the bash commands
ipython.run_cell_magic('bash', '', '''
set -e
echo "📦 Installing system dependencies..."
apt-get update -y
apt-get install -y xvfb x11vnc fluxbox websockify git
echo "📥 Setting up virtual display..."
git clone https://github.com/novnc/noVNC /opt/novnc
git clone https://github.com/novnc/websockify /opt/novnc/utils/websockify
pip install -q nest_asyncio google-colab
echo "✅ Setup complete!"
''')