feat: integrate last30days and daily-news-report skills

This commit is contained in:
sck_0
2026-01-26 19:05:37 +01:00
parent d2569f2107
commit c7f7f23bd7
45 changed files with 7632 additions and 0 deletions

View File

@@ -0,0 +1,521 @@
#!/usr/bin/env python3
"""
last30days - Research a topic from the last 30 days on Reddit + X.
Usage:
python3 last30days.py <topic> [options]
Options:
--mock Use fixtures instead of real API calls
--emit=MODE Output mode: compact|json|md|context|path (default: compact)
--sources=MODE Source selection: auto|reddit|x|both (default: auto)
--quick Faster research with fewer sources (8-12 each)
--deep Comprehensive research with more sources (50-70 Reddit, 40-60 X)
--debug Enable verbose debug logging
"""
import argparse
import json
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
# Add lib to path
SCRIPT_DIR = Path(__file__).parent.resolve()
sys.path.insert(0, str(SCRIPT_DIR))
from lib import (
dates,
dedupe,
env,
http,
models,
normalize,
openai_reddit,
reddit_enrich,
render,
schema,
score,
ui,
websearch,
xai_x,
)
def load_fixture(name: str) -> dict:
"""Load a fixture file."""
fixture_path = SCRIPT_DIR.parent / "fixtures" / name
if fixture_path.exists():
with open(fixture_path) as f:
return json.load(f)
return {}
def _search_reddit(
topic: str,
config: dict,
selected_models: dict,
from_date: str,
to_date: str,
depth: str,
mock: bool,
) -> tuple:
"""Search Reddit via OpenAI (runs in thread).
Returns:
Tuple of (reddit_items, raw_openai, error)
"""
raw_openai = None
reddit_error = None
if mock:
raw_openai = load_fixture("openai_sample.json")
else:
try:
raw_openai = openai_reddit.search_reddit(
config["OPENAI_API_KEY"],
selected_models["openai"],
topic,
from_date,
to_date,
depth=depth,
)
except http.HTTPError as e:
raw_openai = {"error": str(e)}
reddit_error = f"API error: {e}"
except Exception as e:
raw_openai = {"error": str(e)}
reddit_error = f"{type(e).__name__}: {e}"
# Parse response
reddit_items = openai_reddit.parse_reddit_response(raw_openai or {})
# Quick retry with simpler query if few results
if len(reddit_items) < 5 and not mock and not reddit_error:
core = openai_reddit._extract_core_subject(topic)
if core.lower() != topic.lower():
try:
retry_raw = openai_reddit.search_reddit(
config["OPENAI_API_KEY"],
selected_models["openai"],
core,
from_date, to_date,
depth=depth,
)
retry_items = openai_reddit.parse_reddit_response(retry_raw)
# Add items not already found (by URL)
existing_urls = {item.get("url") for item in reddit_items}
for item in retry_items:
if item.get("url") not in existing_urls:
reddit_items.append(item)
except Exception:
pass
return reddit_items, raw_openai, reddit_error
def _search_x(
topic: str,
config: dict,
selected_models: dict,
from_date: str,
to_date: str,
depth: str,
mock: bool,
) -> tuple:
"""Search X via xAI (runs in thread).
Returns:
Tuple of (x_items, raw_xai, error)
"""
raw_xai = None
x_error = None
if mock:
raw_xai = load_fixture("xai_sample.json")
else:
try:
raw_xai = xai_x.search_x(
config["XAI_API_KEY"],
selected_models["xai"],
topic,
from_date,
to_date,
depth=depth,
)
except http.HTTPError as e:
raw_xai = {"error": str(e)}
x_error = f"API error: {e}"
except Exception as e:
raw_xai = {"error": str(e)}
x_error = f"{type(e).__name__}: {e}"
# Parse response
x_items = xai_x.parse_x_response(raw_xai or {})
return x_items, raw_xai, x_error
def run_research(
topic: str,
sources: str,
config: dict,
selected_models: dict,
from_date: str,
to_date: str,
depth: str = "default",
mock: bool = False,
progress: ui.ProgressDisplay = None,
) -> tuple:
"""Run the research pipeline.
Returns:
Tuple of (reddit_items, x_items, web_needed, raw_openai, raw_xai, raw_reddit_enriched, reddit_error, x_error)
Note: web_needed is True when WebSearch should be performed by Claude.
The script outputs a marker and Claude handles WebSearch in its session.
"""
reddit_items = []
x_items = []
raw_openai = None
raw_xai = None
raw_reddit_enriched = []
reddit_error = None
x_error = None
# Check if WebSearch is needed (always needed in web-only mode)
web_needed = sources in ("all", "web", "reddit-web", "x-web")
# Web-only mode: no API calls needed, Claude handles everything
if sources == "web":
if progress:
progress.start_web_only()
progress.end_web_only()
return reddit_items, x_items, True, raw_openai, raw_xai, raw_reddit_enriched, reddit_error, x_error
# Determine which searches to run
run_reddit = sources in ("both", "reddit", "all", "reddit-web")
run_x = sources in ("both", "x", "all", "x-web")
# Run Reddit and X searches in parallel
reddit_future = None
x_future = None
with ThreadPoolExecutor(max_workers=2) as executor:
# Submit both searches
if run_reddit:
if progress:
progress.start_reddit()
reddit_future = executor.submit(
_search_reddit, topic, config, selected_models,
from_date, to_date, depth, mock
)
if run_x:
if progress:
progress.start_x()
x_future = executor.submit(
_search_x, topic, config, selected_models,
from_date, to_date, depth, mock
)
# Collect results
if reddit_future:
try:
reddit_items, raw_openai, reddit_error = reddit_future.result()
if reddit_error and progress:
progress.show_error(f"Reddit error: {reddit_error}")
except Exception as e:
reddit_error = f"{type(e).__name__}: {e}"
if progress:
progress.show_error(f"Reddit error: {e}")
if progress:
progress.end_reddit(len(reddit_items))
if x_future:
try:
x_items, raw_xai, x_error = x_future.result()
if x_error and progress:
progress.show_error(f"X error: {x_error}")
except Exception as e:
x_error = f"{type(e).__name__}: {e}"
if progress:
progress.show_error(f"X error: {e}")
if progress:
progress.end_x(len(x_items))
# Enrich Reddit items with real data (sequential, but with error handling per-item)
if reddit_items:
if progress:
progress.start_reddit_enrich(1, len(reddit_items))
for i, item in enumerate(reddit_items):
if progress and i > 0:
progress.update_reddit_enrich(i + 1, len(reddit_items))
try:
if mock:
mock_thread = load_fixture("reddit_thread_sample.json")
reddit_items[i] = reddit_enrich.enrich_reddit_item(item, mock_thread)
else:
reddit_items[i] = reddit_enrich.enrich_reddit_item(item)
except Exception as e:
# Log but don't crash - keep the unenriched item
if progress:
progress.show_error(f"Enrich failed for {item.get('url', 'unknown')}: {e}")
raw_reddit_enriched.append(reddit_items[i])
if progress:
progress.end_reddit_enrich()
return reddit_items, x_items, web_needed, raw_openai, raw_xai, raw_reddit_enriched, reddit_error, x_error
def main():
parser = argparse.ArgumentParser(
description="Research a topic from the last 30 days on Reddit + X"
)
parser.add_argument("topic", nargs="?", help="Topic to research")
parser.add_argument("--mock", action="store_true", help="Use fixtures")
parser.add_argument(
"--emit",
choices=["compact", "json", "md", "context", "path"],
default="compact",
help="Output mode",
)
parser.add_argument(
"--sources",
choices=["auto", "reddit", "x", "both"],
default="auto",
help="Source selection",
)
parser.add_argument(
"--quick",
action="store_true",
help="Faster research with fewer sources (8-12 each)",
)
parser.add_argument(
"--deep",
action="store_true",
help="Comprehensive research with more sources (50-70 Reddit, 40-60 X)",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable verbose debug logging",
)
parser.add_argument(
"--include-web",
action="store_true",
help="Include general web search alongside Reddit/X (lower weighted)",
)
args = parser.parse_args()
# Enable debug logging if requested
if args.debug:
os.environ["LAST30DAYS_DEBUG"] = "1"
# Re-import http to pick up debug flag
from lib import http as http_module
http_module.DEBUG = True
# Determine depth
if args.quick and args.deep:
print("Error: Cannot use both --quick and --deep", file=sys.stderr)
sys.exit(1)
elif args.quick:
depth = "quick"
elif args.deep:
depth = "deep"
else:
depth = "default"
if not args.topic:
print("Error: Please provide a topic to research.", file=sys.stderr)
print("Usage: python3 last30days.py <topic> [options]", file=sys.stderr)
sys.exit(1)
# Load config
config = env.get_config()
# Check available sources
available = env.get_available_sources(config)
# Mock mode can work without keys
if args.mock:
if args.sources == "auto":
sources = "both"
else:
sources = args.sources
else:
# Validate requested sources against available
sources, error = env.validate_sources(args.sources, available, args.include_web)
if error:
# If it's a warning about WebSearch fallback, print but continue
if "WebSearch fallback" in error:
print(f"Note: {error}", file=sys.stderr)
else:
print(f"Error: {error}", file=sys.stderr)
sys.exit(1)
# Get date range
from_date, to_date = dates.get_date_range(30)
# Check what keys are missing for promo messaging
missing_keys = env.get_missing_keys(config)
# Initialize progress display
progress = ui.ProgressDisplay(args.topic, show_banner=True)
# Show promo for missing keys BEFORE research
if missing_keys != 'none':
progress.show_promo(missing_keys)
# Select models
if args.mock:
# Use mock models
mock_openai_models = load_fixture("models_openai_sample.json").get("data", [])
mock_xai_models = load_fixture("models_xai_sample.json").get("data", [])
selected_models = models.get_models(
{
"OPENAI_API_KEY": "mock",
"XAI_API_KEY": "mock",
**config,
},
mock_openai_models,
mock_xai_models,
)
else:
selected_models = models.get_models(config)
# Determine mode string
if sources == "all":
mode = "all" # reddit + x + web
elif sources == "both":
mode = "both" # reddit + x
elif sources == "reddit":
mode = "reddit-only"
elif sources == "reddit-web":
mode = "reddit-web"
elif sources == "x":
mode = "x-only"
elif sources == "x-web":
mode = "x-web"
elif sources == "web":
mode = "web-only"
else:
mode = sources
# Run research
reddit_items, x_items, web_needed, raw_openai, raw_xai, raw_reddit_enriched, reddit_error, x_error = run_research(
args.topic,
sources,
config,
selected_models,
from_date,
to_date,
depth,
args.mock,
progress,
)
# Processing phase
progress.start_processing()
# Normalize items
normalized_reddit = normalize.normalize_reddit_items(reddit_items, from_date, to_date)
normalized_x = normalize.normalize_x_items(x_items, from_date, to_date)
# Hard date filter: exclude items with verified dates outside the range
# This is the safety net - even if prompts let old content through, this filters it
filtered_reddit = normalize.filter_by_date_range(normalized_reddit, from_date, to_date)
filtered_x = normalize.filter_by_date_range(normalized_x, from_date, to_date)
# Score items
scored_reddit = score.score_reddit_items(filtered_reddit)
scored_x = score.score_x_items(filtered_x)
# Sort items
sorted_reddit = score.sort_items(scored_reddit)
sorted_x = score.sort_items(scored_x)
# Dedupe items
deduped_reddit = dedupe.dedupe_reddit(sorted_reddit)
deduped_x = dedupe.dedupe_x(sorted_x)
progress.end_processing()
# Create report
report = schema.create_report(
args.topic,
from_date,
to_date,
mode,
selected_models.get("openai"),
selected_models.get("xai"),
)
report.reddit = deduped_reddit
report.x = deduped_x
report.reddit_error = reddit_error
report.x_error = x_error
# Generate context snippet
report.context_snippet_md = render.render_context_snippet(report)
# Write outputs
render.write_outputs(report, raw_openai, raw_xai, raw_reddit_enriched)
# Show completion
if sources == "web":
progress.show_web_only_complete()
else:
progress.show_complete(len(deduped_reddit), len(deduped_x))
# Output result
output_result(report, args.emit, web_needed, args.topic, from_date, to_date, missing_keys)
def output_result(
report: schema.Report,
emit_mode: str,
web_needed: bool = False,
topic: str = "",
from_date: str = "",
to_date: str = "",
missing_keys: str = "none",
):
"""Output the result based on emit mode."""
if emit_mode == "compact":
print(render.render_compact(report, missing_keys=missing_keys))
elif emit_mode == "json":
print(json.dumps(report.to_dict(), indent=2))
elif emit_mode == "md":
print(render.render_full_report(report))
elif emit_mode == "context":
print(report.context_snippet_md)
elif emit_mode == "path":
print(render.get_context_path())
# Output WebSearch instructions if needed
if web_needed:
print("\n" + "="*60)
print("### WEBSEARCH REQUIRED ###")
print("="*60)
print(f"Topic: {topic}")
print(f"Date range: {from_date} to {to_date}")
print("")
print("Claude: Use your WebSearch tool to find 8-15 relevant web pages.")
print("EXCLUDE: reddit.com, x.com, twitter.com (already covered above)")
print("INCLUDE: blogs, docs, news, tutorials from the last 30 days")
print("")
print("After searching, synthesize WebSearch results WITH the Reddit/X")
print("results above. WebSearch items should rank LOWER than comparable")
print("Reddit/X items (they lack engagement metrics).")
print("="*60)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
# last30days library modules

View File

