From 829a1f7992703064084826e0ebfeed819988c6e7 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Sat, 23 Nov 2024 19:45:41 +0800 Subject: [PATCH] feat: update version to 0.3.741 and enhance content filtering with heuristic strategy. Fixing the issue that when the past HTML to BM25 content filter does not have any HTML elements. --- crawl4ai/__version__.py | 2 +- crawl4ai/content_filter_strategy.py | 188 +++++++++++++++++++++++++- crawl4ai/content_scraping_strategy.py | 8 +- 3 files changed, 189 insertions(+), 9 deletions(-) diff --git a/crawl4ai/__version__.py b/crawl4ai/__version__.py index 65ee6e73..05bfd336 100644 --- a/crawl4ai/__version__.py +++ b/crawl4ai/__version__.py @@ -1,2 +1,2 @@ # crawl4ai/_version.py -__version__ = "0.3.74" \ No newline at end of file +__version__ = "0.3.741" \ No newline at end of file diff --git a/crawl4ai/content_filter_strategy.py b/crawl4ai/content_filter_strategy.py index 88216f7f..e6891a3f 100644 --- a/crawl4ai/content_filter_strategy.py +++ b/crawl4ai/content_filter_strategy.py @@ -10,6 +10,13 @@ from abc import ABC, abstractmethod from snowballstemmer import stemmer + +# import regex +# def tokenize_text(text): +# # Regular expression to match words or CJK (Chinese, Japanese, Korean) characters +# pattern = r'\p{L}+|\p{N}+|[\p{Script=Han}\p{Script=Hiragana}\p{Script=Katakana}ー]|[\p{P}]' +# return regex.findall(pattern, text) + # from nltk.stem import PorterStemmer # ps = PorterStemmer() class RelevantContentFilter(ABC): @@ -57,9 +64,14 @@ class RelevantContentFilter(ABC): query_parts = [] # Title - if soup.title: - query_parts.append(soup.title.string) - elif soup.find('h1'): + try: + title = soup.title.string + if title: + query_parts.append(title) + except Exception: + pass + + if soup.find('h1'): query_parts.append(soup.find('h1').get_text()) # Meta tags @@ -81,7 +93,7 @@ class RelevantContentFilter(ABC): return ' '.join(filter(None, query_parts)) - def extract_text_chunks(self, body: Tag) -> List[Tuple[str, str]]: + def extract_text_chunks(self, body: Tag, min_word_threshold: int = None) -> List[Tuple[str, str]]: """ Extracts text chunks from a BeautifulSoup body element while preserving order. Returns list of tuples (text, tag_name) for classification. @@ -155,6 +167,9 @@ class RelevantContentFilter(ABC): if text: chunks.append((chunk_index, text, 'content', body)) + if min_word_threshold: + chunks = [chunk for chunk in chunks if len(chunk[1].split()) >= min_word_threshold] + return chunks @@ -274,15 +289,26 @@ class BM25ContentFilter(RelevantContentFilter): } self.stemmer = stemmer(language) - def filter_content(self, html: str) -> List[str]: + def filter_content(self, html: str, min_word_threshold: int = None) -> List[str]: """Implements content filtering using BM25 algorithm with priority tag handling""" if not html or not isinstance(html, str): return [] soup = BeautifulSoup(html, 'lxml') + + # Check if body is present + if not soup.body: + # Wrap in body tag if missing + soup = BeautifulSoup(f'{html}', 'lxml') body = soup.find('body') - query = self.extract_page_query(soup.find('head'), body) - candidates = self.extract_text_chunks(body) + + query = self.extract_page_query(soup, body) + + if not query: + return [] + # return [self.clean_element(soup)] + + candidates = self.extract_text_chunks(body, min_word_threshold) if not candidates: return [] @@ -299,6 +325,10 @@ class BM25ContentFilter(RelevantContentFilter): for _, chunk, _, _ in candidates] tokenized_query = [self.stemmer.stemWord(word) for word in query.lower().split()] + # tokenized_corpus = [[self.stemmer.stemWord(word) for word in tokenize_text(chunk.lower())] + # for _, chunk, _, _ in candidates] + # tokenized_query = [self.stemmer.stemWord(word) for word in tokenize_text(query.lower())] + # Clean from stop words and noise tokenized_corpus = [clean_tokens(tokens) for tokens in tokenized_corpus] tokenized_query = clean_tokens(tokenized_query) @@ -326,3 +356,147 @@ class BM25ContentFilter(RelevantContentFilter): selected_candidates.sort(key=lambda x: x[0]) return [self.clean_element(tag) for _, _, tag in selected_candidates] + + +class HeuristicContentFilter(RelevantContentFilter): + def __init__(self): + super().__init__() + # Weights for different heuristics + self.tag_weights = { + 'article': 10, + 'main': 8, + 'section': 5, + 'div': 3, + 'p': 2, + 'pre': 2, + 'code': 2, + 'blockquote': 2, + 'li': 1, + 'span': 1, + } + self.max_depth = 5 # Maximum depth from body to consider + + def filter_content(self, html: str) -> List[str]: + """Implements heuristic content filtering without relying on a query.""" + if not html or not isinstance(html, str): + return [] + + soup = BeautifulSoup(html, 'lxml') + + # Ensure there is a body tag + if not soup.body: + soup = BeautifulSoup(f'{html}', 'lxml') + body = soup.body + + # Extract candidate text chunks + candidates = self.extract_text_chunks(body) + + if not candidates: + return [] + + # Score each candidate + scored_candidates = [] + for index, text, tag_type, tag in candidates: + score = self.score_element(tag, text) + if score > 0: + scored_candidates.append((score, index, text, tag)) + + # Sort candidates by score and then by document order + scored_candidates.sort(key=lambda x: (-x[0], x[1])) + + # Extract the top candidates (e.g., top 5) + top_candidates = scored_candidates[:5] # Adjust the number as needed + + # Sort the top candidates back to their original document order + top_candidates.sort(key=lambda x: x[1]) + + # Clean and return the content + return [self.clean_element(tag) for _, _, _, tag in top_candidates] + + def score_element(self, tag: Tag, text: str) -> float: + """Compute a score for an element based on heuristics.""" + if not text or not tag: + return 0 + + # Exclude unwanted tags + if self.is_excluded(tag): + return 0 + + # Text density + text_length = len(text.strip()) + html_length = len(str(tag)) + text_density = text_length / html_length if html_length > 0 else 0 + + # Link density + link_text_length = sum(len(a.get_text().strip()) for a in tag.find_all('a')) + link_density = link_text_length / text_length if text_length > 0 else 0 + + # Tag weight + tag_weight = self.tag_weights.get(tag.name, 1) + + # Depth factor (prefer elements closer to the body tag) + depth = self.get_depth(tag) + depth_weight = max(self.max_depth - depth, 1) / self.max_depth + + # Compute the final score + score = (text_density * tag_weight * depth_weight) / (1 + link_density) + + return score + + def get_depth(self, tag: Tag) -> int: + """Compute the depth of the tag from the body tag.""" + depth = 0 + current = tag + while current and current != current.parent and current.name != 'body': + current = current.parent + depth += 1 + return depth + + def extract_text_chunks(self, body: Tag) -> List[Tuple[int, str, str, Tag]]: + """ + Extracts text chunks from the body element while preserving order. + Returns list of tuples (index, text, tag_type, tag) for scoring. + """ + chunks = [] + index = 0 + + def traverse(element): + nonlocal index + if isinstance(element, NavigableString): + return + if not isinstance(element, Tag): + return + if self.is_excluded(element): + return + # Only consider included tags + if element.name in self.included_tags: + text = element.get_text(separator=' ', strip=True) + if len(text.split()) >= self.min_word_count: + tag_type = 'header' if element.name in self.header_tags else 'content' + chunks.append((index, text, tag_type, element)) + index += 1 + # Do not traverse children of this element to prevent duplication + return + for child in element.children: + traverse(child) + + traverse(body) + return chunks + + def is_excluded(self, tag: Tag) -> bool: + """Determine if a tag should be excluded based on heuristics.""" + if tag.name in self.excluded_tags: + return True + class_id = ' '.join(filter(None, [ + ' '.join(tag.get('class', [])), + tag.get('id', '') + ])) + if self.negative_patterns.search(class_id): + return True + # Exclude tags with high link density (e.g., navigation menus) + text = tag.get_text(separator=' ', strip=True) + link_text_length = sum(len(a.get_text(strip=True)) for a in tag.find_all('a')) + text_length = len(text) + if text_length > 0 and (link_text_length / text_length) > 0.5: + return True + return False diff --git a/crawl4ai/content_scraping_strategy.py b/crawl4ai/content_scraping_strategy.py index 70a43240..ea6a2ef8 100644 --- a/crawl4ai/content_scraping_strategy.py +++ b/crawl4ai/content_scraping_strategy.py @@ -9,7 +9,7 @@ from bs4 import element, NavigableString, Comment from urllib.parse import urljoin from requests.exceptions import InvalidSchema # from .content_cleaning_strategy import ContentCleaningStrategy -from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter +from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerationStrategy from .models import MarkdownGenerationResult from .utils import ( @@ -129,6 +129,12 @@ class WebScrapingStrategy(ContentScrapingStrategy): params={"error": str(e)} ) markdown_generator = None + return { + 'markdown': f"Error using new markdown generation strategy: {str(e)}", + 'fit_markdown': "Set flag 'fit_markdown' to True to get cleaned HTML content.", + 'fit_html': "Set flag 'fit_markdown' to True to get cleaned HTML content.", + 'markdown_v2': None + } # Legacy method h = CustomHTML2Text()