feat(content-filter): add LLMContentFilter for intelligent markdown generation

Add new LLMContentFilter class that uses LLMs to generate high-quality markdown content:
- Implement intelligent content filtering with customizable instructions
- Add chunk processing for handling large documents
- Support parallel processing of content chunks
- Include caching mechanism for filtered results
- Add usage tracking and statistics
- Update documentation with examples and use cases

Also includes minor changes:
- Disable Pydantic warnings in __init__.py
- Add new prompt template for content filtering
This commit is contained in:
UncleCode
2025-01-18 19:31:07 +08:00
parent 2d6b19e1a2
commit 3d09b6a221
5 changed files with 495 additions and 5 deletions

View File

@@ -76,3 +76,10 @@ else:
WebCrawler = None
# import warnings
# print("Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.")
import warnings
from pydantic import warnings as pydantic_warnings
# Disable all Pydantic warnings
warnings.filterwarnings("ignore", module="pydantic")
# pydantic_warnings.filter_warnings()

View File

@@ -1,14 +1,24 @@
import re
import time
from bs4 import BeautifulSoup, Tag
from typing import List, Tuple
from typing import List, Tuple, Dict, Optional
from rank_bm25 import BM25Okapi
from collections import deque
from bs4 import NavigableString, Comment
from .utils import clean_tokens
from .utils import clean_tokens, perform_completion_with_backoff, escape_json_string, sanitize_html, get_home_folder, extract_xml_data
from abc import ABC, abstractmethod
import math
from snowballstemmer import stemmer
from .config import DEFAULT_PROVIDER, OVERLAP_RATE, WORD_TOKEN_RATE
from .models import TokenUsage
from .prompts import PROMPT_FILTER_CONTENT
import os
import json
import hashlib
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from .async_logger import AsyncLogger, LogLevel
from colorama import Fore, Style, init
class RelevantContentFilter(ABC):
"""Abstract base class for content filtering strategies"""
@@ -343,7 +353,6 @@ class RelevantContentFilter(ABC):
except Exception:
return str(tag) # Fallback to original if anything fails
class BM25ContentFilter(RelevantContentFilter):
"""
Content filtering using BM25 algorithm with priority tag handling.
@@ -486,7 +495,6 @@ class BM25ContentFilter(RelevantContentFilter):
return [self.clean_element(tag) for _, _, tag in selected_candidates]
class PruningContentFilter(RelevantContentFilter):
"""
Content filtering using pruning algorithm with dynamic threshold.
@@ -732,3 +740,260 @@ class PruningContentFilter(RelevantContentFilter):
if self.negative_patterns.match(element_id):
class_id_score -= 0.5
return class_id_score
class LLMContentFilter(RelevantContentFilter):
"""Content filtering using LLMs to generate relevant markdown."""
def __init__(
self,
provider: str = DEFAULT_PROVIDER,
api_token: Optional[str] = None,
instruction: str = None,
chunk_token_threshold: int = int(1e9),
overlap_rate: float = OVERLAP_RATE,
word_token_rate: float = WORD_TOKEN_RATE,
base_url: Optional[str] = None,
api_base: Optional[str] = None,
extra_args: Dict = None,
verbose: bool = False,
logger: Optional[AsyncLogger] = None,
):
super().__init__(None)
self.provider = provider
self.api_token = (
api_token
or PROVIDER_MODELS.get(provider, "no-token")
or os.getenv("OPENAI_API_KEY")
)
self.instruction = instruction
self.chunk_token_threshold = chunk_token_threshold
self.overlap_rate = overlap_rate
self.word_token_rate = word_token_rate
self.base_url = base_url
self.api_base = api_base or base_url
self.extra_args = extra_args or {}
self.verbose = verbose
# Setup logger with custom styling for LLM operations
if logger:
self.logger = logger
elif verbose:
self.logger = AsyncLogger(
verbose=True,
icons={
**AsyncLogger.DEFAULT_ICONS,
"LLM": "", # Star for LLM operations
"CHUNK": "", # Diamond for chunks
"CACHE": "", # Lightning for cache operations
},
colors={
**AsyncLogger.DEFAULT_COLORS,
LogLevel.INFO: Fore.MAGENTA + Style.DIM, # Dimmed purple for LLM ops
}
)
else:
self.logger = None
self.usages = []
self.total_usage = TokenUsage()
def _get_cache_key(self, html: str, instruction: str) -> str:
"""Generate a unique cache key based on HTML and instruction"""
content = f"{html}{instruction}"
return hashlib.md5(content.encode()).hexdigest()
def _merge_chunks(self, text: str) -> List[str]:
"""Split text into chunks with overlap"""
# Calculate tokens and sections
total_tokens = len(text.split()) * self.word_token_rate
num_sections = max(1, math.floor(total_tokens / self.chunk_token_threshold))
adjusted_chunk_threshold = total_tokens / num_sections
# Split into words
words = text.split()
chunks = []
current_chunk = []
current_token_count = 0
for word in words:
word_tokens = len(word) * self.word_token_rate
if current_token_count + word_tokens <= adjusted_chunk_threshold:
current_chunk.append(word)
current_token_count += word_tokens
else:
# Add overlap if not the last chunk
if chunks and self.overlap_rate > 0:
overlap_size = int(len(current_chunk) * self.overlap_rate)
current_chunk.extend(current_chunk[-overlap_size:])
chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_token_count = word_tokens
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def filter_content(self, html: str, ignore_cache: bool = False) -> List[str]:
if not html or not isinstance(html, str):
return []
if self.logger:
self.logger.info(
"Starting LLM content filtering process",
tag="LLM",
params={"provider": self.provider},
colors={"provider": Fore.CYAN}
)
# Cache handling
cache_dir = Path(get_home_folder()) / "llm_cache" / "content_filter"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_key = self._get_cache_key(html, self.instruction or "")
cache_file = cache_dir / f"{cache_key}.json"
if not ignore_cache and cache_file.exists():
if self.logger:
self.logger.info("Found cached result", tag="CACHE")
try:
with cache_file.open('r') as f:
cached_data = json.load(f)
usage = TokenUsage(**cached_data['usage'])
self.usages.append(usage)
self.total_usage.completion_tokens += usage.completion_tokens
self.total_usage.prompt_tokens += usage.prompt_tokens
self.total_usage.total_tokens += usage.total_tokens
return cached_data['blocks']
except Exception as e:
if self.logger:
self.logger.error(f"Cache read error: {str(e)}", tag="CACHE")
# Split into chunks
html_chunks = self._merge_chunks(html)
if self.logger:
self.logger.info(
"Split content into {chunk_count} chunks",
tag="CHUNK",
params={"chunk_count": len(html_chunks)},
colors={"chunk_count": Fore.YELLOW}
)
extracted_content = []
start_time = time.time()
# Process chunks in parallel
with ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for i, chunk in enumerate(html_chunks):
if self.logger:
self.logger.debug(
"Processing chunk {chunk_num}/{total_chunks}",
tag="CHUNK",
params={
"chunk_num": i + 1,
"total_chunks": len(html_chunks)
}
)
prompt_variables = {
"HTML": escape_json_string(sanitize_html(chunk)),
"REQUEST": self.instruction or "Convert this HTML into clean, relevant markdown, removing any noise or irrelevant content."
}
prompt = PROMPT_FILTER_CONTENT
for var, value in prompt_variables.items():
prompt = prompt.replace("{" + var + "}", value)
future = executor.submit(
perform_completion_with_backoff,
self.provider,
prompt,
self.api_token,
base_url=self.api_base,
extra_args=self.extra_args
)
futures.append((i, future))
# Collect results in order
ordered_results = []
for i, future in sorted(futures):
try:
response = future.result()
# Track usage
usage = TokenUsage(
completion_tokens=response.usage.completion_tokens,
prompt_tokens=response.usage.prompt_tokens,
total_tokens=response.usage.total_tokens,
completion_tokens_details=response.usage.completion_tokens_details.__dict__
if response.usage.completion_tokens_details else {},
prompt_tokens_details=response.usage.prompt_tokens_details.__dict__
if response.usage.prompt_tokens_details else {},
)
self.usages.append(usage)
self.total_usage.completion_tokens += usage.completion_tokens
self.total_usage.prompt_tokens += usage.prompt_tokens
self.total_usage.total_tokens += usage.total_tokens
blocks = extract_xml_data(["content"], response.choices[0].message.content)["content"]
if blocks:
ordered_results.append(blocks)
if self.logger:
self.logger.success(
"Successfully processed chunk {chunk_num}",
tag="CHUNK",
params={"chunk_num": i + 1}
)
except Exception as e:
if self.logger:
self.logger.error(
"Error processing chunk {chunk_num}: {error}",
tag="CHUNK",
params={
"chunk_num": i + 1,
"error": str(e)
}
)
end_time = time.time()
if self.logger:
self.logger.success(
"Completed processing in {time:.2f}s",
tag="LLM",
params={"time": end_time - start_time},
colors={"time": Fore.YELLOW}
)
result = ordered_results if ordered_results else []
# Cache the final result
cache_data = {
'blocks': result,
'usage': self.total_usage.__dict__
}
with cache_file.open('w') as f:
json.dump(cache_data, f)
if self.logger:
self.logger.info("Cached results for future use", tag="CACHE")
return result
def show_usage(self) -> None:
"""Print usage statistics"""
print("\n=== Token Usage Summary ===")
print(f"{'Type':<15} {'Count':>12}")
print("-" * 30)
print(f"{'Completion':<15} {self.total_usage.completion_tokens:>12,}")
print(f"{'Prompt':<15} {self.total_usage.prompt_tokens:>12,}")
print(f"{'Total':<15} {self.total_usage.total_tokens:>12,}")
if self.usages:
print("\n=== Usage History ===")
print(f"{'Request #':<10} {'Completion':>12} {'Prompt':>12} {'Total':>12}")
print("-" * 48)
for i, usage in enumerate(self.usages, 1):
print(
f"{i:<10} {usage.completion_tokens:>12,} "
f"{usage.prompt_tokens:>12,} {usage.total_tokens:>12,}"
)

