feat: update version to 0.3.741 and enhance content filtering with heuristic strategy. Fixing the issue that when the past HTML to BM25 content filter does not have any HTML elements.

This commit is contained in:
UncleCode
2024-11-23 19:45:41 +08:00
parent d729aa7d5e
commit 829a1f7992
3 changed files with 189 additions and 9 deletions

View File

@@ -1,2 +1,2 @@
# crawl4ai/_version.py # crawl4ai/_version.py
__version__ = "0.3.74" __version__ = "0.3.741"

View File

@@ -10,6 +10,13 @@ from abc import ABC, abstractmethod
from snowballstemmer import stemmer from snowballstemmer import stemmer
# import regex
# def tokenize_text(text):
# # Regular expression to match words or CJK (Chinese, Japanese, Korean) characters
# pattern = r'\p{L}+|\p{N}+|[\p{Script=Han}\p{Script=Hiragana}\p{Script=Katakana}ー]|[\p{P}]'
# return regex.findall(pattern, text)
# from nltk.stem import PorterStemmer # from nltk.stem import PorterStemmer
# ps = PorterStemmer() # ps = PorterStemmer()
class RelevantContentFilter(ABC): class RelevantContentFilter(ABC):
@@ -57,9 +64,14 @@ class RelevantContentFilter(ABC):
query_parts = [] query_parts = []
# Title # Title
if soup.title: try:
query_parts.append(soup.title.string) title = soup.title.string
elif soup.find('h1'): if title:
query_parts.append(title)
except Exception:
pass
if soup.find('h1'):
query_parts.append(soup.find('h1').get_text()) query_parts.append(soup.find('h1').get_text())
# Meta tags # Meta tags
@@ -81,7 +93,7 @@ class RelevantContentFilter(ABC):
return ' '.join(filter(None, query_parts)) return ' '.join(filter(None, query_parts))
def extract_text_chunks(self, body: Tag) -> List[Tuple[str, str]]: def extract_text_chunks(self, body: Tag, min_word_threshold: int = None) -> List[Tuple[str, str]]:
""" """
Extracts text chunks from a BeautifulSoup body element while preserving order. Extracts text chunks from a BeautifulSoup body element while preserving order.
Returns list of tuples (text, tag_name) for classification. Returns list of tuples (text, tag_name) for classification.
@@ -155,6 +167,9 @@ class RelevantContentFilter(ABC):
if text: if text:
chunks.append((chunk_index, text, 'content', body)) chunks.append((chunk_index, text, 'content', body))
if min_word_threshold:
chunks = [chunk for chunk in chunks if len(chunk[1].split()) >= min_word_threshold]
return chunks return chunks
@@ -274,15 +289,26 @@ class BM25ContentFilter(RelevantContentFilter):
} }
self.stemmer = stemmer(language) self.stemmer = stemmer(language)
def filter_content(self, html: str) -> List[str]: def filter_content(self, html: str, min_word_threshold: int = None) -> List[str]:
"""Implements content filtering using BM25 algorithm with priority tag handling""" """Implements content filtering using BM25 algorithm with priority tag handling"""
if not html or not isinstance(html, str): if not html or not isinstance(html, str):
return [] return []
soup = BeautifulSoup(html, 'lxml') soup = BeautifulSoup(html, 'lxml')
# Check if body is present
if not soup.body:
# Wrap in body tag if missing
soup = BeautifulSoup(f'<body>{html}</body>', 'lxml')
body = soup.find('body') body = soup.find('body')
query = self.extract_page_query(soup.find('head'), body)
candidates = self.extract_text_chunks(body) query = self.extract_page_query(soup, body)
if not query:
return []
# return [self.clean_element(soup)]
candidates = self.extract_text_chunks(body, min_word_threshold)
if not candidates: if not candidates:
return [] return []
@@ -299,6 +325,10 @@ class BM25ContentFilter(RelevantContentFilter):
for _, chunk, _, _ in candidates] for _, chunk, _, _ in candidates]
tokenized_query = [self.stemmer.stemWord(word) for word in query.lower().split()] tokenized_query = [self.stemmer.stemWord(word) for word in query.lower().split()]
# tokenized_corpus = [[self.stemmer.stemWord(word) for word in tokenize_text(chunk.lower())]
# for _, chunk, _, _ in candidates]
# tokenized_query = [self.stemmer.stemWord(word) for word in tokenize_text(query.lower())]
# Clean from stop words and noise # Clean from stop words and noise
tokenized_corpus = [clean_tokens(tokens) for tokens in tokenized_corpus] tokenized_corpus = [clean_tokens(tokens) for tokens in tokenized_corpus]
tokenized_query = clean_tokens(tokenized_query) tokenized_query = clean_tokens(tokenized_query)
@@ -326,3 +356,147 @@ class BM25ContentFilter(RelevantContentFilter):
selected_candidates.sort(key=lambda x: x[0]) selected_candidates.sort(key=lambda x: x[0])
return [self.clean_element(tag) for _, _, tag in selected_candidates] return [self.clean_element(tag) for _, _, tag in selected_candidates]
class HeuristicContentFilter(RelevantContentFilter):
def __init__(self):
super().__init__()
# Weights for different heuristics
self.tag_weights = {
'article': 10,
'main': 8,
'section': 5,
'div': 3,
'p': 2,
'pre': 2,
'code': 2,
'blockquote': 2,
'li': 1,
'span': 1,
}
self.max_depth = 5 # Maximum depth from body to consider
def filter_content(self, html: str) -> List[str]:
"""Implements heuristic content filtering without relying on a query."""
if not html or not isinstance(html, str):
return []
soup = BeautifulSoup(html, 'lxml')
# Ensure there is a body tag
if not soup.body:
soup = BeautifulSoup(f'<body>{html}</body>', 'lxml')
body = soup.body
# Extract candidate text chunks
candidates = self.extract_text_chunks(body)
if not candidates:
return []
# Score each candidate
scored_candidates = []
for index, text, tag_type, tag in candidates:
score = self.score_element(tag, text)
if score > 0:
scored_candidates.append((score, index, text, tag))
# Sort candidates by score and then by document order
scored_candidates.sort(key=lambda x: (-x[0], x[1]))
# Extract the top candidates (e.g., top 5)
top_candidates = scored_candidates[:5] # Adjust the number as needed
# Sort the top candidates back to their original document order
top_candidates.sort(key=lambda x: x[1])
# Clean and return the content
return [self.clean_element(tag) for _, _, _, tag in top_candidates]
def score_element(self, tag: Tag, text: str) -> float:
"""Compute a score for an element based on heuristics."""
if not text or not tag:
return 0
# Exclude unwanted tags
if self.is_excluded(tag):
return 0
# Text density
text_length = len(text.strip())
html_length = len(str(tag))
text_density = text_length / html_length if html_length > 0 else 0
# Link density
link_text_length = sum(len(a.get_text().strip()) for a in tag.find_all('a'))
link_density = link_text_length / text_length if text_length > 0 else 0
# Tag weight
tag_weight = self.tag_weights.get(tag.name, 1)
# Depth factor (prefer elements closer to the body tag)
depth = self.get_depth(tag)
depth_weight = max(self.max_depth - depth, 1) / self.max_depth
# Compute the final score
score = (text_density * tag_weight * depth_weight) / (1 + link_density)
return score
def get_depth(self, tag: Tag) -> int:
"""Compute the depth of the tag from the body tag."""
depth = 0
current = tag
while current and current != current.parent and current.name != 'body':
current = current.parent
depth += 1
return depth
def extract_text_chunks(self, body: Tag) -> List[Tuple[int, str, str, Tag]]:
"""
Extracts text chunks from the body element while preserving order.
Returns list of tuples (index, text, tag_type, tag) for scoring.
"""
chunks = []
index = 0
def traverse(element):
nonlocal index
if isinstance(element, NavigableString):
return
if not isinstance(element, Tag):
return
if self.is_excluded(element):
return
# Only consider included tags
if element.name in self.included_tags:
text = element.get_text(separator=' ', strip=True)
if len(text.split()) >= self.min_word_count:
tag_type = 'header' if element.name in self.header_tags else 'content'
chunks.append((index, text, tag_type, element))
index += 1
# Do not traverse children of this element to prevent duplication
return
for child in element.children:
traverse(child)
traverse(body)
return chunks
def is_excluded(self, tag: Tag) -> bool:
"""Determine if a tag should be excluded based on heuristics."""
if tag.name in self.excluded_tags:
return True
class_id = ' '.join(filter(None, [
' '.join(tag.get('class', [])),
tag.get('id', '')
]))
if self.negative_patterns.search(class_id):
return True
# Exclude tags with high link density (e.g., navigation menus)
text = tag.get_text(separator=' ', strip=True)
link_text_length = sum(len(a.get_text(strip=True)) for a in tag.find_all('a'))
text_length = len(text)
if text_length > 0 and (link_text_length / text_length) > 0.5:
return True
return False

View File

@@ -9,7 +9,7 @@ from bs4 import element, NavigableString, Comment
from urllib.parse import urljoin from urllib.parse import urljoin
from requests.exceptions import InvalidSchema from requests.exceptions import InvalidSchema
# from .content_cleaning_strategy import ContentCleaningStrategy # from .content_cleaning_strategy import ContentCleaningStrategy
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerationStrategy from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerationStrategy
from .models import MarkdownGenerationResult from .models import MarkdownGenerationResult
from .utils import ( from .utils import (
@@ -129,6 +129,12 @@ class WebScrapingStrategy(ContentScrapingStrategy):
params={"error": str(e)} params={"error": str(e)}
) )
markdown_generator = None markdown_generator = None
return {
'markdown': f"Error using new markdown generation strategy: {str(e)}",
'fit_markdown': "Set flag 'fit_markdown' to True to get cleaned HTML content.",
'fit_html': "Set flag 'fit_markdown' to True to get cleaned HTML content.",
'markdown_v2': None
}
# Legacy method # Legacy method
h = CustomHTML2Text() h = CustomHTML2Text()