Add smart TTL cache for sitemap URL seeder
- Add cache_ttl_hours and validate_sitemap_lastmod params to SeedingConfig - New JSON cache format with metadata (version, created_at, lastmod, url_count) - Cache validation by TTL expiry and sitemap lastmod comparison - Auto-migration from old .jsonl to new .json format - Fixes bug where incomplete cache was used indefinitely
This commit is contained in:
@@ -1996,6 +1996,8 @@ class SeedingConfig:
|
||||
score_threshold: Optional[float] = None,
|
||||
scoring_method: str = "bm25",
|
||||
filter_nonsense_urls: bool = True,
|
||||
cache_ttl_hours: int = 24,
|
||||
validate_sitemap_lastmod: bool = True,
|
||||
):
|
||||
"""
|
||||
Initialize URL seeding configuration.
|
||||
@@ -2027,10 +2029,14 @@ class SeedingConfig:
|
||||
Requires extract_head=True. Default: None
|
||||
score_threshold: Minimum relevance score (0.0-1.0) to include URL.
|
||||
Only applies when query is provided. Default: None
|
||||
scoring_method: Scoring algorithm to use. Currently only "bm25" is supported.
|
||||
scoring_method: Scoring algorithm to use. Currently only "bm25" is supported.
|
||||
Future: "semantic". Default: "bm25"
|
||||
filter_nonsense_urls: Filter out utility URLs like robots.txt, sitemap.xml,
|
||||
filter_nonsense_urls: Filter out utility URLs like robots.txt, sitemap.xml,
|
||||
ads.txt, favicon.ico, etc. Default: True
|
||||
cache_ttl_hours: Hours before sitemap cache expires. Set to 0 to disable TTL
|
||||
(only lastmod validation). Default: 24
|
||||
validate_sitemap_lastmod: If True, compares sitemap's <lastmod> with cache
|
||||
timestamp and refetches if sitemap is newer. Default: True
|
||||
"""
|
||||
self.source = source
|
||||
self.pattern = pattern
|
||||
@@ -2047,6 +2053,8 @@ class SeedingConfig:
|
||||
self.score_threshold = score_threshold
|
||||
self.scoring_method = scoring_method
|
||||
self.filter_nonsense_urls = filter_nonsense_urls
|
||||
self.cache_ttl_hours = cache_ttl_hours
|
||||
self.validate_sitemap_lastmod = validate_sitemap_lastmod
|
||||
|
||||
# Add to_dict, from_kwargs, and clone methods for consistency
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
|
||||
@@ -24,7 +24,7 @@ import os
|
||||
import pathlib
|
||||
import re
|
||||
import time
|
||||
from datetime import timedelta
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
|
||||
from urllib.parse import quote, urljoin
|
||||
@@ -78,6 +78,103 @@ _link_rx = re.compile(
|
||||
# ────────────────────────────────────────────────────────────────────────── helpers
|
||||
|
||||
|
||||
def _parse_sitemap_lastmod(xml_content: bytes) -> Optional[str]:
|
||||
"""Extract the most recent lastmod from sitemap XML."""
|
||||
try:
|
||||
if LXML:
|
||||
root = etree.fromstring(xml_content)
|
||||
# Get all lastmod elements (namespace-agnostic)
|
||||
lastmods = root.xpath("//*[local-name()='lastmod']/text()")
|
||||
if lastmods:
|
||||
# Return the most recent one
|
||||
return max(lastmods)
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _is_cache_valid(
|
||||
cache_path: pathlib.Path,
|
||||
ttl_hours: int,
|
||||
validate_lastmod: bool,
|
||||
current_lastmod: Optional[str] = None
|
||||
) -> bool:
|
||||
"""
|
||||
Check if sitemap cache is still valid.
|
||||
|
||||
Returns False (invalid) if:
|
||||
- File doesn't exist
|
||||
- File is corrupted/unreadable
|
||||
- TTL expired (if ttl_hours > 0)
|
||||
- Sitemap lastmod is newer than cache (if validate_lastmod=True)
|
||||
"""
|
||||
if not cache_path.exists():
|
||||
return False
|
||||
|
||||
try:
|
||||
with open(cache_path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Check version
|
||||
if data.get("version") != 1:
|
||||
return False
|
||||
|
||||
# Check TTL
|
||||
if ttl_hours > 0:
|
||||
created_at = datetime.fromisoformat(data["created_at"].replace("Z", "+00:00"))
|
||||
age_hours = (datetime.now(timezone.utc) - created_at).total_seconds() / 3600
|
||||
if age_hours > ttl_hours:
|
||||
return False
|
||||
|
||||
# Check lastmod
|
||||
if validate_lastmod and current_lastmod:
|
||||
cached_lastmod = data.get("sitemap_lastmod")
|
||||
if cached_lastmod and current_lastmod > cached_lastmod:
|
||||
return False
|
||||
|
||||
# Check URL count (sanity check - if 0, likely corrupted)
|
||||
if data.get("url_count", 0) == 0:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except (json.JSONDecodeError, KeyError, ValueError, IOError):
|
||||
# Corrupted cache - return False to trigger refetch
|
||||
return False
|
||||
|
||||
|
||||
def _read_cache(cache_path: pathlib.Path) -> List[str]:
|
||||
"""Read URLs from cache file. Returns empty list on error."""
|
||||
try:
|
||||
with open(cache_path, "r") as f:
|
||||
data = json.load(f)
|
||||
return data.get("urls", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _write_cache(
|
||||
cache_path: pathlib.Path,
|
||||
urls: List[str],
|
||||
sitemap_url: str,
|
||||
sitemap_lastmod: Optional[str]
|
||||
) -> None:
|
||||
"""Write URLs to cache with metadata."""
|
||||
data = {
|
||||
"version": 1,
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
"sitemap_lastmod": sitemap_lastmod,
|
||||
"sitemap_url": sitemap_url,
|
||||
"url_count": len(urls),
|
||||
"urls": urls
|
||||
}
|
||||
try:
|
||||
with open(cache_path, "w") as f:
|
||||
json.dump(data, f)
|
||||
except Exception:
|
||||
pass # Fail silently - cache is optional
|
||||
|
||||
|
||||
def _match(url: str, pattern: str) -> bool:
|
||||
if fnmatch.fnmatch(url, pattern):
|
||||
return True
|
||||
@@ -295,6 +392,10 @@ class AsyncUrlSeeder:
|
||||
score_threshold = config.score_threshold
|
||||
scoring_method = config.scoring_method
|
||||
|
||||
# Store cache config for use in _from_sitemaps
|
||||
self._cache_ttl_hours = getattr(config, 'cache_ttl_hours', 24)
|
||||
self._validate_sitemap_lastmod = getattr(config, 'validate_sitemap_lastmod', True)
|
||||
|
||||
# Ensure seeder's logger verbose matches the config's verbose if it's set
|
||||
if self.logger and hasattr(self.logger, 'verbose') and config.verbose is not None:
|
||||
self.logger.verbose = config.verbose
|
||||
@@ -764,68 +865,222 @@ class AsyncUrlSeeder:
|
||||
# ─────────────────────────────── Sitemaps
|
||||
async def _from_sitemaps(self, domain: str, pattern: str, force: bool = False):
|
||||
"""
|
||||
1. Probe default sitemap locations.
|
||||
2. If none exist, parse robots.txt for alternative sitemap URLs.
|
||||
3. Yield only URLs that match `pattern`.
|
||||
Discover URLs from sitemaps with smart TTL-based caching.
|
||||
|
||||
1. Check cache validity (TTL + lastmod)
|
||||
2. If valid, yield from cache
|
||||
3. If invalid or force=True, fetch fresh and update cache
|
||||
4. FALLBACK: If anything fails, bypass cache and fetch directly
|
||||
"""
|
||||
# Get config values (passed via self during urls() call)
|
||||
cache_ttl_hours = getattr(self, '_cache_ttl_hours', 24)
|
||||
validate_lastmod = getattr(self, '_validate_sitemap_lastmod', True)
|
||||
|
||||
# ── cache file (same logic as _from_cc)
|
||||
# Cache file path (new format: .json instead of .jsonl)
|
||||
host = re.sub(r'^https?://', '', domain).rstrip('/')
|
||||
host = re.sub('[/?#]+', '_', domain)
|
||||
host_safe = re.sub('[/?#]+', '_', host)
|
||||
digest = hashlib.md5(pattern.encode()).hexdigest()[:8]
|
||||
path = self.cache_dir / f"sitemap_{host}_{digest}.jsonl"
|
||||
cache_path = self.cache_dir / f"sitemap_{host_safe}_{digest}.json"
|
||||
|
||||
if path.exists() and not force:
|
||||
self._log("info", "Loading sitemap URLs for {d} from cache: {p}",
|
||||
params={"d": host, "p": str(path)}, tag="URL_SEED")
|
||||
async with aiofiles.open(path, "r") as fp:
|
||||
async for line in fp:
|
||||
url = line.strip()
|
||||
if _match(url, pattern):
|
||||
yield url
|
||||
return
|
||||
# Check for old .jsonl format and delete it
|
||||
old_cache_path = self.cache_dir / f"sitemap_{host_safe}_{digest}.jsonl"
|
||||
if old_cache_path.exists():
|
||||
try:
|
||||
old_cache_path.unlink()
|
||||
self._log("info", "Deleted old cache format: {p}",
|
||||
params={"p": str(old_cache_path)}, tag="URL_SEED")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 1️⃣ direct sitemap probe
|
||||
# strip any scheme so we can handle https → http fallback
|
||||
host = re.sub(r'^https?://', '', domain).rstrip('/')
|
||||
# Step 1: Find sitemap URL and get lastmod (needed for validation)
|
||||
sitemap_url = None
|
||||
sitemap_lastmod = None
|
||||
sitemap_content = None
|
||||
|
||||
schemes = ('https', 'http') # prefer TLS, downgrade if needed
|
||||
schemes = ('https', 'http')
|
||||
for scheme in schemes:
|
||||
for suffix in ("/sitemap.xml", "/sitemap_index.xml"):
|
||||
sm = f"{scheme}://{host}{suffix}"
|
||||
sm = await self._resolve_head(sm)
|
||||
if sm:
|
||||
self._log("info", "Found sitemap at {url}", params={
|
||||
"url": sm}, tag="URL_SEED")
|
||||
async with aiofiles.open(path, "w") as fp:
|
||||
resolved = await self._resolve_head(sm)
|
||||
if resolved:
|
||||
sitemap_url = resolved
|
||||
# Fetch sitemap content to get lastmod
|
||||
try:
|
||||
r = await self.client.get(sitemap_url, timeout=15, follow_redirects=True)
|
||||
if 200 <= r.status_code < 300:
|
||||
sitemap_content = r.content
|
||||
sitemap_lastmod = _parse_sitemap_lastmod(sitemap_content)
|
||||
except Exception:
|
||||
pass
|
||||
break
|
||||
if sitemap_url:
|
||||
break
|
||||
|
||||
# Step 2: Check cache validity (skip if force=True)
|
||||
if not force and cache_path.exists():
|
||||
if _is_cache_valid(cache_path, cache_ttl_hours, validate_lastmod, sitemap_lastmod):
|
||||
self._log("info", "Loading sitemap URLs from valid cache: {p}",
|
||||
params={"p": str(cache_path)}, tag="URL_SEED")
|
||||
cached_urls = _read_cache(cache_path)
|
||||
for url in cached_urls:
|
||||
if _match(url, pattern):
|
||||
yield url
|
||||
return
|
||||
else:
|
||||
self._log("info", "Cache invalid/expired, refetching sitemap for {d}",
|
||||
params={"d": domain}, tag="URL_SEED")
|
||||
|
||||
# Step 3: Fetch fresh URLs
|
||||
discovered_urls = []
|
||||
|
||||
if sitemap_url and sitemap_content:
|
||||
self._log("info", "Found sitemap at {url}", params={"url": sitemap_url}, tag="URL_SEED")
|
||||
|
||||
# Parse sitemap (reuse content we already fetched)
|
||||
async for u in self._iter_sitemap_content(sitemap_url, sitemap_content):
|
||||
discovered_urls.append(u)
|
||||
if _match(u, pattern):
|
||||
yield u
|
||||
elif sitemap_url:
|
||||
# We have a sitemap URL but no content (fetch failed earlier), try again
|
||||
self._log("info", "Found sitemap at {url}", params={"url": sitemap_url}, tag="URL_SEED")
|
||||
async for u in self._iter_sitemap(sitemap_url):
|
||||
discovered_urls.append(u)
|
||||
if _match(u, pattern):
|
||||
yield u
|
||||
else:
|
||||
# Fallback: robots.txt
|
||||
robots = f"https://{host}/robots.txt"
|
||||
try:
|
||||
r = await self.client.get(robots, timeout=10, follow_redirects=True)
|
||||
if 200 <= r.status_code < 300:
|
||||
sitemap_lines = [l.split(":", 1)[1].strip()
|
||||
for l in r.text.splitlines()
|
||||
if l.lower().startswith("sitemap:")]
|
||||
for sm in sitemap_lines:
|
||||
async for u in self._iter_sitemap(sm):
|
||||
await fp.write(u + "\n")
|
||||
discovered_urls.append(u)
|
||||
if _match(u, pattern):
|
||||
yield u
|
||||
else:
|
||||
self._log("warning", "robots.txt unavailable for {d} HTTP{c}",
|
||||
params={"d": domain, "c": r.status_code}, tag="URL_SEED")
|
||||
return
|
||||
|
||||
# 2️⃣ robots.txt fallback
|
||||
robots = f"https://{domain.rstrip('/')}/robots.txt"
|
||||
try:
|
||||
r = await self.client.get(robots, timeout=10, follow_redirects=True)
|
||||
if not 200 <= r.status_code < 300:
|
||||
self._log("warning", "robots.txt unavailable for {d} HTTP{c}", params={
|
||||
"d": domain, "c": r.status_code}, tag="URL_SEED")
|
||||
except Exception as e:
|
||||
self._log("warning", "Failed to fetch robots.txt for {d}: {e}",
|
||||
params={"d": domain, "e": str(e)}, tag="URL_SEED")
|
||||
return
|
||||
sitemap_lines = [l.split(":", 1)[1].strip(
|
||||
) for l in r.text.splitlines() if l.lower().startswith("sitemap:")]
|
||||
except Exception as e:
|
||||
self._log("warning", "Failed to fetch robots.txt for {d}: {e}", params={
|
||||
"d": domain, "e": str(e)}, tag="URL_SEED")
|
||||
return
|
||||
|
||||
if sitemap_lines:
|
||||
async with aiofiles.open(path, "w") as fp:
|
||||
for sm in sitemap_lines:
|
||||
async for u in self._iter_sitemap(sm):
|
||||
await fp.write(u + "\n")
|
||||
if _match(u, pattern):
|
||||
yield u
|
||||
# Step 4: Write to cache (FALLBACK: if write fails, URLs still yielded above)
|
||||
if discovered_urls:
|
||||
_write_cache(cache_path, discovered_urls, sitemap_url or "", sitemap_lastmod)
|
||||
self._log("info", "Cached {count} URLs for {d}",
|
||||
params={"count": len(discovered_urls), "d": domain}, tag="URL_SEED")
|
||||
|
||||
async def _iter_sitemap_content(self, url: str, content: bytes):
|
||||
"""Parse sitemap from already-fetched content."""
|
||||
data = gzip.decompress(content) if url.endswith(".gz") else content
|
||||
base_url = url
|
||||
|
||||
def _normalize_loc(raw: Optional[str]) -> Optional[str]:
|
||||
if not raw:
|
||||
return None
|
||||
normalized = urljoin(base_url, raw.strip())
|
||||
if not normalized:
|
||||
return None
|
||||
return normalized
|
||||
|
||||
# Detect if this is a sitemap index
|
||||
is_sitemap_index = False
|
||||
sub_sitemaps = []
|
||||
regular_urls = []
|
||||
|
||||
if LXML:
|
||||
try:
|
||||
parser = etree.XMLParser(recover=True)
|
||||
root = etree.fromstring(data, parser=parser)
|
||||
sitemap_loc_nodes = root.xpath("//*[local-name()='sitemap']/*[local-name()='loc']")
|
||||
url_loc_nodes = root.xpath("//*[local-name()='url']/*[local-name()='loc']")
|
||||
|
||||
if sitemap_loc_nodes:
|
||||
is_sitemap_index = True
|
||||
for sitemap_elem in sitemap_loc_nodes:
|
||||
loc = _normalize_loc(sitemap_elem.text)
|
||||
if loc:
|
||||
sub_sitemaps.append(loc)
|
||||
|
||||
if not is_sitemap_index:
|
||||
for loc_elem in url_loc_nodes:
|
||||
loc = _normalize_loc(loc_elem.text)
|
||||
if loc:
|
||||
regular_urls.append(loc)
|
||||
except Exception as e:
|
||||
self._log("error", "LXML parsing error for sitemap {url}: {error}",
|
||||
params={"url": url, "error": str(e)}, tag="URL_SEED")
|
||||
return
|
||||
else:
|
||||
import xml.etree.ElementTree as ET
|
||||
try:
|
||||
root = ET.fromstring(data)
|
||||
for elem in root.iter():
|
||||
if '}' in elem.tag:
|
||||
elem.tag = elem.tag.split('}')[1]
|
||||
|
||||
sitemaps = root.findall('.//sitemap')
|
||||
url_entries = root.findall('.//url')
|
||||
|
||||
if sitemaps:
|
||||
is_sitemap_index = True
|
||||
for sitemap in sitemaps:
|
||||
loc_elem = sitemap.find('loc')
|
||||
loc = _normalize_loc(loc_elem.text if loc_elem is not None else None)
|
||||
if loc:
|
||||
sub_sitemaps.append(loc)
|
||||
|
||||
if not is_sitemap_index:
|
||||
for url_elem in url_entries:
|
||||
loc_elem = url_elem.find('loc')
|
||||
loc = _normalize_loc(loc_elem.text if loc_elem is not None else None)
|
||||
if loc:
|
||||
regular_urls.append(loc)
|
||||
except Exception as e:
|
||||
self._log("error", "ElementTree parsing error for sitemap {url}: {error}",
|
||||
params={"url": url, "error": str(e)}, tag="URL_SEED")
|
||||
return
|
||||
|
||||
# Process based on type
|
||||
if is_sitemap_index and sub_sitemaps:
|
||||
self._log("info", "Processing sitemap index with {count} sub-sitemaps",
|
||||
params={"count": len(sub_sitemaps)}, tag="URL_SEED")
|
||||
|
||||
queue_size = min(50000, len(sub_sitemaps) * 1000)
|
||||
result_queue = asyncio.Queue(maxsize=queue_size)
|
||||
completed_count = 0
|
||||
total_sitemaps = len(sub_sitemaps)
|
||||
|
||||
async def process_subsitemap(sitemap_url: str):
|
||||
try:
|
||||
async for u in self._iter_sitemap(sitemap_url):
|
||||
await result_queue.put(u)
|
||||
except Exception as e:
|
||||
self._log("error", "Error processing sub-sitemap {url}: {error}",
|
||||
params={"url": sitemap_url, "error": str(e)}, tag="URL_SEED")
|
||||
finally:
|
||||
await result_queue.put(None)
|
||||
|
||||
tasks = [asyncio.create_task(process_subsitemap(sm)) for sm in sub_sitemaps]
|
||||
|
||||
while completed_count < total_sitemaps:
|
||||
item = await result_queue.get()
|
||||
if item is None:
|
||||
completed_count += 1
|
||||
else:
|
||||
yield item
|
||||
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
else:
|
||||
for u in regular_urls:
|
||||
yield u
|
||||
|
||||
async def _iter_sitemap(self, url: str):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user