@@ -0,0 +1,152 @@
"""Caching utilities for last30days skill."""
import hashlib
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
CACHE_DIR = Path.home() / ".cache" / "last30days"
DEFAULT_TTL_HOURS = 24
MODEL_CACHE_TTL_DAYS = 7
def ensure_cache_dir():
"""Ensure cache directory exists."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def get_cache_key(topic: str, from_date: str, to_date: str, sources: str) -> str:
"""Generate a cache key from query parameters."""
key_data = f"{topic}|{from_date}|{to_date}|{sources}"
return hashlib.sha256(key_data.encode()).hexdigest()[:16]
def get_cache_path(cache_key: str) -> Path:
"""Get path to cache file."""
return CACHE_DIR / f"{cache_key}.json"
def is_cache_valid(cache_path: Path, ttl_hours: int = DEFAULT_TTL_HOURS) -> bool:
"""Check if cache file exists and is within TTL."""
if not cache_path.exists():
return False
try:
stat = cache_path.stat()
mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
now = datetime.now(timezone.utc)
age_hours = (now - mtime).total_seconds() / 3600
return age_hours < ttl_hours
except OSError:
return False
def load_cache(cache_key: str, ttl_hours: int = DEFAULT_TTL_HOURS) -> Optional[dict]:
"""Load data from cache if valid."""
cache_path = get_cache_path(cache_key)
if not is_cache_valid(cache_path, ttl_hours):
return None
try:
with open(cache_path, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def get_cache_age_hours(cache_path: Path) -> Optional[float]:
"""Get age of cache file in hours."""
if not cache_path.exists():
return None
try:
stat = cache_path.stat()
mtime = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
now = datetime.now(timezone.utc)
return (now - mtime).total_seconds() / 3600
except OSError:
return None
def load_cache_with_age(cache_key: str, ttl_hours: int = DEFAULT_TTL_HOURS) -> tuple:
"""Load data from cache with age info.
Returns:
Tuple of (data, age_hours) or (None, None) if invalid
"""
cache_path = get_cache_path(cache_key)
if not is_cache_valid(cache_path, ttl_hours):
return None, None
age = get_cache_age_hours(cache_path)
try:
with open(cache_path, 'r') as f:
return json.load(f), age
except (json.JSONDecodeError, OSError):
return None, None
def save_cache(cache_key: str, data: dict):
"""Save data to cache."""
ensure_cache_dir()
cache_path = get_cache_path(cache_key)
try:
with open(cache_path, 'w') as f:
json.dump(data, f)
except OSError:
pass # Silently fail on cache write errors
def clear_cache():
"""Clear all cache files."""
if CACHE_DIR.exists():
for f in CACHE_DIR.glob("*.json"):
try:
f.unlink()
except OSError:
pass
# Model selection cache (longer TTL)
MODEL_CACHE_FILE = CACHE_DIR / "model_selection.json"
def load_model_cache() -> dict:
"""Load model selection cache."""
if not is_cache_valid(MODEL_CACHE_FILE, MODEL_CACHE_TTL_DAYS * 24):
return {}
try:
with open(MODEL_CACHE_FILE, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
def save_model_cache(data: dict):
"""Save model selection cache."""
ensure_cache_dir()
try:
with open(MODEL_CACHE_FILE, 'w') as f:
json.dump(data, f)
except OSError:
pass
def get_cached_model(provider: str) -> Optional[str]:
"""Get cached model selection for a provider."""
cache = load_model_cache()
return cache.get(provider)
def set_cached_model(provider: str, model: str):
"""Cache model selection for a provider."""
cache = load_model_cache()
cache[provider] = model
cache['updated_at'] = datetime.now(timezone.utc).isoformat()
save_model_cache(cache)

View File

@@ -0,0 +1,124 @@
"""Date utilities for last30days skill."""
from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple
def get_date_range(days: int = 30) -> Tuple[str, str]:
"""Get the date range for the last N days.
Returns:
Tuple of (from_date, to_date) as YYYY-MM-DD strings
"""
today = datetime.now(timezone.utc).date()
from_date = today - timedelta(days=days)
return from_date.isoformat(), today.isoformat()
def parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Parse a date string in various formats.
Supports: YYYY-MM-DD, ISO 8601, Unix timestamp
"""
if not date_str:
return None
# Try Unix timestamp (from Reddit)
try:
ts = float(date_str)
return datetime.fromtimestamp(ts, tz=timezone.utc)
except (ValueError, TypeError):
pass
# Try ISO formats
formats = [
"%Y-%m-%d",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f%z",
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
return None
def timestamp_to_date(ts: Optional[float]) -> Optional[str]:
"""Convert Unix timestamp to YYYY-MM-DD string."""
if ts is None:
return None
try:
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
return dt.date().isoformat()
except (ValueError, TypeError, OSError):
return None
def get_date_confidence(date_str: Optional[str], from_date: str, to_date: str) -> str:
"""Determine confidence level for a date.
Args:
date_str: The date to check (YYYY-MM-DD or None)
from_date: Start of valid range (YYYY-MM-DD)
to_date: End of valid range (YYYY-MM-DD)
Returns:
'high', 'med', or 'low'
"""
if not date_str:
return 'low'
try:
dt = datetime.strptime(date_str, "%Y-%m-%d").date()
start = datetime.strptime(from_date, "%Y-%m-%d").date()
end = datetime.strptime(to_date, "%Y-%m-%d").date()
if start <= dt <= end:
return 'high'
elif dt < start:
# Older than range
return 'low'
else:
# Future date (suspicious)
return 'low'
except ValueError:
return 'low'
def days_ago(date_str: Optional[str]) -> Optional[int]:
"""Calculate how many days ago a date is.
Returns None if date is invalid or missing.
"""
if not date_str:
return None
try:
dt = datetime.strptime(date_str, "%Y-%m-%d").date()
today = datetime.now(timezone.utc).date()
delta = today - dt
return delta.days
except ValueError:
return None
def recency_score(date_str: Optional[str], max_days: int = 30) -> int:
"""Calculate recency score (0-100).
0 days ago = 100, max_days ago = 0, clamped.
"""
age = days_ago(date_str)
if age is None:
return 0 # Unknown date gets worst score
if age < 0:
return 100 # Future date (treat as today)
if age >= max_days:
return 0
return int(100 * (1 - age / max_days))

View File

@@ -0,0 +1,120 @@
"""Near-duplicate detection for last30days skill."""
import re
from typing import List, Set, Tuple, Union
from . import schema
def normalize_text(text: str) -> str:
"""Normalize text for comparison.
- Lowercase
- Remove punctuation
- Collapse whitespace
"""
text = text.lower()
text = re.sub(r'[^\w\s]', ' ', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def get_ngrams(text: str, n: int = 3) -> Set[str]:
"""Get character n-grams from text."""
text = normalize_text(text)
if len(text) < n:
return {text}
return {text[i:i+n] for i in range(len(text) - n + 1)}
def jaccard_similarity(set1: Set[str], set2: Set[str]) -> float:
"""Compute Jaccard similarity between two sets."""
if not set1 or not set2:
return 0.0
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
def get_item_text(item: Union[schema.RedditItem, schema.XItem]) -> str:
"""Get comparable text from an item."""
if isinstance(item, schema.RedditItem):
return item.title
else:
return item.text
def find_duplicates(
items: List[Union[schema.RedditItem, schema.XItem]],
threshold: float = 0.7,
) -> List[Tuple[int, int]]:
"""Find near-duplicate pairs in items.
Args:
items: List of items to check
threshold: Similarity threshold (0-1)
Returns:
List of (i, j) index pairs where i < j and items are similar
"""
duplicates = []
# Pre-compute n-grams
ngrams = [get_ngrams(get_item_text(item)) for item in items]
for i in range(len(items)):
for j in range(i + 1, len(items)):
similarity = jaccard_similarity(ngrams[i], ngrams[j])
if similarity >= threshold:
duplicates.append((i, j))
return duplicates
def dedupe_items(
items: List[Union[schema.RedditItem, schema.XItem]],
threshold: float = 0.7,
) -> List[Union[schema.RedditItem, schema.XItem]]:
"""Remove near-duplicates, keeping highest-scored item.
Args:
items: List of items (should be pre-sorted by score descending)
threshold: Similarity threshold
Returns:
Deduplicated items
"""
if len(items) <= 1:
return items
# Find duplicate pairs
dup_pairs = find_duplicates(items, threshold)
# Mark indices to remove (always remove the lower-scored one)
# Since items are pre-sorted by score, the second index is always lower
to_remove = set()
for i, j in dup_pairs:
# Keep the higher-scored one (lower index in sorted list)
if items[i].score >= items[j].score:
to_remove.add(j)
else:
to_remove.add(i)
# Return items not marked for removal
return [item for idx, item in enumerate(items) if idx not in to_remove]
def dedupe_reddit(
items: List[schema.RedditItem],
threshold: float = 0.7,
) -> List[schema.RedditItem]:
"""Dedupe Reddit items."""
return dedupe_items(items, threshold)
def dedupe_x(
items: List[schema.XItem],
threshold: float = 0.7,
) -> List[schema.XItem]:
"""Dedupe X items."""
return dedupe_items(items, threshold)

View File

@@ -0,0 +1,149 @@
"""Environment and API key management for last30days skill."""
import os
from pathlib import Path
from typing import Optional, Dict, Any
CONFIG_DIR = Path.home() / ".config" / "last30days"
CONFIG_FILE = CONFIG_DIR / ".env"
def load_env_file(path: Path) -> Dict[str, str]:
"""Load environment variables from a file."""
env = {}
if not path.exists():
return env
with open(path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, _, value = line.partition('=')
key = key.strip()
value = value.strip()
# Remove quotes if present
if value and value[0] in ('"', "'") and value[-1] == value[0]:
value = value[1:-1]
if key and value:
env[key] = value
return env
def get_config() -> Dict[str, Any]:
"""Load configuration from ~/.config/last30days/.env and environment."""
# Load from config file first
file_env = load_env_file(CONFIG_FILE)
# Environment variables override file
config = {
'OPENAI_API_KEY': os.environ.get('OPENAI_API_KEY') or file_env.get('OPENAI_API_KEY'),
'XAI_API_KEY': os.environ.get('XAI_API_KEY') or file_env.get('XAI_API_KEY'),
'OPENAI_MODEL_POLICY': os.environ.get('OPENAI_MODEL_POLICY') or file_env.get('OPENAI_MODEL_POLICY', 'auto'),
'OPENAI_MODEL_PIN': os.environ.get('OPENAI_MODEL_PIN') or file_env.get('OPENAI_MODEL_PIN'),
'XAI_MODEL_POLICY': os.environ.get('XAI_MODEL_POLICY') or file_env.get('XAI_MODEL_POLICY', 'latest'),
'XAI_MODEL_PIN': os.environ.get('XAI_MODEL_PIN') or file_env.get('XAI_MODEL_PIN'),
}
return config
def config_exists() -> bool:
"""Check if configuration file exists."""
return CONFIG_FILE.exists()
def get_available_sources(config: Dict[str, Any]) -> str:
"""Determine which sources are available based on API keys.
Returns: 'both', 'reddit', 'x', or 'web' (fallback when no keys)
"""
has_openai = bool(config.get('OPENAI_API_KEY'))
has_xai = bool(config.get('XAI_API_KEY'))
if has_openai and has_xai:
return 'both'
elif has_openai:
return 'reddit'
elif has_xai:
return 'x'
else:
return 'web' # Fallback: WebSearch only (no API keys needed)
def get_missing_keys(config: Dict[str, Any]) -> str:
"""Determine which API keys are missing.
Returns: 'both', 'reddit', 'x', or 'none'
"""
has_openai = bool(config.get('OPENAI_API_KEY'))
has_xai = bool(config.get('XAI_API_KEY'))
if has_openai and has_xai:
return 'none'
elif has_openai:
return 'x' # Missing xAI key
elif has_xai:
return 'reddit' # Missing OpenAI key
else:
return 'both' # Missing both keys
def validate_sources(requested: str, available: str, include_web: bool = False) -> tuple[str, Optional[str]]:
"""Validate requested sources against available keys.
Args:
requested: 'auto', 'reddit', 'x', 'both', or 'web'
available: Result from get_available_sources()
include_web: If True, add WebSearch to available sources
Returns:
Tuple of (effective_sources, error_message)
"""
# WebSearch-only mode (no API keys)
if available == 'web':
if requested == 'auto':
return 'web', None
elif requested == 'web':
return 'web', None
else:
return 'web', f"No API keys configured. Using WebSearch fallback. Add keys to ~/.config/last30days/.env for Reddit/X."
if requested == 'auto':
# Add web to sources if include_web is set
if include_web:
if available == 'both':
return 'all', None # reddit + x + web
elif available == 'reddit':
return 'reddit-web', None
elif available == 'x':
return 'x-web', None
return available, None
if requested == 'web':
return 'web', None
if requested == 'both':
if available not in ('both',):
missing = 'xAI' if available == 'reddit' else 'OpenAI'
return 'none', f"Requested both sources but {missing} key is missing. Use --sources=auto to use available keys."
if include_web:
return 'all', None
return 'both', None
if requested == 'reddit':
if available == 'x':
return 'none', "Requested Reddit but only xAI key is available."
if include_web:
return 'reddit-web', None
return 'reddit', None
if requested == 'x':
if available == 'reddit':
return 'none', "Requested X but only OpenAI key is available."
if include_web:
return 'x-web', None
return 'x', None
return requested, None

View File

@@ -0,0 +1,152 @@
"""HTTP utilities for last30days skill (stdlib only)."""
import json
import os
import sys
import time
import urllib.error
import urllib.request
from typing import Any, Dict, Optional
from urllib.parse import urlencode
DEFAULT_TIMEOUT = 30
DEBUG = os.environ.get("LAST30DAYS_DEBUG", "").lower() in ("1", "true", "yes")
def log(msg: str):
"""Log debug message to stderr."""
if DEBUG:
sys.stderr.write(f"[DEBUG] {msg}\n")
sys.stderr.flush()
MAX_RETRIES = 3
RETRY_DELAY = 1.0
USER_AGENT = "last30days-skill/1.0 (Claude Code Skill)"
class HTTPError(Exception):
"""HTTP request error with status code."""
def __init__(self, message: str, status_code: Optional[int] = None, body: Optional[str] = None):
super().__init__(message)
self.status_code = status_code
self.body = body
def request(
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
json_data: Optional[Dict[str, Any]] = None,
timeout: int = DEFAULT_TIMEOUT,
retries: int = MAX_RETRIES,
) -> Dict[str, Any]:
"""Make an HTTP request and return JSON response.
Args:
method: HTTP method (GET, POST, etc.)
url: Request URL
headers: Optional headers dict
json_data: Optional JSON body (for POST)
timeout: Request timeout in seconds
retries: Number of retries on failure
Returns:
Parsed JSON response
Raises:
HTTPError: On request failure
"""
headers = headers or {}
headers.setdefault("User-Agent", USER_AGENT)
data = None
if json_data is not None:
data = json.dumps(json_data).encode('utf-8')
headers.setdefault("Content-Type", "application/json")
req = urllib.request.Request(url, data=data, headers=headers, method=method)
log(f"{method} {url}")
if json_data:
log(f"Payload keys: {list(json_data.keys())}")
last_error = None
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=timeout) as response:
body = response.read().decode('utf-8')
log(f"Response: {response.status} ({len(body)} bytes)")
return json.loads(body) if body else {}
except urllib.error.HTTPError as e:
body = None
try:
body = e.read().decode('utf-8')
except:
pass
log(f"HTTP Error {e.code}: {e.reason}")
if body:
log(f"Error body: {body[:500]}")
last_error = HTTPError(f"HTTP {e.code}: {e.reason}", e.code, body)
# Don't retry client errors (4xx) except rate limits
if 400 <= e.code < 500 and e.code != 429:
raise last_error
if attempt < retries - 1:
time.sleep(RETRY_DELAY * (attempt + 1))
except urllib.error.URLError as e:
log(f"URL Error: {e.reason}")
last_error = HTTPError(f"URL Error: {e.reason}")
if attempt < retries - 1:
time.sleep(RETRY_DELAY * (attempt + 1))
except json.JSONDecodeError as e:
log(f"JSON decode error: {e}")
last_error = HTTPError(f"Invalid JSON response: {e}")
raise last_error
except (OSError, TimeoutError, ConnectionResetError) as e:
# Handle socket-level errors (connection reset, timeout, etc.)
log(f"Connection error: {type(e).__name__}: {e}")
last_error = HTTPError(f"Connection error: {type(e).__name__}: {e}")
if attempt < retries - 1:
time.sleep(RETRY_DELAY * (attempt + 1))
if last_error:
raise last_error
raise HTTPError("Request failed with no error details")
def get(url: str, headers: Optional[Dict[str, str]] = None, **kwargs) -> Dict[str, Any]:
"""Make a GET request."""
return request("GET", url, headers=headers, **kwargs)
def post(url: str, json_data: Dict[str, Any], headers: Optional[Dict[str, str]] = None, **kwargs) -> Dict[str, Any]:
"""Make a POST request with JSON body."""
return request("POST", url, headers=headers, json_data=json_data, **kwargs)
def get_reddit_json(path: str) -> Dict[str, Any]:
"""Fetch Reddit thread JSON.
Args:
path: Reddit path (e.g., /r/subreddit/comments/id/title)
Returns:
Parsed JSON response
"""
# Ensure path starts with /
if not path.startswith('/'):
path = '/' + path
# Remove trailing slash and add .json
path = path.rstrip('/')
if not path.endswith('.json'):
path = path + '.json'
url = f"https://www.reddit.com{path}?raw_json=1"
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json",
}
return get(url, headers=headers)

View File

@@ -0,0 +1,175 @@
"""Model auto-selection for last30days skill."""
import re
from typing import Dict, List, Optional, Tuple
from . import cache, http
# OpenAI API
OPENAI_MODELS_URL = "https://api.openai.com/v1/models"
OPENAI_FALLBACK_MODELS = ["gpt-5.2", "gpt-5.1", "gpt-5", "gpt-4o"]
# xAI API - Agent Tools API requires grok-4 family
XAI_MODELS_URL = "https://api.x.ai/v1/models"
XAI_ALIASES = {
"latest": "grok-4-1-fast", # Required for x_search tool
"stable": "grok-4-1-fast",
}
def parse_version(model_id: str) -> Optional[Tuple[int, ...]]:
"""Parse semantic version from model ID.
Examples:
gpt-5 -> (5,)
gpt-5.2 -> (5, 2)
gpt-5.2.1 -> (5, 2, 1)
"""
match = re.search(r'(\d+(?:\.\d+)*)', model_id)
if match:
return tuple(int(x) for x in match.group(1).split('.'))
return None
def is_mainline_openai_model(model_id: str) -> bool:
"""Check if model is a mainline GPT model (not mini/nano/chat/codex/pro)."""
model_lower = model_id.lower()
# Must be gpt-5 series
if not re.match(r'^gpt-5(\.\d+)*$', model_lower):
return False
# Exclude variants
excludes = ['mini', 'nano', 'chat', 'codex', 'pro', 'preview', 'turbo']
for exc in excludes:
if exc in model_lower:
return False
return True
def select_openai_model(
api_key: str,
policy: str = "auto",
pin: Optional[str] = None,
mock_models: Optional[List[Dict]] = None,
) -> str:
"""Select the best OpenAI model based on policy.
Args:
api_key: OpenAI API key
policy: 'auto' or 'pinned'
pin: Model to use if policy is 'pinned'
mock_models: Mock model list for testing
Returns:
Selected model ID
"""
if policy == "pinned" and pin:
return pin
# Check cache first
cached = cache.get_cached_model("openai")
if cached:
return cached
# Fetch model list
if mock_models is not None:
models = mock_models
else:
try:
headers = {"Authorization": f"Bearer {api_key}"}
response = http.get(OPENAI_MODELS_URL, headers=headers)
models = response.get("data", [])
except http.HTTPError:
# Fall back to known models
return OPENAI_FALLBACK_MODELS[0]
# Filter to mainline models
candidates = [m for m in models if is_mainline_openai_model(m.get("id", ""))]
if not candidates:
# No gpt-5 models found, use fallback
return OPENAI_FALLBACK_MODELS[0]
# Sort by version (descending), then by created timestamp
def sort_key(m):
version = parse_version(m.get("id", "")) or (0,)
created = m.get("created", 0)
return (version, created)
candidates.sort(key=sort_key, reverse=True)
selected = candidates[0]["id"]
# Cache the selection
cache.set_cached_model("openai", selected)
return selected
def select_xai_model(
api_key: str,
policy: str = "latest",
pin: Optional[str] = None,
mock_models: Optional[List[Dict]] = None,
) -> str:
"""Select the best xAI model based on policy.
Args:
api_key: xAI API key
policy: 'latest', 'stable', or 'pinned'
pin: Model to use if policy is 'pinned'
mock_models: Mock model list for testing
Returns:
Selected model ID
"""
if policy == "pinned" and pin:
return pin
# Use alias system
if policy in XAI_ALIASES:
alias = XAI_ALIASES[policy]
# Check cache first
cached = cache.get_cached_model("xai")
if cached:
return cached
# Cache the alias
cache.set_cached_model("xai", alias)
return alias
# Default to latest
return XAI_ALIASES["latest"]
def get_models(
config: Dict,
mock_openai_models: Optional[List[Dict]] = None,
mock_xai_models: Optional[List[Dict]] = None,
) -> Dict[str, Optional[str]]:
"""Get selected models for both providers.
Returns:
Dict with 'openai' and 'xai' keys
"""
result = {"openai": None, "xai": None}
if config.get("OPENAI_API_KEY"):
result["openai"] = select_openai_model(
config["OPENAI_API_KEY"],
config.get("OPENAI_MODEL_POLICY", "auto"),
config.get("OPENAI_MODEL_PIN"),
mock_openai_models,
)
if config.get("XAI_API_KEY"):
result["xai"] = select_xai_model(
config["XAI_API_KEY"],
config.get("XAI_MODEL_POLICY", "latest"),
config.get("XAI_MODEL_PIN"),
mock_xai_models,
)
return result

View File

@@ -0,0 +1,160 @@
"""Normalization of raw API data to canonical schema."""
from typing import Any, Dict, List, TypeVar, Union
from . import dates, schema
T = TypeVar("T", schema.RedditItem, schema.XItem, schema.WebSearchItem)
def filter_by_date_range(
items: List[T],
from_date: str,
to_date: str,
require_date: bool = False,
) -> List[T]:
"""Hard filter: Remove items outside the date range.
This is the safety net - even if the prompt lets old content through,
this filter will exclude it.
Args:
items: List of items to filter
from_date: Start date (YYYY-MM-DD) - exclude items before this
to_date: End date (YYYY-MM-DD) - exclude items after this
require_date: If True, also remove items with no date
Returns:
Filtered list with only items in range (or unknown dates if not required)
"""
result = []
for item in items:
if item.date is None:
if not require_date:
result.append(item) # Keep unknown dates (with scoring penalty)
continue
# Hard filter: if date is before from_date, exclude
if item.date < from_date:
continue # DROP - too old
# Hard filter: if date is after to_date, exclude (likely parsing error)
if item.date > to_date:
continue # DROP - future date
result.append(item)
return result
def normalize_reddit_items(
items: List[Dict[str, Any]],
from_date: str,
to_date: str,
) -> List[schema.RedditItem]:
"""Normalize raw Reddit items to schema.
Args:
items: Raw Reddit items from API
from_date: Start of date range
to_date: End of date range
Returns:
List of RedditItem objects
"""
normalized = []
for item in items:
# Parse engagement
engagement = None
eng_raw = item.get("engagement")
if isinstance(eng_raw, dict):
engagement = schema.Engagement(
score=eng_raw.get("score"),
num_comments=eng_raw.get("num_comments"),
upvote_ratio=eng_raw.get("upvote_ratio"),
)
# Parse comments
top_comments = []
for c in item.get("top_comments", []):
top_comments.append(schema.Comment(
score=c.get("score", 0),
date=c.get("date"),
author=c.get("author", ""),
excerpt=c.get("excerpt", ""),
url=c.get("url", ""),
))
# Determine date confidence
date_str = item.get("date")
date_confidence = dates.get_date_confidence(date_str, from_date, to_date)
normalized.append(schema.RedditItem(
id=item.get("id", ""),
title=item.get("title", ""),
url=item.get("url", ""),
subreddit=item.get("subreddit", ""),
date=date_str,
date_confidence=date_confidence,
engagement=engagement,
top_comments=top_comments,
comment_insights=item.get("comment_insights", []),
relevance=item.get("relevance", 0.5),
why_relevant=item.get("why_relevant", ""),
))
return normalized
def normalize_x_items(
items: List[Dict[str, Any]],
from_date: str,
to_date: str,
) -> List[schema.XItem]:
"""Normalize raw X items to schema.
Args:
items: Raw X items from API
from_date: Start of date range
to_date: End of date range
Returns:
List of XItem objects
"""
normalized = []
for item in items:
# Parse engagement
engagement = None
eng_raw = item.get("engagement")
if isinstance(eng_raw, dict):
engagement = schema.Engagement(
likes=eng_raw.get("likes"),
reposts=eng_raw.get("reposts"),
replies=eng_raw.get("replies"),
quotes=eng_raw.get("quotes"),
)
# Determine date confidence
date_str = item.get("date")
date_confidence = dates.get_date_confidence(date_str, from_date, to_date)
normalized.append(schema.XItem(
id=item.get("id", ""),
text=item.get("text", ""),
url=item.get("url", ""),
author_handle=item.get("author_handle", ""),
date=date_str,
date_confidence=date_confidence,
engagement=engagement,
relevance=item.get("relevance", 0.5),
why_relevant=item.get("why_relevant", ""),
))
return normalized
def items_to_dicts(items: List) -> List[Dict[str, Any]]:
"""Convert schema items to dicts for JSON serialization."""
return [item.to_dict() for item in items]

View File

@@ -0,0 +1,230 @@
"""OpenAI Responses API client for Reddit discovery."""
import json
import re
import sys
from typing import Any, Dict, List, Optional
from . import http
def _log_error(msg: str):
"""Log error to stderr."""
sys.stderr.write(f"[REDDIT ERROR] {msg}\n")
sys.stderr.flush()
OPENAI_RESPONSES_URL = "https://api.openai.com/v1/responses"
# Depth configurations: (min, max) threads to request
# Request MORE than needed since many get filtered by date
DEPTH_CONFIG = {
"quick": (15, 25),
"default": (30, 50),
"deep": (70, 100),
}
REDDIT_SEARCH_PROMPT = """Find Reddit discussion threads about: {topic}
STEP 1: EXTRACT THE CORE SUBJECT
Get the MAIN NOUN/PRODUCT/TOPIC:
- "best nano banana prompting practices""nano banana"
- "killer features of clawdbot""clawdbot"
- "top Claude Code skills""Claude Code"
DO NOT include "best", "top", "tips", "practices", "features" in your search.
STEP 2: SEARCH BROADLY
Search for the core subject:
1. "[core subject] site:reddit.com"
2. "reddit [core subject]"
3. "[core subject] reddit"
Return as many relevant threads as you find. We filter by date server-side.
STEP 3: INCLUDE ALL MATCHES
- Include ALL threads about the core subject
- Set date to "YYYY-MM-DD" if you can determine it, otherwise null
- We verify dates and filter old content server-side
- DO NOT pre-filter aggressively - include anything relevant
REQUIRED: URLs must contain "/r/" AND "/comments/"
REJECT: developers.reddit.com, business.reddit.com
Find {min_items}-{max_items} threads. Return MORE rather than fewer.
Return JSON:
{{
"items": [
{{
"title": "Thread title",
"url": "https://www.reddit.com/r/sub/comments/xyz/title/",
"subreddit": "subreddit_name",
"date": "YYYY-MM-DD or null",
"why_relevant": "Why relevant",
"relevance": 0.85
}}
]
}}"""
def _extract_core_subject(topic: str) -> str:
"""Extract core subject from verbose query for retry."""
noise = ['best', 'top', 'how to', 'tips for', 'practices', 'features',
'killer', 'guide', 'tutorial', 'recommendations', 'advice',
'prompting', 'using', 'for', 'with', 'the', 'of', 'in', 'on']
words = topic.lower().split()
result = [w for w in words if w not in noise]
return ' '.join(result[:3]) or topic # Keep max 3 words
def search_reddit(
api_key: str,
model: str,
topic: str,
from_date: str,
to_date: str,
depth: str = "default",
mock_response: Optional[Dict] = None,
_retry: bool = False,
) -> Dict[str, Any]:
"""Search Reddit for relevant threads using OpenAI Responses API.
Args:
api_key: OpenAI API key
model: Model to use
topic: Search topic
from_date: Start date (YYYY-MM-DD) - only include threads after this
to_date: End date (YYYY-MM-DD) - only include threads before this
depth: Research depth - "quick", "default", or "deep"
mock_response: Mock response for testing
Returns:
Raw API response
"""
if mock_response is not None:
return mock_response
min_items, max_items = DEPTH_CONFIG.get(depth, DEPTH_CONFIG["default"])
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Adjust timeout based on depth (generous for OpenAI web_search which can be slow)
timeout = 90 if depth == "quick" else 120 if depth == "default" else 180
# Note: allowed_domains accepts base domain, not subdomains
# We rely on prompt to filter out developers.reddit.com, etc.
payload = {
"model": model,
"tools": [
{
"type": "web_search",
"filters": {
"allowed_domains": ["reddit.com"]
}
}
],
"include": ["web_search_call.action.sources"],
"input": REDDIT_SEARCH_PROMPT.format(
topic=topic,
from_date=from_date,
to_date=to_date,
min_items=min_items,
max_items=max_items,
),
}
return http.post(OPENAI_RESPONSES_URL, payload, headers=headers, timeout=timeout)
def parse_reddit_response(response: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse OpenAI response to extract Reddit items.
Args:
response: Raw API response
Returns:
List of item dicts
"""
items = []
# Check for API errors first
if "error" in response and response["error"]:
error = response["error"]
err_msg = error.get("message", str(error)) if isinstance(error, dict) else str(error)
_log_error(f"OpenAI API error: {err_msg}")
if http.DEBUG:
_log_error(f"Full error response: {json.dumps(response, indent=2)[:1000]}")
return items
# Try to find the output text
output_text = ""
if "output" in response:
output = response["output"]
if isinstance(output, str):
output_text = output
elif isinstance(output, list):
for item in output:
if isinstance(item, dict):
if item.get("type") == "message":
content = item.get("content", [])
for c in content:
if isinstance(c, dict) and c.get("type") == "output_text":
output_text = c.get("text", "")
break
elif "text" in item:
output_text = item["text"]
elif isinstance(item, str):
output_text = item
if output_text:
break
# Also check for choices (older format)
if not output_text and "choices" in response:
for choice in response["choices"]:
if "message" in choice:
output_text = choice["message"].get("content", "")
break
if not output_text:
print(f"[REDDIT WARNING] No output text found in OpenAI response. Keys present: {list(response.keys())}", flush=True)
return items
# Extract JSON from the response
json_match = re.search(r'\{[\s\S]*"items"[\s\S]*\}', output_text)
if json_match:
try:
data = json.loads(json_match.group())
items = data.get("items", [])
except json.JSONDecodeError:
pass
# Validate and clean items
clean_items = []
for i, item in enumerate(items):
if not isinstance(item, dict):
continue
url = item.get("url", "")
if not url or "reddit.com" not in url:
continue
clean_item = {
"id": f"R{i+1}",
"title": str(item.get("title", "")).strip(),
"url": url,
"subreddit": str(item.get("subreddit", "")).strip().lstrip("r/"),
"date": item.get("date"),
"why_relevant": str(item.get("why_relevant", "")).strip(),
"relevance": min(1.0, max(0.0, float(item.get("relevance", 0.5)))),
}
# Validate date format
if clean_item["date"]:
if not re.match(r'^\d{4}-\d{2}-\d{2}$', str(clean_item["date"])):
clean_item["date"] = None
clean_items.append(clean_item)
return clean_items

View File

@@ -0,0 +1,232 @@
"""Reddit thread enrichment with real engagement metrics."""
import re
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from . import http, dates
def extract_reddit_path(url: str) -> Optional[str]:
"""Extract the path from a Reddit URL.
Args:
url: Reddit URL
Returns:
Path component or None
"""
try:
parsed = urlparse(url)
if "reddit.com" not in parsed.netloc:
return None
return parsed.path
except:
return None
def fetch_thread_data(url: str, mock_data: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""Fetch Reddit thread JSON data.
Args:
url: Reddit thread URL
mock_data: Mock data for testing
Returns:
Thread data dict or None on failure
"""
if mock_data is not None:
return mock_data
path = extract_reddit_path(url)
if not path:
return None
try:
data = http.get_reddit_json(path)
return data
except http.HTTPError:
return None
def parse_thread_data(data: Any) -> Dict[str, Any]:
"""Parse Reddit thread JSON into structured data.
Args:
data: Raw Reddit JSON response
Returns:
Dict with submission and comments data
"""
result = {
"submission": None,
"comments": [],
}
if not isinstance(data, list) or len(data) < 1:
return result
# First element is submission listing
submission_listing = data[0]
if isinstance(submission_listing, dict):
children = submission_listing.get("data", {}).get("children", [])
if children:
sub_data = children[0].get("data", {})
result["submission"] = {
"score": sub_data.get("score"),
"num_comments": sub_data.get("num_comments"),
"upvote_ratio": sub_data.get("upvote_ratio"),
"created_utc": sub_data.get("created_utc"),
"permalink": sub_data.get("permalink"),
"title": sub_data.get("title"),
"selftext": sub_data.get("selftext", "")[:500], # Truncate
}
# Second element is comments listing
if len(data) >= 2:
comments_listing = data[1]
if isinstance(comments_listing, dict):
children = comments_listing.get("data", {}).get("children", [])
for child in children:
if child.get("kind") != "t1": # t1 = comment
continue
c_data = child.get("data", {})
if not c_data.get("body"):
continue
comment = {
"score": c_data.get("score", 0),
"created_utc": c_data.get("created_utc"),
"author": c_data.get("author", "[deleted]"),
"body": c_data.get("body", "")[:300], # Truncate
"permalink": c_data.get("permalink"),
}
result["comments"].append(comment)
return result
def get_top_comments(comments: List[Dict], limit: int = 10) -> List[Dict[str, Any]]:
"""Get top comments sorted by score.
Args:
comments: List of comment dicts
limit: Maximum number to return
Returns:
Top comments sorted by score
"""
# Filter out deleted/removed
valid = [c for c in comments if c.get("author") not in ("[deleted]", "[removed]")]
# Sort by score descending
sorted_comments = sorted(valid, key=lambda c: c.get("score", 0), reverse=True)
return sorted_comments[:limit]
def extract_comment_insights(comments: List[Dict], limit: int = 7) -> List[str]:
"""Extract key insights from top comments.
Uses simple heuristics to identify valuable comments:
- Has substantive text
- Contains actionable information
- Not just agreement/disagreement
Args:
comments: Top comments
limit: Max insights to extract
Returns:
List of insight strings
"""
insights = []
for comment in comments[:limit * 2]: # Look at more comments than we need
body = comment.get("body", "").strip()
if not body or len(body) < 30:
continue
# Skip low-value patterns
skip_patterns = [
r'^(this|same|agreed|exactly|yep|nope|yes|no|thanks|thank you)\.?$',
r'^lol|lmao|haha',
r'^\[deleted\]',
r'^\[removed\]',
]
if any(re.match(p, body.lower()) for p in skip_patterns):
continue
# Truncate to first meaningful sentence or ~150 chars
insight = body[:150]
if len(body) > 150:
# Try to find a sentence boundary
for i, char in enumerate(insight):
if char in '.!?' and i > 50:
insight = insight[:i+1]
break
else:
insight = insight.rstrip() + "..."
insights.append(insight)
if len(insights) >= limit:
break
return insights
def enrich_reddit_item(
item: Dict[str, Any],
mock_thread_data: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Enrich a Reddit item with real engagement data.
Args:
item: Reddit item dict
mock_thread_data: Mock data for testing
Returns:
Enriched item dict
"""
url = item.get("url", "")
# Fetch thread data
thread_data = fetch_thread_data(url, mock_thread_data)
if not thread_data:
return item
parsed = parse_thread_data(thread_data)
submission = parsed.get("submission")
comments = parsed.get("comments", [])
# Update engagement metrics
if submission:
item["engagement"] = {
"score": submission.get("score"),
"num_comments": submission.get("num_comments"),
"upvote_ratio": submission.get("upvote_ratio"),
}
# Update date from actual data
created_utc = submission.get("created_utc")
if created_utc:
item["date"] = dates.timestamp_to_date(created_utc)
# Get top comments
top_comments = get_top_comments(comments)
item["top_comments"] = []
for c in top_comments:
permalink = c.get("permalink", "")
comment_url = f"https://reddit.com{permalink}" if permalink else ""
item["top_comments"].append({
"score": c.get("score", 0),
"date": dates.timestamp_to_date(c.get("created_utc")),
"author": c.get("author", ""),
"excerpt": c.get("body", "")[:200],
"url": comment_url,
})
# Extract insights
item["comment_insights"] = extract_comment_insights(top_comments)
return item

View File

@@ -0,0 +1,383 @@
"""Output rendering for last30days skill."""
import json
from pathlib import Path
from typing import List, Optional
from . import schema
OUTPUT_DIR = Path.home() / ".local" / "share" / "last30days" / "out"
def ensure_output_dir():
"""Ensure output directory exists."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
def _assess_data_freshness(report: schema.Report) -> dict:
"""Assess how much data is actually from the last 30 days."""
reddit_recent = sum(1 for r in report.reddit if r.date and r.date >= report.range_from)
x_recent = sum(1 for x in report.x if x.date and x.date >= report.range_from)
web_recent = sum(1 for w in report.web if w.date and w.date >= report.range_from)
total_recent = reddit_recent + x_recent + web_recent
total_items = len(report.reddit) + len(report.x) + len(report.web)
return {
"reddit_recent": reddit_recent,
"x_recent": x_recent,
"web_recent": web_recent,
"total_recent": total_recent,
"total_items": total_items,
"is_sparse": total_recent < 5,
"mostly_evergreen": total_items > 0 and total_recent < total_items * 0.3,
}
def render_compact(report: schema.Report, limit: int = 15, missing_keys: str = "none") -> str:
"""Render compact output for Claude to synthesize.
Args:
report: Report data
limit: Max items per source
missing_keys: 'both', 'reddit', 'x', or 'none'
Returns:
Compact markdown string
"""
lines = []
# Header
lines.append(f"## Research Results: {report.topic}")
lines.append("")
# Assess data freshness and add honesty warning if needed
freshness = _assess_data_freshness(report)
if freshness["is_sparse"]:
lines.append("**⚠️ LIMITED RECENT DATA** - Few discussions from the last 30 days.")
lines.append(f"Only {freshness['total_recent']} item(s) confirmed from {report.range_from} to {report.range_to}.")
lines.append("Results below may include older/evergreen content. Be transparent with the user about this.")
lines.append("")
# Web-only mode banner (when no API keys)
if report.mode == "web-only":
lines.append("**🌐 WEB SEARCH MODE** - Claude will search blogs, docs & news")
lines.append("")
lines.append("---")
lines.append("**⚡ Want better results?** Add API keys to unlock Reddit & X data:")
lines.append("- `OPENAI_API_KEY` → Reddit threads with real upvotes & comments")
lines.append("- `XAI_API_KEY` → X posts with real likes & reposts")
lines.append("- Edit `~/.config/last30days/.env` to add keys")
lines.append("---")
lines.append("")
# Cache indicator
if report.from_cache:
age_str = f"{report.cache_age_hours:.1f}h old" if report.cache_age_hours else "cached"
lines.append(f"**⚡ CACHED RESULTS** ({age_str}) - use `--refresh` for fresh data")
lines.append("")
lines.append(f"**Date Range:** {report.range_from} to {report.range_to}")
lines.append(f"**Mode:** {report.mode}")
if report.openai_model_used:
lines.append(f"**OpenAI Model:** {report.openai_model_used}")
if report.xai_model_used:
lines.append(f"**xAI Model:** {report.xai_model_used}")
lines.append("")
# Coverage note for partial coverage
if report.mode == "reddit-only" and missing_keys == "x":
lines.append("*💡 Tip: Add XAI_API_KEY for X/Twitter data and better triangulation.*")
lines.append("")
elif report.mode == "x-only" and missing_keys == "reddit":
lines.append("*💡 Tip: Add OPENAI_API_KEY for Reddit data and better triangulation.*")
lines.append("")
# Reddit items
if report.reddit_error:
lines.append("### Reddit Threads")
lines.append("")
lines.append(f"**ERROR:** {report.reddit_error}")
lines.append("")
elif report.mode in ("both", "reddit-only") and not report.reddit:
lines.append("### Reddit Threads")
lines.append("")
lines.append("*No relevant Reddit threads found for this topic.*")
lines.append("")
elif report.reddit:
lines.append("### Reddit Threads")
lines.append("")
for item in report.reddit[:limit]:
eng_str = ""
if item.engagement:
eng = item.engagement
parts = []
if eng.score is not None:
parts.append(f"{eng.score}pts")
if eng.num_comments is not None:
parts.append(f"{eng.num_comments}cmt")
if parts:
eng_str = f" [{', '.join(parts)}]"
date_str = f" ({item.date})" if item.date else " (date unknown)"
conf_str = f" [date:{item.date_confidence}]" if item.date_confidence != "high" else ""
lines.append(f"**{item.id}** (score:{item.score}) r/{item.subreddit}{date_str}{conf_str}{eng_str}")
lines.append(f" {item.title}")
lines.append(f" {item.url}")
lines.append(f" *{item.why_relevant}*")
# Top comment insights
if item.comment_insights:
lines.append(f" Insights:")
for insight in item.comment_insights[:3]:
lines.append(f" - {insight}")
lines.append("")
# X items
if report.x_error:
lines.append("### X Posts")
lines.append("")
lines.append(f"**ERROR:** {report.x_error}")
lines.append("")
elif report.mode in ("both", "x-only", "all", "x-web") and not report.x:
lines.append("### X Posts")
lines.append("")
lines.append("*No relevant X posts found for this topic.*")
lines.append("")
elif report.x:
lines.append("### X Posts")
lines.append("")
for item in report.x[:limit]:
eng_str = ""
if item.engagement:
eng = item.engagement
parts = []
if eng.likes is not None:
parts.append(f"{eng.likes}likes")
if eng.reposts is not None:
parts.append(f"{eng.reposts}rt")
if parts:
eng_str = f" [{', '.join(parts)}]"
date_str = f" ({item.date})" if item.date else " (date unknown)"
conf_str = f" [date:{item.date_confidence}]" if item.date_confidence != "high" else ""
lines.append(f"**{item.id}** (score:{item.score}) @{item.author_handle}{date_str}{conf_str}{eng_str}")
lines.append(f" {item.text[:200]}...")
lines.append(f" {item.url}")
lines.append(f" *{item.why_relevant}*")
lines.append("")
# Web items (if any - populated by Claude)
if report.web_error:
lines.append("### Web Results")
lines.append("")
lines.append(f"**ERROR:** {report.web_error}")
lines.append("")
elif report.web:
lines.append("### Web Results")
lines.append("")
for item in report.web[:limit]:
date_str = f" ({item.date})" if item.date else " (date unknown)"
conf_str = f" [date:{item.date_confidence}]" if item.date_confidence != "high" else ""
lines.append(f"**{item.id}** [WEB] (score:{item.score}) {item.source_domain}{date_str}{conf_str}")
lines.append(f" {item.title}")
lines.append(f" {item.url}")
lines.append(f" {item.snippet[:150]}...")
lines.append(f" *{item.why_relevant}*")
lines.append("")
return "\n".join(lines)
def render_context_snippet(report: schema.Report) -> str:
"""Render reusable context snippet.
Args:
report: Report data
Returns:
Context markdown string
"""
lines = []
lines.append(f"# Context: {report.topic} (Last 30 Days)")
lines.append("")
lines.append(f"*Generated: {report.generated_at[:10]} | Sources: {report.mode}*")
lines.append("")
# Key sources summary
lines.append("## Key Sources")
lines.append("")
all_items = []
for item in report.reddit[:5]:
all_items.append((item.score, "Reddit", item.title, item.url))
for item in report.x[:5]:
all_items.append((item.score, "X", item.text[:50] + "...", item.url))
for item in report.web[:5]:
all_items.append((item.score, "Web", item.title[:50] + "...", item.url))
all_items.sort(key=lambda x: -x[0])
for score, source, text, url in all_items[:7]:
lines.append(f"- [{source}] {text}")
lines.append("")
lines.append("## Summary")
lines.append("")
lines.append("*See full report for best practices, prompt pack, and detailed sources.*")
lines.append("")
return "\n".join(lines)
def render_full_report(report: schema.Report) -> str:
"""Render full markdown report.
Args:
report: Report data
Returns:
Full report markdown
"""
lines = []
# Title
lines.append(f"# {report.topic} - Last 30 Days Research Report")
lines.append("")
lines.append(f"**Generated:** {report.generated_at}")
lines.append(f"**Date Range:** {report.range_from} to {report.range_to}")
lines.append(f"**Mode:** {report.mode}")
lines.append("")
# Models
lines.append("## Models Used")
lines.append("")
if report.openai_model_used:
lines.append(f"- **OpenAI:** {report.openai_model_used}")
if report.xai_model_used:
lines.append(f"- **xAI:** {report.xai_model_used}")
lines.append("")
# Reddit section
if report.reddit:
lines.append("## Reddit Threads")
lines.append("")
for item in report.reddit:
lines.append(f"### {item.id}: {item.title}")
lines.append("")
lines.append(f"- **Subreddit:** r/{item.subreddit}")
lines.append(f"- **URL:** {item.url}")
lines.append(f"- **Date:** {item.date or 'Unknown'} (confidence: {item.date_confidence})")
lines.append(f"- **Score:** {item.score}/100")
lines.append(f"- **Relevance:** {item.why_relevant}")
if item.engagement:
eng = item.engagement
lines.append(f"- **Engagement:** {eng.score or '?'} points, {eng.num_comments or '?'} comments")
if item.comment_insights:
lines.append("")
lines.append("**Key Insights from Comments:**")
for insight in item.comment_insights:
lines.append(f"- {insight}")
lines.append("")
# X section
if report.x:
lines.append("## X Posts")
lines.append("")
for item in report.x:
lines.append(f"### {item.id}: @{item.author_handle}")
lines.append("")
lines.append(f"- **URL:** {item.url}")
lines.append(f"- **Date:** {item.date or 'Unknown'} (confidence: {item.date_confidence})")
lines.append(f"- **Score:** {item.score}/100")
lines.append(f"- **Relevance:** {item.why_relevant}")
if item.engagement:
eng = item.engagement
lines.append(f"- **Engagement:** {eng.likes or '?'} likes, {eng.reposts or '?'} reposts")
lines.append("")
lines.append(f"> {item.text}")
lines.append("")
# Web section
if report.web:
lines.append("## Web Results")
lines.append("")
for item in report.web:
lines.append(f"### {item.id}: {item.title}")
lines.append("")
lines.append(f"- **Source:** {item.source_domain}")
lines.append(f"- **URL:** {item.url}")
lines.append(f"- **Date:** {item.date or 'Unknown'} (confidence: {item.date_confidence})")
lines.append(f"- **Score:** {item.score}/100")
lines.append(f"- **Relevance:** {item.why_relevant}")
lines.append("")
lines.append(f"> {item.snippet}")
lines.append("")
# Placeholders for Claude synthesis
lines.append("## Best Practices")
lines.append("")
lines.append("*To be synthesized by Claude*")
lines.append("")
lines.append("## Prompt Pack")
lines.append("")
lines.append("*To be synthesized by Claude*")
lines.append("")
return "\n".join(lines)
def write_outputs(
report: schema.Report,
raw_openai: Optional[dict] = None,
raw_xai: Optional[dict] = None,
raw_reddit_enriched: Optional[list] = None,
):
"""Write all output files.
Args:
report: Report data
raw_openai: Raw OpenAI API response
raw_xai: Raw xAI API response
raw_reddit_enriched: Raw enriched Reddit thread data
"""
ensure_output_dir()
# report.json
with open(OUTPUT_DIR / "report.json", 'w') as f:
json.dump(report.to_dict(), f, indent=2)
# report.md
with open(OUTPUT_DIR / "report.md", 'w') as f:
f.write(render_full_report(report))
# last30days.context.md
with open(OUTPUT_DIR / "last30days.context.md", 'w') as f:
f.write(render_context_snippet(report))
# Raw responses
if raw_openai:
with open(OUTPUT_DIR / "raw_openai.json", 'w') as f:
json.dump(raw_openai, f, indent=2)
if raw_xai:
with open(OUTPUT_DIR / "raw_xai.json", 'w') as f:
json.dump(raw_xai, f, indent=2)
if raw_reddit_enriched:
with open(OUTPUT_DIR / "raw_reddit_threads_enriched.json", 'w') as f:
json.dump(raw_reddit_enriched, f, indent=2)
def get_context_path() -> str:
"""Get path to context file."""
return str(OUTPUT_DIR / "last30days.context.md")

View File

@@ -0,0 +1,336 @@
"""Data schemas for last30days skill."""
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
from datetime import datetime, timezone
@dataclass
class Engagement:
"""Engagement metrics."""
# Reddit fields
score: Optional[int] = None
num_comments: Optional[int] = None
upvote_ratio: Optional[float] = None
# X fields
likes: Optional[int] = None
reposts: Optional[int] = None
replies: Optional[int] = None
quotes: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
d = {}
if self.score is not None:
d['score'] = self.score
if self.num_comments is not None:
d['num_comments'] = self.num_comments
if self.upvote_ratio is not None:
d['upvote_ratio'] = self.upvote_ratio
if self.likes is not None:
d['likes'] = self.likes
if self.reposts is not None:
d['reposts'] = self.reposts
if self.replies is not None:
d['replies'] = self.replies
if self.quotes is not None:
d['quotes'] = self.quotes
return d if d else None
@dataclass
class Comment:
"""Reddit comment."""
score: int
date: Optional[str]
author: str
excerpt: str
url: str
def to_dict(self) -> Dict[str, Any]:
return {
'score': self.score,
'date': self.date,
'author': self.author,
'excerpt': self.excerpt,
'url': self.url,
}
@dataclass
class SubScores:
"""Component scores."""
relevance: int = 0
recency: int = 0
engagement: int = 0
def to_dict(self) -> Dict[str, int]:
return {
'relevance': self.relevance,
'recency': self.recency,
'engagement': self.engagement,
}
@dataclass
class RedditItem:
"""Normalized Reddit item."""
id: str
title: str
url: str
subreddit: str
date: Optional[str] = None
date_confidence: str = "low"
engagement: Optional[Engagement] = None
top_comments: List[Comment] = field(default_factory=list)
comment_insights: List[str] = field(default_factory=list)
relevance: float = 0.5
why_relevant: str = ""
subs: SubScores = field(default_factory=SubScores)
score: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'title': self.title,
'url': self.url,
'subreddit': self.subreddit,
'date': self.date,
'date_confidence': self.date_confidence,
'engagement': self.engagement.to_dict() if self.engagement else None,
'top_comments': [c.to_dict() for c in self.top_comments],
'comment_insights': self.comment_insights,
'relevance': self.relevance,
'why_relevant': self.why_relevant,
'subs': self.subs.to_dict(),
'score': self.score,
}
@dataclass
class XItem:
"""Normalized X item."""
id: str
text: str
url: str
author_handle: str
date: Optional[str] = None
date_confidence: str = "low"
engagement: Optional[Engagement] = None
relevance: float = 0.5
why_relevant: str = ""
subs: SubScores = field(default_factory=SubScores)
score: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'text': self.text,
'url': self.url,
'author_handle': self.author_handle,
'date': self.date,
'date_confidence': self.date_confidence,
'engagement': self.engagement.to_dict() if self.engagement else None,
'relevance': self.relevance,
'why_relevant': self.why_relevant,
'subs': self.subs.to_dict(),
'score': self.score,
}
@dataclass
class WebSearchItem:
"""Normalized web search item (no engagement metrics)."""
id: str
title: str
url: str
source_domain: str # e.g., "medium.com", "github.com"
snippet: str
date: Optional[str] = None
date_confidence: str = "low"
relevance: float = 0.5
why_relevant: str = ""
subs: SubScores = field(default_factory=SubScores)
score: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
'id': self.id,
'title': self.title,
'url': self.url,
'source_domain': self.source_domain,
'snippet': self.snippet,
'date': self.date,
'date_confidence': self.date_confidence,
'relevance': self.relevance,
'why_relevant': self.why_relevant,
'subs': self.subs.to_dict(),
'score': self.score,
}
@dataclass
class Report:
"""Full research report."""
topic: str
range_from: str
range_to: str
generated_at: str
mode: str # 'reddit-only', 'x-only', 'both', 'web-only', etc.
openai_model_used: Optional[str] = None
xai_model_used: Optional[str] = None
reddit: List[RedditItem] = field(default_factory=list)
x: List[XItem] = field(default_factory=list)
web: List[WebSearchItem] = field(default_factory=list)
best_practices: List[str] = field(default_factory=list)
prompt_pack: List[str] = field(default_factory=list)
context_snippet_md: str = ""
# Status tracking
reddit_error: Optional[str] = None
x_error: Optional[str] = None
web_error: Optional[str] = None
# Cache info
from_cache: bool = False
cache_age_hours: Optional[float] = None
def to_dict(self) -> Dict[str, Any]:
d = {
'topic': self.topic,
'range': {
'from': self.range_from,
'to': self.range_to,
},
'generated_at': self.generated_at,
'mode': self.mode,
'openai_model_used': self.openai_model_used,
'xai_model_used': self.xai_model_used,
'reddit': [r.to_dict() for r in self.reddit],
'x': [x.to_dict() for x in self.x],
'web': [w.to_dict() for w in self.web],
'best_practices': self.best_practices,
'prompt_pack': self.prompt_pack,
'context_snippet_md': self.context_snippet_md,
}
if self.reddit_error:
d['reddit_error'] = self.reddit_error
if self.x_error:
d['x_error'] = self.x_error
if self.web_error:
d['web_error'] = self.web_error
if self.from_cache:
d['from_cache'] = self.from_cache
if self.cache_age_hours is not None:
d['cache_age_hours'] = self.cache_age_hours
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Report":
"""Create Report from serialized dict (handles cache format)."""
# Handle range field conversion
range_data = data.get('range', {})
range_from = range_data.get('from', data.get('range_from', ''))
range_to = range_data.get('to', data.get('range_to', ''))
# Reconstruct Reddit items
reddit_items = []
for r in data.get('reddit', []):
eng = None
if r.get('engagement'):
eng = Engagement(**r['engagement'])
comments = [Comment(**c) for c in r.get('top_comments', [])]
subs = SubScores(**r.get('subs', {})) if r.get('subs') else SubScores()
reddit_items.append(RedditItem(
id=r['id'],
title=r['title'],
url=r['url'],
subreddit=r['subreddit'],
date=r.get('date'),
date_confidence=r.get('date_confidence', 'low'),
engagement=eng,
top_comments=comments,
comment_insights=r.get('comment_insights', []),
relevance=r.get('relevance', 0.5),
why_relevant=r.get('why_relevant', ''),
subs=subs,
score=r.get('score', 0),
))
# Reconstruct X items
x_items = []
for x in data.get('x', []):
eng = None
if x.get('engagement'):
eng = Engagement(**x['engagement'])
subs = SubScores(**x.get('subs', {})) if x.get('subs') else SubScores()
x_items.append(XItem(
id=x['id'],
text=x['text'],
url=x['url'],
author_handle=x['author_handle'],
date=x.get('date'),
date_confidence=x.get('date_confidence', 'low'),
engagement=eng,
relevance=x.get('relevance', 0.5),
why_relevant=x.get('why_relevant', ''),
subs=subs,
score=x.get('score', 0),
))
# Reconstruct Web items
web_items = []
for w in data.get('web', []):
subs = SubScores(**w.get('subs', {})) if w.get('subs') else SubScores()
web_items.append(WebSearchItem(
id=w['id'],
title=w['title'],
url=w['url'],
source_domain=w.get('source_domain', ''),
snippet=w.get('snippet', ''),
date=w.get('date'),
date_confidence=w.get('date_confidence', 'low'),
relevance=w.get('relevance', 0.5),
why_relevant=w.get('why_relevant', ''),
subs=subs,
score=w.get('score', 0),
))
return cls(
topic=data['topic'],
range_from=range_from,
range_to=range_to,
generated_at=data['generated_at'],
mode=data['mode'],
openai_model_used=data.get('openai_model_used'),
xai_model_used=data.get('xai_model_used'),
reddit=reddit_items,
x=x_items,
web=web_items,
best_practices=data.get('best_practices', []),
prompt_pack=data.get('prompt_pack', []),
context_snippet_md=data.get('context_snippet_md', ''),
reddit_error=data.get('reddit_error'),
x_error=data.get('x_error'),
web_error=data.get('web_error'),
from_cache=data.get('from_cache', False),
cache_age_hours=data.get('cache_age_hours'),
)
def create_report(
topic: str,
from_date: str,
to_date: str,
mode: str,
openai_model: Optional[str] = None,
xai_model: Optional[str] = None,
) -> Report:
"""Create a new report with metadata."""
return Report(
topic=topic,
range_from=from_date,
range_to=to_date,
generated_at=datetime.now(timezone.utc).isoformat(),
mode=mode,
openai_model_used=openai_model,
xai_model_used=xai_model,
)

