diff --git a/crawl4ai/async_url_seeder.py b/crawl4ai/async_url_seeder.py index b9dce91a..c3931b07 100644 --- a/crawl4ai/async_url_seeder.py +++ b/crawl4ai/async_url_seeder.py @@ -14,7 +14,16 @@ Features """ from __future__ import annotations -import aiofiles, asyncio, gzip, hashlib, io, json, os, pathlib, re, time +import aiofiles +import asyncio +import gzip +import hashlib +import io +import json +import os +import pathlib +import re +import time from datetime import timedelta from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Union @@ -42,7 +51,8 @@ except ImportError: # Import AsyncLoggerBase from crawl4ai's logger module # Assuming crawl4ai/async_logger.py defines AsyncLoggerBase # You might need to adjust this import based on your exact file structure -from .async_logger import AsyncLoggerBase, AsyncLogger # Import AsyncLogger for default if needed +# Import AsyncLogger for default if needed +from .async_logger import AsyncLoggerBase, AsyncLogger # Import SeedingConfig for type hints from typing import TYPE_CHECKING @@ -55,16 +65,19 @@ COLLINFO_URL = "https://index.commoncrawl.org/collinfo.json" # CACHE_DIR = pathlib.Path("~/.crawl4ai").expanduser() # REMOVED: now managed by __init__ # CACHE_DIR.mkdir(exist_ok=True) # REMOVED: now managed by __init__ # INDEX_CACHE = CACHE_DIR / "latest_cc_index.txt" # REMOVED: now managed by __init__ -TTL = timedelta(days=7) # Keeping this constant as it's a seeder-specific TTL +TTL = timedelta(days=7) # Keeping this constant as it's a seeder-specific TTL _meta_rx = re.compile( r']*?(?:name|property|http-equiv)\s*=\s*["\']?([^"\' >]+)[^>]*?content\s*=\s*["\']?([^"\' >]+)[^>]*?)\/?>', re.I) _charset_rx = re.compile(r']*charset=["\']?([^"\' >]+)', re.I) -_title_rx = re.compile(r'(.*?)', re.I|re.S) -_link_rx = re.compile(r']*rel=["\']?([^"\' >]+)[^>]*href=["\']?([^"\' >]+)', re.I) +_title_rx = re.compile(r'(.*?)', re.I | re.S) +_link_rx = re.compile( + r']*rel=["\']?([^"\' >]+)[^>]*href=["\']?([^"\' >]+)', re.I) # ────────────────────────────────────────────────────────────────────────── helpers + + def _match(url: str, pattern: str) -> bool: if fnmatch.fnmatch(url, pattern): return True @@ -72,11 +85,13 @@ def _match(url: str, pattern: str) -> bool: return (fnmatch.fnmatch(canon, pattern) or (canon.startswith("www.") and fnmatch.fnmatch(canon[4:], pattern))) + def _parse_head(src: str) -> Dict[str, Any]: if LXML: try: if isinstance(src, str): - src = src.encode("utf-8", "replace") # strip Unicode, let lxml decode + # strip Unicode, let lxml decode + src = src.encode("utf-8", "replace") doc = lxml_html.fromstring(src) except (ValueError, etree.ParserError): return {} # malformed, bail gracefully @@ -87,13 +102,18 @@ def _parse_head(src: str) -> Dict[str, Any]: "meta": {}, "link": {}, "jsonld": [] } for el in doc.xpath(".//meta"): - k = el.attrib.get("name") or el.attrib.get("property") or el.attrib.get("http-equiv") - if k: info["meta"][k.lower()] = el.attrib.get("content", "") - elif "charset" in el.attrib: info["charset"] = el.attrib["charset"].lower() + k = el.attrib.get("name") or el.attrib.get( + "property") or el.attrib.get("http-equiv") + if k: + info["meta"][k.lower()] = el.attrib.get("content", "") + elif "charset" in el.attrib: + info["charset"] = el.attrib["charset"].lower() for el in doc.xpath(".//link"): rel = " ".join(el.attrib.get("rel", [])).lower() - if not rel: continue - entry = {a: el.attrib[a] for a in ("href","as","type","hreflang") if a in el.attrib} + if not rel: + continue + entry = {a: el.attrib[a] for a in ( + "href", "as", "type", "hreflang") if a in el.attrib} info["link"].setdefault(rel, []).append(entry) # Extract JSON-LD structured data for script in doc.xpath('.//script[@type="application/ld+json"]'): @@ -109,14 +129,19 @@ def _parse_head(src: str) -> Dict[str, Any]: info["lang"] = html_elem.attrib.get("lang", "") return info # regex fallback - info: Dict[str,Any] = {"title":None,"charset":None,"meta":{},"link":{},"jsonld":[],"lang":""} - m=_title_rx.search(src); info["title"]=m.group(1).strip() if m else None - for k,v in _meta_rx.findall(src): info["meta"][k.lower()]=v - m=_charset_rx.search(src); info["charset"]=m.group(1).lower() if m else None - for rel,href in _link_rx.findall(src): - info["link"].setdefault(rel.lower(),[]).append({"href":href}) + info: Dict[str, Any] = {"title": None, "charset": None, + "meta": {}, "link": {}, "jsonld": [], "lang": ""} + m = _title_rx.search(src) + info["title"] = m.group(1).strip() if m else None + for k, v in _meta_rx.findall(src): + info["meta"][k.lower()] = v + m = _charset_rx.search(src) + info["charset"] = m.group(1).lower() if m else None + for rel, href in _link_rx.findall(src): + info["link"].setdefault(rel.lower(), []).append({"href": href}) # Try to extract JSON-LD with regex - jsonld_pattern = re.compile(r']*type=["\']application/ld\+json["\'][^>]*>(.*?)', re.I|re.S) + jsonld_pattern = re.compile( + r']*type=["\']application/ld\+json["\'][^>]*>(.*?)', re.I | re.S) for match in jsonld_pattern.findall(src): try: jsonld_data = json.loads(match.strip()) @@ -130,41 +155,72 @@ def _parse_head(src: str) -> Dict[str, Any]: return info # ────────────────────────────────────────────────────────────────────────── class + + class AsyncUrlSeeder: """ Async version of UrlSeeder. Call pattern is await/async for / async with. - Public coroutine - ---------------- + Public coroutines + ----------------- await seed.urls(...) returns List[Dict[str,Any]] (url, status, head_data) + await seed.many_urls(...) + returns Dict[str, List[Dict[str,Any]]] + await seed.close() + closes the HTTP client if owned by seeder + + Usage examples + -------------- + # Manual cleanup: + seeder = AsyncUrlSeeder() + try: + urls = await seeder.urls("example.com", config) + finally: + await seeder.close() + + # Using async context manager (recommended): + async with AsyncUrlSeeder() as seeder: + urls = await seeder.urls("example.com", config) + + # Reusing existing client: + client = httpx.AsyncClient() + seeder = AsyncUrlSeeder(client=client) + urls = await seeder.urls("example.com", config) + # No need to close seeder, as it doesn't own the client """ def __init__( self, ttl: timedelta = TTL, - client: Optional[httpx.AsyncClient]=None, - logger: Optional[AsyncLoggerBase] = None, # NEW: Add logger parameter - base_directory: Optional[Union[str, pathlib.Path]] = None, # NEW: Add base_directory + client: Optional[httpx.AsyncClient] = None, + logger: Optional[AsyncLoggerBase] = None, # NEW: Add logger parameter + # NEW: Add base_directory + base_directory: Optional[Union[str, pathlib.Path]] = None, cache_root: Optional[Union[str, Path]] = None, ): self.ttl = ttl + self._owns_client = client is None # Track if we created the client self.client = client or httpx.AsyncClient(http2=True, timeout=20, headers={ - "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) +AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) +AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" }) - self.logger = logger # Store the logger instance - self.base_directory = pathlib.Path(base_directory or os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())) # Resolve base_directory - self.cache_dir = self.base_directory / ".crawl4ai" / "seeder_cache" # NEW: Specific cache dir for seeder - self.cache_dir.mkdir(parents=True, exist_ok=True) # Ensure it exists - self.index_cache_path = self.cache_dir / "latest_cc_index.txt" # NEW: Index cache path + self.logger = logger # Store the logger instance + self.base_directory = pathlib.Path(base_directory or os.getenv( + "CRAWL4_AI_BASE_DIRECTORY", Path.home())) # Resolve base_directory + self.cache_dir = self.base_directory / ".crawl4ai" / \ + "seeder_cache" # NEW: Specific cache dir for seeder + self.cache_dir.mkdir(parents=True, exist_ok=True) # Ensure it exists + self.index_cache_path = self.cache_dir / \ + "latest_cc_index.txt" # NEW: Index cache path # defer – grabbing the index inside an active loop blows up self.index_id: Optional[str] = None self._rate_sem: Optional[asyncio.Semaphore] = None # ───────── cache dirs ───────── - self.cache_root = Path(os.path.expanduser(cache_root or "~/.cache/url_seeder")) + self.cache_root = Path(os.path.expanduser( + cache_root or "~/.cache/url_seeder")) (self.cache_root / "live").mkdir(parents=True, exist_ok=True) (self.cache_root / "head").mkdir(exist_ok=True) @@ -173,7 +229,8 @@ class AsyncUrlSeeder: if self.logger: log_method = getattr(self.logger, level, None) if log_method: - log_method(message=message, tag=tag, params=kwargs.get('params', {})) + log_method(message=message, tag=tag, + params=kwargs.get('params', {})) # else: # Fallback for unknown level, should not happen with AsyncLoggerBase # print(f"[{tag}] {level.upper()}: {message.format(**kwargs)}") @@ -182,35 +239,34 @@ class AsyncUrlSeeder: h = hashlib.sha1(url.encode()).hexdigest() return self.cache_root / kind / f"{h}.json" - def _cache_get(self, kind: str, url: str) -> Optional[Dict[str, Any]]: + async def _cache_get(self, kind: str, url: str) -> Optional[Dict[str, Any]]: p = self._cache_path(kind, url) if not p.exists(): return None - # TTL check - if time.time() - p.stat().st_mtime > self.ttl.total_seconds(): + if time.time()-p.stat().st_mtime > self.ttl.total_seconds(): return None try: - return json.loads(p.read_text()) + async with aiofiles.open(p, "r") as f: + return json.loads(await f.read()) except Exception: return None - def _cache_set(self, kind: str, url: str, data: Dict[str, Any]) -> None: + async def _cache_set(self, kind: str, url: str, data: Dict[str, Any]) -> None: try: - self._cache_path(kind, url).write_text( - json.dumps(data, separators=(",", ":")) - ) + async with aiofiles.open(self._cache_path(kind, url), "w") as f: + await f.write(json.dumps(data, separators=(",", ":"))) except Exception: pass - # ─────────────────────────────── discovery entry + async def urls(self, - domain: str, - config: "SeedingConfig", - ) -> List[Dict[str,Any]]: + domain: str, + config: "SeedingConfig", + ) -> List[Dict[str, Any]]: """ Fetch URLs for a domain using configuration from SeedingConfig. - + Parameters ---------- domain : str @@ -228,37 +284,40 @@ class AsyncUrlSeeder: hits_per_sec = config.hits_per_sec self.force = config.force # Store force flag as instance attribute force = config.force - verbose = config.verbose if config.verbose is not None else (self.logger.verbose if self.logger else False) + verbose = config.verbose if config.verbose is not None else ( + self.logger.verbose if self.logger else False) max_urls = config.max_urls if config.max_urls is not None else -1 query = config.query score_threshold = config.score_threshold - scoring_method = config.scoring_method - + scoring_method = config.scoring_method + # Ensure seeder's logger verbose matches the config's verbose if it's set if self.logger and hasattr(self.logger, 'verbose') and config.verbose is not None: self.logger.verbose = config.verbose # ensure we have the latest CC collection id if self.index_id is None: - self.index_id = await self._latest_index() + self.index_id = await self._latest_index() # Parse source parameter - split by '+' to get list of sources sources = source.split('+') valid_sources = {"cc", "sitemap"} for s in sources: if s not in valid_sources: - raise ValueError(f"Invalid source '{s}'. Valid sources are: {', '.join(valid_sources)}") - + raise ValueError( + f"Invalid source '{s}'. Valid sources are: {', '.join(valid_sources)}") + if hits_per_sec: if hits_per_sec <= 0: - self._log("warning", "hits_per_sec must be positive. Disabling rate limiting.", tag="URL_SEED") + self._log( + "warning", "hits_per_sec must be positive. Disabling rate limiting.", tag="URL_SEED") self._rate_sem = None else: self._rate_sem = asyncio.Semaphore(hits_per_sec) else: - self._rate_sem = None # Ensure it's None if no rate limiting + self._rate_sem = None # Ensure it's None if no rate limiting - self._log("info", "Starting URL seeding for {domain} with source={source}", + self._log("info", "Starting URL seeding for {domain} with source={source}", params={"domain": domain, "source": source}, tag="URL_SEED") # choose stream @@ -268,44 +327,51 @@ class AsyncUrlSeeder: async for u in self._from_sitemaps(domain, pattern, force): yield u if "cc" in sources: - self._log("debug", "Fetching from Common Crawl...", tag="URL_SEED") + self._log("debug", "Fetching from Common Crawl...", + tag="URL_SEED") async for u in self._from_cc(domain, pattern, force): yield u - queue = asyncio.Queue() + # Use bounded queue to prevent RAM spikes with large domains + queue_size = min(10000, max(1000, concurrency * 100)) # Dynamic size based on concurrency + queue = asyncio.Queue(maxsize=queue_size) producer_done = asyncio.Event() - stop_event = asyncio.Event() + stop_event = asyncio.Event() seen: set[str] = set() async def producer(): try: async for u in gen(): if u in seen: - self._log("debug", "Skipping duplicate URL: {url}", + self._log("debug", "Skipping duplicate URL: {url}", params={"url": u}, tag="URL_SEED") continue if stop_event.is_set(): - self._log("info", "Producer stopping due to max_urls limit.", tag="URL_SEED") + self._log( + "info", "Producer stopping due to max_urls limit.", tag="URL_SEED") break - await queue.put(u) + seen.add(u) + await queue.put(u) # Will block if queue is full, providing backpressure except Exception as e: - self._log("error", "Producer encountered an error: {error}", params={"error": str(e)}, tag="URL_SEED") + self._log("error", "Producer encountered an error: {error}", params={ + "error": str(e)}, tag="URL_SEED") finally: producer_done.set() self._log("debug", "Producer finished.", tag="URL_SEED") - - async def worker(res_list: List[Dict[str,Any]]): + async def worker(res_list: List[Dict[str, Any]]): while True: if queue.empty() and producer_done.is_set(): # self._log("debug", "Worker exiting: queue empty and producer done.", tag="URL_SEED") break try: - url = await asyncio.wait_for(queue.get(), 5) # Increased timeout slightly + # Increased timeout slightly + url = await asyncio.wait_for(queue.get(), 5) except asyncio.TimeoutError: - continue # Keep checking queue and producer_done status + continue # Keep checking queue and producer_done status except Exception as e: - self._log("error", "Worker failed to get URL from queue: {error}", params={"error": str(e)}, tag="URL_SEED") + self._log("error", "Worker failed to get URL from queue: {error}", params={ + "error": str(e)}, tag="URL_SEED") continue if max_urls > 0 and len(res_list) >= max_urls: @@ -332,70 +398,34 @@ class AsyncUrlSeeder: if self._rate_sem: # global QPS control async with self._rate_sem: await self._validate(url, res_list, live_check, extract_head, - head_timeout, verbose) + head_timeout, verbose, query, score_threshold, scoring_method) else: await self._validate(url, res_list, live_check, extract_head, - head_timeout, verbose) - queue.task_done() # Mark task as done for queue.join() if ever used + head_timeout, verbose, query, score_threshold, scoring_method) + queue.task_done() # Mark task as done for queue.join() if ever used # launch - results: List[Dict[str,Any]] = [] + results: List[Dict[str, Any]] = [] prod_task = asyncio.create_task(producer()) - workers = [asyncio.create_task(worker(results)) for _ in range(concurrency)] + workers = [asyncio.create_task(worker(results)) + for _ in range(concurrency)] # Wait for all workers to finish await asyncio.gather(prod_task, *workers) - await queue.join() # Ensure all queued items are processed + await queue.join() # Ensure all queued items are processed - self._log("info", "Finished URL seeding for {domain}. Total URLs: {count}", + self._log("info", "Finished URL seeding for {domain}. Total URLs: {count}", params={"domain": domain, "count": len(results)}, tag="URL_SEED") - # Apply BM25 scoring if query is provided and extract_head is enabled + # Sort by relevance score if query was provided if query and extract_head and scoring_method == "bm25": - self._log("info", "Applying BM25 scoring for query: '{query}'", - params={"query": query}, tag="URL_SEED") - - # Extract text contexts from all results - documents = [] - valid_indices = [] - for i, result in enumerate(results): - if result.get("head_data"): - text_context = self._extract_text_context(result["head_data"]) - if text_context: # Only include non-empty contexts - documents.append(text_context) - valid_indices.append(i) - - if documents: - # Calculate BM25 scores - scores = self._calculate_bm25_score(query, documents) - - # Add scores to results - for idx, score in zip(valid_indices, scores): - results[idx]["relevance_score"] = float(score) - - # Add zero scores to results without head_data - for i, result in enumerate(results): - if i not in valid_indices: - result["relevance_score"] = 0.0 - - # Filter by score threshold if specified - if score_threshold is not None: - original_count = len(results) - results = [r for r in results if r.get("relevance_score", 0.0) >= score_threshold] - self._log("info", "Filtered {filtered} URLs below score threshold {threshold}. Remaining: {remaining}", - params={"filtered": original_count - len(results), - "threshold": score_threshold, - "remaining": len(results)}, tag="URL_SEED") - - # Sort by relevance score (highest first) - results.sort(key=lambda x: x.get("relevance_score", 0.0), reverse=True) - else: - self._log("warning", "No valid head data found for BM25 scoring.", tag="URL_SEED") - # Add zero scores to all results - for result in results: - result["relevance_score"] = 0.0 + results.sort(key=lambda x: x.get( + "relevance_score", 0.0), reverse=True) + self._log("info", "Sorted {count} URLs by relevance score for query: '{query}'", + params={"count": len(results), "query": query}, tag="URL_SEED") elif query and not extract_head: - self._log("warning", "Query provided but extract_head is False. Enable extract_head for relevance scoring.", tag="URL_SEED") + self._log( + "warning", "Query provided but extract_head is False. Enable extract_head for relevance scoring.", tag="URL_SEED") return results[:max_urls] if max_urls > 0 else results @@ -403,7 +433,7 @@ class AsyncUrlSeeder: self, domains: Sequence[str], config: "SeedingConfig", - ) -> Dict[str, List[Dict[str,Any]]]: + ) -> Dict[str, List[Dict[str, Any]]]: """ Fetch URLs for many domains in parallel. @@ -416,9 +446,9 @@ class AsyncUrlSeeder: Returns a {domain: urls-list} dict. """ - self._log("info", "Starting URL seeding for {count} domains...", + self._log("info", "Starting URL seeding for {count} domains...", params={"count": len(domains)}, tag="URL_SEED") - + # Ensure seeder's logger verbose matches the config's verbose if it's set if self.logger and hasattr(self.logger, 'verbose') and config.verbose is not None: self.logger.verbose = config.verbose @@ -428,9 +458,10 @@ class AsyncUrlSeeder: for domain in domains ] results = await asyncio.gather(*tasks) - + final_results = dict(zip(domains, results)) - self._log("info", "Finished URL seeding for multiple domains.", tag="URL_SEED") + self._log( + "info", "Finished URL seeding for multiple domains.", tag="URL_SEED") return final_results async def _resolve_head(self, url: str) -> Optional[str]: @@ -459,66 +490,67 @@ class AsyncUrlSeeder: except Exception as e: self._log("debug", "HEAD {url} failed: {err}", - params={"url": url, "err": str(e)}, tag="URL_SEED") + params={"url": url, "err": str(e)}, tag="URL_SEED") return None - # ─────────────────────────────── CC - async def _from_cc(self, domain:str, pattern:str, force:bool): + async def _from_cc(self, domain: str, pattern: str, force: bool): import re digest = hashlib.md5(pattern.encode()).hexdigest()[:8] # ── normalise for CC (strip scheme, query, fragment) - raw = re.sub(r'^https?://', '', domain).split('#', 1)[0].split('?', 1)[0].lstrip('.') + raw = re.sub(r'^https?://', '', domain).split('#', + 1)[0].split('?', 1)[0].lstrip('.') # ── sanitize only for cache-file name safe = re.sub('[/?#]+', '_', raw) path = self.cache_dir / f"{self.index_id}_{safe}_{digest}.jsonl" if path.exists() and not force: - self._log("info", "Loading CC URLs for {domain} from cache: {path}", + self._log("info", "Loading CC URLs for {domain} from cache: {path}", params={"domain": domain, "path": path}, tag="URL_SEED") - async with aiofiles.open(path,"r") as fp: + async with aiofiles.open(path, "r") as fp: async for line in fp: - url=line.strip() - if _match(url,pattern): yield url + url = line.strip() + if _match(url, pattern): + yield url return # build CC glob – if a path is present keep it, else add trailing /* glob = f"*.{raw}*" if '/' in raw else f"*.{raw}/*" url = f"https://index.commoncrawl.org/{self.index_id}-index?url={quote(glob, safe='*')}&output=json" - retries=(1,3,7) - self._log("info", "Fetching CC URLs for {domain} from Common Crawl index: {url}", + retries = (1, 3, 7) + self._log("info", "Fetching CC URLs for {domain} from Common Crawl index: {url}", params={"domain": domain, "url": url}, tag="URL_SEED") - for i,d in enumerate(retries+(-1,)): # last -1 means don't retry + for i, d in enumerate(retries+(-1,)): # last -1 means don't retry try: async with self.client.stream("GET", url) as r: r.raise_for_status() - async with aiofiles.open(path,"w") as fp: + async with aiofiles.open(path, "w") as fp: async for line in r.aiter_lines(): rec = json.loads(line) u = rec["url"] await fp.write(u+"\n") - if _match(u,pattern): yield u + if _match(u, pattern): + yield u return except httpx.HTTPStatusError as e: - if e.response.status_code==503 and i or presence of elements + is_sitemap_index = False + sub_sitemaps = [] + regular_urls = [] + # Use lxml for XML parsing if available, as it's generally more robust if LXML: try: # Use XML parser for sitemaps, not HTML parser parser = etree.XMLParser(recover=True) root = etree.fromstring(data, parser=parser) - + # Define namespace for sitemap ns = {'s': 'http://www.sitemaps.org/schemas/sitemap/0.9'} - - # First check if this is a sitemap index - for sitemap_elem in root.xpath('//s:sitemap/s:loc', namespaces=ns): - loc = sitemap_elem.text.strip() if sitemap_elem.text else "" - if loc: - self._log("debug", "Found nested sitemap: {loc}", params={"loc": loc}, tag="URL_SEED") - async for u in self._iter_sitemap(loc): - yield u - - # Then check for regular URLs - for loc_elem in root.xpath('//s:url/s:loc', namespaces=ns): - loc = loc_elem.text.strip() if loc_elem.text else "" - if loc: - yield loc + + # Check for sitemap index entries + sitemap_locs = root.xpath('//s:sitemap/s:loc', namespaces=ns) + if sitemap_locs: + is_sitemap_index = True + for sitemap_elem in sitemap_locs: + loc = sitemap_elem.text.strip() if sitemap_elem.text else "" + if loc: + sub_sitemaps.append(loc) + + # If not a sitemap index, get regular URLs + if not is_sitemap_index: + for loc_elem in root.xpath('//s:url/s:loc', namespaces=ns): + loc = loc_elem.text.strip() if loc_elem.text else "" + if loc: + regular_urls.append(loc) except Exception as e: - self._log("error", "LXML parsing error for sitemap {url}: {error}", + self._log("error", "LXML parsing error for sitemap {url}: {error}", params={"url": url, "error": str(e)}, tag="URL_SEED") return - else: # Fallback to xml.etree.ElementTree + else: # Fallback to xml.etree.ElementTree import xml.etree.ElementTree as ET try: # Parse the XML @@ -634,89 +677,180 @@ class AsyncUrlSeeder: for elem in root.iter(): if '}' in elem.tag: elem.tag = elem.tag.split('}')[1] - + # Check for sitemap index entries - for sitemap in root.findall('.//sitemap'): - loc_elem = sitemap.find('loc') - if loc_elem is not None and loc_elem.text: - loc = loc_elem.text.strip() - self._log("debug", "Found nested sitemap: {loc}", params={"loc": loc}, tag="URL_SEED") - async for u in self._iter_sitemap(loc): - yield u - - # Check for regular URL entries - for url in root.findall('.//url'): - loc_elem = url.find('loc') - if loc_elem is not None and loc_elem.text: - yield loc_elem.text.strip() + sitemaps = root.findall('.//sitemap') + if sitemaps: + is_sitemap_index = True + for sitemap in sitemaps: + loc_elem = sitemap.find('loc') + if loc_elem is not None and loc_elem.text: + sub_sitemaps.append(loc_elem.text.strip()) + + # If not a sitemap index, get regular URLs + if not is_sitemap_index: + for url_elem in root.findall('.//url'): + loc_elem = url_elem.find('loc') + if loc_elem is not None and loc_elem.text: + regular_urls.append(loc_elem.text.strip()) except Exception as e: - self._log("error", "ElementTree parsing error for sitemap {url}: {error}", + self._log("error", "ElementTree parsing error for sitemap {url}: {error}", params={"url": url, "error": str(e)}, tag="URL_SEED") return + # Process based on type + if is_sitemap_index and sub_sitemaps: + self._log("info", "Processing sitemap index with {count} sub-sitemaps in parallel", + params={"count": len(sub_sitemaps)}, tag="URL_SEED") + + # Create a bounded queue for results to prevent RAM issues + # For sitemap indexes, use a larger queue as we expect many URLs + queue_size = min(50000, len(sub_sitemaps) * 1000) # Estimate 1000 URLs per sitemap + result_queue = asyncio.Queue(maxsize=queue_size) + completed_count = 0 + total_sitemaps = len(sub_sitemaps) + + async def process_subsitemap(sitemap_url: str): + try: + self._log( + "debug", "Processing sub-sitemap: {url}", params={"url": sitemap_url}, tag="URL_SEED") + # Recursively process sub-sitemap + async for u in self._iter_sitemap(sitemap_url): + await result_queue.put(u) # Will block if queue is full + except Exception as e: + self._log("error", "Error processing sub-sitemap {url}: {error}", + params={"url": sitemap_url, "error": str(e)}, tag="URL_SEED") + finally: + # Put sentinel to signal completion + await result_queue.put(None) + + # Start all tasks + tasks = [asyncio.create_task(process_subsitemap(sm)) + for sm in sub_sitemaps] + + # Yield results as they come in + while completed_count < total_sitemaps: + item = await result_queue.get() + if item is None: + completed_count += 1 + else: + yield item + + # Ensure all tasks are done + await asyncio.gather(*tasks, return_exceptions=True) + else: + # Regular sitemap - yield URLs directly + for u in regular_urls: + yield u # ─────────────────────────────── validate helpers - async def _validate(self, url:str, res_list:List[Dict[str,Any]], live:bool, - extract:bool, timeout:int, verbose:bool): + async def _validate(self, url: str, res_list: List[Dict[str, Any]], live: bool, + extract: bool, timeout: int, verbose: bool, query: Optional[str] = None, + score_threshold: Optional[float] = None, scoring_method: str = "bm25"): # Local verbose parameter for this function is used to decide if intermediate logs should be printed # The main logger's verbose status should be controlled by the caller. - + cache_kind = "head" if extract else "live" # ---------- try cache ---------- - if (live or extract) and not (hasattr(self, 'force') and self.force): - cached = self._cache_get(cache_kind, url) + if not (hasattr(self, 'force') and self.force): + cached = await self._cache_get(cache_kind, url) if cached: res_list.append(cached) return if extract: - self._log("debug", "Fetching head for {url}", params={"url": url}, tag="URL_SEED") - ok,html,final = await self._fetch_head(url,timeout) - status="valid" if ok else "not_valid" - self._log("info" if ok else "warning", "HEAD {status} for {final_url}", + self._log("debug", "Fetching head for {url}", params={ + "url": url}, tag="URL_SEED") + ok, html, final = await self._fetch_head(url, timeout) + status = "valid" if ok else "not_valid" + self._log("info" if ok else "warning", "HEAD {status} for {final_url}", params={"status": status.upper(), "final_url": final or url}, tag="URL_SEED") + # head_data = _parse_head(html) if ok else {} + head_data = await asyncio.to_thread(_parse_head, html) if ok else {} entry = { "url": final or url, "status": status, - "head_data": _parse_head(html) if ok else {}, + "head_data": head_data, } - if live or extract: - self._cache_set(cache_kind, url, entry) - res_list.append(entry) + + # Apply BM25 scoring if query is provided and head data exists + if query and ok and scoring_method == "bm25" and head_data: + text_context = self._extract_text_context(head_data) + if text_context: + # Calculate BM25 score for this single document + # scores = self._calculate_bm25_score(query, [text_context]) + scores = await asyncio.to_thread(self._calculate_bm25_score, query, [text_context]) + relevance_score = scores[0] if scores else 0.0 + entry["relevance_score"] = float(relevance_score) + else: + # No text context, use URL-based scoring as fallback + relevance_score = self._calculate_url_relevance_score( + query, entry["url"]) + entry["relevance_score"] = float(relevance_score) + elif query: + # Query provided but no head data - we reject this entry + self._log("debug", "No head data for {url}, using URL-based scoring", + params={"url": url}, tag="URL_SEED") + return + # relevance_score = self._calculate_url_relevance_score(query, entry["url"]) + # entry["relevance_score"] = float(relevance_score) + elif live: - self._log("debug", "Performing live check for {url}", params={"url": url}, tag="URL_SEED") - ok=await self._resolve_head(url) - status="valid" if ok else "not_valid" - self._log("info" if ok else "warning", "LIVE CHECK {status} for {url}", + self._log("debug", "Performing live check for {url}", params={ + "url": url}, tag="URL_SEED") + ok = await self._resolve_head(url) + status = "valid" if ok else "not_valid" + self._log("info" if ok else "warning", "LIVE CHECK {status} for {url}", params={"status": status.upper(), "url": url}, tag="URL_SEED") entry = {"url": url, "status": status, "head_data": {}} - if live or extract: - self._cache_set(cache_kind, url, entry) - res_list.append(entry) + + # Apply URL-based scoring if query is provided + if query: + relevance_score = self._calculate_url_relevance_score( + query, url) + entry["relevance_score"] = float(relevance_score) + else: entry = {"url": url, "status": "unknown", "head_data": {}} + + # Apply URL-based scoring if query is provided + if query: + relevance_score = self._calculate_url_relevance_score( + query, url) + entry["relevance_score"] = float(relevance_score) + + # Now decide whether to add the entry based on score threshold + if query and "relevance_score" in entry: + if score_threshold is None or entry["relevance_score"] >= score_threshold: + if live or extract: + await self._cache_set(cache_kind, url, entry) + res_list.append(entry) + else: + self._log("debug", "URL {url} filtered out with score {score} < {threshold}", + params={"url": url, "score": entry["relevance_score"], "threshold": score_threshold}, tag="URL_SEED") + else: + # No query or no scoring - add as usual if live or extract: - self._cache_set(cache_kind, url, entry) + await self._cache_set(cache_kind, url, entry) res_list.append(entry) - - async def _head_ok(self, url:str, timeout:int)->bool: + async def _head_ok(self, url: str, timeout: int) -> bool: try: - r=await self.client.head(url, timeout=timeout, - headers={"Range":"bytes=0-0","Accept-Encoding":"identity"}) - r.raise_for_status() # Raise for bad status codes (4xx, 5xx) + r = await self.client.head(url, timeout=timeout, + headers={"Range": "bytes=0-0", "Accept-Encoding": "identity"}) + r.raise_for_status() # Raise for bad status codes (4xx, 5xx) return True except httpx.RequestError as e: - self._log("debug", "HEAD check network error for {url}: {error}", + self._log("debug", "HEAD check network error for {url}: {error}", params={"url": url, "error": str(e)}, tag="URL_SEED") return False except httpx.HTTPStatusError as e: - self._log("debug", "HEAD check HTTP status error for {url}: {status_code}", + self._log("debug", "HEAD check HTTP status error for {url}: {status_code}", params={"url": url, "status_code": e.response.status_code}, tag="URL_SEED") return False except Exception as e: - self._log("error", "Unexpected error during HEAD check for {url}: {error}", + self._log("error", "Unexpected error during HEAD check for {url}: {error}", params={"url": url, "error": str(e)}, tag="URL_SEED") return False @@ -726,7 +860,7 @@ class AsyncUrlSeeder: timeout: int, max_redirects: int = 5, max_bytes: int = 65_536, # stop after 64 kB even if never comes - chunk_size: int = 4096, # how much we read per await + chunk_size: int = 4096, # how much we read per await ): for _ in range(max_redirects+1): try: @@ -742,22 +876,24 @@ class AsyncUrlSeeder: }, follow_redirects=False, ) as r: - - if r.status_code in (301,302,303,307,308): + + if r.status_code in (301, 302, 303, 307, 308): location = r.headers.get("Location") if location: url = urljoin(url, location) - self._log("debug", "Redirecting from {original_url} to {new_url}", + self._log("debug", "Redirecting from {original_url} to {new_url}", params={"original_url": r.url, "new_url": url}, tag="URL_SEED") continue else: - self._log("warning", "Redirect status {status_code} but no Location header for {url}", + self._log("warning", "Redirect status {status_code} but no Location header for {url}", params={"status_code": r.status_code, "url": r.url}, tag="URL_SEED") - return False, "", str(r.url) # Return original URL if no new location + # Return original URL if no new location + return False, "", str(r.url) # For 2xx or other non-redirect codes, proceed to read content - if not (200 <= r.status_code < 400): # Only allow successful codes, or continue - self._log("warning", "Non-success status {status_code} when fetching head for {url}", + # Only allow successful codes, or continue + if not (200 <= r.status_code < 400): + self._log("warning", "Non-success status {status_code} when fetching head for {url}", params={"status_code": r.status_code, "url": r.url}, tag="URL_SEED") return False, "", str(r.url) @@ -768,7 +904,7 @@ class AsyncUrlSeeder: if b"" in low or len(buf) >= max_bytes: await r.aclose() break - + enc = r.headers.get("Content-Encoding", "").lower() try: if enc == "gzip" and buf[:2] == b"\x1f\x8b": @@ -779,28 +915,30 @@ class AsyncUrlSeeder: # Header says “gzip” or “br” but payload is plain – ignore self._log( "debug", - "Skipping bogus {encoding} for {url}", + "Skipping bogus {encoding} for {url}", params={"encoding": enc, "url": r.url}, tag="URL_SEED", ) except Exception as e: self._log( - "warning", + "warning", "Decompression error for {url} ({encoding}): {error}", - params={"url": r.url, "encoding": enc, "error": str(e)}, + params={"url": r.url, + "encoding": enc, "error": str(e)}, tag="URL_SEED", - ) + ) # fall through with raw buf - + # Find the tag case-insensitively and decode idx = buf.lower().find(b"") - if idx==-1: - self._log("debug", "No tag found in initial bytes of {url}", + if idx == -1: + self._log("debug", "No tag found in initial bytes of {url}", params={"url": r.url}, tag="URL_SEED") # If no is found, take a reasonable chunk or all if small - html_bytes = buf if len(buf) < 10240 else buf[:10240] # Take max 10KB if no head tag + # Take max 10KB if no head tag + html_bytes = buf if len(buf) < 10240 else buf[:10240] else: - html_bytes = buf[:idx+7] # Include tag + html_bytes = buf[:idx+7] # Include tag try: html = html_bytes.decode("utf-8", "replace") @@ -813,49 +951,50 @@ class AsyncUrlSeeder: ) html = html_bytes.decode("latin-1", "replace") - return True,html,str(r.url) # Return the actual URL after redirects - + # Return the actual URL after redirects + return True, html, str(r.url) + except httpx.RequestError as e: - self._log("debug", "Fetch head network error for {url}: {error}", + self._log("debug", "Fetch head network error for {url}: {error}", params={"url": url, "error": str(e)}, tag="URL_SEED") - return False,"",url - + return False, "", url + # If loop finishes without returning (e.g. too many redirects) - self._log("warning", "Exceeded max redirects ({max_redirects}) for {url}", + self._log("warning", "Exceeded max redirects ({max_redirects}) for {url}", params={"max_redirects": max_redirects, "url": url}, tag="URL_SEED") - return False,"",url + return False, "", url # ─────────────────────────────── BM25 scoring helpers def _extract_text_context(self, head_data: Dict[str, Any]) -> str: """Extract all relevant text from head metadata for scoring.""" # Priority fields with their weights (for future enhancement) text_parts = [] - + # Title if head_data.get("title"): text_parts.append(head_data["title"]) - + # Standard meta tags meta = head_data.get("meta", {}) for key in ["description", "keywords", "author", "subject", "summary", "abstract"]: if meta.get(key): text_parts.append(meta[key]) - + # Open Graph tags for key in ["og:title", "og:description", "og:site_name", "article:tag"]: if meta.get(key): text_parts.append(meta[key]) - + # Twitter Card tags for key in ["twitter:title", "twitter:description", "twitter:image:alt"]: if meta.get(key): text_parts.append(meta[key]) - + # Dublin Core tags for key in ["dc.title", "dc.description", "dc.subject", "dc.creator"]: if meta.get(key): text_parts.append(meta[key]) - + # JSON-LD structured data for jsonld in head_data.get("jsonld", []): if isinstance(jsonld, dict): @@ -865,8 +1004,9 @@ class AsyncUrlSeeder: if isinstance(jsonld[field], str): text_parts.append(jsonld[field]) elif isinstance(jsonld[field], list): - text_parts.extend(str(item) for item in jsonld[field] if item) - + text_parts.extend(str(item) + for item in jsonld[field] if item) + # Handle @graph structures if "@graph" in jsonld and isinstance(jsonld["@graph"], list): for item in jsonld["@graph"]: @@ -874,71 +1014,176 @@ class AsyncUrlSeeder: for field in ["name", "headline", "description"]: if field in item and isinstance(item[field], str): text_parts.append(item[field]) - + # Combine all text parts return " ".join(filter(None, text_parts)) - + + def _calculate_url_relevance_score(self, query: str, url: str) -> float: + """Calculate relevance score between query and URL using string matching.""" + # Normalize inputs + query_lower = query.lower() + url_lower = url.lower() + + # Extract URL components + from urllib.parse import urlparse + parsed = urlparse(url) + domain = parsed.netloc.replace('www.', '') + path = parsed.path.strip('/') + + # Create searchable text from URL + # Split domain by dots and path by slashes + domain_parts = domain.split('.') + path_parts = [p for p in path.split('/') if p] + + # Include query parameters if any + query_params = parsed.query + param_parts = [] + if query_params: + for param in query_params.split('&'): + if '=' in param: + key, value = param.split('=', 1) + param_parts.extend([key, value]) + + # Combine all parts + all_parts = domain_parts + path_parts + param_parts + + # Calculate scores + scores = [] + query_tokens = query_lower.split() + + # 1. Exact match in any part (highest score) + for part in all_parts: + part_lower = part.lower() + if query_lower in part_lower: + scores.append(1.0) + elif part_lower in query_lower: + scores.append(0.9) + + # 2. Token matching + for token in query_tokens: + token_scores = [] + for part in all_parts: + part_lower = part.lower() + if token in part_lower: + # Score based on how much of the part the token covers + coverage = len(token) / len(part_lower) + token_scores.append(0.7 * coverage) + elif part_lower in token: + coverage = len(part_lower) / len(token) + token_scores.append(0.6 * coverage) + + if token_scores: + scores.append(max(token_scores)) + + # 3. Character n-gram similarity (for fuzzy matching) + def get_ngrams(text, n=3): + return set(text[i:i+n] for i in range(len(text)-n+1)) + + # Combine all URL parts into one string for n-gram comparison + url_text = ' '.join(all_parts).lower() + if len(query_lower) >= 3 and len(url_text) >= 3: + query_ngrams = get_ngrams(query_lower) + url_ngrams = get_ngrams(url_text) + if query_ngrams and url_ngrams: + intersection = len(query_ngrams & url_ngrams) + union = len(query_ngrams | url_ngrams) + jaccard = intersection / union if union > 0 else 0 + scores.append(0.5 * jaccard) + + # Calculate final score + if not scores: + return 0.0 + + # Weighted average with bias towards higher scores + scores.sort(reverse=True) + weighted_score = 0 + total_weight = 0 + for i, score in enumerate(scores): + weight = 1 / (i + 1) # Higher weight for better matches + weighted_score += score * weight + total_weight += weight + + final_score = weighted_score / total_weight if total_weight > 0 else 0 + return min(final_score, 1.0) # Cap at 1.0 + def _calculate_bm25_score(self, query: str, documents: List[str]) -> List[float]: """Calculate BM25 scores for documents against a query.""" if not HAS_BM25: - self._log("warning", "rank_bm25 not installed. Returning zero scores.", tag="URL_SEED") + self._log( + "warning", "rank_bm25 not installed. Returning zero scores.", tag="URL_SEED") return [0.0] * len(documents) - + if not query or not documents: return [0.0] * len(documents) - + # Tokenize query and documents (simple whitespace tokenization) # For production, consider using a proper tokenizer query_tokens = query.lower().split() tokenized_docs = [doc.lower().split() for doc in documents] - + # Handle edge case where all documents are empty if all(len(doc) == 0 for doc in tokenized_docs): return [0.0] * len(documents) - + # Create BM25 instance and calculate scores try: from rank_bm25 import BM25Okapi bm25 = BM25Okapi(tokenized_docs) scores = bm25.get_scores(query_tokens) - + # Normalize scores to 0-1 range max_score = max(scores) if max(scores) > 0 else 1.0 normalized_scores = [score / max_score for score in scores] - + return normalized_scores except Exception as e: - self._log("error", "Error calculating BM25 scores: {error}", + self._log("error", "Error calculating BM25 scores: {error}", params={"error": str(e)}, tag="URL_SEED") return [0.0] * len(documents) + # ─────────────────────────────── cleanup methods + async def close(self): + """Close the HTTP client if we own it.""" + if self._owns_client and self.client: + await self.client.aclose() + self._log("debug", "Closed HTTP client", tag="URL_SEED") + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + return False + # ─────────────────────────────── index helper - async def _latest_index(self)->str: - if self.index_cache_path.exists() and (time.time()-self.index_cache_path.stat().st_mtime) str: + if self.index_cache_path.exists() and (time.time()-self.index_cache_path.stat().st_mtime) < self.ttl.total_seconds(): + self._log("info", "Loading latest CC index from cache: {path}", params={"path": self.index_cache_path}, tag="URL_SEED") return self.index_cache_path.read_text().strip() - - self._log("info", "Fetching latest Common Crawl index from {url}", + + self._log("info", "Fetching latest Common Crawl index from {url}", params={"url": COLLINFO_URL}, tag="URL_SEED") try: async with httpx.AsyncClient() as c: - j=await c.get(COLLINFO_URL,timeout=10) - j.raise_for_status() # Raise an exception for bad status codes - idx=j.json()[0]["id"] + j = await c.get(COLLINFO_URL, timeout=10) + j.raise_for_status() # Raise an exception for bad status codes + idx = j.json()[0]["id"] self.index_cache_path.write_text(idx) - self._log("success", "Successfully fetched and cached CC index: {index_id}", + self._log("success", "Successfully fetched and cached CC index: {index_id}", params={"index_id": idx}, tag="URL_SEED") return idx except httpx.RequestError as e: - self._log("error", "Network error fetching CC index info: {error}", + self._log("error", "Network error fetching CC index info: {error}", params={"error": str(e)}, tag="URL_SEED") raise except httpx.HTTPStatusError as e: - self._log("error", "HTTP error fetching CC index info: {status_code}", + self._log("error", "HTTP error fetching CC index info: {status_code}", params={"status_code": e.response.status_code}, tag="URL_SEED") raise except Exception as e: - self._log("error", "Unexpected error fetching CC index info: {error}", + self._log("error", "Unexpected error fetching CC index info: {error}", params={"error": str(e)}, tag="URL_SEED") - raise \ No newline at end of file + raise diff --git a/docs/examples/url_seeder/bbc_sport_research_assistant.py b/docs/examples/url_seeder/bbc_sport_research_assistant.py new file mode 100644 index 00000000..c9104f2f --- /dev/null +++ b/docs/examples/url_seeder/bbc_sport_research_assistant.py @@ -0,0 +1,806 @@ +""" +BBC Sport Research Assistant Pipeline +===================================== + +This example demonstrates how URLSeeder helps create an efficient research pipeline: +1. Discover all available URLs without crawling +2. Filter and rank them based on relevance +3. Crawl only the most relevant content +4. Generate comprehensive research insights + +Pipeline Steps: +1. Get user query +2. Optionally enhance query using LLM +3. Use URLSeeder to discover and rank URLs +4. Crawl top K URLs with BM25 filtering +5. Generate detailed response with citations + +Requirements: +- pip install crawl4ai +- pip install litellm +- export GEMINI_API_KEY="your-api-key" + +Usage: +- Run normally: python bbc_sport_research_assistant.py +- Run test mode: python bbc_sport_research_assistant.py test +""" + +import asyncio +import json +import os +import hashlib +import pickle +from typing import List, Dict, Optional, Tuple +from dataclasses import dataclass, asdict +from datetime import datetime +from pathlib import Path + +# Rich for colored output +from rich.console import Console +from rich.text import Text +from rich.panel import Panel +from rich.table import Table +from rich.progress import Progress, SpinnerColumn, TextColumn + +# Crawl4AI imports +from crawl4ai import ( + AsyncWebCrawler, + BrowserConfig, + CrawlerRunConfig, + AsyncUrlSeeder, + SeedingConfig, + AsyncLogger +) +from crawl4ai.content_filter_strategy import PruningContentFilter +from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator + +# LiteLLM for AI communication +import litellm + +# Initialize Rich console +console = Console() + +# Get the current directory where this script is located +SCRIPT_DIR = Path(__file__).parent.resolve() + +# Cache configuration - relative to script directory +CACHE_DIR = SCRIPT_DIR / "temp_cache" +CACHE_DIR.mkdir(parents=True, exist_ok=True) + +# Testing limits +TESTING_MODE = True +MAX_URLS_DISCOVERY = 100 if TESTING_MODE else 1000 +MAX_URLS_TO_CRAWL = 5 if TESTING_MODE else 10 + + +def get_cache_key(prefix: str, *args) -> str: + """Generate cache key from prefix and arguments""" + content = f"{prefix}:{'|'.join(str(arg) for arg in args)}" + return hashlib.md5(content.encode()).hexdigest() + + +def load_from_cache(cache_key: str) -> Optional[any]: + """Load data from cache if exists""" + cache_path = CACHE_DIR / f"{cache_key}.pkl" + if cache_path.exists(): + with open(cache_path, 'rb') as f: + return pickle.load(f) + return None + + +def save_to_cache(cache_key: str, data: any) -> None: + """Save data to cache""" + cache_path = CACHE_DIR / f"{cache_key}.pkl" + with open(cache_path, 'wb') as f: + pickle.dump(data, f) + + +@dataclass +class ResearchConfig: + """Configuration for research pipeline""" + # Core settings + domain: str = "www.bbc.com/sport" + max_urls_discovery: int = 100 + max_urls_to_crawl: int = 10 + top_k_urls: int = 10 + + # Scoring and filtering + score_threshold: float = 0.1 + scoring_method: str = "bm25" + + # Processing options + use_llm_enhancement: bool = True + extract_head_metadata: bool = True + live_check: bool = True + force_refresh: bool = False + + # Crawler settings + max_concurrent_crawls: int = 5 + timeout: int = 30000 + headless: bool = True + + # Output settings + save_json: bool = True + save_markdown: bool = True + output_dir: str = None # Will be set in __post_init__ + + # Development settings + test_mode: bool = False + interactive_mode: bool = False + verbose: bool = True + + def __post_init__(self): + """Adjust settings based on test mode""" + if self.test_mode: + self.max_urls_discovery = 50 + self.max_urls_to_crawl = 3 + self.top_k_urls = 5 + + # Set default output directory relative to script location + if self.output_dir is None: + self.output_dir = str(SCRIPT_DIR / "research_results") + + +@dataclass +class ResearchQuery: + """Container for research query and metadata""" + original_query: str + enhanced_query: Optional[str] = None + search_patterns: List[str] = None + timestamp: str = None + + +@dataclass +class ResearchResult: + """Container for research results""" + query: ResearchQuery + discovered_urls: List[Dict] + crawled_content: List[Dict] + synthesis: str + citations: List[Dict] + metadata: Dict + + +async def get_user_query() -> str: + """ + Get research query from user input + """ + query = input("\n🔍 Enter your research query: ") + return query.strip() + + +async def enhance_query_with_llm(query: str) -> ResearchQuery: + """ + Use LLM to enhance the research query: + - Extract key terms + - Generate search patterns + - Identify related topics + """ + # Check cache + cache_key = get_cache_key("enhanced_query", query) + cached_result = load_from_cache(cache_key) + if cached_result: + console.print("[dim cyan]📦 Using cached enhanced query[/dim cyan]") + return cached_result + + try: + response = await litellm.acompletion( + model="gemini/gemini-2.5-flash-preview-04-17", + messages=[{ + "role": "user", + "content": f"""Given this research query: "{query}" + + Extract: + 1. Key terms and concepts (as a list) + 2. Related search terms + 3. A more specific/enhanced version of the query + + Return as JSON: + {{ + "key_terms": ["term1", "term2"], + "related_terms": ["related1", "related2"], + "enhanced_query": "enhanced version of query" + }}""" + }], + # reasoning_effort="low", + temperature=0.3, + response_format={"type": "json_object"} + ) + + data = json.loads(response.choices[0].message.content) + + # Create search patterns + all_terms = data["key_terms"] + data["related_terms"] + patterns = [f"*{term.lower()}*" for term in all_terms] + + result = ResearchQuery( + original_query=query, + enhanced_query=data["enhanced_query"], + search_patterns=patterns[:10], # Limit patterns + timestamp=datetime.now().isoformat() + ) + + # Cache the result + save_to_cache(cache_key, result) + return result + + except Exception as e: + console.print(f"[yellow]⚠️ LLM enhancement failed: {e}[/yellow]") + # Fallback to simple tokenization + return ResearchQuery( + original_query=query, + enhanced_query=query, + search_patterns=tokenize_query_to_patterns(query), + timestamp=datetime.now().isoformat() + ) + + +def tokenize_query_to_patterns(query: str) -> List[str]: + """ + Convert query into URL patterns for URLSeeder + Example: "AI startups funding" -> ["*ai*", "*startup*", "*funding*"] + """ + # Simple tokenization - split and create patterns + words = query.lower().split() + # Filter out common words + stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'that'} + keywords = [w for w in words if w not in stop_words and len(w) > 2] + + # Create patterns + patterns = [f"*{keyword}*" for keyword in keywords] + return patterns[:8] # Limit to 8 patterns + + +async def discover_urls(domain: str, query: str, config: ResearchConfig) -> List[Dict]: + """ + Use URLSeeder to discover and rank URLs: + 1. Fetch all URLs from domain + 2. Filter by patterns + 3. Extract metadata (titles, descriptions) + 4. Rank by BM25 relevance score + 5. Return top K URLs + """ + # Check cache + cache_key = get_cache_key("discovered_urls", domain, query, config.top_k_urls) + cached_result = load_from_cache(cache_key) + if cached_result and not config.force_refresh: + console.print("[dim cyan]📦 Using cached URL discovery[/dim cyan]") + return cached_result + + console.print(f"\n[cyan]🔍 Discovering URLs from {domain}...[/cyan]") + + # Initialize URL seeder + seeder = AsyncUrlSeeder(logger=AsyncLogger(verbose=config.verbose)) + + # Configure seeding + seeding_config = SeedingConfig( + source="sitemap+cc", # Use both sitemap and Common Crawl + extract_head=config.extract_head_metadata, + query=query, + scoring_method=config.scoring_method, + score_threshold=config.score_threshold, + max_urls=config.max_urls_discovery, + live_check=config.live_check, + force=config.force_refresh + ) + + try: + # Discover URLs + urls = await seeder.urls(domain, seeding_config) + + # Sort by relevance score (descending) + sorted_urls = sorted( + urls, + key=lambda x: x.get('relevance_score', 0), + reverse=True + ) + + # Take top K + top_urls = sorted_urls[:config.top_k_urls] + + console.print(f"[green]✅ Discovered {len(urls)} URLs, selected top {len(top_urls)}[/green]") + + # Cache the result + save_to_cache(cache_key, top_urls) + return top_urls + + except Exception as e: + console.print(f"[red]❌ URL discovery failed: {e}[/red]") + return [] + + +async def crawl_selected_urls(urls: List[str], query: str, config: ResearchConfig) -> List[Dict]: + """ + Crawl selected URLs with content filtering: + - Use AsyncWebCrawler.arun_many() + - Apply content filter + - Generate clean markdown + """ + # Extract just URLs from the discovery results + url_list = [u['url'] for u in urls if 'url' in u][:config.max_urls_to_crawl] + + if not url_list: + console.print("[red]❌ No URLs to crawl[/red]") + return [] + + console.print(f"\n[cyan]🕷️ Crawling {len(url_list)} URLs...[/cyan]") + + # Check cache for each URL + crawled_results = [] + urls_to_crawl = [] + + for url in url_list: + cache_key = get_cache_key("crawled_content", url, query) + cached_content = load_from_cache(cache_key) + if cached_content and not config.force_refresh: + crawled_results.append(cached_content) + else: + urls_to_crawl.append(url) + + if urls_to_crawl: + console.print(f"[cyan]📥 Crawling {len(urls_to_crawl)} new URLs (cached: {len(crawled_results)})[/cyan]") + + # Configure markdown generator with content filter + md_generator = DefaultMarkdownGenerator( + content_filter=PruningContentFilter( + threshold=0.48, + threshold_type="dynamic", + min_word_threshold=10 + ), + ) + + # Configure crawler + crawler_config = CrawlerRunConfig( + markdown_generator=md_generator, + exclude_external_links=True, + excluded_tags=['nav', 'header', 'footer', 'aside'], + ) + + # Create crawler with browser config + async with AsyncWebCrawler( + config=BrowserConfig( + headless=config.headless, + verbose=config.verbose + ) + ) as crawler: + # Crawl URLs + results = await crawler.arun_many( + urls_to_crawl, + config=crawler_config, + max_concurrent=config.max_concurrent_crawls + ) + + # Process results + for url, result in zip(urls_to_crawl, results): + if result.success: + content_data = { + 'url': url, + 'title': result.metadata.get('title', ''), + 'markdown': result.markdown.fit_markdown or result.markdown.raw_markdown, + 'raw_length': len(result.markdown.raw_markdown), + 'fit_length': len(result.markdown.fit_markdown) if result.markdown.fit_markdown else len(result.markdown.raw_markdown), + 'metadata': result.metadata + } + crawled_results.append(content_data) + + # Cache the result + cache_key = get_cache_key("crawled_content", url, query) + save_to_cache(cache_key, content_data) + else: + console.print(f" [red]❌ Failed: {url[:50]}... - {result.error}[/red]") + + console.print(f"[green]✅ Successfully crawled {len(crawled_results)} URLs[/green]") + return crawled_results + + +async def generate_research_synthesis( + query: str, + crawled_content: List[Dict] +) -> Tuple[str, List[Dict]]: + """ + Use LLM to synthesize research findings: + - Analyze all crawled content + - Generate comprehensive answer + - Extract citations and references + """ + if not crawled_content: + return "No content available for synthesis.", [] + + console.print("\n[cyan]🤖 Generating research synthesis...[/cyan]") + + # Prepare content for LLM + content_sections = [] + for i, content in enumerate(crawled_content, 1): + section = f""" +SOURCE {i}: +Title: {content['title']} +URL: {content['url']} +Content Preview: +{content['markdown'][:1500]}... +""" + content_sections.append(section) + + combined_content = "\n---\n".join(content_sections) + + try: + response = await litellm.acompletion( + model="gemini/gemini-2.5-flash-preview-04-17", + messages=[{ + "role": "user", + "content": f"""Research Query: "{query}" + +Based on the following sources, provide a comprehensive research synthesis. + +{combined_content} + +Please provide: +1. An executive summary (2-3 sentences) +2. Key findings (3-5 bullet points) +3. Detailed analysis (2-3 paragraphs) +4. Future implications or trends + +Format your response with clear sections and cite sources using [Source N] notation. +Keep the total response under 800 words.""" + }], + # reasoning_effort="medium", + temperature=0.7 + ) + + synthesis = response.choices[0].message.content + + # Extract citations from the synthesis + citations = [] + for i, content in enumerate(crawled_content, 1): + if f"[Source {i}]" in synthesis or f"Source {i}" in synthesis: + citations.append({ + 'source_id': i, + 'title': content['title'], + 'url': content['url'] + }) + + return synthesis, citations + + except Exception as e: + console.print(f"[red]❌ Synthesis generation failed: {e}[/red]") + # Fallback to simple summary + summary = f"Research on '{query}' found {len(crawled_content)} relevant articles:\n\n" + for content in crawled_content[:3]: + summary += f"- {content['title']}\n {content['url']}\n\n" + return summary, [] + + +def format_research_output(result: ResearchResult) -> str: + """ + Format the final research output with: + - Executive summary + - Key findings + - Detailed analysis + - Citations and sources + """ + output = [] + output.append("\n" + "=" * 60) + output.append("🔬 RESEARCH RESULTS") + output.append("=" * 60) + + # Query info + output.append(f"\n📋 Query: {result.query.original_query}") + if result.query.enhanced_query != result.query.original_query: + output.append(f" Enhanced: {result.query.enhanced_query}") + + # Discovery stats + output.append(f"\n📊 Statistics:") + output.append(f" - URLs discovered: {len(result.discovered_urls)}") + output.append(f" - URLs crawled: {len(result.crawled_content)}") + output.append(f" - Processing time: {result.metadata.get('duration', 'N/A')}") + + # Synthesis + output.append(f"\n📝 SYNTHESIS") + output.append("-" * 60) + output.append(result.synthesis) + + # Citations + if result.citations: + output.append(f"\n📚 SOURCES") + output.append("-" * 60) + for citation in result.citations: + output.append(f"[{citation['source_id']}] {citation['title']}") + output.append(f" {citation['url']}") + + return "\n".join(output) + + +async def save_research_results(result: ResearchResult, config: ResearchConfig) -> Tuple[str, str]: + """ + Save research results in JSON and Markdown formats + + Returns: + Tuple of (json_path, markdown_path) + """ + # Create output directory + output_dir = Path(config.output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # Generate filename based on query and timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + query_slug = result.query.original_query[:50].replace(" ", "_").replace("/", "_") + base_filename = f"{timestamp}_{query_slug}" + + json_path = None + md_path = None + + # Save JSON + if config.save_json: + json_path = output_dir / f"{base_filename}.json" + with open(json_path, 'w') as f: + json.dump(asdict(result), f, indent=2, default=str) + console.print(f"\n[green]💾 JSON saved: {json_path}[/green]") + + # Save Markdown + if config.save_markdown: + md_path = output_dir / f"{base_filename}.md" + + # Create formatted markdown + md_content = [ + f"# Research Report: {result.query.original_query}", + f"\n**Generated on:** {result.metadata.get('timestamp', 'N/A')}", + f"\n**Domain:** {result.metadata.get('domain', 'N/A')}", + f"\n**Processing time:** {result.metadata.get('duration', 'N/A')}", + "\n---\n", + "## Query Information", + f"- **Original Query:** {result.query.original_query}", + f"- **Enhanced Query:** {result.query.enhanced_query or 'N/A'}", + f"- **Search Patterns:** {', '.join(result.query.search_patterns or [])}", + "\n## Statistics", + f"- **URLs Discovered:** {len(result.discovered_urls)}", + f"- **URLs Crawled:** {len(result.crawled_content)}", + f"- **Sources Cited:** {len(result.citations)}", + "\n## Research Synthesis\n", + result.synthesis, + "\n## Sources\n" + ] + + # Add citations + for citation in result.citations: + md_content.append(f"### [{citation['source_id']}] {citation['title']}") + md_content.append(f"- **URL:** [{citation['url']}]({citation['url']})") + md_content.append("") + + # Add discovered URLs summary + md_content.extend([ + "\n## Discovered URLs (Top 10)\n", + "| Score | URL | Title |", + "|-------|-----|-------|" + ]) + + for url_data in result.discovered_urls[:10]: + score = url_data.get('relevance_score', 0) + url = url_data.get('url', '') + title = 'N/A' + if 'head_data' in url_data and url_data['head_data']: + title = url_data['head_data'].get('title', 'N/A')[:60] + '...' + md_content.append(f"| {score:.3f} | {url[:50]}... | {title} |") + + # Write markdown + with open(md_path, 'w') as f: + f.write('\n'.join(md_content)) + + console.print(f"[green]📄 Markdown saved: {md_path}[/green]") + + return str(json_path) if json_path else None, str(md_path) if md_path else None + + +async def wait_for_user(message: str = "\nPress Enter to continue..."): + """Wait for user input in interactive mode""" + input(message) + + +async def research_pipeline( + query: str, + config: ResearchConfig +) -> ResearchResult: + """ + Main research pipeline orchestrator with configurable settings + """ + start_time = datetime.now() + + # Display pipeline header + header = Panel( + f"[bold cyan]Research Pipeline[/bold cyan]\n\n" + f"[dim]Domain:[/dim] {config.domain}\n" + f"[dim]Mode:[/dim] {'Test' if config.test_mode else 'Production'}\n" + f"[dim]Interactive:[/dim] {'Yes' if config.interactive_mode else 'No'}", + title="🚀 Starting", + border_style="cyan" + ) + console.print(header) + + # Step 1: Enhance query (optional) + console.print(f"\n[bold cyan]📝 Step 1: Query Processing[/bold cyan]") + if config.interactive_mode: + await wait_for_user() + + if config.use_llm_enhancement: + research_query = await enhance_query_with_llm(query) + else: + research_query = ResearchQuery( + original_query=query, + enhanced_query=query, + search_patterns=tokenize_query_to_patterns(query), + timestamp=datetime.now().isoformat() + ) + + console.print(f" [green]✅ Query ready:[/green] {research_query.enhanced_query or query}") + + # Step 2: Discover URLs + console.print(f"\n[bold cyan]🔍 Step 2: URL Discovery[/bold cyan]") + if config.interactive_mode: + await wait_for_user() + + discovered_urls = await discover_urls( + domain=config.domain, + query=research_query.enhanced_query or query, + config=config + ) + + if not discovered_urls: + return ResearchResult( + query=research_query, + discovered_urls=[], + crawled_content=[], + synthesis="No relevant URLs found for the given query.", + citations=[], + metadata={'duration': str(datetime.now() - start_time)} + ) + + console.print(f" [green]✅ Found {len(discovered_urls)} relevant URLs[/green]") + + # Step 3: Crawl selected URLs + console.print(f"\n[bold cyan]🕷️ Step 3: Content Crawling[/bold cyan]") + if config.interactive_mode: + await wait_for_user() + + crawled_content = await crawl_selected_urls( + urls=discovered_urls, + query=research_query.enhanced_query or query, + config=config + ) + + console.print(f" [green]✅ Successfully crawled {len(crawled_content)} pages[/green]") + + # Step 4: Generate synthesis + console.print(f"\n[bold cyan]🤖 Step 4: Synthesis Generation[/bold cyan]") + if config.interactive_mode: + await wait_for_user() + + synthesis, citations = await generate_research_synthesis( + query=research_query.enhanced_query or query, + crawled_content=crawled_content + ) + + console.print(f" [green]✅ Generated synthesis with {len(citations)} citations[/green]") + + # Step 5: Create result + result = ResearchResult( + query=research_query, + discovered_urls=discovered_urls, + crawled_content=crawled_content, + synthesis=synthesis, + citations=citations, + metadata={ + 'duration': str(datetime.now() - start_time), + 'domain': config.domain, + 'timestamp': datetime.now().isoformat(), + 'config': asdict(config) + } + ) + + duration = datetime.now() - start_time + console.print(f"\n[bold green]✅ Research completed in {duration}[/bold green]") + + return result + + +async def main(): + """ + Main entry point for the BBC Sport Research Assistant + """ + # Example queries + example_queries = [ + "Premier League transfer news and rumors", + "Champions League match results and analysis", + "World Cup qualifying updates", + "Football injury reports and return dates", + "Tennis grand slam tournament results" + ] + + # Display header + console.print(Panel.fit( + "[bold cyan]BBC Sport Research Assistant[/bold cyan]\n\n" + "This tool demonstrates efficient research using URLSeeder:\n" + "[dim]• Discover all URLs without crawling\n" + "• Filter and rank by relevance\n" + "• Crawl only the most relevant content\n" + "• Generate AI-powered insights with citations[/dim]\n\n" + f"[dim]📁 Working directory: {SCRIPT_DIR}[/dim]", + title="🔬 Welcome", + border_style="cyan" + )) + + # Configuration options table + config_table = Table(title="\n⚙️ Configuration Options", show_header=False, box=None) + config_table.add_column(style="bold cyan", width=3) + config_table.add_column() + + config_table.add_row("1", "Quick Test Mode (3 URLs, fast)") + config_table.add_row("2", "Standard Mode (10 URLs, balanced)") + config_table.add_row("3", "Comprehensive Mode (20 URLs, thorough)") + config_table.add_row("4", "Custom Configuration") + + console.print(config_table) + + config_choice = input("\nSelect configuration (1-4): ").strip() + + # Create config based on choice + if config_choice == "1": + config = ResearchConfig(test_mode=True, interactive_mode=False) + elif config_choice == "2": + config = ResearchConfig(max_urls_to_crawl=10, top_k_urls=10) + elif config_choice == "3": + config = ResearchConfig(max_urls_to_crawl=20, top_k_urls=20, max_urls_discovery=200) + else: + # Custom configuration + config = ResearchConfig() + config.test_mode = input("\nTest mode? (y/n): ").lower() == 'y' + config.interactive_mode = input("Interactive mode (pause between steps)? (y/n): ").lower() == 'y' + config.use_llm_enhancement = input("Use AI to enhance queries? (y/n): ").lower() == 'y' + + if not config.test_mode: + try: + config.max_urls_to_crawl = int(input("Max URLs to crawl (default 10): ") or "10") + config.top_k_urls = int(input("Top K URLs to select (default 10): ") or "10") + except ValueError: + console.print("[yellow]Using default values[/yellow]") + + # Display example queries + query_table = Table(title="\n📋 Example Queries", show_header=False, box=None) + query_table.add_column(style="bold cyan", width=3) + query_table.add_column() + + for i, q in enumerate(example_queries, 1): + query_table.add_row(str(i), q) + + console.print(query_table) + + query_input = input("\nSelect a query (1-5) or enter your own: ").strip() + + if query_input.isdigit() and 1 <= int(query_input) <= len(example_queries): + query = example_queries[int(query_input) - 1] + else: + query = query_input if query_input else example_queries[0] + + console.print(f"\n[bold cyan]📝 Selected Query:[/bold cyan] {query}") + + # Run the research pipeline + result = await research_pipeline(query=query, config=config) + + # Display results + formatted_output = format_research_output(result) + # print(formatted_output) + console.print(Panel.fit( + formatted_output, + title="🔬 Research Results", + border_style="green" + )) + + # Save results + if config.save_json or config.save_markdown: + json_path, md_path = await save_research_results(result, config) + # print(f"\n✅ Results saved successfully!") + if json_path: + console.print(f"[green]JSON saved at:[/green] {json_path}") + if md_path: + console.print(f"[green]Markdown saved at:[/green] {md_path}") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file