From 3d78001c30c37a9a2eed29c1ea9300052d783d76 Mon Sep 17 00:00:00 2001 From: unclecode Date: Tue, 30 Dec 2025 01:59:09 +0000 Subject: [PATCH] Add smart TTL cache for sitemap URL seeder - Add cache_ttl_hours and validate_sitemap_lastmod params to SeedingConfig - New JSON cache format with metadata (version, created_at, lastmod, url_count) - Cache validation by TTL expiry and sitemap lastmod comparison - Auto-migration from old .jsonl to new .json format - Fixes bug where incomplete cache was used indefinitely --- crawl4ai/async_configs.py | 12 +- crawl4ai/async_url_seeder.py | 349 ++++++++++++++++++++++++++++++----- 2 files changed, 312 insertions(+), 49 deletions(-) diff --git a/crawl4ai/async_configs.py b/crawl4ai/async_configs.py index b287b544..aa5745fb 100644 --- a/crawl4ai/async_configs.py +++ b/crawl4ai/async_configs.py @@ -1996,6 +1996,8 @@ class SeedingConfig: score_threshold: Optional[float] = None, scoring_method: str = "bm25", filter_nonsense_urls: bool = True, + cache_ttl_hours: int = 24, + validate_sitemap_lastmod: bool = True, ): """ Initialize URL seeding configuration. @@ -2027,10 +2029,14 @@ class SeedingConfig: Requires extract_head=True. Default: None score_threshold: Minimum relevance score (0.0-1.0) to include URL. Only applies when query is provided. Default: None - scoring_method: Scoring algorithm to use. Currently only "bm25" is supported. + scoring_method: Scoring algorithm to use. Currently only "bm25" is supported. Future: "semantic". Default: "bm25" - filter_nonsense_urls: Filter out utility URLs like robots.txt, sitemap.xml, + filter_nonsense_urls: Filter out utility URLs like robots.txt, sitemap.xml, ads.txt, favicon.ico, etc. Default: True + cache_ttl_hours: Hours before sitemap cache expires. Set to 0 to disable TTL + (only lastmod validation). Default: 24 + validate_sitemap_lastmod: If True, compares sitemap's with cache + timestamp and refetches if sitemap is newer. Default: True """ self.source = source self.pattern = pattern @@ -2047,6 +2053,8 @@ class SeedingConfig: self.score_threshold = score_threshold self.scoring_method = scoring_method self.filter_nonsense_urls = filter_nonsense_urls + self.cache_ttl_hours = cache_ttl_hours + self.validate_sitemap_lastmod = validate_sitemap_lastmod # Add to_dict, from_kwargs, and clone methods for consistency def to_dict(self) -> Dict[str, Any]: diff --git a/crawl4ai/async_url_seeder.py b/crawl4ai/async_url_seeder.py index 91f61837..29fb4b50 100644 --- a/crawl4ai/async_url_seeder.py +++ b/crawl4ai/async_url_seeder.py @@ -24,7 +24,7 @@ import os import pathlib import re import time -from datetime import timedelta +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Sequence, Union from urllib.parse import quote, urljoin @@ -78,6 +78,103 @@ _link_rx = re.compile( # ────────────────────────────────────────────────────────────────────────── helpers +def _parse_sitemap_lastmod(xml_content: bytes) -> Optional[str]: + """Extract the most recent lastmod from sitemap XML.""" + try: + if LXML: + root = etree.fromstring(xml_content) + # Get all lastmod elements (namespace-agnostic) + lastmods = root.xpath("//*[local-name()='lastmod']/text()") + if lastmods: + # Return the most recent one + return max(lastmods) + except Exception: + pass + return None + + +def _is_cache_valid( + cache_path: pathlib.Path, + ttl_hours: int, + validate_lastmod: bool, + current_lastmod: Optional[str] = None +) -> bool: + """ + Check if sitemap cache is still valid. + + Returns False (invalid) if: + - File doesn't exist + - File is corrupted/unreadable + - TTL expired (if ttl_hours > 0) + - Sitemap lastmod is newer than cache (if validate_lastmod=True) + """ + if not cache_path.exists(): + return False + + try: + with open(cache_path, "r") as f: + data = json.load(f) + + # Check version + if data.get("version") != 1: + return False + + # Check TTL + if ttl_hours > 0: + created_at = datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")) + age_hours = (datetime.now(timezone.utc) - created_at).total_seconds() / 3600 + if age_hours > ttl_hours: + return False + + # Check lastmod + if validate_lastmod and current_lastmod: + cached_lastmod = data.get("sitemap_lastmod") + if cached_lastmod and current_lastmod > cached_lastmod: + return False + + # Check URL count (sanity check - if 0, likely corrupted) + if data.get("url_count", 0) == 0: + return False + + return True + + except (json.JSONDecodeError, KeyError, ValueError, IOError): + # Corrupted cache - return False to trigger refetch + return False + + +def _read_cache(cache_path: pathlib.Path) -> List[str]: + """Read URLs from cache file. Returns empty list on error.""" + try: + with open(cache_path, "r") as f: + data = json.load(f) + return data.get("urls", []) + except Exception: + return [] + + +def _write_cache( + cache_path: pathlib.Path, + urls: List[str], + sitemap_url: str, + sitemap_lastmod: Optional[str] +) -> None: + """Write URLs to cache with metadata.""" + data = { + "version": 1, + "created_at": datetime.now(timezone.utc).isoformat(), + "sitemap_lastmod": sitemap_lastmod, + "sitemap_url": sitemap_url, + "url_count": len(urls), + "urls": urls + } + try: + with open(cache_path, "w") as f: + json.dump(data, f) + except Exception: + pass # Fail silently - cache is optional + + def _match(url: str, pattern: str) -> bool: if fnmatch.fnmatch(url, pattern): return True @@ -295,6 +392,10 @@ class AsyncUrlSeeder: score_threshold = config.score_threshold scoring_method = config.scoring_method + # Store cache config for use in _from_sitemaps + self._cache_ttl_hours = getattr(config, 'cache_ttl_hours', 24) + self._validate_sitemap_lastmod = getattr(config, 'validate_sitemap_lastmod', True) + # Ensure seeder's logger verbose matches the config's verbose if it's set if self.logger and hasattr(self.logger, 'verbose') and config.verbose is not None: self.logger.verbose = config.verbose @@ -764,68 +865,222 @@ class AsyncUrlSeeder: # ─────────────────────────────── Sitemaps async def _from_sitemaps(self, domain: str, pattern: str, force: bool = False): """ - 1. Probe default sitemap locations. - 2. If none exist, parse robots.txt for alternative sitemap URLs. - 3. Yield only URLs that match `pattern`. + Discover URLs from sitemaps with smart TTL-based caching. + + 1. Check cache validity (TTL + lastmod) + 2. If valid, yield from cache + 3. If invalid or force=True, fetch fresh and update cache + 4. FALLBACK: If anything fails, bypass cache and fetch directly """ + # Get config values (passed via self during urls() call) + cache_ttl_hours = getattr(self, '_cache_ttl_hours', 24) + validate_lastmod = getattr(self, '_validate_sitemap_lastmod', True) - # ── cache file (same logic as _from_cc) + # Cache file path (new format: .json instead of .jsonl) host = re.sub(r'^https?://', '', domain).rstrip('/') - host = re.sub('[/?#]+', '_', domain) + host_safe = re.sub('[/?#]+', '_', host) digest = hashlib.md5(pattern.encode()).hexdigest()[:8] - path = self.cache_dir / f"sitemap_{host}_{digest}.jsonl" + cache_path = self.cache_dir / f"sitemap_{host_safe}_{digest}.json" - if path.exists() and not force: - self._log("info", "Loading sitemap URLs for {d} from cache: {p}", - params={"d": host, "p": str(path)}, tag="URL_SEED") - async with aiofiles.open(path, "r") as fp: - async for line in fp: - url = line.strip() - if _match(url, pattern): - yield url - return + # Check for old .jsonl format and delete it + old_cache_path = self.cache_dir / f"sitemap_{host_safe}_{digest}.jsonl" + if old_cache_path.exists(): + try: + old_cache_path.unlink() + self._log("info", "Deleted old cache format: {p}", + params={"p": str(old_cache_path)}, tag="URL_SEED") + except Exception: + pass - # 1️⃣ direct sitemap probe - # strip any scheme so we can handle https → http fallback - host = re.sub(r'^https?://', '', domain).rstrip('/') + # Step 1: Find sitemap URL and get lastmod (needed for validation) + sitemap_url = None + sitemap_lastmod = None + sitemap_content = None - schemes = ('https', 'http') # prefer TLS, downgrade if needed + schemes = ('https', 'http') for scheme in schemes: for suffix in ("/sitemap.xml", "/sitemap_index.xml"): sm = f"{scheme}://{host}{suffix}" - sm = await self._resolve_head(sm) - if sm: - self._log("info", "Found sitemap at {url}", params={ - "url": sm}, tag="URL_SEED") - async with aiofiles.open(path, "w") as fp: + resolved = await self._resolve_head(sm) + if resolved: + sitemap_url = resolved + # Fetch sitemap content to get lastmod + try: + r = await self.client.get(sitemap_url, timeout=15, follow_redirects=True) + if 200 <= r.status_code < 300: + sitemap_content = r.content + sitemap_lastmod = _parse_sitemap_lastmod(sitemap_content) + except Exception: + pass + break + if sitemap_url: + break + + # Step 2: Check cache validity (skip if force=True) + if not force and cache_path.exists(): + if _is_cache_valid(cache_path, cache_ttl_hours, validate_lastmod, sitemap_lastmod): + self._log("info", "Loading sitemap URLs from valid cache: {p}", + params={"p": str(cache_path)}, tag="URL_SEED") + cached_urls = _read_cache(cache_path) + for url in cached_urls: + if _match(url, pattern): + yield url + return + else: + self._log("info", "Cache invalid/expired, refetching sitemap for {d}", + params={"d": domain}, tag="URL_SEED") + + # Step 3: Fetch fresh URLs + discovered_urls = [] + + if sitemap_url and sitemap_content: + self._log("info", "Found sitemap at {url}", params={"url": sitemap_url}, tag="URL_SEED") + + # Parse sitemap (reuse content we already fetched) + async for u in self._iter_sitemap_content(sitemap_url, sitemap_content): + discovered_urls.append(u) + if _match(u, pattern): + yield u + elif sitemap_url: + # We have a sitemap URL but no content (fetch failed earlier), try again + self._log("info", "Found sitemap at {url}", params={"url": sitemap_url}, tag="URL_SEED") + async for u in self._iter_sitemap(sitemap_url): + discovered_urls.append(u) + if _match(u, pattern): + yield u + else: + # Fallback: robots.txt + robots = f"https://{host}/robots.txt" + try: + r = await self.client.get(robots, timeout=10, follow_redirects=True) + if 200 <= r.status_code < 300: + sitemap_lines = [l.split(":", 1)[1].strip() + for l in r.text.splitlines() + if l.lower().startswith("sitemap:")] + for sm in sitemap_lines: async for u in self._iter_sitemap(sm): - await fp.write(u + "\n") + discovered_urls.append(u) if _match(u, pattern): yield u + else: + self._log("warning", "robots.txt unavailable for {d} HTTP{c}", + params={"d": domain, "c": r.status_code}, tag="URL_SEED") return - - # 2️⃣ robots.txt fallback - robots = f"https://{domain.rstrip('/')}/robots.txt" - try: - r = await self.client.get(robots, timeout=10, follow_redirects=True) - if not 200 <= r.status_code < 300: - self._log("warning", "robots.txt unavailable for {d} HTTP{c}", params={ - "d": domain, "c": r.status_code}, tag="URL_SEED") + except Exception as e: + self._log("warning", "Failed to fetch robots.txt for {d}: {e}", + params={"d": domain, "e": str(e)}, tag="URL_SEED") return - sitemap_lines = [l.split(":", 1)[1].strip( - ) for l in r.text.splitlines() if l.lower().startswith("sitemap:")] - except Exception as e: - self._log("warning", "Failed to fetch robots.txt for {d}: {e}", params={ - "d": domain, "e": str(e)}, tag="URL_SEED") - return - if sitemap_lines: - async with aiofiles.open(path, "w") as fp: - for sm in sitemap_lines: - async for u in self._iter_sitemap(sm): - await fp.write(u + "\n") - if _match(u, pattern): - yield u + # Step 4: Write to cache (FALLBACK: if write fails, URLs still yielded above) + if discovered_urls: + _write_cache(cache_path, discovered_urls, sitemap_url or "", sitemap_lastmod) + self._log("info", "Cached {count} URLs for {d}", + params={"count": len(discovered_urls), "d": domain}, tag="URL_SEED") + + async def _iter_sitemap_content(self, url: str, content: bytes): + """Parse sitemap from already-fetched content.""" + data = gzip.decompress(content) if url.endswith(".gz") else content + base_url = url + + def _normalize_loc(raw: Optional[str]) -> Optional[str]: + if not raw: + return None + normalized = urljoin(base_url, raw.strip()) + if not normalized: + return None + return normalized + + # Detect if this is a sitemap index + is_sitemap_index = False + sub_sitemaps = [] + regular_urls = [] + + if LXML: + try: + parser = etree.XMLParser(recover=True) + root = etree.fromstring(data, parser=parser) + sitemap_loc_nodes = root.xpath("//*[local-name()='sitemap']/*[local-name()='loc']") + url_loc_nodes = root.xpath("//*[local-name()='url']/*[local-name()='loc']") + + if sitemap_loc_nodes: + is_sitemap_index = True + for sitemap_elem in sitemap_loc_nodes: + loc = _normalize_loc(sitemap_elem.text) + if loc: + sub_sitemaps.append(loc) + + if not is_sitemap_index: + for loc_elem in url_loc_nodes: + loc = _normalize_loc(loc_elem.text) + if loc: + regular_urls.append(loc) + except Exception as e: + self._log("error", "LXML parsing error for sitemap {url}: {error}", + params={"url": url, "error": str(e)}, tag="URL_SEED") + return + else: + import xml.etree.ElementTree as ET + try: + root = ET.fromstring(data) + for elem in root.iter(): + if '}' in elem.tag: + elem.tag = elem.tag.split('}')[1] + + sitemaps = root.findall('.//sitemap') + url_entries = root.findall('.//url') + + if sitemaps: + is_sitemap_index = True + for sitemap in sitemaps: + loc_elem = sitemap.find('loc') + loc = _normalize_loc(loc_elem.text if loc_elem is not None else None) + if loc: + sub_sitemaps.append(loc) + + if not is_sitemap_index: + for url_elem in url_entries: + loc_elem = url_elem.find('loc') + loc = _normalize_loc(loc_elem.text if loc_elem is not None else None) + if loc: + regular_urls.append(loc) + except Exception as e: + self._log("error", "ElementTree parsing error for sitemap {url}: {error}", + params={"url": url, "error": str(e)}, tag="URL_SEED") + return + + # Process based on type + if is_sitemap_index and sub_sitemaps: + self._log("info", "Processing sitemap index with {count} sub-sitemaps", + params={"count": len(sub_sitemaps)}, tag="URL_SEED") + + queue_size = min(50000, len(sub_sitemaps) * 1000) + result_queue = asyncio.Queue(maxsize=queue_size) + completed_count = 0 + total_sitemaps = len(sub_sitemaps) + + async def process_subsitemap(sitemap_url: str): + try: + async for u in self._iter_sitemap(sitemap_url): + await result_queue.put(u) + except Exception as e: + self._log("error", "Error processing sub-sitemap {url}: {error}", + params={"url": sitemap_url, "error": str(e)}, tag="URL_SEED") + finally: + await result_queue.put(None) + + tasks = [asyncio.create_task(process_subsitemap(sm)) for sm in sub_sitemaps] + + while completed_count < total_sitemaps: + item = await result_queue.get() + if item is None: + completed_count += 1 + else: + yield item + + await asyncio.gather(*tasks, return_exceptions=True) + else: + for u in regular_urls: + yield u async def _iter_sitemap(self, url: str): try: