Setup Guide
Follow these steps...
diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py
index bcb6b3ef..beda64f8 100644
--- a/crawl4ai/__init__.py
+++ b/crawl4ai/__init__.py
@@ -76,3 +76,10 @@ else:
WebCrawler = None
# import warnings
# print("Warning: Synchronous WebCrawler is not available. Install crawl4ai[sync] for synchronous support. However, please note that the synchronous version will be deprecated soon.")
+
+import warnings
+from pydantic import warnings as pydantic_warnings
+
+# Disable all Pydantic warnings
+warnings.filterwarnings("ignore", module="pydantic")
+# pydantic_warnings.filter_warnings()
\ No newline at end of file
diff --git a/crawl4ai/content_filter_strategy.py b/crawl4ai/content_filter_strategy.py
index 11a33ac2..75702ec5 100644
--- a/crawl4ai/content_filter_strategy.py
+++ b/crawl4ai/content_filter_strategy.py
@@ -1,14 +1,24 @@
import re
+import time
from bs4 import BeautifulSoup, Tag
-from typing import List, Tuple
+from typing import List, Tuple, Dict, Optional
from rank_bm25 import BM25Okapi
from collections import deque
from bs4 import NavigableString, Comment
-from .utils import clean_tokens
+from .utils import clean_tokens, perform_completion_with_backoff, escape_json_string, sanitize_html, get_home_folder, extract_xml_data
from abc import ABC, abstractmethod
import math
from snowballstemmer import stemmer
-
+from .config import DEFAULT_PROVIDER, OVERLAP_RATE, WORD_TOKEN_RATE
+from .models import TokenUsage
+from .prompts import PROMPT_FILTER_CONTENT
+import os
+import json
+import hashlib
+from pathlib import Path
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from .async_logger import AsyncLogger, LogLevel
+from colorama import Fore, Style, init
class RelevantContentFilter(ABC):
"""Abstract base class for content filtering strategies"""
@@ -343,7 +353,6 @@ class RelevantContentFilter(ABC):
except Exception:
return str(tag) # Fallback to original if anything fails
-
class BM25ContentFilter(RelevantContentFilter):
"""
Content filtering using BM25 algorithm with priority tag handling.
@@ -486,7 +495,6 @@ class BM25ContentFilter(RelevantContentFilter):
return [self.clean_element(tag) for _, _, tag in selected_candidates]
-
class PruningContentFilter(RelevantContentFilter):
"""
Content filtering using pruning algorithm with dynamic threshold.
@@ -732,3 +740,260 @@ class PruningContentFilter(RelevantContentFilter):
if self.negative_patterns.match(element_id):
class_id_score -= 0.5
return class_id_score
+
+class LLMContentFilter(RelevantContentFilter):
+ """Content filtering using LLMs to generate relevant markdown."""
+
+ def __init__(
+ self,
+ provider: str = DEFAULT_PROVIDER,
+ api_token: Optional[str] = None,
+ instruction: str = None,
+ chunk_token_threshold: int = int(1e9),
+ overlap_rate: float = OVERLAP_RATE,
+ word_token_rate: float = WORD_TOKEN_RATE,
+ base_url: Optional[str] = None,
+ api_base: Optional[str] = None,
+ extra_args: Dict = None,
+ verbose: bool = False,
+ logger: Optional[AsyncLogger] = None,
+ ):
+ super().__init__(None)
+ self.provider = provider
+ self.api_token = (
+ api_token
+ or PROVIDER_MODELS.get(provider, "no-token")
+ or os.getenv("OPENAI_API_KEY")
+ )
+ self.instruction = instruction
+ self.chunk_token_threshold = chunk_token_threshold
+ self.overlap_rate = overlap_rate
+ self.word_token_rate = word_token_rate
+ self.base_url = base_url
+ self.api_base = api_base or base_url
+ self.extra_args = extra_args or {}
+ self.verbose = verbose
+
+ # Setup logger with custom styling for LLM operations
+ if logger:
+ self.logger = logger
+ elif verbose:
+ self.logger = AsyncLogger(
+ verbose=True,
+ icons={
+ **AsyncLogger.DEFAULT_ICONS,
+ "LLM": "★", # Star for LLM operations
+ "CHUNK": "◈", # Diamond for chunks
+ "CACHE": "⚡", # Lightning for cache operations
+ },
+ colors={
+ **AsyncLogger.DEFAULT_COLORS,
+ LogLevel.INFO: Fore.MAGENTA + Style.DIM, # Dimmed purple for LLM ops
+ }
+ )
+ else:
+ self.logger = None
+
+ self.usages = []
+ self.total_usage = TokenUsage()
+
+ def _get_cache_key(self, html: str, instruction: str) -> str:
+ """Generate a unique cache key based on HTML and instruction"""
+ content = f"{html}{instruction}"
+ return hashlib.md5(content.encode()).hexdigest()
+
+ def _merge_chunks(self, text: str) -> List[str]:
+ """Split text into chunks with overlap"""
+ # Calculate tokens and sections
+ total_tokens = len(text.split()) * self.word_token_rate
+ num_sections = max(1, math.floor(total_tokens / self.chunk_token_threshold))
+ adjusted_chunk_threshold = total_tokens / num_sections
+
+ # Split into words
+ words = text.split()
+ chunks = []
+ current_chunk = []
+ current_token_count = 0
+
+ for word in words:
+ word_tokens = len(word) * self.word_token_rate
+ if current_token_count + word_tokens <= adjusted_chunk_threshold:
+ current_chunk.append(word)
+ current_token_count += word_tokens
+ else:
+ # Add overlap if not the last chunk
+ if chunks and self.overlap_rate > 0:
+ overlap_size = int(len(current_chunk) * self.overlap_rate)
+ current_chunk.extend(current_chunk[-overlap_size:])
+
+ chunks.append(" ".join(current_chunk))
+ current_chunk = [word]
+ current_token_count = word_tokens
+
+ if current_chunk:
+ chunks.append(" ".join(current_chunk))
+
+ return chunks
+
+ def filter_content(self, html: str, ignore_cache: bool = False) -> List[str]:
+ if not html or not isinstance(html, str):
+ return []
+
+ if self.logger:
+ self.logger.info(
+ "Starting LLM content filtering process",
+ tag="LLM",
+ params={"provider": self.provider},
+ colors={"provider": Fore.CYAN}
+ )
+
+ # Cache handling
+ cache_dir = Path(get_home_folder()) / "llm_cache" / "content_filter"
+ cache_dir.mkdir(parents=True, exist_ok=True)
+ cache_key = self._get_cache_key(html, self.instruction or "")
+ cache_file = cache_dir / f"{cache_key}.json"
+
+ if not ignore_cache and cache_file.exists():
+ if self.logger:
+ self.logger.info("Found cached result", tag="CACHE")
+ try:
+ with cache_file.open('r') as f:
+ cached_data = json.load(f)
+ usage = TokenUsage(**cached_data['usage'])
+ self.usages.append(usage)
+ self.total_usage.completion_tokens += usage.completion_tokens
+ self.total_usage.prompt_tokens += usage.prompt_tokens
+ self.total_usage.total_tokens += usage.total_tokens
+ return cached_data['blocks']
+ except Exception as e:
+ if self.logger:
+ self.logger.error(f"Cache read error: {str(e)}", tag="CACHE")
+
+ # Split into chunks
+ html_chunks = self._merge_chunks(html)
+ if self.logger:
+ self.logger.info(
+ "Split content into {chunk_count} chunks",
+ tag="CHUNK",
+ params={"chunk_count": len(html_chunks)},
+ colors={"chunk_count": Fore.YELLOW}
+ )
+
+ extracted_content = []
+ start_time = time.time()
+
+ # Process chunks in parallel
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ futures = []
+ for i, chunk in enumerate(html_chunks):
+ if self.logger:
+ self.logger.debug(
+ "Processing chunk {chunk_num}/{total_chunks}",
+ tag="CHUNK",
+ params={
+ "chunk_num": i + 1,
+ "total_chunks": len(html_chunks)
+ }
+ )
+
+ prompt_variables = {
+ "HTML": escape_json_string(sanitize_html(chunk)),
+ "REQUEST": self.instruction or "Convert this HTML into clean, relevant markdown, removing any noise or irrelevant content."
+ }
+
+ prompt = PROMPT_FILTER_CONTENT
+ for var, value in prompt_variables.items():
+ prompt = prompt.replace("{" + var + "}", value)
+
+ future = executor.submit(
+ perform_completion_with_backoff,
+ self.provider,
+ prompt,
+ self.api_token,
+ base_url=self.api_base,
+ extra_args=self.extra_args
+ )
+ futures.append((i, future))
+
+ # Collect results in order
+ ordered_results = []
+ for i, future in sorted(futures):
+ try:
+ response = future.result()
+
+ # Track usage
+ usage = TokenUsage(
+ completion_tokens=response.usage.completion_tokens,
+ prompt_tokens=response.usage.prompt_tokens,
+ total_tokens=response.usage.total_tokens,
+ completion_tokens_details=response.usage.completion_tokens_details.__dict__
+ if response.usage.completion_tokens_details else {},
+ prompt_tokens_details=response.usage.prompt_tokens_details.__dict__
+ if response.usage.prompt_tokens_details else {},
+ )
+ self.usages.append(usage)
+ self.total_usage.completion_tokens += usage.completion_tokens
+ self.total_usage.prompt_tokens += usage.prompt_tokens
+ self.total_usage.total_tokens += usage.total_tokens
+
+ blocks = extract_xml_data(["content"], response.choices[0].message.content)["content"]
+ if blocks:
+ ordered_results.append(blocks)
+ if self.logger:
+ self.logger.success(
+ "Successfully processed chunk {chunk_num}",
+ tag="CHUNK",
+ params={"chunk_num": i + 1}
+ )
+ except Exception as e:
+ if self.logger:
+ self.logger.error(
+ "Error processing chunk {chunk_num}: {error}",
+ tag="CHUNK",
+ params={
+ "chunk_num": i + 1,
+ "error": str(e)
+ }
+ )
+
+ end_time = time.time()
+ if self.logger:
+ self.logger.success(
+ "Completed processing in {time:.2f}s",
+ tag="LLM",
+ params={"time": end_time - start_time},
+ colors={"time": Fore.YELLOW}
+ )
+
+ result = ordered_results if ordered_results else []
+
+ # Cache the final result
+ cache_data = {
+ 'blocks': result,
+ 'usage': self.total_usage.__dict__
+ }
+ with cache_file.open('w') as f:
+ json.dump(cache_data, f)
+ if self.logger:
+ self.logger.info("Cached results for future use", tag="CACHE")
+
+ return result
+
+ def show_usage(self) -> None:
+ """Print usage statistics"""
+ print("\n=== Token Usage Summary ===")
+ print(f"{'Type':<15} {'Count':>12}")
+ print("-" * 30)
+ print(f"{'Completion':<15} {self.total_usage.completion_tokens:>12,}")
+ print(f"{'Prompt':<15} {self.total_usage.prompt_tokens:>12,}")
+ print(f"{'Total':<15} {self.total_usage.total_tokens:>12,}")
+
+ if self.usages:
+ print("\n=== Usage History ===")
+ print(f"{'Request #':<10} {'Completion':>12} {'Prompt':>12} {'Total':>12}")
+ print("-" * 48)
+ for i, usage in enumerate(self.usages, 1):
+ print(
+ f"{i:<10} {usage.completion_tokens:>12,} "
+ f"{usage.prompt_tokens:>12,} {usage.total_tokens:>12,}"
+ )
\ No newline at end of file
diff --git a/crawl4ai/prompts.py b/crawl4ai/prompts.py
index 7a963e6d..abddd64f 100644
--- a/crawl4ai/prompts.py
+++ b/crawl4ai/prompts.py
@@ -202,3 +202,58 @@ Avoid Common Mistakes:
Result
Output the final list of JSON objects, wrapped in
Follow these steps...