diff --git a/.claude/settings.local.json b/.claude/settings.local.json
new file mode 100644
index 00000000..e8f289d7
--- /dev/null
+++ b/.claude/settings.local.json
@@ -0,0 +1,3 @@
+{
+ "enableAllProjectMcpServers": false
+}
\ No newline at end of file
diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py
index 2c6c2f29..c0d2b424 100644
--- a/crawl4ai/__init__.py
+++ b/crawl4ai/__init__.py
@@ -2,7 +2,8 @@
import warnings
from .async_webcrawler import AsyncWebCrawler, CacheMode
-from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig
+# MODIFIED: Add SeedingConfig here
+from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig
from .content_scraping_strategy import (
ContentScrapingStrategy,
@@ -65,6 +66,8 @@ from .deep_crawling import (
DFSDeepCrawlStrategy,
DeepCrawlDecorator,
)
+# NEW: Import AsyncUrlSeeder
+from .async_url_seeder import AsyncUrlSeeder
from .utils import (
start_colab_display_server,
@@ -78,6 +81,10 @@ __all__ = [
"BrowserProfiler",
"LLMConfig",
"GeolocationConfig",
+ # NEW: Add SeedingConfig
+ "SeedingConfig",
+ # NEW: Add AsyncUrlSeeder
+ "AsyncUrlSeeder",
"DeepCrawlStrategy",
"BFSDeepCrawlStrategy",
"BestFirstCrawlingStrategy",
@@ -160,4 +167,4 @@ __all__ = [
# Disable all Pydantic warnings
warnings.filterwarnings("ignore", module="pydantic")
-# pydantic_warnings.filter_warnings()
+# pydantic_warnings.filter_warnings()
\ No newline at end of file
diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py
index 3fcd9911..4eb116c6 100644
--- a/crawl4ai/async_configs.py
+++ b/crawl4ai/async_configs.py
@@ -207,7 +207,6 @@ class GeolocationConfig:
config_dict.update(kwargs)
return GeolocationConfig.from_dict(config_dict)
-
class ProxyConfig:
def __init__(
self,
@@ -318,8 +317,6 @@ class ProxyConfig:
config_dict.update(kwargs)
return ProxyConfig.from_dict(config_dict)
-
-
class BrowserConfig:
"""
Configuration class for setting up a browser instance and its context in AsyncPlaywrightCrawlerStrategy.
@@ -597,7 +594,6 @@ class BrowserConfig:
return config
return BrowserConfig.from_kwargs(config)
-
class HTTPCrawlerConfig:
"""HTTP-specific crawler configuration"""
@@ -1329,7 +1325,6 @@ class CrawlerRunConfig():
config_dict.update(kwargs)
return CrawlerRunConfig.from_kwargs(config_dict)
-
class LLMConfig:
def __init__(
self,
@@ -1414,4 +1409,51 @@ class LLMConfig:
config_dict.update(kwargs)
return LLMConfig.from_kwargs(config_dict)
+class SeedingConfig:
+ """
+ Configuration class for URL discovery and pre-validation via AsyncUrlSeeder.
+ """
+ def __init__(
+ self,
+ source: str = "sitemap+cc", # Options: "sitemap", "cc", "sitemap+cc"
+ pattern: Optional[str] = "*", # URL pattern to filter discovered URLs (e.g., "*example.com/blog/*")
+ live_check: bool = False, # Whether to perform HEAD requests to verify URL liveness
+ extract_head: bool = False, # Whether to fetch and parse
section for metadata
+ max_urls: int = -1, # Maximum number of URLs to discover (default: -1 for no limit)
+ concurrency: int = 1000, # Maximum concurrent requests for live checks/head extraction
+ hits_per_sec: int = 5, # Rate limit in requests per second
+ force: bool = False, # If True, bypasses the AsyncUrlSeeder's internal .jsonl cache
+ base_directory: Optional[str] = None, # Base directory for UrlSeeder's cache files (.jsonl)
+ llm_config: Optional[LLMConfig] = None, # Forward LLM config for future use (e.g., relevance scoring)
+ verbose: Optional[bool] = None, # Override crawler's general verbose setting
+ query: Optional[str] = None, # Search query for relevance scoring
+ score_threshold: Optional[float] = None, # Minimum relevance score to include URL (0.0-1.0)
+ scoring_method: str = "bm25", # Scoring method: "bm25" (default), future: "semantic"
+ ):
+ self.source = source
+ self.pattern = pattern
+ self.live_check = live_check
+ self.extract_head = extract_head
+ self.max_urls = max_urls
+ self.concurrency = concurrency
+ self.hits_per_sec = hits_per_sec
+ self.force = force
+ self.base_directory = base_directory
+ self.llm_config = llm_config
+ self.verbose = verbose
+ self.query = query
+ self.score_threshold = score_threshold
+ self.scoring_method = scoring_method
+ # Add to_dict, from_kwargs, and clone methods for consistency
+ def to_dict(self) -> Dict[str, Any]:
+ return {k: v for k, v in self.__dict__.items() if k != 'llm_config' or v is not None}
+
+ @staticmethod
+ def from_kwargs(kwargs: Dict[str, Any]) -> 'SeedingConfig':
+ return SeedingConfig(**kwargs)
+
+ def clone(self, **kwargs: Any) -> 'SeedingConfig':
+ config_dict = self.to_dict()
+ config_dict.update(kwargs)
+ return SeedingConfig.from_kwargs(config_dict)
diff --git a/crawl4ai/async_logger.py b/crawl4ai/async_logger.py
index 49c7ee6f..e203b6c9 100644
--- a/crawl4ai/async_logger.py
+++ b/crawl4ai/async_logger.py
@@ -29,7 +29,7 @@ class LogLevel(Enum):
class LogColor(str, Enum):
"""Enum for log colors."""
- DEBUG = "lightblack"
+ DEBUG = "bright_black"
INFO = "cyan"
SUCCESS = "green"
WARNING = "yellow"
diff --git a/crawl4ai/async_url_seeder.py b/crawl4ai/async_url_seeder.py
new file mode 100644
index 00000000..b9dce91a
--- /dev/null
+++ b/crawl4ai/async_url_seeder.py
@@ -0,0 +1,944 @@
+"""
+async_url_seeder.py
+Fast async URL discovery for Crawl4AI
+
+Features
+--------
+* Common-Crawl streaming via httpx.AsyncClient (HTTP/2, keep-alive)
+* robots.txt → sitemap chain (.gz + nested indexes) via async httpx
+* Per-domain CDX result cache on disk (~/.crawl4ai/__.jsonl)
+* Optional HEAD-only liveness check
+* Optional partial download + meta parsing
+* Global hits-per-second rate-limit via asyncio.Semaphore
+* Concurrency in the thousands — fine on a single event-loop
+"""
+
+from __future__ import annotations
+import aiofiles, asyncio, gzip, hashlib, io, json, os, pathlib, re, time
+from datetime import timedelta
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
+from urllib.parse import quote, urljoin
+
+import httpx
+import fnmatch
+try:
+ from lxml import html as lxml_html
+ from lxml import etree
+ LXML = True
+except ImportError:
+ LXML = False
+try:
+ import brotli
+ HAS_BROTLI = True
+except ImportError:
+ HAS_BROTLI = False
+try:
+ import rank_bm25
+ HAS_BM25 = True
+except ImportError:
+ HAS_BM25 = False
+
+# Import AsyncLoggerBase from crawl4ai's logger module
+# Assuming crawl4ai/async_logger.py defines AsyncLoggerBase
+# You might need to adjust this import based on your exact file structure
+from .async_logger import AsyncLoggerBase, AsyncLogger # Import AsyncLogger for default if needed
+
+# Import SeedingConfig for type hints
+from typing import TYPE_CHECKING
+if TYPE_CHECKING:
+ from .async_configs import SeedingConfig
+
+
+# ────────────────────────────────────────────────────────────────────────── consts
+COLLINFO_URL = "https://index.commoncrawl.org/collinfo.json"
+# CACHE_DIR = pathlib.Path("~/.crawl4ai").expanduser() # REMOVED: now managed by __init__
+# CACHE_DIR.mkdir(exist_ok=True) # REMOVED: now managed by __init__
+# INDEX_CACHE = CACHE_DIR / "latest_cc_index.txt" # REMOVED: now managed by __init__
+TTL = timedelta(days=7) # Keeping this constant as it's a seeder-specific TTL
+
+_meta_rx = re.compile(
+ r']*?(?:name|property|http-equiv)\s*=\s*["\']?([^"\' >]+)[^>]*?content\s*=\s*["\']?([^"\' >]+)[^>]*?)\/?>',
+ re.I)
+_charset_rx = re.compile(r']*charset=["\']?([^"\' >]+)', re.I)
+_title_rx = re.compile(r'(.*?)', re.I|re.S)
+_link_rx = re.compile(r']*rel=["\']?([^"\' >]+)[^>]*href=["\']?([^"\' >]+)', re.I)
+
+# ────────────────────────────────────────────────────────────────────────── helpers
+def _match(url: str, pattern: str) -> bool:
+ if fnmatch.fnmatch(url, pattern):
+ return True
+ canon = url.split("://", 1)[-1]
+ return (fnmatch.fnmatch(canon, pattern)
+ or (canon.startswith("www.") and fnmatch.fnmatch(canon[4:], pattern)))
+
+def _parse_head(src: str) -> Dict[str, Any]:
+ if LXML:
+ try:
+ if isinstance(src, str):
+ src = src.encode("utf-8", "replace") # strip Unicode, let lxml decode
+ doc = lxml_html.fromstring(src)
+ except (ValueError, etree.ParserError):
+ return {} # malformed, bail gracefully
+ info: Dict[str, Any] = {
+ "title": (doc.find(".//title").text or "").strip()
+ if doc.find(".//title") is not None else None,
+ "charset": None,
+ "meta": {}, "link": {}, "jsonld": []
+ }
+ for el in doc.xpath(".//meta"):
+ k = el.attrib.get("name") or el.attrib.get("property") or el.attrib.get("http-equiv")
+ if k: info["meta"][k.lower()] = el.attrib.get("content", "")
+ elif "charset" in el.attrib: info["charset"] = el.attrib["charset"].lower()
+ for el in doc.xpath(".//link"):
+ rel = " ".join(el.attrib.get("rel", [])).lower()
+ if not rel: continue
+ entry = {a: el.attrib[a] for a in ("href","as","type","hreflang") if a in el.attrib}
+ info["link"].setdefault(rel, []).append(entry)
+ # Extract JSON-LD structured data
+ for script in doc.xpath('.//script[@type="application/ld+json"]'):
+ if script.text:
+ try:
+ jsonld_data = json.loads(script.text.strip())
+ info["jsonld"].append(jsonld_data)
+ except json.JSONDecodeError:
+ pass
+ # Extract html lang attribute
+ html_elem = doc.find(".//html")
+ if html_elem is not None:
+ info["lang"] = html_elem.attrib.get("lang", "")
+ return info
+ # regex fallback
+ info: Dict[str,Any] = {"title":None,"charset":None,"meta":{},"link":{},"jsonld":[],"lang":""}
+ m=_title_rx.search(src); info["title"]=m.group(1).strip() if m else None
+ for k,v in _meta_rx.findall(src): info["meta"][k.lower()]=v
+ m=_charset_rx.search(src); info["charset"]=m.group(1).lower() if m else None
+ for rel,href in _link_rx.findall(src):
+ info["link"].setdefault(rel.lower(),[]).append({"href":href})
+ # Try to extract JSON-LD with regex
+ jsonld_pattern = re.compile(r'', re.I|re.S)
+ for match in jsonld_pattern.findall(src):
+ try:
+ jsonld_data = json.loads(match.strip())
+ info["jsonld"].append(jsonld_data)
+ except json.JSONDecodeError:
+ pass
+ # Try to extract lang attribute
+ lang_match = re.search(r']*lang=["\']?([^"\' >]+)', src, re.I)
+ if lang_match:
+ info["lang"] = lang_match.group(1)
+ return info
+
+# ────────────────────────────────────────────────────────────────────────── class
+class AsyncUrlSeeder:
+ """
+ Async version of UrlSeeder.
+ Call pattern is await/async for / async with.
+
+ Public coroutine
+ ----------------
+ await seed.urls(...)
+ returns List[Dict[str,Any]] (url, status, head_data)
+ """
+
+ def __init__(
+ self,
+ ttl: timedelta = TTL,
+ client: Optional[httpx.AsyncClient]=None,
+ logger: Optional[AsyncLoggerBase] = None, # NEW: Add logger parameter
+ base_directory: Optional[Union[str, pathlib.Path]] = None, # NEW: Add base_directory
+ cache_root: Optional[Union[str, Path]] = None,
+ ):
+ self.ttl = ttl
+ self.client = client or httpx.AsyncClient(http2=True, timeout=20, headers={
+ "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) +AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
+ })
+ self.logger = logger # Store the logger instance
+ self.base_directory = pathlib.Path(base_directory or os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())) # Resolve base_directory
+ self.cache_dir = self.base_directory / ".crawl4ai" / "seeder_cache" # NEW: Specific cache dir for seeder
+ self.cache_dir.mkdir(parents=True, exist_ok=True) # Ensure it exists
+ self.index_cache_path = self.cache_dir / "latest_cc_index.txt" # NEW: Index cache path
+
+ # defer – grabbing the index inside an active loop blows up
+ self.index_id: Optional[str] = None
+ self._rate_sem: Optional[asyncio.Semaphore] = None
+
+ # ───────── cache dirs ─────────
+ self.cache_root = Path(os.path.expanduser(cache_root or "~/.cache/url_seeder"))
+ (self.cache_root / "live").mkdir(parents=True, exist_ok=True)
+ (self.cache_root / "head").mkdir(exist_ok=True)
+
+ def _log(self, level: str, message: str, tag: str = "URL_SEED", **kwargs: Any):
+ """Helper to log messages using the provided logger, if available."""
+ if self.logger:
+ log_method = getattr(self.logger, level, None)
+ if log_method:
+ log_method(message=message, tag=tag, params=kwargs.get('params', {}))
+ # else: # Fallback for unknown level, should not happen with AsyncLoggerBase
+ # print(f"[{tag}] {level.upper()}: {message.format(**kwargs)}")
+
+ # ───────── cache helpers ─────────
+ def _cache_path(self, kind: str, url: str) -> Path:
+ h = hashlib.sha1(url.encode()).hexdigest()
+ return self.cache_root / kind / f"{h}.json"
+
+ def _cache_get(self, kind: str, url: str) -> Optional[Dict[str, Any]]:
+ p = self._cache_path(kind, url)
+ if not p.exists():
+ return None
+ # TTL check
+ if time.time() - p.stat().st_mtime > self.ttl.total_seconds():
+ return None
+ try:
+ return json.loads(p.read_text())
+ except Exception:
+ return None
+
+ def _cache_set(self, kind: str, url: str, data: Dict[str, Any]) -> None:
+ try:
+ self._cache_path(kind, url).write_text(
+ json.dumps(data, separators=(",", ":"))
+ )
+ except Exception:
+ pass
+
+
+ # ─────────────────────────────── discovery entry
+ async def urls(self,
+ domain: str,
+ config: "SeedingConfig",
+ ) -> List[Dict[str,Any]]:
+ """
+ Fetch URLs for a domain using configuration from SeedingConfig.
+
+ Parameters
+ ----------
+ domain : str
+ The domain to fetch URLs for (e.g., "example.com")
+ config : SeedingConfig
+ Configuration object containing all seeding parameters
+ """
+ # Extract parameters from config
+ pattern = config.pattern or "*"
+ source = config.source
+ live_check = config.live_check
+ extract_head = config.extract_head
+ concurrency = config.concurrency
+ head_timeout = 5 # Default timeout for HEAD requests
+ hits_per_sec = config.hits_per_sec
+ self.force = config.force # Store force flag as instance attribute
+ force = config.force
+ verbose = config.verbose if config.verbose is not None else (self.logger.verbose if self.logger else False)
+ max_urls = config.max_urls if config.max_urls is not None else -1
+ query = config.query
+ score_threshold = config.score_threshold
+ scoring_method = config.scoring_method
+
+ # Ensure seeder's logger verbose matches the config's verbose if it's set
+ if self.logger and hasattr(self.logger, 'verbose') and config.verbose is not None:
+ self.logger.verbose = config.verbose
+
+ # ensure we have the latest CC collection id
+ if self.index_id is None:
+ self.index_id = await self._latest_index()
+
+ # Parse source parameter - split by '+' to get list of sources
+ sources = source.split('+')
+ valid_sources = {"cc", "sitemap"}
+ for s in sources:
+ if s not in valid_sources:
+ raise ValueError(f"Invalid source '{s}'. Valid sources are: {', '.join(valid_sources)}")
+
+ if hits_per_sec:
+ if hits_per_sec <= 0:
+ self._log("warning", "hits_per_sec must be positive. Disabling rate limiting.", tag="URL_SEED")
+ self._rate_sem = None
+ else:
+ self._rate_sem = asyncio.Semaphore(hits_per_sec)
+ else:
+ self._rate_sem = None # Ensure it's None if no rate limiting
+
+ self._log("info", "Starting URL seeding for {domain} with source={source}",
+ params={"domain": domain, "source": source}, tag="URL_SEED")
+
+ # choose stream
+ async def gen():
+ if "sitemap" in sources:
+ self._log("debug", "Fetching from sitemaps...", tag="URL_SEED")
+ async for u in self._from_sitemaps(domain, pattern, force):
+ yield u
+ if "cc" in sources:
+ self._log("debug", "Fetching from Common Crawl...", tag="URL_SEED")
+ async for u in self._from_cc(domain, pattern, force):
+ yield u
+
+ queue = asyncio.Queue()
+ producer_done = asyncio.Event()
+ stop_event = asyncio.Event()
+ seen: set[str] = set()
+
+ async def producer():
+ try:
+ async for u in gen():
+ if u in seen:
+ self._log("debug", "Skipping duplicate URL: {url}",
+ params={"url": u}, tag="URL_SEED")
+ continue
+ if stop_event.is_set():
+ self._log("info", "Producer stopping due to max_urls limit.", tag="URL_SEED")
+ break
+ await queue.put(u)
+ except Exception as e:
+ self._log("error", "Producer encountered an error: {error}", params={"error": str(e)}, tag="URL_SEED")
+ finally:
+ producer_done.set()
+ self._log("debug", "Producer finished.", tag="URL_SEED")
+
+
+ async def worker(res_list: List[Dict[str,Any]]):
+ while True:
+ if queue.empty() and producer_done.is_set():
+ # self._log("debug", "Worker exiting: queue empty and producer done.", tag="URL_SEED")
+ break
+ try:
+ url = await asyncio.wait_for(queue.get(), 5) # Increased timeout slightly
+ except asyncio.TimeoutError:
+ continue # Keep checking queue and producer_done status
+ except Exception as e:
+ self._log("error", "Worker failed to get URL from queue: {error}", params={"error": str(e)}, tag="URL_SEED")
+ continue
+
+ if max_urls > 0 and len(res_list) >= max_urls:
+ self._log(
+ "info",
+ "Worker stopping due to max_urls limit.",
+ tag="URL_SEED",
+ )
+ stop_event.set()
+
+ # mark the current item done
+ queue.task_done()
+
+ # flush whatever is still sitting in the queue so
+ # queue.join() can finish cleanly
+ while not queue.empty():
+ try:
+ queue.get_nowait()
+ queue.task_done()
+ except asyncio.QueueEmpty:
+ break
+ break
+
+ if self._rate_sem: # global QPS control
+ async with self._rate_sem:
+ await self._validate(url, res_list, live_check, extract_head,
+ head_timeout, verbose)
+ else:
+ await self._validate(url, res_list, live_check, extract_head,
+ head_timeout, verbose)
+ queue.task_done() # Mark task as done for queue.join() if ever used
+
+ # launch
+ results: List[Dict[str,Any]] = []
+ prod_task = asyncio.create_task(producer())
+ workers = [asyncio.create_task(worker(results)) for _ in range(concurrency)]
+
+ # Wait for all workers to finish
+ await asyncio.gather(prod_task, *workers)
+ await queue.join() # Ensure all queued items are processed
+
+ self._log("info", "Finished URL seeding for {domain}. Total URLs: {count}",
+ params={"domain": domain, "count": len(results)}, tag="URL_SEED")
+
+ # Apply BM25 scoring if query is provided and extract_head is enabled
+ if query and extract_head and scoring_method == "bm25":
+ self._log("info", "Applying BM25 scoring for query: '{query}'",
+ params={"query": query}, tag="URL_SEED")
+
+ # Extract text contexts from all results
+ documents = []
+ valid_indices = []
+ for i, result in enumerate(results):
+ if result.get("head_data"):
+ text_context = self._extract_text_context(result["head_data"])
+ if text_context: # Only include non-empty contexts
+ documents.append(text_context)
+ valid_indices.append(i)
+
+ if documents:
+ # Calculate BM25 scores
+ scores = self._calculate_bm25_score(query, documents)
+
+ # Add scores to results
+ for idx, score in zip(valid_indices, scores):
+ results[idx]["relevance_score"] = float(score)
+
+ # Add zero scores to results without head_data
+ for i, result in enumerate(results):
+ if i not in valid_indices:
+ result["relevance_score"] = 0.0
+
+ # Filter by score threshold if specified
+ if score_threshold is not None:
+ original_count = len(results)
+ results = [r for r in results if r.get("relevance_score", 0.0) >= score_threshold]
+ self._log("info", "Filtered {filtered} URLs below score threshold {threshold}. Remaining: {remaining}",
+ params={"filtered": original_count - len(results),
+ "threshold": score_threshold,
+ "remaining": len(results)}, tag="URL_SEED")
+
+ # Sort by relevance score (highest first)
+ results.sort(key=lambda x: x.get("relevance_score", 0.0), reverse=True)
+ else:
+ self._log("warning", "No valid head data found for BM25 scoring.", tag="URL_SEED")
+ # Add zero scores to all results
+ for result in results:
+ result["relevance_score"] = 0.0
+ elif query and not extract_head:
+ self._log("warning", "Query provided but extract_head is False. Enable extract_head for relevance scoring.", tag="URL_SEED")
+
+ return results[:max_urls] if max_urls > 0 else results
+
+ async def many_urls(
+ self,
+ domains: Sequence[str],
+ config: "SeedingConfig",
+ ) -> Dict[str, List[Dict[str,Any]]]:
+ """
+ Fetch URLs for many domains in parallel.
+
+ Parameters
+ ----------
+ domains : Sequence[str]
+ List of domains to fetch URLs for
+ config : SeedingConfig
+ Configuration object containing all seeding parameters
+
+ Returns a {domain: urls-list} dict.
+ """
+ self._log("info", "Starting URL seeding for {count} domains...",
+ params={"count": len(domains)}, tag="URL_SEED")
+
+ # Ensure seeder's logger verbose matches the config's verbose if it's set
+ if self.logger and hasattr(self.logger, 'verbose') and config.verbose is not None:
+ self.logger.verbose = config.verbose
+
+ tasks = [
+ self.urls(domain, config)
+ for domain in domains
+ ]
+ results = await asyncio.gather(*tasks)
+
+ final_results = dict(zip(domains, results))
+ self._log("info", "Finished URL seeding for multiple domains.", tag="URL_SEED")
+ return final_results
+
+ async def _resolve_head(self, url: str) -> Optional[str]:
+ """
+ HEAD-probe a URL.
+
+ Returns:
+ * the same URL if it answers 2xx,
+ * the absolute redirect target if it answers 3xx,
+ * None on any other status or network error.
+ """
+ try:
+ r = await self.client.head(url, timeout=10, follow_redirects=False)
+
+ # direct hit
+ if 200 <= r.status_code < 300:
+ return str(r.url)
+
+ # single level redirect
+ if r.status_code in (301, 302, 303, 307, 308):
+ loc = r.headers.get("location")
+ if loc:
+ return urljoin(url, loc)
+
+ return None
+
+ except Exception as e:
+ self._log("debug", "HEAD {url} failed: {err}",
+ params={"url": url, "err": str(e)}, tag="URL_SEED")
+ return None
+
+
+ # ─────────────────────────────── CC
+ async def _from_cc(self, domain:str, pattern:str, force:bool):
+ import re
+ digest = hashlib.md5(pattern.encode()).hexdigest()[:8]
+
+ # ── normalise for CC (strip scheme, query, fragment)
+ raw = re.sub(r'^https?://', '', domain).split('#', 1)[0].split('?', 1)[0].lstrip('.')
+
+ # ── sanitize only for cache-file name
+ safe = re.sub('[/?#]+', '_', raw)
+ path = self.cache_dir / f"{self.index_id}_{safe}_{digest}.jsonl"
+
+ if path.exists() and not force:
+ self._log("info", "Loading CC URLs for {domain} from cache: {path}",
+ params={"domain": domain, "path": path}, tag="URL_SEED")
+ async with aiofiles.open(path,"r") as fp:
+ async for line in fp:
+ url=line.strip()
+ if _match(url,pattern): yield url
+ return
+
+ # build CC glob – if a path is present keep it, else add trailing /*
+ glob = f"*.{raw}*" if '/' in raw else f"*.{raw}/*"
+ url = f"https://index.commoncrawl.org/{self.index_id}-index?url={quote(glob, safe='*')}&output=json"
+
+ retries=(1,3,7)
+ self._log("info", "Fetching CC URLs for {domain} from Common Crawl index: {url}",
+ params={"domain": domain, "url": url}, tag="URL_SEED")
+ for i,d in enumerate(retries+(-1,)): # last -1 means don't retry
+ try:
+ async with self.client.stream("GET", url) as r:
+ r.raise_for_status()
+ async with aiofiles.open(path,"w") as fp:
+ async for line in r.aiter_lines():
+ rec = json.loads(line)
+ u = rec["url"]
+ await fp.write(u+"\n")
+ if _match(u,pattern): yield u
+ return
+ except httpx.HTTPStatusError as e:
+ if e.response.status_code==503 and ibool:
+ try:
+ r=await self.client.head(url, timeout=timeout,
+ headers={"Range":"bytes=0-0","Accept-Encoding":"identity"})
+ r.raise_for_status() # Raise for bad status codes (4xx, 5xx)
+ return True
+ except httpx.RequestError as e:
+ self._log("debug", "HEAD check network error for {url}: {error}",
+ params={"url": url, "error": str(e)}, tag="URL_SEED")
+ return False
+ except httpx.HTTPStatusError as e:
+ self._log("debug", "HEAD check HTTP status error for {url}: {status_code}",
+ params={"url": url, "status_code": e.response.status_code}, tag="URL_SEED")
+ return False
+ except Exception as e:
+ self._log("error", "Unexpected error during HEAD check for {url}: {error}",
+ params={"url": url, "error": str(e)}, tag="URL_SEED")
+ return False
+
+ async def _fetch_head(
+ self,
+ url: str,
+ timeout: int,
+ max_redirects: int = 5,
+ max_bytes: int = 65_536, # stop after 64 kB even if never comes
+ chunk_size: int = 4096, # how much we read per await
+ ):
+ for _ in range(max_redirects+1):
+ try:
+ # ask the first `max_bytes` and force plain text to avoid
+ # partial-gzip decode headaches
+ async with self.client.stream(
+ "GET",
+ url,
+ timeout=timeout,
+ headers={
+ # "Range": f"bytes=0-{max_bytes-1}", # Dropped the Range header – no need now, and some servers ignore it. We still keep an upper‐bound max_bytes as a fail-safe.
+ "Accept-Encoding": "identity",
+ },
+ follow_redirects=False,
+ ) as r:
+
+ if r.status_code in (301,302,303,307,308):
+ location = r.headers.get("Location")
+ if location:
+ url = urljoin(url, location)
+ self._log("debug", "Redirecting from {original_url} to {new_url}",
+ params={"original_url": r.url, "new_url": url}, tag="URL_SEED")
+ continue
+ else:
+ self._log("warning", "Redirect status {status_code} but no Location header for {url}",
+ params={"status_code": r.status_code, "url": r.url}, tag="URL_SEED")
+ return False, "", str(r.url) # Return original URL if no new location
+
+ # For 2xx or other non-redirect codes, proceed to read content
+ if not (200 <= r.status_code < 400): # Only allow successful codes, or continue
+ self._log("warning", "Non-success status {status_code} when fetching head for {url}",
+ params={"status_code": r.status_code, "url": r.url}, tag="URL_SEED")
+ return False, "", str(r.url)
+
+ buf = bytearray()
+ async for chunk in r.aiter_bytes(chunk_size):
+ buf.extend(chunk)
+ low = buf.lower()
+ if b"" in low or len(buf) >= max_bytes:
+ await r.aclose()
+ break
+
+ enc = r.headers.get("Content-Encoding", "").lower()
+ try:
+ if enc == "gzip" and buf[:2] == b"\x1f\x8b":
+ buf = gzip.decompress(buf)
+ elif enc == "br" and HAS_BROTLI and buf[:4] == b"\x8b\x6c\x0a\x1a":
+ buf = brotli.decompress(buf)
+ elif enc in {"gzip", "br"}:
+ # Header says “gzip” or “br” but payload is plain – ignore
+ self._log(
+ "debug",
+ "Skipping bogus {encoding} for {url}",
+ params={"encoding": enc, "url": r.url},
+ tag="URL_SEED",
+ )
+ except Exception as e:
+ self._log(
+ "warning",
+ "Decompression error for {url} ({encoding}): {error}",
+ params={"url": r.url, "encoding": enc, "error": str(e)},
+ tag="URL_SEED",
+ )
+ # fall through with raw buf
+
+ # Find the tag case-insensitively and decode
+ idx = buf.lower().find(b"")
+ if idx==-1:
+ self._log("debug", "No tag found in initial bytes of {url}",
+ params={"url": r.url}, tag="URL_SEED")
+ # If no is found, take a reasonable chunk or all if small
+ html_bytes = buf if len(buf) < 10240 else buf[:10240] # Take max 10KB if no head tag
+ else:
+ html_bytes = buf[:idx+7] # Include tag
+
+ try:
+ html = html_bytes.decode("utf-8", "replace")
+ except Exception as e:
+ self._log(
+ "warning",
+ "Failed to decode head content for {url}: {error}",
+ params={"url": r.url, "error": str(e)},
+ tag="URL_SEED",
+ )
+ html = html_bytes.decode("latin-1", "replace")
+
+ return True,html,str(r.url) # Return the actual URL after redirects
+
+ except httpx.RequestError as e:
+ self._log("debug", "Fetch head network error for {url}: {error}",
+ params={"url": url, "error": str(e)}, tag="URL_SEED")
+ return False,"",url
+
+ # If loop finishes without returning (e.g. too many redirects)
+ self._log("warning", "Exceeded max redirects ({max_redirects}) for {url}",
+ params={"max_redirects": max_redirects, "url": url}, tag="URL_SEED")
+ return False,"",url
+
+ # ─────────────────────────────── BM25 scoring helpers
+ def _extract_text_context(self, head_data: Dict[str, Any]) -> str:
+ """Extract all relevant text from head metadata for scoring."""
+ # Priority fields with their weights (for future enhancement)
+ text_parts = []
+
+ # Title
+ if head_data.get("title"):
+ text_parts.append(head_data["title"])
+
+ # Standard meta tags
+ meta = head_data.get("meta", {})
+ for key in ["description", "keywords", "author", "subject", "summary", "abstract"]:
+ if meta.get(key):
+ text_parts.append(meta[key])
+
+ # Open Graph tags
+ for key in ["og:title", "og:description", "og:site_name", "article:tag"]:
+ if meta.get(key):
+ text_parts.append(meta[key])
+
+ # Twitter Card tags
+ for key in ["twitter:title", "twitter:description", "twitter:image:alt"]:
+ if meta.get(key):
+ text_parts.append(meta[key])
+
+ # Dublin Core tags
+ for key in ["dc.title", "dc.description", "dc.subject", "dc.creator"]:
+ if meta.get(key):
+ text_parts.append(meta[key])
+
+ # JSON-LD structured data
+ for jsonld in head_data.get("jsonld", []):
+ if isinstance(jsonld, dict):
+ # Extract common fields from JSON-LD
+ for field in ["name", "headline", "description", "abstract", "keywords"]:
+ if field in jsonld:
+ if isinstance(jsonld[field], str):
+ text_parts.append(jsonld[field])
+ elif isinstance(jsonld[field], list):
+ text_parts.extend(str(item) for item in jsonld[field] if item)
+
+ # Handle @graph structures
+ if "@graph" in jsonld and isinstance(jsonld["@graph"], list):
+ for item in jsonld["@graph"]:
+ if isinstance(item, dict):
+ for field in ["name", "headline", "description"]:
+ if field in item and isinstance(item[field], str):
+ text_parts.append(item[field])
+
+ # Combine all text parts
+ return " ".join(filter(None, text_parts))
+
+ def _calculate_bm25_score(self, query: str, documents: List[str]) -> List[float]:
+ """Calculate BM25 scores for documents against a query."""
+ if not HAS_BM25:
+ self._log("warning", "rank_bm25 not installed. Returning zero scores.", tag="URL_SEED")
+ return [0.0] * len(documents)
+
+ if not query or not documents:
+ return [0.0] * len(documents)
+
+ # Tokenize query and documents (simple whitespace tokenization)
+ # For production, consider using a proper tokenizer
+ query_tokens = query.lower().split()
+ tokenized_docs = [doc.lower().split() for doc in documents]
+
+ # Handle edge case where all documents are empty
+ if all(len(doc) == 0 for doc in tokenized_docs):
+ return [0.0] * len(documents)
+
+ # Create BM25 instance and calculate scores
+ try:
+ from rank_bm25 import BM25Okapi
+ bm25 = BM25Okapi(tokenized_docs)
+ scores = bm25.get_scores(query_tokens)
+
+ # Normalize scores to 0-1 range
+ max_score = max(scores) if max(scores) > 0 else 1.0
+ normalized_scores = [score / max_score for score in scores]
+
+ return normalized_scores
+ except Exception as e:
+ self._log("error", "Error calculating BM25 scores: {error}",
+ params={"error": str(e)}, tag="URL_SEED")
+ return [0.0] * len(documents)
+
+ # ─────────────────────────────── index helper
+ async def _latest_index(self)->str:
+ if self.index_cache_path.exists() and (time.time()-self.index_cache_path.stat().st_mtime) Union[List[str], Dict[str, List[Union[str, Dict[str, Any]]]]]:
+ """
+ Discovers, filters, and optionally validates URLs for a given domain(s)
+ using sitemaps and Common Crawl archives.
+
+ Args:
+ domain_or_domains: A single domain string (e.g., "iana.org") or a list of domains.
+ config: A SeedingConfig object to control the seeding process.
+ Parameters passed directly via kwargs will override those in 'config'.
+ **kwargs: Additional parameters (e.g., `source`, `live_check`, `extract_head`,
+ `pattern`, `concurrency`, `hits_per_sec`, `force_refresh`, `verbose`)
+ that will be used to construct or update the SeedingConfig.
+
+ Returns:
+ If `extract_head` is False:
+ - For a single domain: `List[str]` of discovered URLs.
+ - For multiple domains: `Dict[str, List[str]]` mapping each domain to its URLs.
+ If `extract_head` is True:
+ - For a single domain: `List[Dict[str, Any]]` where each dict contains 'url'
+ and 'head_data' (parsed metadata).
+ - For multiple domains: `Dict[str, List[Dict[str, Any]]]` mapping each domain
+ to a list of URL data dictionaries.
+
+ Raises:
+ ValueError: If `domain_or_domains` is not a string or a list of strings.
+ Exception: Any underlying exceptions from AsyncUrlSeeder or network operations.
+
+ Example:
+ >>> # Discover URLs from sitemap with live check for 'example.com'
+ >>> result = await crawler.aseed_urls("example.com", source="sitemap", live_check=True, hits_per_sec=10)
+
+ >>> # Discover URLs from Common Crawl, extract head data for 'example.com' and 'python.org'
+ >>> multi_domain_result = await crawler.aseed_urls(
+ >>> ["example.com", "python.org"],
+ >>> source="cc", extract_head=True, concurrency=200, hits_per_sec=50
+ >>> )
+ """
+ # Initialize AsyncUrlSeeder here if it hasn't been already
+ if not self.url_seeder:
+ # Pass the crawler's base_directory for seeder's cache management
+ # Pass the crawler's logger for consistent logging
+ self.url_seeder = AsyncUrlSeeder(
+ base_directory=self.crawl4ai_folder,
+ logger=self.logger
+ )
+
+ # Merge config object with direct kwargs, giving kwargs precedence
+ seeding_config = config.clone(**kwargs) if config else SeedingConfig.from_kwargs(kwargs)
+
+ # Ensure base_directory is set for the seeder's cache
+ seeding_config.base_directory = seeding_config.base_directory or self.crawl4ai_folder
+ # Ensure the seeder uses the crawler's logger (if not already set)
+ if not self.url_seeder.logger:
+ self.url_seeder.logger = self.logger
+
+ # Pass verbose setting if explicitly provided in SeedingConfig or kwargs
+ if seeding_config.verbose is not None:
+ self.url_seeder.logger.verbose = seeding_config.verbose
+ else: # Default to crawler's verbose setting
+ self.url_seeder.logger.verbose = self.logger.verbose
+
+
+ if isinstance(domain_or_domains, str):
+ self.logger.info(
+ message="Starting URL seeding for domain: {domain}",
+ tag="SEED",
+ params={"domain": domain_or_domains}
+ )
+ return await self.url_seeder.urls(
+ domain_or_domains,
+ seeding_config
+ )
+ elif isinstance(domain_or_domains, (list, tuple)):
+ self.logger.info(
+ message="Starting URL seeding for {count} domains",
+ tag="SEED",
+ params={"count": len(domain_or_domains)}
+ )
+ # AsyncUrlSeeder.many_urls directly accepts a list of domains and individual params.
+ return await self.url_seeder.many_urls(
+ domain_or_domains,
+ seeding_config
+ )
+ else:
+ raise ValueError("`domain_or_domains` must be a string or a list of strings.")
\ No newline at end of file
diff --git a/crawl4ai/types.py b/crawl4ai/types.py
index 63fd45ba..2b044ebd 100644
--- a/crawl4ai/types.py
+++ b/crawl4ai/types.py
@@ -10,12 +10,16 @@ CacheMode = Union['CacheModeType']
CrawlResult = Union['CrawlResultType']
CrawlerHub = Union['CrawlerHubType']
BrowserProfiler = Union['BrowserProfilerType']
+# NEW: Add AsyncUrlSeederType
+AsyncUrlSeeder = Union['AsyncUrlSeederType']
# Configuration types
BrowserConfig = Union['BrowserConfigType']
CrawlerRunConfig = Union['CrawlerRunConfigType']
HTTPCrawlerConfig = Union['HTTPCrawlerConfigType']
LLMConfig = Union['LLMConfigType']
+# NEW: Add SeedingConfigType
+SeedingConfig = Union['SeedingConfigType']
# Content scraping types
ContentScrapingStrategy = Union['ContentScrapingStrategyType']
@@ -94,6 +98,8 @@ if TYPE_CHECKING:
from .models import CrawlResult as CrawlResultType
from .hub import CrawlerHub as CrawlerHubType
from .browser_profiler import BrowserProfiler as BrowserProfilerType
+ # NEW: Import AsyncUrlSeeder for type checking
+ from .async_url_seeder import AsyncUrlSeeder as AsyncUrlSeederType
# Configuration imports
from .async_configs import (
@@ -101,6 +107,8 @@ if TYPE_CHECKING:
CrawlerRunConfig as CrawlerRunConfigType,
HTTPCrawlerConfig as HTTPCrawlerConfigType,
LLMConfig as LLMConfigType,
+ # NEW: Import SeedingConfig for type checking
+ SeedingConfig as SeedingConfigType,
)
# Content scraping imports
@@ -184,4 +192,4 @@ if TYPE_CHECKING:
def create_llm_config(*args, **kwargs) -> 'LLMConfigType':
from .async_configs import LLMConfig
- return LLMConfig(*args, **kwargs)
+ return LLMConfig(*args, **kwargs)
\ No newline at end of file
diff --git a/docs/examples/url_seeder/url_seeder_demo.py b/docs/examples/url_seeder/url_seeder_demo.py
new file mode 100644
index 00000000..faf730f9
--- /dev/null
+++ b/docs/examples/url_seeder/url_seeder_demo.py
@@ -0,0 +1,261 @@
+"""
+URL Seeder Demo - Interactive showcase of Crawl4AI's URL discovery capabilities
+
+This demo shows:
+1. Basic URL discovery from sitemaps and Common Crawl
+2. Cache management and forced refresh
+3. Live URL validation and metadata extraction
+4. BM25 relevance scoring for intelligent filtering
+5. Integration with AsyncWebCrawler for the complete pipeline
+"""
+
+import asyncio
+import time
+from datetime import datetime
+from rich.console import Console
+from rich.table import Table
+from rich.panel import Panel
+from rich.progress import Progress, SpinnerColumn, BarColumn, TimeElapsedColumn
+from rich.prompt import Prompt, Confirm
+from crawl4ai import (
+ AsyncWebCrawler,
+ CrawlerRunConfig,
+ AsyncUrlSeeder,
+ SeedingConfig
+)
+
+console = Console()
+
+console.rule("[bold green]🌐 Crawl4AI URL Seeder: Interactive Demo")
+
+DOMAIN = "crawl4ai.com"
+
+# Utils
+
+def print_head_info(head_data):
+ table = Table(title=" Metadata", expand=True)
+ table.add_column("Key", style="cyan", no_wrap=True)
+ table.add_column("Value", style="magenta")
+
+ if not head_data:
+ console.print("[yellow]No head data found.")
+ return
+
+ if head_data.get("title"):
+ table.add_row("title", head_data["title"])
+ if head_data.get("charset"):
+ table.add_row("charset", head_data["charset"])
+ for k, v in head_data.get("meta", {}).items():
+ table.add_row(f"meta:{k}", v)
+ for rel, items in head_data.get("link", {}).items():
+ for item in items:
+ table.add_row(f"link:{rel}", item.get("href", ""))
+ console.print(table)
+
+
+async def section_1_basic_exploration(seed: AsyncUrlSeeder):
+ console.rule("[bold cyan]1. Basic Seeding")
+ cfg = SeedingConfig(source="cc+sitemap", pattern="*", verbose=True)
+
+ start_time = time.time()
+ with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
+ p.add_task(description="Fetching from Common Crawl + Sitemap...", total=None)
+ urls = await seed.urls(DOMAIN, cfg)
+ elapsed = time.time() - start_time
+
+ console.print(f"[green]✓ Fetched {len(urls)} URLs in {elapsed:.2f} seconds")
+ console.print(f"[dim] Speed: {len(urls)/elapsed:.0f} URLs/second[/dim]\n")
+
+ console.print("[bold]Sample URLs:[/bold]")
+ for u in urls[:5]:
+ console.print(f" • {u['url']}")
+
+
+async def section_2_cache_demo(seed: AsyncUrlSeeder):
+ console.rule("[bold cyan]2. Caching Demonstration")
+ console.print("[yellow]Using `force=True` to bypass cache and fetch fresh data.[/yellow]")
+ cfg = SeedingConfig(source="cc", pattern="*crawl4ai.com/core/*", verbose=False, force = True)
+ await seed.urls(DOMAIN, cfg)
+
+async def section_3_live_head(seed: AsyncUrlSeeder):
+ console.rule("[bold cyan]3. Live Check + Head Extraction")
+ cfg = SeedingConfig(
+ extract_head=True,
+ concurrency=10,
+ hits_per_sec=5,
+ pattern="*crawl4ai.com/*",
+ max_urls=10,
+ verbose=False,
+ )
+ urls = await seed.urls(DOMAIN, cfg)
+
+ valid = [u for u in urls if u["status"] == "valid"]
+ console.print(f"[green]Valid: {len(valid)} / {len(urls)}")
+ if valid:
+ print_head_info(valid[0]["head_data"])
+
+
+async def section_4_bm25_scoring(seed: AsyncUrlSeeder):
+ console.rule("[bold cyan]4. BM25 Relevance Scoring")
+ console.print("[yellow]Using AI-powered relevance scoring to find the most relevant content[/yellow]")
+
+ query = "markdown generation extraction strategies"
+ cfg = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query=query,
+ scoring_method="bm25",
+ score_threshold=0.3, # Only URLs with >30% relevance
+ max_urls=20,
+ verbose=False
+ )
+
+ with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
+ p.add_task(description=f"Searching for: '{query}'", total=None)
+ urls = await seed.urls(DOMAIN, cfg)
+
+ console.print(f"[green]Found {len(urls)} relevant URLs (score > 0.3)")
+
+ # Show top results with scores
+ table = Table(title="Top 5 Most Relevant Pages", expand=True)
+ table.add_column("Score", style="cyan", width=8)
+ table.add_column("Title", style="magenta")
+ table.add_column("URL", style="blue", overflow="fold")
+
+ for url in urls[:5]:
+ score = f"{url['relevance_score']:.2f}"
+ title = url['head_data'].get('title', 'No title')[:60] + "..."
+ table.add_row(score, title, url['url'])
+
+ console.print(table)
+
+async def section_5_keyword_filter_to_agent(seed: AsyncUrlSeeder):
+ console.rule("[bold cyan]5. Complete Pipeline: Discover → Filter → Crawl")
+ cfg = SeedingConfig(
+ extract_head=True,
+ concurrency=20,
+ hits_per_sec=10,
+ max_urls=10,
+ pattern="*crawl4ai.com/*",
+ force=True,
+ )
+ urls = await seed.urls(DOMAIN, cfg)
+
+ keywords = ["deep crawling", "markdown", "llm"]
+ selected = [u for u in urls if any(k in str(u["head_data"]).lower() for k in keywords)]
+
+ console.print(f"[cyan]Selected {len(selected)} URLs with relevant keywords:")
+ for u in selected[:10]:
+ console.print("•", u["url"])
+
+ console.print("\n[yellow]Passing above URLs to arun_many() LLM agent for crawling...")
+ async with AsyncWebCrawler(verbose=True) as crawler:
+ crawl_run_config = CrawlerRunConfig(
+ # Example crawl settings for these URLs:
+ only_text=True, # Just get text content
+ screenshot=False,
+ pdf=False,
+ word_count_threshold=50, # Only process pages with at least 50 words
+ stream=True,
+ verbose=False # Keep logs clean for arun_many in this demo
+ )
+
+ # Extract just the URLs from the selected results
+ urls_to_crawl = [u["url"] for u in selected]
+
+ # We'll stream results for large lists, but collect them here for demonstration
+ crawled_results_stream = await crawler.arun_many(urls_to_crawl, config=crawl_run_config)
+ final_crawled_data = []
+ async for result in crawled_results_stream:
+ final_crawled_data.append(result)
+ if len(final_crawled_data) % 5 == 0:
+ print(f" Processed {len(final_crawled_data)}/{len(urls_to_crawl)} URLs...")
+
+ print(f"\n Successfully crawled {len(final_crawled_data)} URLs.")
+ if final_crawled_data:
+ print("\n Example of a crawled result's URL and Markdown (first successful one):")
+ for result in final_crawled_data:
+ if result.success and result.markdown.raw_markdown:
+ print(f" URL: {result.url}")
+ print(f" Markdown snippet: {result.markdown.raw_markdown[:200]}...")
+ break
+ else:
+ print(" No successful crawls with markdown found.")
+ else:
+ print(" No successful crawls found.")
+
+
+async def section_6_multi_domain(seed: AsyncUrlSeeder):
+ console.rule("[bold cyan]6. Multi-Domain Discovery")
+ console.print("[yellow]Discovering Python tutorials across multiple educational sites[/yellow]\n")
+
+ domains = ["docs.python.org", "realpython.com", "docs.crawl4ai.com"]
+ cfg = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="python tutorial guide",
+ scoring_method="bm25",
+ score_threshold=0.2,
+ max_urls=5 # Per domain
+ )
+
+ start_time = time.time()
+ with Progress(SpinnerColumn(), "[progress.description]{task.description}") as p:
+ task = p.add_task(description="Discovering across domains...", total=None)
+ results = await seed.many_urls(domains, cfg)
+ elapsed = time.time() - start_time
+
+ total_urls = sum(len(urls) for urls in results.values())
+ console.print(f"[green]✓ Found {total_urls} relevant URLs across {len(domains)} domains in {elapsed:.2f}s\n")
+
+ # Show results per domain
+ for domain, urls in results.items():
+ console.print(f"[bold]{domain}:[/bold] {len(urls)} relevant pages")
+ if urls:
+ top = urls[0]
+ console.print(f" Top result: [{top['relevance_score']:.2f}] {top['head_data'].get('title', 'No title')}")
+
+
+async def main():
+ seed = AsyncUrlSeeder()
+
+ # Interactive menu
+ sections = {
+ "1": ("Basic URL Discovery", section_1_basic_exploration),
+ "2": ("Cache Management Demo", section_2_cache_demo),
+ "3": ("Live Check & Metadata Extraction", section_3_live_head),
+ "4": ("BM25 Relevance Scoring", section_4_bm25_scoring),
+ "5": ("Complete Pipeline (Discover → Filter → Crawl)", section_5_keyword_filter_to_agent),
+ "6": ("Multi-Domain Discovery", section_6_multi_domain),
+ "7": ("Run All Demos", None)
+ }
+
+ console.print("\n[bold]Available Demos:[/bold]")
+ for key, (title, _) in sections.items():
+ console.print(f" {key}. {title}")
+
+ choice = Prompt.ask("\n[cyan]Which demo would you like to run?[/cyan]",
+ choices=list(sections.keys()),
+ default="7")
+
+ console.print()
+
+ if choice == "7":
+ # Run all demos
+ for key, (title, func) in sections.items():
+ if key != "7" and func:
+ await func(seed)
+ if key != "6": # Don't pause after the last demo
+ if not Confirm.ask("\n[yellow]Continue to next demo?[/yellow]", default=True):
+ break
+ console.print()
+ else:
+ # Run selected demo
+ _, func = sections[choice]
+ await func(seed)
+
+ console.rule("[bold green]Demo Complete ✔︎")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/docs/examples/url_seeder/url_seeder_quick_demo.py b/docs/examples/url_seeder/url_seeder_quick_demo.py
new file mode 100644
index 00000000..3dc58b98
--- /dev/null
+++ b/docs/examples/url_seeder/url_seeder_quick_demo.py
@@ -0,0 +1,128 @@
+"""
+🚀 URL Seeder + AsyncWebCrawler = Magic!
+Quick demo showing discovery → filter → crawl pipeline
+"""
+import asyncio
+from crawl4ai import AsyncUrlSeeder, AsyncWebCrawler, SeedingConfig, CrawlerRunConfig, AsyncLogger, DefaultMarkdownGenerator
+from crawl4ai.content_filter_strategy import PruningContentFilter
+
+# 🔍 Example 1: Discover ALL → Filter → Crawl
+async def discover_and_crawl():
+ """Find Python module tutorials & extract them all!"""
+ seeder = AsyncUrlSeeder(
+ logger=AsyncLogger() # Log everything
+ )
+
+ # Step 1: See how many URLs exist (spoiler: A LOT!)
+ print("📊 Let's see what RealPython has...")
+ all_urls = await seeder.urls("realpython.com",
+ SeedingConfig(source="sitemap"))
+ print(f"😱 Found {len(all_urls)} total URLs!")
+
+ # Step 2: Filter for Python modules (perfect size ~13)
+ print("\n🎯 Filtering for 'python-modules' tutorials...")
+ module_urls = await seeder.urls("realpython.com",
+ SeedingConfig(
+ source="sitemap",
+ pattern="*python-modules*",
+ live_check=True # Make sure they're alive!
+ ))
+
+ print(f"✨ Found {len(module_urls)} module tutorials")
+ for url in module_urls[:3]: # Show first 3
+ status = "✅" if url["status"] == "valid" else "❌"
+ print(f"{status} {url['url']}")
+
+ # Step 3: Crawl them all with pruning (keep it lean!)
+ print("\n🕷️ Crawling all module tutorials...")
+ async with AsyncWebCrawler() as crawler:
+ config = CrawlerRunConfig(
+ markdown_generator=DefaultMarkdownGenerator(
+ content_filter=PruningContentFilter( # Smart filtering!
+ threshold=0.48, # Remove fluff
+ threshold_type="fixed",
+ ),
+ ),
+ only_text=True,
+ stream=True,
+ )
+
+ # Extract just the URLs from the seeder results
+ urls_to_crawl = [u["url"] for u in module_urls[:5]]
+ results = await crawler.arun_many(urls_to_crawl, config=config)
+
+ # Process & save
+ saved = 0
+ async for result in results:
+ if result.success:
+ # Save each tutorial (name from URL)
+ name = result.url.split("/")[-2] + ".md"
+ with open(name, "w") as f:
+ f.write(result.markdown.fit_markdown)
+ saved += 1
+ print(f"💾 Saved: {name}")
+
+ print(f"\n🎉 Successfully saved {saved} tutorials!")
+
+# 🔍 Example 2: Beautiful Soup articles with metadata peek
+async def explore_beautifulsoup():
+ """Discover BeautifulSoup content & peek at metadata"""
+ seeder = AsyncUrlSeeder(logger=AsyncLogger() )
+
+ print("🍲 Looking for Beautiful Soup articles...")
+ soup_urls = await seeder.urls("realpython.com",
+ SeedingConfig(
+ source="sitemap",
+ pattern="*beautiful-soup*",
+ extract_head=True # Get the metadata!
+ ))
+
+ print(f"\n📚 Found {len(soup_urls)} Beautiful Soup articles:\n")
+
+ # Show what we discovered
+ for i, url in enumerate(soup_urls, 1):
+ meta = url["head_data"]["meta"]
+
+ print(f"{i}. {url['head_data']['title']}")
+ print(f" 📝 {meta.get('description', 'No description')[:60]}...")
+ print(f" 👤 By: {meta.get('author', 'Unknown')}")
+ print(f" 🔗 {url['url']}\n")
+
+# 🔍 Example 3: Smart search with BM25 relevance scoring
+async def smart_search_with_bm25():
+ """Use AI-powered relevance scoring to find the best content"""
+ seeder = AsyncUrlSeeder(logger=AsyncLogger() )
+
+ print("🧠 Smart search: 'web scraping tutorial quiz'")
+
+ # Search with BM25 scoring - AI finds the best matches!
+ results = await seeder.urls("realpython.com",
+ SeedingConfig(
+ source="sitemap",
+ pattern="*beautiful-soup*",
+ extract_head=True,
+ query="web scraping tutorial quiz", # Our search
+ scoring_method="bm25",
+ score_threshold=0.2 # Quality filter
+ ))
+
+ print(f"\n🎯 Top {len(results)} most relevant results:\n")
+
+ # Show ranked results with relevance scores
+ for i, result in enumerate(results[:3], 1):
+ print(f"{i}. [{result['relevance_score']:.2f}] {result['head_data']['title']}")
+ print(f" 🔗 {result['url'][:60]}...")
+
+ print("\n✨ BM25 automatically ranked by relevance!")
+
+# 🎬 Run the show!
+async def main():
+ # print("=" * 60)
+ # await discover_and_crawl()
+ # print("\n" + "=" * 60 + "\n")
+ # await explore_beautifulsoup()
+ # print("\n" + "=" * 60 + "\n")
+ await smart_search_with_bm25()
+
+if __name__ == "__main__":
+ asyncio.run(main())
\ No newline at end of file
diff --git a/docs/md_v2/assets/styles.css b/docs/md_v2/assets/styles.css
index 46b90ab0..fcd56b7a 100644
--- a/docs/md_v2/assets/styles.css
+++ b/docs/md_v2/assets/styles.css
@@ -16,22 +16,10 @@
--mono-font-stack: Menlo, Monaco, Lucida Console, Liberation Mono, DejaVu Sans Mono, Bitstream Vera Sans Mono,
Courier New, monospace, serif;
- --background-color: #151515; /* Dark background */
- --font-color: #eaeaea; /* Light font color for contrast */
- --invert-font-color: #151515; /* Dark color for inverted elements */
- --primary-color: #1a95e0; /* Primary color can remain the same or be adjusted for better contrast */
- --secondary-color: #727578; /* Secondary color for less important text */
--secondary-dimmed-color: #8b857a; /* Dimmed secondary color */
- --error-color: #ff5555; /* Bright color for errors */
- --progress-bar-background: #444; /* Darker background for progress bar */
- --progress-bar-fill: #1a95e0; /* Bright color for progress bar fill */
- --code-bg-color: #1e1e1e; /* Darker background for code blocks */
- --input-style: solid; /* Keeping input style solid */
--block-background-color: #202020; /* Darker background for block elements */
--global-font-color: #eaeaea; /* Light font color for global elements */
- --background-color: #222225;
-
--background-color: #070708;
--page-width: 70em;
--font-color: #e8e9ed;
@@ -40,7 +28,7 @@
--secondary-color: #d5cec0;
--tertiary-color: #a3abba;
--primary-dimmed-color: #09b5a5; /* Updated to the brand color */
- --primary-color: #50ffff; /* Updated to the brand color */
+ --primary-color: #0fbbaa; /* Updated to the brand color */
--accent-color: rgb(243, 128, 245);
--error-color: #ff3c74;
--progress-bar-background: #3f3f44;
diff --git a/docs/md_v2/core/url-seeding.md b/docs/md_v2/core/url-seeding.md
new file mode 100644
index 00000000..7ffe9ea7
--- /dev/null
+++ b/docs/md_v2/core/url-seeding.md
@@ -0,0 +1,1000 @@
+# URL Seeding: The Smart Way to Crawl at Scale
+
+## Why URL Seeding?
+
+Web crawling comes in different flavors, each with its own strengths. Let's understand when to use URL seeding versus deep crawling.
+
+### Deep Crawling: Real-Time Discovery
+
+Deep crawling is perfect when you need:
+- **Fresh, real-time data** - discovering pages as they're created
+- **Dynamic exploration** - following links based on content
+- **Selective extraction** - stopping when you find what you need
+
+```python
+# Deep crawling example: Explore a website dynamically
+import asyncio
+from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
+from crawl4ai.deep_crawling import BFSDeepCrawlStrategy
+
+async def deep_crawl_example():
+ # Configure a 2-level deep crawl
+ config = CrawlerRunConfig(
+ deep_crawl_strategy=BFSDeepCrawlStrategy(
+ max_depth=2, # Crawl 2 levels deep
+ include_external=False, # Stay within domain
+ max_pages=50 # Limit for efficiency
+ ),
+ verbose=True
+ )
+
+ async with AsyncWebCrawler() as crawler:
+ # Start crawling and follow links dynamically
+ results = await crawler.arun("https://example.com", config=config)
+
+ print(f"Discovered and crawled {len(results)} pages")
+ for result in results[:3]:
+ print(f"Found: {result.url} at depth {result.metadata.get('depth', 0)}")
+
+asyncio.run(deep_crawl_example())
+```
+
+### URL Seeding: Bulk Discovery
+
+URL seeding shines when you want:
+- **Comprehensive coverage** - get thousands of URLs in seconds
+- **Bulk processing** - filter before crawling
+- **Resource efficiency** - know exactly what you'll crawl
+
+```python
+# URL seeding example: Analyze all documentation
+from crawl4ai import AsyncUrlSeeder, SeedingConfig
+
+seeder = AsyncUrlSeeder()
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ pattern="*/docs/*"
+)
+
+# Get ALL documentation URLs instantly
+urls = await seeder.urls("example.com", config)
+# 1000+ URLs discovered in seconds!
+```
+
+### The Trade-offs
+
+| Aspect | Deep Crawling | URL Seeding |
+|--------|---------------|-------------|
+| **Coverage** | Discovers pages dynamically | Gets most existing URLs instantly |
+| **Freshness** | Finds brand new pages | May miss very recent pages |
+| **Speed** | Slower, page by page | Extremely fast bulk discovery |
+| **Resource Usage** | Higher - crawls to discover | Lower - discovers then crawls |
+| **Control** | Can stop mid-process | Pre-filters before crawling |
+
+### When to Use Each
+
+**Choose Deep Crawling when:**
+- You need the absolute latest content
+- You're searching for specific information
+- The site structure is unknown or dynamic
+- You want to stop as soon as you find what you need
+
+**Choose URL Seeding when:**
+- You need to analyze large portions of a site
+- You want to filter URLs before crawling
+- You're doing comparative analysis
+- You need to optimize resource usage
+
+The magic happens when you understand both approaches and choose the right tool for your task. Sometimes, you might even combine them - use URL seeding for bulk discovery, then deep crawl specific sections for the latest updates.
+
+## Your First URL Seeding Adventure
+
+Let's see the magic in action. We'll discover blog posts about Python, filter for tutorials, and crawl only those pages.
+
+```python
+import asyncio
+from crawl4ai import AsyncUrlSeeder, AsyncWebCrawler, SeedingConfig, CrawlerRunConfig
+
+async def smart_blog_crawler():
+ # Step 1: Create our URL discoverer
+ seeder = AsyncUrlSeeder()
+
+ # Step 2: Configure discovery - let's find all blog posts
+ config = SeedingConfig(
+ source="sitemap", # Use the website's sitemap
+ pattern="*/blog/*.html", # Only blog posts
+ extract_head=True, # Get page metadata
+ max_urls=100 # Limit for this example
+ )
+
+ # Step 3: Discover URLs from the Python blog
+ print("🔍 Discovering blog posts...")
+ urls = await seeder.urls("realpython.com", config)
+ print(f"✅ Found {len(urls)} blog posts")
+
+ # Step 4: Filter for Python tutorials (using metadata!)
+ tutorials = [
+ url for url in urls
+ if url["status"] == "valid" and
+ any(keyword in str(url["head_data"]).lower()
+ for keyword in ["tutorial", "guide", "how to"])
+ ]
+ print(f"📚 Filtered to {len(tutorials)} tutorials")
+
+ # Step 5: Show what we found
+ print("\n🎯 Found these tutorials:")
+ for tutorial in tutorials[:5]: # First 5
+ title = tutorial["head_data"].get("title", "No title")
+ print(f" - {title}")
+ print(f" {tutorial['url']}")
+
+ # Step 6: Now crawl ONLY these relevant pages
+ print("\n🚀 Crawling tutorials...")
+ async with AsyncWebCrawler() as crawler:
+ config = CrawlerRunConfig(
+ only_text=True,
+ word_count_threshold=300 # Only substantial articles
+ )
+
+ # Extract URLs and stream results as they come
+ tutorial_urls = [t["url"] for t in tutorials[:10]]
+ results = await crawler.arun_many(tutorial_urls, config=config)
+
+ successful = 0
+ async for result in results:
+ if result.success:
+ successful += 1
+ print(f"✓ Crawled: {result.url[:60]}...")
+
+ print(f"\n✨ Successfully crawled {successful} tutorials!")
+
+# Run it!
+asyncio.run(smart_blog_crawler())
+```
+
+**What just happened?**
+
+1. We discovered all blog URLs from the sitemap
+2. We filtered using metadata (no crawling needed!)
+3. We crawled only the relevant tutorials
+4. We saved tons of time and bandwidth
+
+This is the power of URL seeding - you see everything before you crawl anything.
+
+## Understanding the URL Seeder
+
+Now that you've seen the magic, let's understand how it works.
+
+### Basic Usage
+
+Creating a URL seeder is simple:
+
+```python
+from crawl4ai import AsyncUrlSeeder
+
+# Create a seeder instance
+seeder = AsyncUrlSeeder()
+
+# Discover URLs from a domain
+config = SeedingConfig(source="sitemap")
+urls = await seeder.urls("example.com", config)
+```
+
+The seeder can discover URLs from two powerful sources:
+
+#### 1. Sitemaps (Fastest)
+
+```python
+# Discover from sitemap
+config = SeedingConfig(source="sitemap")
+urls = await seeder.urls("example.com", config)
+```
+
+Sitemaps are XML files that websites create specifically to list all their URLs. It's like getting a menu at a restaurant - everything is listed upfront.
+
+#### 2. Common Crawl (Most Comprehensive)
+
+```python
+# Discover from Common Crawl
+config = SeedingConfig(source="cc")
+urls = await seeder.urls("example.com", config)
+```
+
+Common Crawl is a massive public dataset that regularly crawls the entire web. It's like having access to a pre-built index of the internet.
+
+#### 3. Both Sources (Maximum Coverage)
+
+```python
+# Use both sources
+config = SeedingConfig(source="cc+sitemap")
+urls = await seeder.urls("example.com", config)
+```
+
+### Configuration Magic: SeedingConfig
+
+The `SeedingConfig` object is your control panel. Here's everything you can configure:
+
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `source` | str | "cc" | URL source: "cc" (Common Crawl), "sitemap", or "cc+sitemap" |
+| `pattern` | str | "*" | URL pattern filter (e.g., "*/blog/*", "*.html") |
+| `extract_head` | bool | False | Extract metadata from page `` |
+| `live_check` | bool | False | Verify URLs are accessible |
+| `max_urls` | int | -1 | Maximum URLs to return (-1 = unlimited) |
+| `concurrency` | int | 10 | Parallel workers for fetching |
+| `hits_per_sec` | int | None | Rate limit for requests |
+| `force` | bool | False | Bypass cache, fetch fresh data |
+| `verbose` | bool | False | Show detailed progress |
+| `query` | str | None | Search query for BM25 scoring |
+| `scoring_method` | str | None | Scoring method (currently "bm25") |
+| `score_threshold` | float | None | Minimum score to include URL |
+
+#### Pattern Matching Examples
+
+```python
+# Match all blog posts
+config = SeedingConfig(pattern="*/blog/*")
+
+# Match only HTML files
+config = SeedingConfig(pattern="*.html")
+
+# Match product pages
+config = SeedingConfig(pattern="*/product/*")
+
+# Match everything except admin pages
+config = SeedingConfig(pattern="*")
+# Then filter: urls = [u for u in urls if "/admin/" not in u["url"]]
+```
+
+### URL Validation: Live Checking
+
+Sometimes you need to know if URLs are actually accessible. That's where live checking comes in:
+
+```python
+config = SeedingConfig(
+ source="sitemap",
+ live_check=True, # Verify each URL is accessible
+ concurrency=20 # Check 20 URLs in parallel
+)
+
+urls = await seeder.urls("example.com", config)
+
+# Now you can filter by status
+live_urls = [u for u in urls if u["status"] == "valid"]
+dead_urls = [u for u in urls if u["status"] == "not_valid"]
+
+print(f"Live URLs: {len(live_urls)}")
+print(f"Dead URLs: {len(dead_urls)}")
+```
+
+**When to use live checking:**
+- Before a large crawling operation
+- When working with older sitemaps
+- When data freshness is critical
+
+**When to skip it:**
+- Quick explorations
+- When you trust the source
+- When speed is more important than accuracy
+
+### The Power of Metadata: Head Extraction
+
+This is where URL seeding gets really powerful. Instead of crawling entire pages, you can extract just the metadata:
+
+```python
+config = SeedingConfig(
+ extract_head=True # Extract metadata from section
+)
+
+urls = await seeder.urls("example.com", config)
+
+# Now each URL has rich metadata
+for url in urls[:3]:
+ print(f"\nURL: {url['url']}")
+ print(f"Title: {url['head_data'].get('title')}")
+
+ meta = url['head_data'].get('meta', {})
+ print(f"Description: {meta.get('description')}")
+ print(f"Keywords: {meta.get('keywords')}")
+
+ # Even Open Graph data!
+ print(f"OG Image: {meta.get('og:image')}")
+```
+
+#### What Can We Extract?
+
+The head extraction gives you a treasure trove of information:
+
+```python
+# Example of extracted head_data
+{
+ "title": "10 Python Tips for Beginners",
+ "charset": "utf-8",
+ "lang": "en",
+ "meta": {
+ "description": "Learn essential Python tips...",
+ "keywords": "python, programming, tutorial",
+ "author": "Jane Developer",
+ "viewport": "width=device-width, initial-scale=1",
+
+ # Open Graph tags
+ "og:title": "10 Python Tips for Beginners",
+ "og:description": "Essential Python tips for new programmers",
+ "og:image": "https://example.com/python-tips.jpg",
+ "og:type": "article",
+
+ # Twitter Card tags
+ "twitter:card": "summary_large_image",
+ "twitter:title": "10 Python Tips",
+
+ # Dublin Core metadata
+ "dc.creator": "Jane Developer",
+ "dc.date": "2024-01-15"
+ },
+ "link": {
+ "canonical": [{"href": "https://example.com/blog/python-tips"}],
+ "alternate": [{"href": "/feed.xml", "type": "application/rss+xml"}]
+ },
+ "jsonld": [
+ {
+ "@type": "Article",
+ "headline": "10 Python Tips for Beginners",
+ "datePublished": "2024-01-15",
+ "author": {"@type": "Person", "name": "Jane Developer"}
+ }
+ ]
+}
+```
+
+This metadata is gold for filtering! You can find exactly what you need without crawling a single page.
+
+### Understanding Results
+
+Each URL in the results has this structure:
+
+```python
+{
+ "url": "https://example.com/blog/python-tips.html",
+ "status": "valid", # "valid", "not_valid", or "unknown"
+ "head_data": { # Only if extract_head=True
+ "title": "Page Title",
+ "meta": {...},
+ "link": {...},
+ "jsonld": [...]
+ },
+ "relevance_score": 0.85 # Only if using BM25 scoring
+}
+```
+
+Let's see a real example:
+
+```python
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ live_check=True
+)
+
+urls = await seeder.urls("blog.example.com", config)
+
+# Analyze the results
+for url in urls[:5]:
+ print(f"\n{'='*60}")
+ print(f"URL: {url['url']}")
+ print(f"Status: {url['status']}")
+
+ if url['head_data']:
+ data = url['head_data']
+ print(f"Title: {data.get('title', 'No title')}")
+
+ # Check content type
+ meta = data.get('meta', {})
+ content_type = meta.get('og:type', 'unknown')
+ print(f"Content Type: {content_type}")
+
+ # Publication date
+ pub_date = None
+ for jsonld in data.get('jsonld', []):
+ if isinstance(jsonld, dict):
+ pub_date = jsonld.get('datePublished')
+ if pub_date:
+ break
+
+ if pub_date:
+ print(f"Published: {pub_date}")
+
+ # Word count (if available)
+ word_count = meta.get('word_count')
+ if word_count:
+ print(f"Word Count: {word_count}")
+```
+
+## Smart Filtering with BM25 Scoring
+
+Now for the really cool part - intelligent filtering based on relevance!
+
+### Introduction to Relevance Scoring
+
+BM25 is a ranking algorithm that scores how relevant a document is to a search query. With URL seeding, we can score URLs based on their metadata *before* crawling them.
+
+Think of it like this:
+- Traditional way: Read every book in the library to find ones about Python
+- Smart way: Check the titles and descriptions, score them, read only the most relevant
+
+### Query-Based Discovery
+
+Here's how to use BM25 scoring:
+
+```python
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True, # Required for scoring
+ query="python async tutorial", # What we're looking for
+ scoring_method="bm25", # Use BM25 algorithm
+ score_threshold=0.3 # Minimum relevance score
+)
+
+urls = await seeder.urls("realpython.com", config)
+
+# Results are automatically sorted by relevance!
+for url in urls[:5]:
+ print(f"Score: {url['relevance_score']:.2f} - {url['url']}")
+ print(f" Title: {url['head_data']['title']}")
+```
+
+### Real Examples
+
+#### Finding Documentation Pages
+
+```python
+# Find API documentation
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="API reference documentation endpoints",
+ scoring_method="bm25",
+ score_threshold=0.5,
+ max_urls=20
+)
+
+urls = await seeder.urls("docs.example.com", config)
+
+# The highest scoring URLs will be API docs!
+```
+
+#### Discovering Product Pages
+
+```python
+# Find specific products
+config = SeedingConfig(
+ source="cc+sitemap", # Use both sources
+ extract_head=True,
+ query="wireless headphones noise canceling",
+ scoring_method="bm25",
+ score_threshold=0.4,
+ pattern="*/product/*" # Combine with pattern matching
+)
+
+urls = await seeder.urls("shop.example.com", config)
+
+# Filter further by price (from metadata)
+affordable = [
+ u for u in urls
+ if float(u['head_data'].get('meta', {}).get('product:price', '0')) < 200
+]
+```
+
+#### Filtering News Articles
+
+```python
+# Find recent news about AI
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="artificial intelligence machine learning breakthrough",
+ scoring_method="bm25",
+ score_threshold=0.35
+)
+
+urls = await seeder.urls("technews.com", config)
+
+# Filter by date
+from datetime import datetime, timedelta
+
+recent = []
+cutoff = datetime.now() - timedelta(days=7)
+
+for url in urls:
+ # Check JSON-LD for publication date
+ for jsonld in url['head_data'].get('jsonld', []):
+ if 'datePublished' in jsonld:
+ pub_date = datetime.fromisoformat(jsonld['datePublished'].replace('Z', '+00:00'))
+ if pub_date > cutoff:
+ recent.append(url)
+ break
+```
+
+#### Complex Query Patterns
+
+```python
+# Multi-concept queries
+queries = [
+ "python async await concurrency tutorial",
+ "data science pandas numpy visualization",
+ "web scraping beautifulsoup selenium automation",
+ "machine learning tensorflow keras deep learning"
+]
+
+all_tutorials = []
+
+for query in queries:
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query=query,
+ scoring_method="bm25",
+ score_threshold=0.4,
+ max_urls=10 # Top 10 per topic
+ )
+
+ urls = await seeder.urls("learning-platform.com", config)
+ all_tutorials.extend(urls)
+
+# Remove duplicates while preserving order
+seen = set()
+unique_tutorials = []
+for url in all_tutorials:
+ if url['url'] not in seen:
+ seen.add(url['url'])
+ unique_tutorials.append(url)
+
+print(f"Found {len(unique_tutorials)} unique tutorials across all topics")
+```
+
+## Scaling Up: Multiple Domains
+
+When you need to discover URLs across multiple websites, URL seeding really shines.
+
+### The `many_urls` Method
+
+```python
+# Discover URLs from multiple domains in parallel
+domains = ["site1.com", "site2.com", "site3.com"]
+
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="python tutorial",
+ scoring_method="bm25",
+ score_threshold=0.3
+)
+
+# Returns a dictionary: {domain: [urls]}
+results = await seeder.many_urls(domains, config)
+
+# Process results
+for domain, urls in results.items():
+ print(f"\n{domain}: Found {len(urls)} relevant URLs")
+ if urls:
+ top = urls[0] # Highest scoring
+ print(f" Top result: {top['url']}")
+ print(f" Score: {top['relevance_score']:.2f}")
+```
+
+### Cross-Domain Examples
+
+#### Competitor Analysis
+
+```python
+# Analyze content strategies across competitors
+competitors = [
+ "competitor1.com",
+ "competitor2.com",
+ "competitor3.com"
+]
+
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ pattern="*/blog/*",
+ max_urls=100
+)
+
+results = await seeder.many_urls(competitors, config)
+
+# Analyze content types
+for domain, urls in results.items():
+ content_types = {}
+
+ for url in urls:
+ # Extract content type from metadata
+ og_type = url['head_data'].get('meta', {}).get('og:type', 'unknown')
+ content_types[og_type] = content_types.get(og_type, 0) + 1
+
+ print(f"\n{domain} content distribution:")
+ for ctype, count in sorted(content_types.items(), key=lambda x: x[1], reverse=True):
+ print(f" {ctype}: {count}")
+```
+
+#### Industry Research
+
+```python
+# Research Python tutorials across educational sites
+educational_sites = [
+ "realpython.com",
+ "pythontutorial.net",
+ "learnpython.org",
+ "python.org"
+]
+
+config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="beginner python tutorial basics",
+ scoring_method="bm25",
+ score_threshold=0.3,
+ max_urls=20 # Per site
+)
+
+results = await seeder.many_urls(educational_sites, config)
+
+# Find the best beginner tutorials
+all_tutorials = []
+for domain, urls in results.items():
+ for url in urls:
+ url['domain'] = domain # Add domain info
+ all_tutorials.append(url)
+
+# Sort by relevance across all domains
+all_tutorials.sort(key=lambda x: x['relevance_score'], reverse=True)
+
+print("Top 10 Python tutorials for beginners across all sites:")
+for i, tutorial in enumerate(all_tutorials[:10], 1):
+ print(f"{i}. [{tutorial['relevance_score']:.2f}] {tutorial['head_data']['title']}")
+ print(f" {tutorial['url']}")
+ print(f" From: {tutorial['domain']}")
+```
+
+#### Multi-Site Monitoring
+
+```python
+# Monitor news about your company across multiple sources
+news_sites = [
+ "techcrunch.com",
+ "theverge.com",
+ "wired.com",
+ "arstechnica.com"
+]
+
+company_name = "YourCompany"
+
+config = SeedingConfig(
+ source="cc", # Common Crawl for recent content
+ extract_head=True,
+ query=f"{company_name} announcement news",
+ scoring_method="bm25",
+ score_threshold=0.5, # High threshold for relevance
+ max_urls=10
+)
+
+results = await seeder.many_urls(news_sites, config)
+
+# Collect all mentions
+mentions = []
+for domain, urls in results.items():
+ mentions.extend(urls)
+
+if mentions:
+ print(f"Found {len(mentions)} mentions of {company_name}:")
+ for mention in mentions:
+ print(f"\n- {mention['head_data']['title']}")
+ print(f" {mention['url']}")
+ print(f" Score: {mention['relevance_score']:.2f}")
+else:
+ print(f"No recent mentions of {company_name} found")
+```
+
+## Advanced Integration Patterns
+
+Let's put everything together in a real-world example.
+
+### Building a Research Assistant
+
+Here's a complete example that discovers, scores, filters, and crawls intelligently:
+
+```python
+import asyncio
+from datetime import datetime
+from crawl4ai import AsyncUrlSeeder, AsyncWebCrawler, SeedingConfig, CrawlerRunConfig
+
+class ResearchAssistant:
+ def __init__(self):
+ self.seeder = AsyncUrlSeeder()
+
+ async def research_topic(self, topic, domains, max_articles=20):
+ """Research a topic across multiple domains."""
+
+ print(f"🔬 Researching '{topic}' across {len(domains)} domains...")
+
+ # Step 1: Discover relevant URLs
+ config = SeedingConfig(
+ source="cc+sitemap", # Maximum coverage
+ extract_head=True, # Get metadata
+ query=topic, # Research topic
+ scoring_method="bm25", # Smart scoring
+ score_threshold=0.4, # Quality threshold
+ max_urls=10, # Per domain
+ concurrency=20, # Fast discovery
+ verbose=True
+ )
+
+ # Discover across all domains
+ discoveries = await self.seeder.many_urls(domains, config)
+
+ # Step 2: Collect and rank all articles
+ all_articles = []
+ for domain, urls in discoveries.items():
+ for url in urls:
+ url['domain'] = domain
+ all_articles.append(url)
+
+ # Sort by relevance
+ all_articles.sort(key=lambda x: x['relevance_score'], reverse=True)
+
+ # Take top articles
+ top_articles = all_articles[:max_articles]
+
+ print(f"\n📊 Found {len(all_articles)} relevant articles")
+ print(f"📌 Selected top {len(top_articles)} for deep analysis")
+
+ # Step 3: Show what we're about to crawl
+ print("\n🎯 Articles to analyze:")
+ for i, article in enumerate(top_articles[:5], 1):
+ print(f"\n{i}. {article['head_data']['title']}")
+ print(f" Score: {article['relevance_score']:.2f}")
+ print(f" Source: {article['domain']}")
+ print(f" URL: {article['url'][:60]}...")
+
+ # Step 4: Crawl the selected articles
+ print(f"\n🚀 Deep crawling {len(top_articles)} articles...")
+
+ async with AsyncWebCrawler() as crawler:
+ config = CrawlerRunConfig(
+ only_text=True,
+ word_count_threshold=200, # Substantial content only
+ stream=True
+ )
+
+ # Extract URLs and crawl all articles
+ article_urls = [article['url'] for article in top_articles]
+ results = []
+ async for result in await crawler.arun_many(article_urls, config=config):
+ if result.success:
+ results.append({
+ 'url': result.url,
+ 'title': result.metadata.get('title', 'No title'),
+ 'content': result.markdown.raw_markdown,
+ 'domain': next(a['domain'] for a in top_articles if a['url'] == result.url),
+ 'score': next(a['relevance_score'] for a in top_articles if a['url'] == result.url)
+ })
+ print(f"✓ Crawled: {result.url[:60]}...")
+
+ # Step 5: Analyze and summarize
+ print(f"\n📝 Analysis complete! Crawled {len(results)} articles")
+
+ return self.create_research_summary(topic, results)
+
+ def create_research_summary(self, topic, articles):
+ """Create a research summary from crawled articles."""
+
+ summary = {
+ 'topic': topic,
+ 'timestamp': datetime.now().isoformat(),
+ 'total_articles': len(articles),
+ 'sources': {}
+ }
+
+ # Group by domain
+ for article in articles:
+ domain = article['domain']
+ if domain not in summary['sources']:
+ summary['sources'][domain] = []
+
+ summary['sources'][domain].append({
+ 'title': article['title'],
+ 'url': article['url'],
+ 'score': article['score'],
+ 'excerpt': article['content'][:500] + '...' if len(article['content']) > 500 else article['content']
+ })
+
+ return summary
+
+# Use the research assistant
+async def main():
+ assistant = ResearchAssistant()
+
+ # Research Python async programming across multiple sources
+ topic = "python asyncio best practices performance optimization"
+ domains = [
+ "realpython.com",
+ "python.org",
+ "stackoverflow.com",
+ "medium.com"
+ ]
+
+ summary = await assistant.research_topic(topic, domains, max_articles=15)
+
+ # Display results
+ print("\n" + "="*60)
+ print("RESEARCH SUMMARY")
+ print("="*60)
+ print(f"Topic: {summary['topic']}")
+ print(f"Date: {summary['timestamp']}")
+ print(f"Total Articles Analyzed: {summary['total_articles']}")
+
+ print("\nKey Findings by Source:")
+ for domain, articles in summary['sources'].items():
+ print(f"\n📚 {domain} ({len(articles)} articles)")
+ for article in articles[:2]: # Top 2 per domain
+ print(f"\n Title: {article['title']}")
+ print(f" Relevance: {article['score']:.2f}")
+ print(f" Preview: {article['excerpt'][:200]}...")
+
+asyncio.run(main())
+```
+
+### Performance Optimization Tips
+
+1. **Use caching wisely**
+```python
+# First run - populate cache
+config = SeedingConfig(source="sitemap", extract_head=True, force=True)
+urls = await seeder.urls("example.com", config)
+
+# Subsequent runs - use cache (much faster)
+config = SeedingConfig(source="sitemap", extract_head=True, force=False)
+urls = await seeder.urls("example.com", config)
+```
+
+2. **Optimize concurrency**
+```python
+# For many small requests (like HEAD checks)
+config = SeedingConfig(concurrency=50, hits_per_sec=20)
+
+# For fewer large requests (like full head extraction)
+config = SeedingConfig(concurrency=10, hits_per_sec=5)
+```
+
+3. **Stream large result sets**
+```python
+# When crawling many URLs
+async with AsyncWebCrawler() as crawler:
+ # Assuming urls is a list of URL strings
+ results = await crawler.arun_many(urls, config=config)
+
+ # Process as they arrive
+ async for result in results:
+ process_immediately(result) # Don't wait for all
+```
+
+## Best Practices & Tips
+
+### Cache Management
+
+The seeder automatically caches results to speed up repeated operations:
+
+- **Common Crawl cache**: `~/.crawl4ai/seeder_cache/[index]_[domain]_[hash].jsonl`
+- **Sitemap cache**: `~/.crawl4ai/seeder_cache/sitemap_[domain]_[hash].jsonl`
+- **HEAD data cache**: `~/.cache/url_seeder/head/[hash].json`
+
+Cache expires after 7 days by default. Use `force=True` to refresh.
+
+### Pattern Matching Strategies
+
+```python
+# Be specific when possible
+good_pattern = "*/blog/2024/*.html" # Specific
+bad_pattern = "*" # Too broad
+
+# Combine patterns with metadata filtering
+config = SeedingConfig(
+ pattern="*/articles/*",
+ extract_head=True
+)
+urls = await seeder.urls("news.com", config)
+
+# Further filter by publish date, author, category, etc.
+recent = [u for u in urls if is_recent(u['head_data'])]
+```
+
+### Rate Limiting Considerations
+
+```python
+# Be respectful of servers
+config = SeedingConfig(
+ hits_per_sec=10, # Max 10 requests per second
+ concurrency=20 # But use 20 workers
+)
+
+# For your own servers
+config = SeedingConfig(
+ hits_per_sec=None, # No limit
+ concurrency=100 # Go fast
+)
+```
+
+## Quick Reference
+
+### Common Patterns
+
+```python
+# Blog post discovery
+config = SeedingConfig(
+ source="sitemap",
+ pattern="*/blog/*",
+ extract_head=True,
+ query="your topic",
+ scoring_method="bm25"
+)
+
+# E-commerce product discovery
+config = SeedingConfig(
+ source="cc+sitemap",
+ pattern="*/product/*",
+ extract_head=True,
+ live_check=True
+)
+
+# Documentation search
+config = SeedingConfig(
+ source="sitemap",
+ pattern="*/docs/*",
+ extract_head=True,
+ query="API reference",
+ scoring_method="bm25",
+ score_threshold=0.5
+)
+
+# News monitoring
+config = SeedingConfig(
+ source="cc",
+ extract_head=True,
+ query="company name",
+ scoring_method="bm25",
+ max_urls=50
+)
+```
+
+### Troubleshooting Guide
+
+| Issue | Solution |
+|-------|----------|
+| No URLs found | Try `source="cc+sitemap"`, check domain spelling |
+| Slow discovery | Reduce `concurrency`, add `hits_per_sec` limit |
+| Missing metadata | Ensure `extract_head=True` |
+| Low relevance scores | Refine query, lower `score_threshold` |
+| Rate limit errors | Reduce `hits_per_sec` and `concurrency` |
+
+### Performance Benchmarks
+
+Typical performance on a standard connection:
+
+- **Sitemap discovery**: 100-1,000 URLs/second
+- **Common Crawl discovery**: 50-500 URLs/second
+- **HEAD checking**: 10-50 URLs/second
+- **Head extraction**: 5-20 URLs/second
+- **BM25 scoring**: 10,000+ URLs/second
+
+## Conclusion
+
+URL seeding transforms web crawling from a blind expedition into a surgical strike. By discovering and analyzing URLs before crawling, you can:
+
+- Save hours of crawling time
+- Reduce bandwidth usage by 90%+
+- Find exactly what you need
+- Scale across multiple domains effortlessly
+
+Whether you're building a research tool, monitoring competitors, or creating a content aggregator, URL seeding gives you the intelligence to crawl smarter, not harder.
+
+Now go forth and seed intelligently! 🌱🚀
\ No newline at end of file
diff --git a/tests/general/test_async_url_seeder_bm25.py b/tests/general/test_async_url_seeder_bm25.py
new file mode 100644
index 00000000..31d6cff0
--- /dev/null
+++ b/tests/general/test_async_url_seeder_bm25.py
@@ -0,0 +1,711 @@
+"""
+Comprehensive test cases for AsyncUrlSeeder with BM25 scoring functionality.
+Tests cover all features including query-based scoring, metadata extraction,
+edge cases, and integration scenarios.
+"""
+
+import asyncio
+import pytest
+from typing import List, Dict, Any
+from crawl4ai import AsyncUrlSeeder, SeedingConfig, AsyncLogger
+import json
+from datetime import datetime
+
+# Test domain - using docs.crawl4ai.com as it has the actual documentation
+TEST_DOMAIN = "kidocode.com"
+TEST_DOMAIN = "docs.crawl4ai.com"
+TEST_DOMAIN = "www.bbc.com/sport"
+
+
+class TestAsyncUrlSeederBM25:
+ """Comprehensive test suite for AsyncUrlSeeder with BM25 scoring."""
+
+ async def create_seeder(self):
+ """Create an AsyncUrlSeeder instance for testing."""
+ logger = AsyncLogger()
+ return AsyncUrlSeeder(logger=logger)
+
+ # ============================================
+ # Basic BM25 Scoring Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_basic_bm25_scoring(self, seeder):
+ """Test basic BM25 scoring with a simple query."""
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="premier league highlights",
+ scoring_method="bm25",
+ max_urls=200,
+ verbose=True,
+ force=True # Force fresh fetch
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ # Verify results have relevance scores
+ assert all("relevance_score" in r for r in results)
+
+ # Verify scores are normalized between 0 and 1
+ scores = [r["relevance_score"] for r in results]
+ assert all(0.0 <= s <= 1.0 for s in scores)
+
+ # Verify results are sorted by relevance (descending)
+ assert scores == sorted(scores, reverse=True)
+
+ # Print top 5 results for manual verification
+ print("\nTop 5 results for 'web crawling tutorial':")
+ for i, r in enumerate(results[:5]):
+ print(f"{i+1}. Score: {r['relevance_score']:.3f} - {r['url']}")
+
+ @pytest.mark.asyncio
+ async def test_query_variations(self, seeder):
+ """Test BM25 scoring with different query variations."""
+ queries = [
+ "VAR controversy",
+ "player ratings",
+ "live score update",
+ "transfer rumours",
+ "post match analysis",
+ "injury news"
+ ]
+
+ for query in queries:
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query=query,
+ scoring_method="bm25",
+ max_urls=100,
+ # force=True
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ # Verify each query produces scored results
+ assert len(results) > 0
+ assert all("relevance_score" in r for r in results)
+
+ print(f"\nTop result for '{query}':")
+ if results:
+ top = results[0]
+ print(f" Score: {top['relevance_score']:.3f} - {top['url']}")
+
+ # ============================================
+ # Score Threshold Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_score_threshold_filtering(self, seeder):
+ """Test filtering results by minimum relevance score."""
+ thresholds = [0.1, 0.3, 0.5, 0.7]
+
+ for threshold in thresholds:
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="league standings",
+ score_threshold=threshold,
+ scoring_method="bm25",
+ max_urls=50
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ # Verify all results meet threshold
+ if results:
+ assert all(r["relevance_score"] >= threshold for r in results)
+
+ print(f"\nThreshold {threshold}: {len(results)} URLs passed")
+
+ @pytest.mark.asyncio
+ async def test_extreme_thresholds(self, seeder):
+ """Test edge cases with extreme threshold values."""
+ # Very low threshold - should return many results
+ config_low = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="match",
+ score_threshold=0.001,
+ scoring_method="bm25"
+ )
+ results_low = await seeder.urls(TEST_DOMAIN, config_low)
+
+ # Very high threshold - might return few or no results
+ config_high = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="match",
+ score_threshold=0.99,
+ scoring_method="bm25"
+ )
+ results_high = await seeder.urls(TEST_DOMAIN, config_high)
+
+ # Low threshold should return more results than high
+ assert len(results_low) >= len(results_high)
+ print(f"\nLow threshold (0.001): {len(results_low)} results")
+ print(f"High threshold (0.99): {len(results_high)} results")
+
+ # ============================================
+ # Metadata Extraction Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_comprehensive_metadata_extraction(self, seeder):
+ """Test extraction of all metadata types including JSON-LD."""
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="match report",
+ scoring_method="bm25",
+ max_urls=5,
+ verbose=True
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ for result in results:
+ head_data = result.get("head_data", {})
+
+ # Check for various metadata fields
+ print(f"\nMetadata for {result['url']}:")
+ print(f" Title: {head_data.get('title', 'N/A')}")
+ print(f" Charset: {head_data.get('charset', 'N/A')}")
+ print(f" Lang: {head_data.get('lang', 'N/A')}")
+
+ # Check meta tags
+ meta = head_data.get("meta", {})
+ if meta:
+ print(" Meta tags found:")
+ for key in ["description", "keywords", "author", "viewport"]:
+ if key in meta:
+ print(f" {key}: {meta[key][:50]}...")
+
+ # Check for Open Graph tags
+ og_tags = {k: v for k, v in meta.items() if k.startswith("og:")}
+ if og_tags:
+ print(" Open Graph tags found:")
+ for k, v in list(og_tags.items())[:3]:
+ print(f" {k}: {v[:50]}...")
+
+ # Check JSON-LD
+ if head_data.get("jsonld"):
+ print(f" JSON-LD schemas found: {len(head_data['jsonld'])}")
+
+ @pytest.mark.asyncio
+ async def test_jsonld_extraction_scoring(self, seeder):
+ """Test that JSON-LD data contributes to BM25 scoring."""
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="Premier League match report highlights",
+ scoring_method="bm25",
+ max_urls=20
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ # Find results with JSON-LD data
+ jsonld_results = [r for r in results if r.get("head_data", {}).get("jsonld")]
+
+ if jsonld_results:
+ print(f"\nFound {len(jsonld_results)} URLs with JSON-LD data")
+ for r in jsonld_results[:3]:
+ print(f" Score: {r['relevance_score']:.3f} - {r['url']}")
+ jsonld_data = r["head_data"]["jsonld"]
+ print(f" JSON-LD types: {[item.get('@type', 'Unknown') for item in jsonld_data if isinstance(item, dict)]}")
+
+ # ============================================
+ # Edge Cases and Error Handling
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_empty_query(self, seeder):
+ """Test behavior with empty query string."""
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="",
+ scoring_method="bm25",
+ max_urls=10
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ # Should return results but all with zero scores
+ assert len(results) > 0
+ assert all(r.get("relevance_score", 0) == 0 for r in results)
+
+ @pytest.mark.asyncio
+ async def test_query_without_extract_head(self, seeder):
+ """Test query scoring when extract_head is False."""
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=False, # This should trigger a warning
+ query="Premier League match report highlights",
+ scoring_method="bm25",
+ max_urls=10
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ # Results should not have relevance scores
+ assert all("relevance_score" not in r for r in results)
+ print("\nVerified: No scores added when extract_head=False")
+
+ @pytest.mark.asyncio
+ async def test_special_characters_in_query(self, seeder):
+ """Test queries with special characters and symbols."""
+ special_queries = [
+ "premier league + analytics",
+ "injury/rehab routines",
+ "AI-powered scouting",
+ "match stats & xG",
+ "tactical@breakdown",
+ "transfer-window.yml"
+ ]
+
+ for query in special_queries:
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query=query,
+ scoring_method="bm25",
+ max_urls=5
+ )
+
+ try:
+ results = await seeder.urls(TEST_DOMAIN, config)
+ assert isinstance(results, list)
+ print(f"\n✓ Query '{query}' processed successfully")
+ except Exception as e:
+ pytest.fail(f"Failed on query '{query}': {str(e)}")
+
+ @pytest.mark.asyncio
+ async def test_unicode_query(self, seeder):
+ """Test queries with Unicode characters."""
+ unicode_queries = [
+ "网页爬虫", # Chinese
+ "веб-краулер", # Russian
+ "🚀 crawl4ai", # Emoji
+ "naïve implementation", # Accented characters
+ ]
+
+ for query in unicode_queries:
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query=query,
+ scoring_method="bm25",
+ max_urls=5
+ )
+
+ try:
+ results = await seeder.urls(TEST_DOMAIN, config)
+ assert isinstance(results, list)
+ print(f"\n✓ Unicode query '{query}' processed successfully")
+ except Exception as e:
+ print(f"\n✗ Unicode query '{query}' failed: {str(e)}")
+
+ # ============================================
+ # Performance and Scalability Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_large_scale_scoring(self, seeder):
+ """Test BM25 scoring with many URLs."""
+ config = SeedingConfig(
+ source="cc+sitemap", # Use both sources for more URLs
+ extract_head=True,
+ query="world cup group standings",
+ scoring_method="bm25",
+ max_urls=100,
+ concurrency=20,
+ hits_per_sec=10
+ )
+
+ start_time = asyncio.get_event_loop().time()
+ results = await seeder.urls(TEST_DOMAIN, config)
+ elapsed = asyncio.get_event_loop().time() - start_time
+
+ print(f"\nProcessed {len(results)} URLs in {elapsed:.2f} seconds")
+ print(f"Average time per URL: {elapsed/len(results)*1000:.1f}ms")
+
+ # Verify scoring worked at scale
+ assert all("relevance_score" in r for r in results)
+
+ # Check score distribution
+ scores = [r["relevance_score"] for r in results]
+ print(f"Score distribution:")
+ print(f" Min: {min(scores):.3f}")
+ print(f" Max: {max(scores):.3f}")
+ print(f" Avg: {sum(scores)/len(scores):.3f}")
+
+ @pytest.mark.asyncio
+ async def test_concurrent_scoring_consistency(self, seeder):
+ """Test that concurrent requests produce consistent scores."""
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="live score update",
+ scoring_method="bm25",
+ max_urls=20,
+ concurrency=10
+ )
+
+ # Run the same query multiple times
+ results_list = []
+ for _ in range(3):
+ results = await seeder.urls(TEST_DOMAIN, config)
+ results_list.append(results)
+
+ # Compare scores across runs (they should be identical for same URLs)
+ url_scores = {}
+ for results in results_list:
+ for r in results:
+ url = r["url"]
+ score = r["relevance_score"]
+ if url in url_scores:
+ # Scores should be very close (allowing for tiny float differences)
+ assert abs(url_scores[url] - score) < 0.001
+ else:
+ url_scores[url] = score
+
+ print(f"\n✓ Consistent scores across {len(results_list)} runs")
+
+ # ============================================
+ # Multi-Domain Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_many_urls_with_scoring(self, seeder):
+ """Test many_urls method with BM25 scoring."""
+ domains = [TEST_DOMAIN, "docs.crawl4ai.com", "example.com"]
+
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ # live_check=True,
+ query="fixture list",
+ scoring_method="bm25",
+ score_threshold=0.2,
+ max_urls=10,
+ force=True, # Force fresh fetch
+ )
+
+ results_dict = await seeder.many_urls(domains, config)
+
+ for domain, results in results_dict.items():
+ print(f"\nDomain: {domain}")
+ print(f" Found {len(results)} URLs above threshold")
+ if results:
+ top = results[0]
+ print(f" Top result: {top['relevance_score']:.3f} - {top['url']}")
+
+ # ============================================
+ # Complex Query Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_multi_word_complex_queries(self, seeder):
+ """Test complex multi-word queries."""
+ complex_queries = [
+ "how to follow live match commentary",
+ "extract expected goals stats from match data",
+ "premier league match report analysis",
+ "transfer rumours and confirmed signings tracker",
+ "tactical breakdown of high press strategy"
+ ]
+
+ for query in complex_queries:
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query=query,
+ scoring_method="bm25",
+ max_urls=5
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ if results:
+ print(f"\nQuery: '{query}'")
+ print(f"Top match: {results[0]['relevance_score']:.3f} - {results[0]['url']}")
+
+ # Extract matched terms from metadata
+ head_data = results[0].get("head_data", {})
+ title = head_data.get("title", "")
+ description = head_data.get("meta", {}).get("description", "")
+
+ # Simple term matching for verification
+ query_terms = set(query.lower().split())
+ title_terms = set(title.lower().split())
+ desc_terms = set(description.lower().split())
+
+ matched_terms = query_terms & (title_terms | desc_terms)
+ if matched_terms:
+ print(f"Matched terms: {', '.join(matched_terms)}")
+
+ # ============================================
+ # Cache and Force Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_scoring_with_cache(self, seeder):
+ """Test that scoring works correctly with cached results."""
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="injury update timeline",
+ scoring_method="bm25",
+ max_urls=10,
+ force=False # Use cache
+ )
+
+ # First run - populate cache
+ results1 = await seeder.urls(TEST_DOMAIN, config)
+
+ # Second run - should use cache
+ results2 = await seeder.urls(TEST_DOMAIN, config)
+
+ # Results should be identical
+ assert len(results1) == len(results2)
+ for r1, r2 in zip(results1, results2):
+ assert r1["url"] == r2["url"]
+ assert abs(r1["relevance_score"] - r2["relevance_score"]) < 0.001
+
+ print("\n✓ Cache produces consistent scores")
+
+ @pytest.mark.asyncio
+ async def test_force_refresh_scoring(self, seeder):
+ """Test force=True bypasses cache for fresh scoring."""
+ config_cached = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="transfer window",
+ scoring_method="bm25",
+ max_urls=5,
+ force=False
+ )
+
+ config_forced = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="transfer window",
+ scoring_method="bm25",
+ max_urls=5,
+ force=True
+ )
+
+ # Run with cache
+ start1 = asyncio.get_event_loop().time()
+ results1 = await seeder.urls(TEST_DOMAIN, config_cached)
+ time1 = asyncio.get_event_loop().time() - start1
+
+ # Run with force (should be slower due to fresh fetch)
+ start2 = asyncio.get_event_loop().time()
+ results2 = await seeder.urls(TEST_DOMAIN, config_forced)
+ time2 = asyncio.get_event_loop().time() - start2
+
+ print(f"\nCached run: {time1:.2f}s")
+ print(f"Forced run: {time2:.2f}s")
+
+ # Both should produce scored results
+ assert all("relevance_score" in r for r in results1)
+ assert all("relevance_score" in r for r in results2)
+
+ # ============================================
+ # Source Combination Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_scoring_with_multiple_sources(self, seeder):
+ """Test BM25 scoring with combined sources (cc+sitemap)."""
+ config = SeedingConfig(
+ source="cc+sitemap",
+ extract_head=True,
+ query="match highlights video",
+ scoring_method="bm25",
+ score_threshold=0.3,
+ max_urls=30,
+ concurrency=15
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ # Verify we got results from both sources
+ print(f"\nCombined sources returned {len(results)} URLs above threshold")
+
+ # Check URL diversity
+ unique_paths = set()
+ for r in results:
+ path = r["url"].replace("https://", "").replace("http://", "").split("/", 1)[-1]
+ unique_paths.add(path.split("?")[0]) # Remove query params
+
+ print(f"Unique paths found: {len(unique_paths)}")
+
+ # All should be scored and above threshold
+ assert all(r["relevance_score"] >= 0.3 for r in results)
+
+ # ============================================
+ # Integration Tests
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_full_workflow_integration(self, seeder):
+ """Test complete workflow: discover -> score -> filter -> use."""
+ # Step 1: Discover and score URLs
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query="premier league opening fixtures",
+ scoring_method="bm25",
+ score_threshold=0.4,
+ max_urls=10,
+ verbose=True
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ print(f"\nStep 1: Found {len(results)} relevant URLs")
+
+ # Step 2: Analyze top results
+ if results:
+ top_urls = results[:3]
+ print("\nStep 2: Top 3 URLs for crawling:")
+ for i, r in enumerate(top_urls):
+ print(f"{i+1}. Score: {r['relevance_score']:.3f}")
+ print(f" URL: {r['url']}")
+ print(f" Title: {r['head_data'].get('title', 'N/A')}")
+
+ # Check metadata quality
+ meta = r['head_data'].get('meta', {})
+ if 'description' in meta:
+ print(f" Description: {meta['description'][:80]}...")
+
+ # Step 3: Verify these URLs would be good for actual crawling
+ assert all(r["status"] == "valid" for r in results[:3])
+ print("\nStep 3: All top URLs are valid for crawling ✓")
+
+ # ============================================
+ # Report Generation
+ # ============================================
+
+ @pytest.mark.asyncio
+ async def test_generate_scoring_report(self, seeder):
+ """Generate a comprehensive report of BM25 scoring effectiveness."""
+ queries = {
+ "beginner": "match schedule",
+ "advanced": "tactical analysis pressing",
+ "api": "VAR decision explanation",
+ "deployment": "fixture changes due to weather",
+ "extraction": "expected goals statistics"
+ }
+
+ report = {
+ "timestamp": datetime.now().isoformat(),
+ "domain": TEST_DOMAIN,
+ "results": {}
+ }
+
+ for category, query in queries.items():
+ config = SeedingConfig(
+ source="sitemap",
+ extract_head=True,
+ query=query,
+ scoring_method="bm25",
+ max_urls=10
+ )
+
+ results = await seeder.urls(TEST_DOMAIN, config)
+
+ report["results"][category] = {
+ "query": query,
+ "total_results": len(results),
+ "top_results": [
+ {
+ "url": r["url"],
+ "score": r["relevance_score"],
+ "title": r["head_data"].get("title", "")
+ }
+ for r in results[:3]
+ ],
+ "score_distribution": {
+ "min": min(r["relevance_score"] for r in results) if results else 0,
+ "max": max(r["relevance_score"] for r in results) if results else 0,
+ "avg": sum(r["relevance_score"] for r in results) / len(results) if results else 0
+ }
+ }
+
+ # Print report
+ print("\n" + "="*60)
+ print("BM25 SCORING EFFECTIVENESS REPORT")
+ print("="*60)
+ print(f"Domain: {report['domain']}")
+ print(f"Timestamp: {report['timestamp']}")
+ print("\nResults by Category:")
+
+ for category, data in report["results"].items():
+ print(f"\n{category.upper()}: '{data['query']}'")
+ print(f" Total results: {data['total_results']}")
+ print(f" Score range: {data['score_distribution']['min']:.3f} - {data['score_distribution']['max']:.3f}")
+ print(f" Average score: {data['score_distribution']['avg']:.3f}")
+ print(" Top matches:")
+ for i, result in enumerate(data['top_results']):
+ print(f" {i+1}. [{result['score']:.3f}] {result['title']}")
+
+
+# ============================================
+# Standalone test runner
+# ============================================
+
+async def run_all_tests():
+ """Run all tests standalone (without pytest)."""
+ print("Running AsyncUrlSeeder BM25 Tests...")
+ print("="*60)
+
+ test_instance = TestAsyncUrlSeederBM25()
+ seeder = await test_instance.create_seeder()
+
+ # Run each test method
+ test_methods = [
+ # test_instance.test_basic_bm25_scoring,
+ # test_instance.test_query_variations,
+ # test_instance.test_score_threshold_filtering,
+ # test_instance.test_extreme_thresholds,
+ # test_instance.test_comprehensive_metadata_extraction,
+ # test_instance.test_jsonld_extraction_scoring,
+ # test_instance.test_empty_query,
+ # test_instance.test_query_without_extract_head,
+ # test_instance.test_special_characters_in_query,
+ # test_instance.test_unicode_query,
+ # test_instance.test_large_scale_scoring,
+ # test_instance.test_concurrent_scoring_consistency,
+ # test_instance.test_many_urls_with_scoring,
+ test_instance.test_multi_word_complex_queries,
+ test_instance.test_scoring_with_cache,
+ test_instance.test_force_refresh_scoring,
+ test_instance.test_scoring_with_multiple_sources,
+ test_instance.test_full_workflow_integration,
+ test_instance.test_generate_scoring_report
+ ]
+
+ for test_method in test_methods:
+ try:
+ print(f"\nRunning {test_method.__name__}...")
+ await test_method(seeder)
+ print(f"✓ {test_method.__name__} passed")
+ except Exception as e:
+ import traceback
+ print(f"✗ {test_method.__name__} failed: {str(e)}")
+ print(f" Error type: {type(e).__name__}")
+ traceback.print_exc()
+
+ print("\n" + "="*60)
+ print("Test suite completed!")
+
+
+if __name__ == "__main__":
+ # Run tests directly
+ asyncio.run(run_all_tests())
\ No newline at end of file