View File

@@ -202,3 +202,58 @@ Avoid Common Mistakes:
Result
Output the final list of JSON objects, wrapped in <blocks>...</blocks> XML tags. Make sure to close the tag properly."""
PROMPT_FILTER_CONTENT = """Your task is to filter and convert HTML content into clean, focused markdown that's optimized for use with LLMs and information retrieval systems.
INPUT HTML:
<|HTML_CONTENT_START|>
{HTML}
<|HTML_CONTENT_END|>
SPECIFIC INSTRUCTION:
<|USER_INSTRUCTION_START|>
{REQUEST}
<|USER_INSTRUCTION_END|>
TASK DETAILS:
1. Content Selection
- DO: Keep essential information, main content, key details
- DO: Preserve hierarchical structure using markdown headers
- DO: Keep code blocks, tables, key lists
- DON'T: Include navigation menus, ads, footers, cookie notices
- DON'T: Keep social media widgets, sidebars, related content
2. Content Transformation
- DO: Use proper markdown syntax (#, ##, **, `, etc)
- DO: Convert tables to markdown tables
- DO: Preserve code formatting with ```language blocks
- DO: Maintain link texts but remove tracking parameters
- DON'T: Include HTML tags in output
- DON'T: Keep class names, ids, or other HTML attributes
3. Content Organization
- DO: Maintain logical flow of information
- DO: Group related content under appropriate headers
- DO: Use consistent header levels
- DON'T: Fragment related content
- DON'T: Duplicate information
Example Input:
<div class="main-content"><h1>Setup Guide</h1><p>Follow these steps...</p></div>
<div class="sidebar">Related articles...</div>
Example Output:
# Setup Guide
Follow these steps...
IMPORTANT: If specific instruction is provided above, prioritize those requirements over these general guidelines.
OUTPUT FORMAT:
Wrap your response in <content> tags. Use proper markdown throughout.
<content>
[Your markdown content here]
</content>
Begin filtering now."""

View File

@@ -170,6 +170,82 @@ prune_filter = PruningContentFilter(
- You want a broad cleanup without a user query.
- The page has lots of repeated sidebars, footers, or disclaimers that hamper text extraction.
### 4.3 LLMContentFilter
For intelligent content filtering and high-quality markdown generation, you can use the **LLMContentFilter**. This filter leverages LLMs to generate relevant markdown while preserving the original content's meaning and structure:
```python
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig
from crawl4ai.content_filter_strategy import LLMContentFilter
async def main():
# Initialize LLM filter with specific instruction
filter = LLMContentFilter(
provider="openai/gpt-4", # or your preferred provider
api_token="your-api-token", # or use environment variable
instruction="""
Focus on extracting the core educational content.
Include:
- Key concepts and explanations
- Important code examples
- Essential technical details
Exclude:
- Navigation elements
- Sidebars
- Footer content
Format the output as clean markdown with proper code blocks and headers.
""",
chunk_token_threshold=4096, # Adjust based on your needs
verbose=True
)
config = CrawlerRunConfig(
content_filter=filter
)
async with AsyncWebCrawler() as crawler:
result = await crawler.arun("https://example.com", config=config)
print(result.fit_markdown) # Filtered markdown content
```
**Key Features:**
- **Intelligent Filtering**: Uses LLMs to understand and extract relevant content while maintaining context
- **Customizable Instructions**: Tailor the filtering process with specific instructions
- **Chunk Processing**: Handles large documents by processing them in chunks (controlled by `chunk_token_threshold`)
- **Parallel Processing**: For better performance, use smaller `chunk_token_threshold` (e.g., 2048 or 4096) to enable parallel processing of content chunks
**Two Common Use Cases:**
1. **Exact Content Preservation**:
```python
filter = LLMContentFilter(
instruction="""
Extract the main educational content while preserving its original wording and substance completely.
1. Maintain the exact language and terminology
2. Keep all technical explanations and examples intact
3. Preserve the original flow and structure
4. Remove only clearly irrelevant elements like navigation menus and ads
""",
chunk_token_threshold=4096
)
```
2. **Focused Content Extraction**:
```python
filter = LLMContentFilter(
instruction="""
Focus on extracting specific types of content:
- Technical documentation
- Code examples
- API references
Reformat the content into clear, well-structured markdown
""",
chunk_token_threshold=4096
)
```
> **Performance Tip**: Set a smaller `chunk_token_threshold` (e.g., 2048 or 4096) to enable parallel processing of content chunks. The default value is infinity, which processes the entire content as a single chunk.
---
## 5. Using Fit Markdown

View File

@@ -0,0 +1,87 @@
import os
import asyncio
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from crawl4ai.content_filter_strategy import LLMContentFilter
async def test_llm_filter():
# Create an HTML source that needs intelligent filtering
url = "https://docs.python.org/3/tutorial/classes.html"
browser_config = BrowserConfig(
headless=True,
verbose=True
)
# run_config = CrawlerRunConfig(cache_mode=CacheMode.BYPASS)
run_config = CrawlerRunConfig(cache_mode=CacheMode.ENABLED)
async with AsyncWebCrawler(config=browser_config) as crawler:
# First get the raw HTML
result = await crawler.arun(url, config=run_config)
html = result.cleaned_html
# Initialize LLM filter with focused instruction
filter = LLMContentFilter(
provider="openai/gpt-4o",
api_token=os.getenv('OPENAI_API_KEY'),
instruction="""
Focus on extracting the core educational content about Python classes.
Include:
- Key concepts and their explanations
- Important code examples
- Essential technical details
Exclude:
- Navigation elements
- Sidebars
- Footer content
- Version information
- Any non-essential UI elements
Format the output as clean markdown with proper code blocks and headers.
""",
verbose=True
)
filter = LLMContentFilter(
provider="openai/gpt-4o",
api_token=os.getenv('OPENAI_API_KEY'),
chunk_token_threshold=2 ** 12 * 2, # 2048 * 2
instruction="""
Extract the main educational content while preserving its original wording and substance completely. Your task is to:
1. Maintain the exact language and terminology used in the main content
2. Keep all technical explanations, examples, and educational content intact
3. Preserve the original flow and structure of the core content
4. Remove only clearly irrelevant elements like:
- Navigation menus
- Advertisement sections
- Cookie notices
- Footers with site information
- Sidebars with external links
- Any UI elements that don't contribute to learning
The goal is to create a clean markdown version that reads exactly like the original article,
keeping all valuable content but free from distracting elements. Imagine you're creating
a perfect reading experience where nothing valuable is lost, but all noise is removed.
""",
verbose=True
)
# Apply filtering
filtered_content = filter.filter_content(html, ignore_cache = True)
# Show results
print("\nFiltered Content Length:", len(filtered_content))
print("\nFirst 500 chars of filtered content:")
if filtered_content:
print(filtered_content[0][:500])
# Save on disc the markdown version
with open("filtered_content.md", "w", encoding="utf-8") as f:
f.write("\n".join(filtered_content))
# Show token usage
filter.show_usage()
if __name__ == "__main__":
asyncio.run(test_llm_filter())