Commit Message:
Enhance Crawl4AI with CLI and documentation updates - Implemented Command-Line Interface (CLI) in `crawl4ai/cli.py` - Added chunking strategies and their documentation in `llm.txt`
This commit is contained in:
95
crawl4ai/cli.py
Normal file
95
crawl4ai/cli.py
Normal file
@@ -0,0 +1,95 @@
|
||||
import click
|
||||
import sys
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
from .docs_manager import DocsManager
|
||||
from .async_logger import AsyncLogger
|
||||
|
||||
logger = AsyncLogger(verbose=True)
|
||||
docs_manager = DocsManager(logger)
|
||||
|
||||
def print_table(headers: List[str], rows: List[List[str]], padding: int = 2):
|
||||
"""Helper function to print formatted tables"""
|
||||
col_widths = [max(len(str(cell)) for cell in col) for col in zip(headers, *rows)]
|
||||
border = '+' + '+'.join('-' * (width + 2 * padding) for width in col_widths) + '+'
|
||||
|
||||
def print_row(row):
|
||||
return '|' + '|'.join(
|
||||
f"{str(cell):{' '}<{width}}" for cell, width in zip(row, col_widths)
|
||||
) + '|'
|
||||
|
||||
click.echo(border)
|
||||
click.echo(print_row(headers))
|
||||
click.echo(border)
|
||||
for row in rows:
|
||||
click.echo(print_row(row))
|
||||
click.echo(border)
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
"""Crawl4AI Command Line Interface"""
|
||||
pass
|
||||
|
||||
@cli.group()
|
||||
def docs():
|
||||
"""Documentation and LLM text operations"""
|
||||
pass
|
||||
|
||||
@docs.command()
|
||||
@click.argument('sections', nargs=-1)
|
||||
@click.option('--mode', type=click.Choice(['extended', 'condensed']), default='extended',
|
||||
help='Documentation detail level')
|
||||
def combine(sections: tuple, mode: str):
|
||||
"""Combine documentation sections.
|
||||
|
||||
If no sections are specified, combines all available sections.
|
||||
"""
|
||||
try:
|
||||
asyncio.run(docs_manager.ensure_docs_exist())
|
||||
result = docs_manager.concatenate_docs(sections, mode)
|
||||
click.echo(result)
|
||||
except Exception as e:
|
||||
logger.error(str(e), tag="ERROR")
|
||||
sys.exit(1)
|
||||
|
||||
@docs.command()
|
||||
@click.argument('query')
|
||||
@click.option('--top-k', '-k', default=5, help='Number of top results to return')
|
||||
def search(query: str, top_k: int):
|
||||
"""Search through documentation questions"""
|
||||
try:
|
||||
results = docs_manager.search_questions(query, top_k)
|
||||
click.echo(results)
|
||||
except Exception as e:
|
||||
click.echo(f"Error: {str(e)}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
@docs.command()
|
||||
def list():
|
||||
"""List available documentation sections"""
|
||||
try:
|
||||
file_map = docs_manager.get_file_map()
|
||||
rows = [[num, name] for name, num in file_map.items()]
|
||||
rows.sort(key=lambda x: int(x[0]))
|
||||
print_table(['Number', 'Section Name'], rows)
|
||||
except Exception as e:
|
||||
click.echo(f"Error: {str(e)}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
@docs.command()
|
||||
def update():
|
||||
"""Update local documentation cache from GitHub"""
|
||||
try:
|
||||
docs_manager = DocsManager()
|
||||
docs_manager.update_docs()
|
||||
click.echo("Documentation updated successfully")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"Error: {str(e)}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cli()
|
||||
59
crawl4ai/docs_manager.py
Normal file
59
crawl4ai/docs_manager.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import os
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
from .async_logger import AsyncLogger
|
||||
from .llmtxt import LLMTextManager
|
||||
|
||||
class DocsManager:
|
||||
BASE_URL = "https://raw.githubusercontent.com/unclecode/crawl4ai/main/docs/llm.txt"
|
||||
|
||||
def __init__(self, logger: Optional[AsyncLogger] = None):
|
||||
self.docs_dir = Path.home() / ".crawl4ai" / "docs"
|
||||
self.docs_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.logger = logger or AsyncLogger(verbose=True)
|
||||
self.llm_text = LLMTextManager(self.docs_dir, self.logger)
|
||||
|
||||
async def ensure_docs_exist(self):
|
||||
"""Ensure docs are downloaded, fetch if not present"""
|
||||
if not any(self.docs_dir.iterdir()):
|
||||
self.logger.info("Documentation not found, downloading...", tag="DOCS")
|
||||
await self.update_docs()
|
||||
|
||||
async def update_docs(self) -> bool:
|
||||
"""Always fetch latest docs"""
|
||||
try:
|
||||
self.logger.info("Fetching documentation files...", tag="DOCS")
|
||||
|
||||
# Get file list
|
||||
response = requests.get(f"{self.BASE_URL}/files.json")
|
||||
response.raise_for_status()
|
||||
files = response.json()["files"]
|
||||
|
||||
# Download each file
|
||||
for file in files:
|
||||
response = requests.get(f"{self.BASE_URL}/{file}")
|
||||
response.raise_for_status()
|
||||
|
||||
file_path = self.docs_dir / file
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write(response.text)
|
||||
|
||||
self.logger.debug(f"Downloaded {file}", tag="DOCS")
|
||||
|
||||
self.logger.success("Documentation updated successfully", tag="DOCS")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to update documentation: {str(e)}", tag="ERROR")
|
||||
raise
|
||||
|
||||
# Delegate LLM text operations to LLMTextManager
|
||||
def get_file_map(self) -> dict:
|
||||
return self.llm_text.get_file_map()
|
||||
|
||||
def concatenate_docs(self, sections: List[str], mode: str) -> str:
|
||||
return self.llm_text.concatenate_docs(sections, mode)
|
||||
|
||||
def search_questions(self, query: str, top_k: int = 5) -> str:
|
||||
return self.llm_text.search_questions(query, top_k)
|
||||
@@ -2,6 +2,7 @@ import subprocess
|
||||
import sys
|
||||
import asyncio
|
||||
from .async_logger import AsyncLogger, LogLevel
|
||||
from .docs_manager import DocsManager
|
||||
|
||||
# Initialize logger
|
||||
logger = AsyncLogger(log_level=LogLevel.DEBUG, verbose=True)
|
||||
@@ -11,6 +12,7 @@ def post_install():
|
||||
logger.info("Running post-installation setup...", tag="INIT")
|
||||
install_playwright()
|
||||
run_migration()
|
||||
asyncio.run(setup_docs())
|
||||
logger.success("Post-installation setup completed!", tag="COMPLETE")
|
||||
|
||||
def install_playwright():
|
||||
@@ -41,4 +43,9 @@ def run_migration():
|
||||
logger.warning("Database module not found. Will initialize on first use.")
|
||||
except Exception as e:
|
||||
logger.warning(f"Database initialization failed: {e}")
|
||||
logger.warning("Database will be initialized on first use")
|
||||
logger.warning("Database will be initialized on first use")
|
||||
|
||||
async def setup_docs():
|
||||
"""Download documentation files"""
|
||||
docs_manager = DocsManager(logger)
|
||||
await docs_manager.update_docs()
|
||||
196
crawl4ai/llmtxt.py
Normal file
196
crawl4ai/llmtxt.py
Normal file
@@ -0,0 +1,196 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from rank_bm25 import BM25Okapi
|
||||
import re
|
||||
from typing import List, Literal
|
||||
|
||||
from nltk.tokenize import word_tokenize
|
||||
from nltk.corpus import stopwords
|
||||
from nltk.stem import WordNetLemmatizer
|
||||
import nltk
|
||||
|
||||
|
||||
BASE_PATH = Path(__file__).resolve().parent
|
||||
|
||||
class LLMTextManager:
|
||||
"""Manages LLM text operations and caching"""
|
||||
|
||||
def __init__(self, docs_dir: Path, logger: Optional['AsyncLogger'] = None):
|
||||
self.docs_dir = docs_dir
|
||||
self.logger = logger
|
||||
|
||||
def get_file_map(self) -> dict:
|
||||
"""Cache file mappings to avoid repeated directory scans"""
|
||||
files = os.listdir(self.docs_dir)
|
||||
file_map = {}
|
||||
|
||||
for file in files:
|
||||
if file.endswith('.md'):
|
||||
# Extract number and name: "6_chunking_strategies.md" -> ("chunking_strategies", "6")
|
||||
match = re.match(r'(\d+)_(.+?)(?:\.(?:ex|xs|sm|q)?\.md)?$', file)
|
||||
if match:
|
||||
num, name = match.groups()
|
||||
if name not in file_map:
|
||||
file_map[name] = num
|
||||
return file_map
|
||||
|
||||
def concatenate_docs(self, file_names: List[str], mode: str) -> str:
|
||||
"""Concatenate documentation files based on names and mode."""
|
||||
file_map = self.get_file_map()
|
||||
result = []
|
||||
suffix_map = {
|
||||
"extended": ".ex.md",
|
||||
"condensed": [".xs.md", ".sm.md"]
|
||||
}
|
||||
|
||||
for name in file_names:
|
||||
if name not in file_map:
|
||||
continue
|
||||
|
||||
num = file_map[name]
|
||||
base_path = self.docs_dir
|
||||
|
||||
if mode == "extended":
|
||||
file_path = base_path / f"{num}_{name}{suffix_map[mode]}"
|
||||
if not file_path.exists():
|
||||
file_path = base_path / f"{num}_{name}.md"
|
||||
else:
|
||||
file_path = None
|
||||
for suffix in suffix_map["condensed"]:
|
||||
temp_path = base_path / f"{num}_{name}{suffix}"
|
||||
if temp_path.exists():
|
||||
file_path = temp_path
|
||||
break
|
||||
if not file_path:
|
||||
file_path = base_path / f"{num}_{name}.md"
|
||||
|
||||
if file_path.exists():
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
result.append(f.read())
|
||||
|
||||
return "\n\n---\n\n".join(result)
|
||||
|
||||
def search_questions(self, query: str, top_k: int = 5) -> str:
|
||||
"""Search through Q files using BM25 ranking and return top K matches."""
|
||||
q_files = [f for f in os.listdir(self.docs_dir) if f.endswith(".q.md")]
|
||||
# Prepare base path for file reading
|
||||
q_files = [self.docs_dir / f for f in q_files] # Convert to full path
|
||||
|
||||
documents = []
|
||||
file_contents = {}
|
||||
|
||||
for file in q_files:
|
||||
with open(file, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
questions = extract_questions(content)
|
||||
for category, question, full_section in questions:
|
||||
documents.append(question)
|
||||
file_contents[question] = (file, category, full_section)
|
||||
|
||||
if not documents:
|
||||
return "No questions found in documentation."
|
||||
|
||||
tokenized_docs = [preprocess_text(doc) for doc in documents]
|
||||
tokenized_query = preprocess_text(query)
|
||||
|
||||
bm25 = BM25Okapi(tokenized_docs)
|
||||
doc_scores = bm25.get_scores(tokenized_query)
|
||||
|
||||
score_threshold = max(doc_scores) * 0.4
|
||||
|
||||
# Aggregate scores by file
|
||||
file_data = {}
|
||||
for idx, score in enumerate(doc_scores):
|
||||
if score > score_threshold:
|
||||
question = documents[idx]
|
||||
file, category, _ = file_contents[question]
|
||||
|
||||
if file not in file_data:
|
||||
file_data[file] = {
|
||||
'total_score': 0,
|
||||
'match_count': 0,
|
||||
'questions': []
|
||||
}
|
||||
|
||||
file_data[file]['total_score'] += score
|
||||
file_data[file]['match_count'] += 1
|
||||
file_data[file]['questions'].append({
|
||||
'category': category,
|
||||
'question': question,
|
||||
'score': score
|
||||
})
|
||||
|
||||
# Sort files by match count and total score
|
||||
ranked_files = sorted(
|
||||
file_data.items(),
|
||||
key=lambda x: (x[1]['match_count'], x[1]['total_score']),
|
||||
reverse=True
|
||||
)[:top_k]
|
||||
|
||||
# Format results by file
|
||||
results = []
|
||||
for file, data in ranked_files:
|
||||
questions_summary = "\n".join(
|
||||
f"- [{q['category']}] {q['question']} (score: {q['score']:.2f})"
|
||||
for q in sorted(data['questions'], key=lambda x: x['score'], reverse=True)
|
||||
)
|
||||
|
||||
results.append(
|
||||
f"File: {file}\n"
|
||||
f"Match Count: {data['match_count']}\n"
|
||||
f"Total Score: {data['total_score']:.2f}\n\n"
|
||||
f"Matching Questions:\n{questions_summary}"
|
||||
)
|
||||
|
||||
return "\n\n---\n\n".join(results) if results else "No relevant matches found."
|
||||
|
||||
def extract_questions(content: str) -> List[tuple[str, str, str]]:
|
||||
"""
|
||||
Extract questions from Q files, returning list of (category, question, full_section).
|
||||
"""
|
||||
# Split into main sections (### Questions or ### Hypothetical Questions)
|
||||
sections = re.split(r'^###\s+.*Questions\s*$', content, flags=re.MULTILINE)[1:]
|
||||
|
||||
results = []
|
||||
for section in sections:
|
||||
# Find all numbered categories (1. **Category Name**)
|
||||
categories = re.split(r'^\d+\.\s+\*\*([^*]+)\*\*\s*$', section.strip(), flags=re.MULTILINE)
|
||||
|
||||
# Process each category
|
||||
for i in range(1, len(categories), 2):
|
||||
category = categories[i].strip()
|
||||
category_content = categories[i+1].strip()
|
||||
|
||||
# Extract questions (lines starting with dash and wrapped in italics)
|
||||
questions = re.findall(r'^\s*-\s*\*"([^"]+)"\*\s*$', category_content, flags=re.MULTILINE)
|
||||
|
||||
# Add each question with its category and full context
|
||||
for q in questions:
|
||||
results.append((category, q, f"Category: {category}\nQuestion: {q}"))
|
||||
|
||||
return results
|
||||
|
||||
def preprocess_text(text: str) -> List[str]:
|
||||
"""Preprocess text for better semantic matching"""
|
||||
# Lowercase and tokenize
|
||||
tokens = word_tokenize(text.lower())
|
||||
|
||||
# Remove stopwords but keep question words
|
||||
stop_words = set(stopwords.words('english')) - {'how', 'what', 'when', 'where', 'why', 'which'}
|
||||
lemmatizer = WordNetLemmatizer()
|
||||
|
||||
# Lemmatize but preserve original form for technical terms
|
||||
tokens = [lemmatizer.lemmatize(token) for token in tokens if token not in stop_words]
|
||||
|
||||
return tokens
|
||||
|
||||
if __name__ == "__main__":
|
||||
llm_manager = LLMTextManager(BASE_PATH)
|
||||
|
||||
# Example 1: Concatenate docs
|
||||
docs = llm_manager.concatenate_docs(["chunking_strategies", "content_selection"], "extended")
|
||||
print("Concatenated docs:", docs[:200], "...\n")
|
||||
|
||||
# Example 2: Search questions
|
||||
results = llm_manager.search_questions("How do I execute JS script on the page?", 3)
|
||||
print("Search results:", results[:200], "...")
|
||||
Reference in New Issue
Block a user