View File

@@ -0,0 +1,311 @@
"""Popularity-aware scoring for last30days skill."""
import math
from typing import List, Optional, Union
from . import dates, schema
# Score weights for Reddit/X (has engagement)
WEIGHT_RELEVANCE = 0.45
WEIGHT_RECENCY = 0.25
WEIGHT_ENGAGEMENT = 0.30
# WebSearch weights (no engagement, reweighted to 100%)
WEBSEARCH_WEIGHT_RELEVANCE = 0.55
WEBSEARCH_WEIGHT_RECENCY = 0.45
WEBSEARCH_SOURCE_PENALTY = 15 # Points deducted for lacking engagement
# WebSearch date confidence adjustments
WEBSEARCH_VERIFIED_BONUS = 10 # Bonus for URL-verified recent date (high confidence)
WEBSEARCH_NO_DATE_PENALTY = 20 # Heavy penalty for no date signals (low confidence)
# Default engagement score for unknown
DEFAULT_ENGAGEMENT = 35
UNKNOWN_ENGAGEMENT_PENALTY = 10
def log1p_safe(x: Optional[int]) -> float:
"""Safe log1p that handles None and negative values."""
if x is None or x < 0:
return 0.0
return math.log1p(x)
def compute_reddit_engagement_raw(engagement: Optional[schema.Engagement]) -> Optional[float]:
"""Compute raw engagement score for Reddit item.
Formula: 0.55*log1p(score) + 0.40*log1p(num_comments) + 0.05*(upvote_ratio*10)
"""
if engagement is None:
return None
if engagement.score is None and engagement.num_comments is None:
return None
score = log1p_safe(engagement.score)
comments = log1p_safe(engagement.num_comments)
ratio = (engagement.upvote_ratio or 0.5) * 10
return 0.55 * score + 0.40 * comments + 0.05 * ratio
def compute_x_engagement_raw(engagement: Optional[schema.Engagement]) -> Optional[float]:
"""Compute raw engagement score for X item.
Formula: 0.55*log1p(likes) + 0.25*log1p(reposts) + 0.15*log1p(replies) + 0.05*log1p(quotes)
"""
if engagement is None:
return None
if engagement.likes is None and engagement.reposts is None:
return None
likes = log1p_safe(engagement.likes)
reposts = log1p_safe(engagement.reposts)
replies = log1p_safe(engagement.replies)
quotes = log1p_safe(engagement.quotes)
return 0.55 * likes + 0.25 * reposts + 0.15 * replies + 0.05 * quotes
def normalize_to_100(values: List[float], default: float = 50) -> List[float]:
"""Normalize a list of values to 0-100 scale.
Args:
values: Raw values (None values are preserved)
default: Default value for None entries
Returns:
Normalized values
"""
# Filter out None
valid = [v for v in values if v is not None]
if not valid:
return [default if v is None else 50 for v in values]
min_val = min(valid)
max_val = max(valid)
range_val = max_val - min_val
if range_val == 0:
return [50 if v is None else 50 for v in values]
result = []
for v in values:
if v is None:
result.append(None)
else:
normalized = ((v - min_val) / range_val) * 100
result.append(normalized)
return result
def score_reddit_items(items: List[schema.RedditItem]) -> List[schema.RedditItem]:
"""Compute scores for Reddit items.
Args:
items: List of Reddit items
Returns:
Items with updated scores
"""
if not items:
return items
# Compute raw engagement scores
eng_raw = [compute_reddit_engagement_raw(item.engagement) for item in items]
# Normalize engagement to 0-100
eng_normalized = normalize_to_100(eng_raw)
for i, item in enumerate(items):
# Relevance subscore (model-provided, convert to 0-100)
rel_score = int(item.relevance * 100)
# Recency subscore
rec_score = dates.recency_score(item.date)
# Engagement subscore
if eng_normalized[i] is not None:
eng_score = int(eng_normalized[i])
else:
eng_score = DEFAULT_ENGAGEMENT
# Store subscores
item.subs = schema.SubScores(
relevance=rel_score,
recency=rec_score,
engagement=eng_score,
)
# Compute overall score
overall = (
WEIGHT_RELEVANCE * rel_score +
WEIGHT_RECENCY * rec_score +
WEIGHT_ENGAGEMENT * eng_score
)
# Apply penalty for unknown engagement
if eng_raw[i] is None:
overall -= UNKNOWN_ENGAGEMENT_PENALTY
# Apply penalty for low date confidence
if item.date_confidence == "low":
overall -= 10
elif item.date_confidence == "med":
overall -= 5
item.score = max(0, min(100, int(overall)))
return items
def score_x_items(items: List[schema.XItem]) -> List[schema.XItem]:
"""Compute scores for X items.
Args:
items: List of X items
Returns:
Items with updated scores
"""
if not items:
return items
# Compute raw engagement scores
eng_raw = [compute_x_engagement_raw(item.engagement) for item in items]
# Normalize engagement to 0-100
eng_normalized = normalize_to_100(eng_raw)
for i, item in enumerate(items):
# Relevance subscore (model-provided, convert to 0-100)
rel_score = int(item.relevance * 100)
# Recency subscore
rec_score = dates.recency_score(item.date)
# Engagement subscore
if eng_normalized[i] is not None:
eng_score = int(eng_normalized[i])
else:
eng_score = DEFAULT_ENGAGEMENT
# Store subscores
item.subs = schema.SubScores(
relevance=rel_score,
recency=rec_score,
engagement=eng_score,
)
# Compute overall score
overall = (
WEIGHT_RELEVANCE * rel_score +
WEIGHT_RECENCY * rec_score +
WEIGHT_ENGAGEMENT * eng_score
)
# Apply penalty for unknown engagement
if eng_raw[i] is None:
overall -= UNKNOWN_ENGAGEMENT_PENALTY
# Apply penalty for low date confidence
if item.date_confidence == "low":
overall -= 10
elif item.date_confidence == "med":
overall -= 5
item.score = max(0, min(100, int(overall)))
return items
def score_websearch_items(items: List[schema.WebSearchItem]) -> List[schema.WebSearchItem]:
"""Compute scores for WebSearch items WITHOUT engagement metrics.
Uses reweighted formula: 55% relevance + 45% recency - 15pt source penalty.
This ensures WebSearch items rank below comparable Reddit/X items.
Date confidence adjustments:
- High confidence (URL-verified date): +10 bonus
- Med confidence (snippet-extracted date): no change
- Low confidence (no date signals): -20 penalty
Args:
items: List of WebSearch items
Returns:
Items with updated scores
"""
if not items:
return items
for item in items:
# Relevance subscore (model-provided, convert to 0-100)
rel_score = int(item.relevance * 100)
# Recency subscore
rec_score = dates.recency_score(item.date)
# Store subscores (engagement is 0 for WebSearch - no data)
item.subs = schema.SubScores(
relevance=rel_score,
recency=rec_score,
engagement=0, # Explicitly zero - no engagement data available
)
# Compute overall score using WebSearch weights
overall = (
WEBSEARCH_WEIGHT_RELEVANCE * rel_score +
WEBSEARCH_WEIGHT_RECENCY * rec_score
)
# Apply source penalty (WebSearch < Reddit/X for same relevance/recency)
overall -= WEBSEARCH_SOURCE_PENALTY
# Apply date confidence adjustments
# High confidence (URL-verified): reward with bonus
# Med confidence (snippet-extracted): neutral
# Low confidence (no date signals): heavy penalty
if item.date_confidence == "high":
overall += WEBSEARCH_VERIFIED_BONUS # Reward verified recent dates
elif item.date_confidence == "low":
overall -= WEBSEARCH_NO_DATE_PENALTY # Heavy penalty for unknown
item.score = max(0, min(100, int(overall)))
return items
def sort_items(items: List[Union[schema.RedditItem, schema.XItem, schema.WebSearchItem]]) -> List:
"""Sort items by score (descending), then date, then source priority.
Args:
items: List of items to sort
Returns:
Sorted items
"""
def sort_key(item):
# Primary: score descending (negate for descending)
score = -item.score
# Secondary: date descending (recent first)
date = item.date or "0000-00-00"
date_key = -int(date.replace("-", ""))
# Tertiary: source priority (Reddit > X > WebSearch)
if isinstance(item, schema.RedditItem):
source_priority = 0
elif isinstance(item, schema.XItem):
source_priority = 1
else: # WebSearchItem
source_priority = 2
# Quaternary: title/text for stability
text = getattr(item, "title", "") or getattr(item, "text", "")
return (score, date_key, source_priority, text)
return sorted(items, key=sort_key)

View File

@@ -0,0 +1,324 @@
"""Terminal UI utilities for last30days skill."""
import os
import sys
import time
import threading
import random
from typing import Optional
# Check if we're in a real terminal (not captured by Claude Code)
IS_TTY = sys.stderr.isatty()
# ANSI color codes
class Colors:
PURPLE = '\033[95m'
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
DIM = '\033[2m'
RESET = '\033[0m'
BANNER = f"""{Colors.PURPLE}{Colors.BOLD}
██╗ █████╗ ███████╗████████╗██████╗ ██████╗ ██████╗ █████╗ ██╗ ██╗███████╗
██║ ██╔══██╗██╔════╝╚══██╔══╝╚════██╗██╔═████╗██╔══██╗██╔══██╗╚██╗ ██╔╝██╔════╝
██║ ███████║███████╗ ██║ █████╔╝██║██╔██║██║ ██║███████║ ╚████╔╝ ███████╗
██║ ██╔══██║╚════██║ ██║ ╚═══██╗████╔╝██║██║ ██║██╔══██║ ╚██╔╝ ╚════██║
███████╗██║ ██║███████║ ██║ ██████╔╝╚██████╔╝██████╔╝██║ ██║ ██║ ███████║
╚══════╝╚═╝ ╚═╝╚══════╝ ╚═╝ ╚═════╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝
{Colors.RESET}{Colors.DIM} 30 days of research. 30 seconds of work.{Colors.RESET}
"""
MINI_BANNER = f"""{Colors.PURPLE}{Colors.BOLD}/last30days{Colors.RESET} {Colors.DIM}· researching...{Colors.RESET}"""
# Fun status messages for each phase
REDDIT_MESSAGES = [
"Diving into Reddit threads...",
"Scanning subreddits for gold...",
"Reading what Redditors are saying...",
"Exploring the front page of the internet...",
"Finding the good discussions...",
"Upvoting mentally...",
"Scrolling through comments...",
]
X_MESSAGES = [
"Checking what X is buzzing about...",
"Reading the timeline...",
"Finding the hot takes...",
"Scanning tweets and threads...",
"Discovering trending insights...",
"Following the conversation...",
"Reading between the posts...",
]
ENRICHING_MESSAGES = [
"Getting the juicy details...",
"Fetching engagement metrics...",
"Reading top comments...",
"Extracting insights...",
"Analyzing discussions...",
]
PROCESSING_MESSAGES = [
"Crunching the data...",
"Scoring and ranking...",
"Finding patterns...",
"Removing duplicates...",
"Organizing findings...",
]
WEB_ONLY_MESSAGES = [
"Searching the web...",
"Finding blogs and docs...",
"Crawling news sites...",
"Discovering tutorials...",
]
# Promo message for users without API keys
PROMO_MESSAGE = f"""
{Colors.YELLOW}{Colors.BOLD}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━{Colors.RESET}
{Colors.YELLOW}⚡ UNLOCK THE FULL POWER OF /last30days{Colors.RESET}
{Colors.DIM}Right now you're using web search only. Add API keys to unlock:{Colors.RESET}
{Colors.YELLOW}🟠 Reddit{Colors.RESET} - Real upvotes, comments, and community insights
└─ Add OPENAI_API_KEY (uses OpenAI's web_search for Reddit)
{Colors.CYAN}🔵 X (Twitter){Colors.RESET} - Real-time posts, likes, reposts from creators
└─ Add XAI_API_KEY (uses xAI's live X search)
{Colors.DIM}Setup:{Colors.RESET} Edit {Colors.BOLD}~/.config/last30days/.env{Colors.RESET}
{Colors.YELLOW}{Colors.BOLD}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━{Colors.RESET}
"""
PROMO_MESSAGE_PLAIN = """
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⚡ UNLOCK THE FULL POWER OF /last30days
Right now you're using web search only. Add API keys to unlock:
🟠 Reddit - Real upvotes, comments, and community insights
└─ Add OPENAI_API_KEY (uses OpenAI's web_search for Reddit)
🔵 X (Twitter) - Real-time posts, likes, reposts from creators
└─ Add XAI_API_KEY (uses xAI's live X search)
Setup: Edit ~/.config/last30days/.env
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
# Shorter promo for single missing key
PROMO_SINGLE_KEY = {
"reddit": f"""
{Colors.DIM}💡 Tip: Add {Colors.YELLOW}OPENAI_API_KEY{Colors.RESET}{Colors.DIM} to ~/.config/last30days/.env for Reddit data with real engagement metrics!{Colors.RESET}
""",
"x": f"""
{Colors.DIM}💡 Tip: Add {Colors.CYAN}XAI_API_KEY{Colors.RESET}{Colors.DIM} to ~/.config/last30days/.env for X/Twitter data with real likes & reposts!{Colors.RESET}
""",
}
PROMO_SINGLE_KEY_PLAIN = {
"reddit": "\n💡 Tip: Add OPENAI_API_KEY to ~/.config/last30days/.env for Reddit data with real engagement metrics!\n",
"x": "\n💡 Tip: Add XAI_API_KEY to ~/.config/last30days/.env for X/Twitter data with real likes & reposts!\n",
}
# Spinner frames
SPINNER_FRAMES = ['', '', '', '', '', '', '', '', '', '']
DOTS_FRAMES = [' ', '. ', '.. ', '...']
class Spinner:
"""Animated spinner for long-running operations."""
def __init__(self, message: str = "Working", color: str = Colors.CYAN):
self.message = message
self.color = color
self.running = False
self.thread: Optional[threading.Thread] = None
self.frame_idx = 0
self.shown_static = False
def _spin(self):
while self.running:
frame = SPINNER_FRAMES[self.frame_idx % len(SPINNER_FRAMES)]
sys.stderr.write(f"\r{self.color}{frame}{Colors.RESET} {self.message} ")
sys.stderr.flush()
self.frame_idx += 1
time.sleep(0.08)
def start(self):
self.running = True
if IS_TTY:
# Real terminal - animate
self.thread = threading.Thread(target=self._spin, daemon=True)
self.thread.start()
else:
# Not a TTY (Claude Code) - just print once
if not self.shown_static:
sys.stderr.write(f"{self.message}\n")
sys.stderr.flush()
self.shown_static = True
def update(self, message: str):
self.message = message
if not IS_TTY and not self.shown_static:
# Print update in non-TTY mode
sys.stderr.write(f"{message}\n")
sys.stderr.flush()
def stop(self, final_message: str = ""):
self.running = False
if self.thread:
self.thread.join(timeout=0.2)
if IS_TTY:
# Clear the line in real terminal
sys.stderr.write("\r" + " " * 80 + "\r")
if final_message:
sys.stderr.write(f"{final_message}\n")
sys.stderr.flush()
class ProgressDisplay:
"""Progress display for research phases."""
def __init__(self, topic: str, show_banner: bool = True):
self.topic = topic
self.spinner: Optional[Spinner] = None
self.start_time = time.time()
if show_banner:
self._show_banner()
def _show_banner(self):
if IS_TTY:
sys.stderr.write(MINI_BANNER + "\n")
sys.stderr.write(f"{Colors.DIM}Topic: {Colors.RESET}{Colors.BOLD}{self.topic}{Colors.RESET}\n\n")
else:
# Simple text for non-TTY
sys.stderr.write(f"/last30days · researching: {self.topic}\n")
sys.stderr.flush()
def start_reddit(self):
msg = random.choice(REDDIT_MESSAGES)
self.spinner = Spinner(f"{Colors.YELLOW}Reddit{Colors.RESET} {msg}", Colors.YELLOW)
self.spinner.start()
def end_reddit(self, count: int):
if self.spinner:
self.spinner.stop(f"{Colors.YELLOW}Reddit{Colors.RESET} Found {count} threads")
def start_reddit_enrich(self, current: int, total: int):
if self.spinner:
self.spinner.stop()
msg = random.choice(ENRICHING_MESSAGES)
self.spinner = Spinner(f"{Colors.YELLOW}Reddit{Colors.RESET} [{current}/{total}] {msg}", Colors.YELLOW)
self.spinner.start()
def update_reddit_enrich(self, current: int, total: int):
if self.spinner:
msg = random.choice(ENRICHING_MESSAGES)
self.spinner.update(f"{Colors.YELLOW}Reddit{Colors.RESET} [{current}/{total}] {msg}")
def end_reddit_enrich(self):
if self.spinner:
self.spinner.stop(f"{Colors.YELLOW}Reddit{Colors.RESET} Enriched with engagement data")
def start_x(self):
msg = random.choice(X_MESSAGES)
self.spinner = Spinner(f"{Colors.CYAN}X{Colors.RESET} {msg}", Colors.CYAN)
self.spinner.start()
def end_x(self, count: int):
if self.spinner:
self.spinner.stop(f"{Colors.CYAN}X{Colors.RESET} Found {count} posts")
def start_processing(self):
msg = random.choice(PROCESSING_MESSAGES)
self.spinner = Spinner(f"{Colors.PURPLE}Processing{Colors.RESET} {msg}", Colors.PURPLE)
self.spinner.start()
def end_processing(self):
if self.spinner:
self.spinner.stop()
def show_complete(self, reddit_count: int, x_count: int):
elapsed = time.time() - self.start_time
if IS_TTY:
sys.stderr.write(f"\n{Colors.GREEN}{Colors.BOLD}✓ Research complete{Colors.RESET} ")
sys.stderr.write(f"{Colors.DIM}({elapsed:.1f}s){Colors.RESET}\n")
sys.stderr.write(f" {Colors.YELLOW}Reddit:{Colors.RESET} {reddit_count} threads ")
sys.stderr.write(f"{Colors.CYAN}X:{Colors.RESET} {x_count} posts\n\n")
else:
sys.stderr.write(f"✓ Research complete ({elapsed:.1f}s) - Reddit: {reddit_count} threads, X: {x_count} posts\n")
sys.stderr.flush()
def show_cached(self, age_hours: float = None):
if age_hours is not None:
age_str = f" ({age_hours:.1f}h old)"
else:
age_str = ""
sys.stderr.write(f"{Colors.GREEN}{Colors.RESET} {Colors.DIM}Using cached results{age_str} - use --refresh for fresh data{Colors.RESET}\n\n")
sys.stderr.flush()
def show_error(self, message: str):
sys.stderr.write(f"{Colors.RED}✗ Error:{Colors.RESET} {message}\n")
sys.stderr.flush()
def start_web_only(self):
"""Show web-only mode indicator."""
msg = random.choice(WEB_ONLY_MESSAGES)
self.spinner = Spinner(f"{Colors.GREEN}Web{Colors.RESET} {msg}", Colors.GREEN)
self.spinner.start()
def end_web_only(self):
"""End web-only spinner."""
if self.spinner:
self.spinner.stop(f"{Colors.GREEN}Web{Colors.RESET} Claude will search the web")
def show_web_only_complete(self):
"""Show completion for web-only mode."""
elapsed = time.time() - self.start_time
if IS_TTY:
sys.stderr.write(f"\n{Colors.GREEN}{Colors.BOLD}✓ Ready for web search{Colors.RESET} ")
sys.stderr.write(f"{Colors.DIM}({elapsed:.1f}s){Colors.RESET}\n")
sys.stderr.write(f" {Colors.GREEN}Web:{Colors.RESET} Claude will search blogs, docs & news\n\n")
else:
sys.stderr.write(f"✓ Ready for web search ({elapsed:.1f}s)\n")
sys.stderr.flush()
def show_promo(self, missing: str = "both"):
"""Show promotional message for missing API keys.
Args:
missing: 'both', 'reddit', or 'x' - which keys are missing
"""
if missing == "both":
if IS_TTY:
sys.stderr.write(PROMO_MESSAGE)
else:
sys.stderr.write(PROMO_MESSAGE_PLAIN)
elif missing in PROMO_SINGLE_KEY:
if IS_TTY:
sys.stderr.write(PROMO_SINGLE_KEY[missing])
else:
sys.stderr.write(PROMO_SINGLE_KEY_PLAIN[missing])
sys.stderr.flush()
def print_phase(phase: str, message: str):
"""Print a phase message."""
colors = {
"reddit": Colors.YELLOW,
"x": Colors.CYAN,
"process": Colors.PURPLE,
"done": Colors.GREEN,
"error": Colors.RED,
}
color = colors.get(phase, Colors.RESET)
sys.stderr.write(f"{color}{Colors.RESET} {message}\n")
sys.stderr.flush()

View File

@@ -0,0 +1,401 @@
"""WebSearch module for last30days skill.
NOTE: WebSearch uses Claude's built-in WebSearch tool, which runs INSIDE Claude Code.
Unlike Reddit/X which use external APIs, WebSearch results are obtained by Claude
directly and passed to this module for normalization and scoring.
The typical flow is:
1. Claude invokes WebSearch tool with the topic
2. Claude passes results to parse_websearch_results()
3. Results are normalized into WebSearchItem objects
"""
import re
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from . import schema
# Month name mappings for date parsing
MONTH_MAP = {
"jan": 1, "january": 1,
"feb": 2, "february": 2,
"mar": 3, "march": 3,
"apr": 4, "april": 4,
"may": 5,
"jun": 6, "june": 6,
"jul": 7, "july": 7,
"aug": 8, "august": 8,
"sep": 9, "sept": 9, "september": 9,
"oct": 10, "october": 10,
"nov": 11, "november": 11,
"dec": 12, "december": 12,
}
def extract_date_from_url(url: str) -> Optional[str]:
"""Try to extract a date from URL path.
Many sites embed dates in URLs like:
- /2026/01/24/article-title
- /2026-01-24/article
- /blog/20260124/title
Args:
url: URL to parse
Returns:
Date string in YYYY-MM-DD format, or None
"""
# Pattern 1: /YYYY/MM/DD/ (most common)
match = re.search(r'/(\d{4})/(\d{2})/(\d{2})/', url)
if match:
year, month, day = match.groups()
if 2020 <= int(year) <= 2030 and 1 <= int(month) <= 12 and 1 <= int(day) <= 31:
return f"{year}-{month}-{day}"
# Pattern 2: /YYYY-MM-DD/ or /YYYY-MM-DD-
match = re.search(r'/(\d{4})-(\d{2})-(\d{2})[-/]', url)
if match:
year, month, day = match.groups()
if 2020 <= int(year) <= 2030 and 1 <= int(month) <= 12 and 1 <= int(day) <= 31:
return f"{year}-{month}-{day}"
# Pattern 3: /YYYYMMDD/ (compact)
match = re.search(r'/(\d{4})(\d{2})(\d{2})/', url)
if match:
year, month, day = match.groups()
if 2020 <= int(year) <= 2030 and 1 <= int(month) <= 12 and 1 <= int(day) <= 31:
return f"{year}-{month}-{day}"
return None
def extract_date_from_snippet(text: str) -> Optional[str]:
"""Try to extract a date from text snippet or title.
Looks for patterns like:
- January 24, 2026 or Jan 24, 2026
- 24 January 2026
- 2026-01-24
- "3 days ago", "yesterday", "last week"
Args:
text: Text to parse
Returns:
Date string in YYYY-MM-DD format, or None
"""
if not text:
return None
text_lower = text.lower()
# Pattern 1: Month DD, YYYY (e.g., "January 24, 2026")
match = re.search(
r'\b(jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|'
r'jul(?:y)?|aug(?:ust)?|sep(?:t(?:ember)?)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)'
r'\s+(\d{1,2})(?:st|nd|rd|th)?,?\s*(\d{4})\b',
text_lower
)
if match:
month_str, day, year = match.groups()
month = MONTH_MAP.get(month_str[:3])
if month and 2020 <= int(year) <= 2030 and 1 <= int(day) <= 31:
return f"{year}-{month:02d}-{int(day):02d}"
# Pattern 2: DD Month YYYY (e.g., "24 January 2026")
match = re.search(
r'\b(\d{1,2})(?:st|nd|rd|th)?\s+'
r'(jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|'
r'jul(?:y)?|aug(?:ust)?|sep(?:t(?:ember)?)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)'
r'\s+(\d{4})\b',
text_lower
)
if match:
day, month_str, year = match.groups()
month = MONTH_MAP.get(month_str[:3])
if month and 2020 <= int(year) <= 2030 and 1 <= int(day) <= 31:
return f"{year}-{month:02d}-{int(day):02d}"
# Pattern 3: YYYY-MM-DD (ISO format)
match = re.search(r'\b(\d{4})-(\d{2})-(\d{2})\b', text)
if match:
year, month, day = match.groups()
if 2020 <= int(year) <= 2030 and 1 <= int(month) <= 12 and 1 <= int(day) <= 31:
return f"{year}-{month}-{day}"
# Pattern 4: Relative dates ("3 days ago", "yesterday", etc.)
today = datetime.now()
if "yesterday" in text_lower:
date = today - timedelta(days=1)
return date.strftime("%Y-%m-%d")
if "today" in text_lower:
return today.strftime("%Y-%m-%d")
# "N days ago"
match = re.search(r'\b(\d+)\s*days?\s*ago\b', text_lower)
if match:
days = int(match.group(1))
if days <= 60: # Reasonable range
date = today - timedelta(days=days)
return date.strftime("%Y-%m-%d")
# "N hours ago" -> today
match = re.search(r'\b(\d+)\s*hours?\s*ago\b', text_lower)
if match:
return today.strftime("%Y-%m-%d")
# "last week" -> ~7 days ago
if "last week" in text_lower:
date = today - timedelta(days=7)
return date.strftime("%Y-%m-%d")
# "this week" -> ~3 days ago (middle of week)
if "this week" in text_lower:
date = today - timedelta(days=3)
return date.strftime("%Y-%m-%d")
return None
def extract_date_signals(
url: str,
snippet: str,
title: str,
) -> Tuple[Optional[str], str]:
"""Extract date from any available signal.
Tries URL first (most reliable), then snippet, then title.
Args:
url: Page URL
snippet: Page snippet/description
title: Page title
Returns:
Tuple of (date_string, confidence)
- date from URL: 'high' confidence
- date from snippet/title: 'med' confidence
- no date found: None, 'low' confidence
"""
# Try URL first (most reliable)
url_date = extract_date_from_url(url)
if url_date:
return url_date, "high"
# Try snippet
snippet_date = extract_date_from_snippet(snippet)
if snippet_date:
return snippet_date, "med"
# Try title
title_date = extract_date_from_snippet(title)
if title_date:
return title_date, "med"
return None, "low"
# Domains to exclude (Reddit and X are handled separately)
EXCLUDED_DOMAINS = {
"reddit.com",
"www.reddit.com",
"old.reddit.com",
"twitter.com",
"www.twitter.com",
"x.com",
"www.x.com",
"mobile.twitter.com",
}
def extract_domain(url: str) -> str:
"""Extract the domain from a URL.
Args:
url: Full URL
Returns:
Domain string (e.g., "medium.com")
"""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Remove www. prefix for cleaner display
if domain.startswith("www."):
domain = domain[4:]
return domain
except Exception:
return ""
def is_excluded_domain(url: str) -> bool:
"""Check if URL is from an excluded domain (Reddit/X).
Args:
url: URL to check
Returns:
True if URL should be excluded
"""
try:
parsed = urlparse(url)
domain = parsed.netloc.lower()
return domain in EXCLUDED_DOMAINS
except Exception:
return False
def parse_websearch_results(
results: List[Dict[str, Any]],
topic: str,
from_date: str = "",
to_date: str = "",
) -> List[Dict[str, Any]]:
"""Parse WebSearch results into normalized format.
This function expects results from Claude's WebSearch tool.
Each result should have: title, url, snippet, and optionally date/relevance.
Uses "Date Detective" approach:
1. Extract dates from URLs (high confidence)
2. Extract dates from snippets/titles (med confidence)
3. Hard filter: exclude items with verified old dates
4. Keep items with no date signals (with low confidence penalty)
Args:
results: List of WebSearch result dicts
topic: Original search topic (for context)
from_date: Start date for filtering (YYYY-MM-DD)
to_date: End date for filtering (YYYY-MM-DD)
Returns:
List of normalized item dicts ready for WebSearchItem creation
"""
items = []
for i, result in enumerate(results):
if not isinstance(result, dict):
continue
url = result.get("url", "")
if not url:
continue
# Skip Reddit/X URLs (handled separately)
if is_excluded_domain(url):
continue
title = str(result.get("title", "")).strip()
snippet = str(result.get("snippet", result.get("description", ""))).strip()
if not title and not snippet:
continue
# Use Date Detective to extract date signals
date = result.get("date") # Use provided date if available
date_confidence = "low"
if date and re.match(r'^\d{4}-\d{2}-\d{2}$', str(date)):
# Provided date is valid
date_confidence = "med"
else:
# Try to extract date from URL/snippet/title
extracted_date, confidence = extract_date_signals(url, snippet, title)
if extracted_date:
date = extracted_date
date_confidence = confidence
# Hard filter: if we found a date and it's too old, skip
if date and from_date and date < from_date:
continue # DROP - verified old content
# Hard filter: if date is in the future, skip (parsing error)
if date and to_date and date > to_date:
continue # DROP - future date
# Get relevance if provided, default to 0.5
relevance = result.get("relevance", 0.5)
try:
relevance = min(1.0, max(0.0, float(relevance)))
except (TypeError, ValueError):
relevance = 0.5
item = {
"id": f"W{i+1}",
"title": title[:200], # Truncate long titles
"url": url,
"source_domain": extract_domain(url),
"snippet": snippet[:500], # Truncate long snippets
"date": date,
"date_confidence": date_confidence,
"relevance": relevance,
"why_relevant": str(result.get("why_relevant", "")).strip(),
}
items.append(item)
return items
def normalize_websearch_items(
items: List[Dict[str, Any]],
from_date: str,
to_date: str,
) -> List[schema.WebSearchItem]:
"""Convert parsed dicts to WebSearchItem objects.
Args:
items: List of parsed item dicts
from_date: Start of date range (YYYY-MM-DD)
to_date: End of date range (YYYY-MM-DD)
Returns:
List of WebSearchItem objects
"""
result = []
for item in items:
web_item = schema.WebSearchItem(
id=item["id"],
title=item["title"],
url=item["url"],
source_domain=item["source_domain"],
snippet=item["snippet"],
date=item.get("date"),
date_confidence=item.get("date_confidence", "low"),
relevance=item.get("relevance", 0.5),
why_relevant=item.get("why_relevant", ""),
)
result.append(web_item)
return result
def dedupe_websearch(items: List[schema.WebSearchItem]) -> List[schema.WebSearchItem]:
"""Remove duplicate WebSearch items.
Deduplication is based on URL.
Args:
items: List of WebSearchItem objects
Returns:
Deduplicated list
"""
seen_urls = set()
result = []
for item in items:
# Normalize URL for comparison
url_key = item.url.lower().rstrip("/")
if url_key not in seen_urls:
seen_urls.add(url_key)
result.append(item)
return result

View File

@@ -0,0 +1,217 @@
"""xAI API client for X (Twitter) discovery."""
import json
import re
import sys
from typing import Any, Dict, List, Optional
from . import http
def _log_error(msg: str):
"""Log error to stderr."""
sys.stderr.write(f"[X ERROR] {msg}\n")
sys.stderr.flush()
# xAI uses responses endpoint with Agent Tools API
XAI_RESPONSES_URL = "https://api.x.ai/v1/responses"
# Depth configurations: (min, max) posts to request
DEPTH_CONFIG = {
"quick": (8, 12),
"default": (20, 30),
"deep": (40, 60),
}
X_SEARCH_PROMPT = """You have access to real-time X (Twitter) data. Search for posts about: {topic}
Focus on posts from {from_date} to {to_date}. Find {min_items}-{max_items} high-quality, relevant posts.
IMPORTANT: Return ONLY valid JSON in this exact format, no other text:
{{
"items": [
{{
"text": "Post text content (truncated if long)",
"url": "https://x.com/user/status/...",
"author_handle": "username",
"date": "YYYY-MM-DD or null if unknown",
"engagement": {{
"likes": 100,
"reposts": 25,
"replies": 15,
"quotes": 5
}},
"why_relevant": "Brief explanation of relevance",
"relevance": 0.85
}}
]
}}
Rules:
- relevance is 0.0 to 1.0 (1.0 = highly relevant)
- date must be YYYY-MM-DD format or null
- engagement can be null if unknown
- Include diverse voices/accounts if applicable
- Prefer posts with substantive content, not just links"""
def search_x(
api_key: str,
model: str,
topic: str,
from_date: str,
to_date: str,
depth: str = "default",
mock_response: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Search X for relevant posts using xAI API with live search.
Args:
api_key: xAI API key
model: Model to use
topic: Search topic
from_date: Start date (YYYY-MM-DD)
to_date: End date (YYYY-MM-DD)
depth: Research depth - "quick", "default", or "deep"
mock_response: Mock response for testing
Returns:
Raw API response
"""
if mock_response is not None:
return mock_response
min_items, max_items = DEPTH_CONFIG.get(depth, DEPTH_CONFIG["default"])
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Adjust timeout based on depth (generous for API response time)
timeout = 90 if depth == "quick" else 120 if depth == "default" else 180
# Use Agent Tools API with x_search tool
payload = {
"model": model,
"tools": [
{"type": "x_search"}
],
"input": [
{
"role": "user",
"content": X_SEARCH_PROMPT.format(
topic=topic,
from_date=from_date,
to_date=to_date,
min_items=min_items,
max_items=max_items,
),
}
],
}
return http.post(XAI_RESPONSES_URL, payload, headers=headers, timeout=timeout)
def parse_x_response(response: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Parse xAI response to extract X items.
Args:
response: Raw API response
Returns:
List of item dicts
"""
items = []
# Check for API errors first
if "error" in response and response["error"]:
error = response["error"]
err_msg = error.get("message", str(error)) if isinstance(error, dict) else str(error)
_log_error(f"xAI API error: {err_msg}")
if http.DEBUG:
_log_error(f"Full error response: {json.dumps(response, indent=2)[:1000]}")
return items
# Try to find the output text
output_text = ""
if "output" in response:
output = response["output"]
if isinstance(output, str):
output_text = output
elif isinstance(output, list):
for item in output:
if isinstance(item, dict):
if item.get("type") == "message":
content = item.get("content", [])
for c in content:
if isinstance(c, dict) and c.get("type") == "output_text":
output_text = c.get("text", "")
break
elif "text" in item:
output_text = item["text"]
elif isinstance(item, str):
output_text = item
if output_text:
break
# Also check for choices (older format)
if not output_text and "choices" in response:
for choice in response["choices"]:
if "message" in choice:
output_text = choice["message"].get("content", "")
break
if not output_text:
return items
# Extract JSON from the response
json_match = re.search(r'\{[\s\S]*"items"[\s\S]*\}', output_text)
if json_match:
try:
data = json.loads(json_match.group())
items = data.get("items", [])
except json.JSONDecodeError:
pass
# Validate and clean items
clean_items = []
for i, item in enumerate(items):
if not isinstance(item, dict):
continue
url = item.get("url", "")
if not url:
continue
# Parse engagement
engagement = None
eng_raw = item.get("engagement")
if isinstance(eng_raw, dict):
engagement = {
"likes": int(eng_raw.get("likes", 0)) if eng_raw.get("likes") else None,
"reposts": int(eng_raw.get("reposts", 0)) if eng_raw.get("reposts") else None,
"replies": int(eng_raw.get("replies", 0)) if eng_raw.get("replies") else None,
"quotes": int(eng_raw.get("quotes", 0)) if eng_raw.get("quotes") else None,
}
clean_item = {
"id": f"X{i+1}",
"text": str(item.get("text", "")).strip()[:500], # Truncate long text
"url": url,
"author_handle": str(item.get("author_handle", "")).strip().lstrip("@"),
"date": item.get("date"),
"engagement": engagement,
"why_relevant": str(item.get("why_relevant", "")).strip(),
"relevance": min(1.0, max(0.0, float(item.get("relevance", 0.5)))),
}
# Validate date format
if clean_item["date"]:
if not re.match(r'^\d{4}-\d{2}-\d{2}$', str(clean_item["date"])):
clean_item["date"] = None
clean_items.append(clean_item)
return clean_items