diff --git a/crawl4ai/content_scraping_strategy.py b/crawl4ai/content_scraping_strategy.py index de8894b7..970c40f0 100644 --- a/crawl4ai/content_scraping_strategy.py +++ b/crawl4ai/content_scraping_strategy.py @@ -6,10 +6,11 @@ from concurrent.futures import ThreadPoolExecutor import asyncio, requests, re, os from .config import * from bs4 import element, NavigableString, Comment +from bs4 import PageElement, Tag from urllib.parse import urljoin from requests.exceptions import InvalidSchema # from .content_cleaning_strategy import ContentCleaningStrategy -from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter, PruningContentFilter +from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator from .models import MarkdownGenerationResult from .utils import ( @@ -80,45 +81,21 @@ class WebScrapingStrategy(ContentScrapingStrategy): async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: return await asyncio.to_thread(self._get_content_of_website_optimized, url, html, **kwargs) - def _generate_markdown_content(self, cleaned_html: str, html: str, url: str, success: bool, **kwargs) -> Dict[str, Any]: - """Generate markdown content using either new strategy or legacy method. - - Args: - cleaned_html: Sanitized HTML content - html: Original HTML content - url: Base URL of the page - success: Whether scraping was successful - **kwargs: Additional options including: - - markdown_generator: Optional[MarkdownGenerationStrategy] - - html2text: Dict[str, Any] options for HTML2Text - - content_filter: Optional[RelevantContentFilter] - - fit_markdown: bool - - fit_markdown_user_query: Optional[str] - - fit_markdown_bm25_threshold: float - - Returns: - Dict containing markdown content in various formats - """ markdown_generator: Optional[MarkdownGenerationStrategy] = kwargs.get('markdown_generator', DefaultMarkdownGenerator()) if markdown_generator: try: if kwargs.get('fit_markdown', False) and not markdown_generator.content_filter: - markdown_generator.content_filter = PruningContentFilter( - threshold_type=kwargs.get('fit_markdown_treshold_type', 'fixed'), - threshold=kwargs.get('fit_markdown_treshold', 0.48), - min_word_threshold=kwargs.get('fit_markdown_min_word_threshold', ), + markdown_generator.content_filter = BM25ContentFilter( + user_query=kwargs.get('fit_markdown_user_query', None), + bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0) ) - # markdown_generator.content_filter = BM25ContentFilter( - # user_query=kwargs.get('fit_markdown_user_query', None), - # bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0) - # ) markdown_result: MarkdownGenerationResult = markdown_generator.generate_markdown( cleaned_html=cleaned_html, @@ -182,13 +159,335 @@ class WebScrapingStrategy(ContentScrapingStrategy): 'markdown_v2' : markdown_v2 } + def flatten_nested_elements(self, node): + if isinstance(node, NavigableString): + return node + if len(node.contents) == 1 and isinstance(node.contents[0], Tag) and node.contents[0].name == node.name: + return self.flatten_nested_elements(node.contents[0]) + node.contents = [self.flatten_nested_elements(child) for child in node.contents] + return node + + def find_closest_parent_with_useful_text(self, tag, **kwargs): + image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) + current_tag = tag + while current_tag: + current_tag = current_tag.parent + # Get the text content of the parent tag + if current_tag: + text_content = current_tag.get_text(separator=' ',strip=True) + # Check if the text content has at least word_count_threshold + if len(text_content.split()) >= image_description_min_word_threshold: + return text_content + return None + + def remove_unwanted_attributes(self, element, important_attrs, keep_data_attributes=False): + attrs_to_remove = [] + for attr in element.attrs: + if attr not in important_attrs: + if keep_data_attributes: + if not attr.startswith('data-'): + attrs_to_remove.append(attr) + else: + attrs_to_remove.append(attr) + + for attr in attrs_to_remove: + del element[attr] + + def process_image(self, img, url, index, total_images, **kwargs): + parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w') + if ' ' in u else None} + for u in [f"http{p}" for p in s.split("http") if p]] + + # Constants for checks + classes_to_check = frozenset(['button', 'icon', 'logo']) + tags_to_check = frozenset(['button', 'input']) + + # Pre-fetch commonly used attributes + style = img.get('style', '') + alt = img.get('alt', '') + src = img.get('src', '') + data_src = img.get('data-src', '') + width = img.get('width') + height = img.get('height') + parent = img.parent + parent_classes = parent.get('class', []) + + # Quick validation checks + if ('display:none' in style or + parent.name in tags_to_check or + any(c in cls for c in parent_classes for cls in classes_to_check) or + any(c in src for c in classes_to_check) or + any(c in alt for c in classes_to_check)): + return None + + # Quick score calculation + score = 0 + if width and width.isdigit(): + width_val = int(width) + score += 1 if width_val > 150 else 0 + if height and height.isdigit(): + height_val = int(height) + score += 1 if height_val > 150 else 0 + if alt: + score += 1 + score += index/total_images < 0.5 + + image_format = '' + if "data:image/" in src: + image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0] + else: + image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0] + + if image_format in ('jpg', 'png', 'webp', 'avif'): + score += 1 + + if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD): + return None + + # Use set for deduplication + unique_urls = set() + image_variants = [] + + # Generate a unique group ID for this set of variants + group_id = index + + # Base image info template + image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) + base_info = { + 'alt': alt, + 'desc': self.find_closest_parent_with_useful_text(img, **kwargs), + 'score': score, + 'type': 'image', + 'group_id': group_id # Group ID for this set of variants + } + + # Inline function for adding variants + def add_variant(src, width=None): + if src and not src.startswith('data:') and src not in unique_urls: + unique_urls.add(src) + image_variants.append({**base_info, 'src': src, 'width': width}) + + # Process all sources + add_variant(src) + add_variant(data_src) + + # Handle srcset and data-srcset in one pass + for attr in ('srcset', 'data-srcset'): + if value := img.get(attr): + for source in parse_srcset(value): + add_variant(source['url'], source['width']) + + # Quick picture element check + if picture := img.find_parent('picture'): + for source in picture.find_all('source'): + if srcset := source.get('srcset'): + for src in parse_srcset(srcset): + add_variant(src['url'], src['width']) + + # Framework-specific attributes in one pass + for attr, value in img.attrs.items(): + if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value: + add_variant(value) + + return image_variants if image_variants else None + + + def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]: + media = {'images': [], 'videos': [], 'audios': []} + internal_links_dict = {} + external_links_dict = {} + self._process_element( + url, + element, + media, + internal_links_dict, + external_links_dict, + **kwargs + ) + return { + 'media': media, + 'internal_links_dict': internal_links_dict, + 'external_links_dict': external_links_dict + } + + def _process_element(self, url, element: PageElement, media: Dict[str, Any], internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool: + try: + if isinstance(element, NavigableString): + if isinstance(element, Comment): + element.extract() + return False + + # if element.name == 'img': + # process_image(element, url, 0, 1) + # return True + + if element.name in ['script', 'style', 'link', 'meta', 'noscript']: + element.decompose() + return False + + keep_element = False + + exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', []) + exclude_social_media_domains = list(set(exclude_social_media_domains)) + + try: + if element.name == 'a' and element.get('href'): + href = element.get('href', '').strip() + if not href: # Skip empty hrefs + return False + + url_base = url.split('/')[2] + + # Normalize the URL + try: + normalized_href = normalize_url(href, url) + except ValueError as e: + # logging.warning(f"Invalid URL format: {href}, Error: {str(e)}") + return False + + link_data = { + 'href': normalized_href, + 'text': element.get_text().strip(), + 'title': element.get('title', '').strip() + } + + # Check for duplicates and add to appropriate dictionary + is_external = is_external_url(normalized_href, url_base) + if is_external: + if normalized_href not in external_links_dict: + external_links_dict[normalized_href] = link_data + else: + if normalized_href not in internal_links_dict: + internal_links_dict[normalized_href] = link_data + + keep_element = True + + # Handle external link exclusions + if is_external: + if kwargs.get('exclude_external_links', False): + element.decompose() + return False + elif kwargs.get('exclude_social_media_links', False): + if any(domain in normalized_href.lower() for domain in exclude_social_media_domains): + element.decompose() + return False + elif kwargs.get('exclude_domains', []): + if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])): + element.decompose() + return False + + except Exception as e: + raise Exception(f"Error processing links: {str(e)}") + + try: + if element.name == 'img': + potential_sources = ['src', 'data-src', 'srcset' 'data-lazy-src', 'data-original'] + src = element.get('src', '') + while not src and potential_sources: + src = element.get(potential_sources.pop(0), '') + if not src: + element.decompose() + return False + + # If it is srcset pick up the first image + if 'srcset' in element.attrs: + src = element.attrs['srcset'].split(',')[0].split(' ')[0] + + # Check flag if we should remove external images + if kwargs.get('exclude_external_images', False): + src_url_base = src.split('/')[2] + url_base = url.split('/')[2] + if url_base not in src_url_base: + element.decompose() + return False + + if not kwargs.get('exclude_external_images', False) and kwargs.get('exclude_social_media_links', False): + src_url_base = src.split('/')[2] + url_base = url.split('/')[2] + if any(domain in src for domain in exclude_social_media_domains): + element.decompose() + return False + + # Handle exclude domains + if kwargs.get('exclude_domains', []): + if any(domain in src for domain in kwargs.get('exclude_domains', [])): + element.decompose() + return False + + return True # Always keep image elements + except Exception as e: + raise "Error processing images" + + + # Check if flag to remove all forms is set + if kwargs.get('remove_forms', False) and element.name == 'form': + element.decompose() + return False + + if element.name in ['video', 'audio']: + media[f"{element.name}s"].append({ + 'src': element.get('src'), + 'alt': element.get('alt'), + 'type': element.name, + 'description': self.find_closest_parent_with_useful_text(element, **kwargs) + }) + source_tags = element.find_all('source') + for source_tag in source_tags: + media[f"{element.name}s"].append({ + 'src': source_tag.get('src'), + 'alt': element.get('alt'), + 'type': element.name, + 'description': self.find_closest_parent_with_useful_text(element, **kwargs) + }) + return True # Always keep video and audio elements + + if element.name in ONLY_TEXT_ELIGIBLE_TAGS: + if kwargs.get('only_text', False): + element.replace_with(element.get_text()) + + try: + self.remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False)) + except Exception as e: + # print('Error removing unwanted attributes:', str(e)) + self._log('error', + message="Error removing unwanted attributes: {error}", + tag="SCRAPE", + params={"error": str(e)} + ) + # Process children + for child in list(element.children): + if isinstance(child, NavigableString) and not isinstance(child, Comment): + if len(child.strip()) > 0: + keep_element = True + else: + if self._process_element(url, child, media, internal_links_dict, external_links_dict, **kwargs): + keep_element = True + + + # Check word count + word_count_threshold = kwargs.get('word_count_threshold', MIN_WORD_THRESHOLD) + if not keep_element: + word_count = len(element.get_text(strip=True).split()) + keep_element = word_count >= word_count_threshold + + if not keep_element: + element.decompose() + + return keep_element + except Exception as e: + # print('Error processing element:', str(e)) + self._log('error', + message="Error processing element: {error}", + tag="SCRAPE", + params={"error": str(e)} + ) + return False def _get_content_of_website_optimized(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: success = True if not html: return None - # soup = BeautifulSoup(html, 'html.parser') soup = BeautifulSoup(html, 'lxml') body = soup.body @@ -200,15 +499,24 @@ class WebScrapingStrategy(ContentScrapingStrategy): tag="SCRAPE", params={"error": str(e)} ) - # print('Error extracting metadata:', str(e)) meta = {} + # Handle tag-based removal first - faster than CSS selection + excluded_tags = set(kwargs.get('excluded_tags', []) or []) + if excluded_tags: + for element in body.find_all(lambda tag: tag.name in excluded_tags): + element.extract() - image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) - - for tag in kwargs.get('excluded_tags', []) or []: - for el in body.select(tag): - el.decompose() + # Handle CSS selector-based removal + excluded_selector = kwargs.get('excluded_selector', '') + if excluded_selector: + is_single_selector = ',' not in excluded_selector and ' ' not in excluded_selector + if is_single_selector: + while element := body.select_one(excluded_selector): + element.extract() + else: + for element in body.select(excluded_selector): + element.extract() if css_selector: selected_elements = body.select(css_selector) @@ -227,384 +535,17 @@ class WebScrapingStrategy(ContentScrapingStrategy): for el in selected_elements: body.append(el) - links = {'internal': [], 'external': []} - media = {'images': [], 'videos': [], 'audios': []} - internal_links_dict = {} - external_links_dict = {} - - # Extract meaningful text for media files from closest parent - def find_closest_parent_with_useful_text(tag): - current_tag = tag - while current_tag: - current_tag = current_tag.parent - # Get the text content of the parent tag - if current_tag: - text_content = current_tag.get_text(separator=' ',strip=True) - # Check if the text content has at least word_count_threshold - if len(text_content.split()) >= image_description_min_word_threshold: - return text_content - return None - - def process_image_old(img, url, index, total_images): - - - #Check if an image has valid display and inside undesired html elements - def is_valid_image(img, parent, parent_classes): - style = img.get('style', '') - src = img.get('src', '') - classes_to_check = ['button', 'icon', 'logo'] - tags_to_check = ['button', 'input'] - return all([ - 'display:none' not in style, - src, - not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check), - parent.name not in tags_to_check - ]) - - #Score an image for it's usefulness - def score_image_for_usefulness(img, base_url, index, images_count): - image_height = img.get('height') - height_value, height_unit = parse_dimension(image_height) - image_width = img.get('width') - width_value, width_unit = parse_dimension(image_width) - image_size = 0 #int(fetch_image_file_size(img,base_url) or 0) - image_src = img.get('src','') - if "data:image/" in image_src: - image_format = image_src.split(',')[0].split(';')[0].split('/')[1] - else: - image_format = os.path.splitext(img.get('src',''))[1].lower() - # Remove . from format - image_format = image_format.strip('.').split('?')[0] - score = 0 - if height_value: - if height_unit == 'px' and height_value > 150: - score += 1 - if height_unit in ['%','vh','vmin','vmax'] and height_value >30: - score += 1 - if width_value: - if width_unit == 'px' and width_value > 150: - score += 1 - if width_unit in ['%','vh','vmin','vmax'] and width_value >30: - score += 1 - if image_size > 10000: - score += 1 - if img.get('alt') != '': - score+=1 - if any(image_format==format for format in ['jpg','png','webp']): - score+=1 - if index/images_count<0.5: - score+=1 - return score - - if not is_valid_image(img, img.parent, img.parent.get('class', [])): - return None - - score = score_image_for_usefulness(img, url, index, total_images) - if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD): - return None - - base_result = { - 'src': img.get('src', ''), - 'data-src': img.get('data-src', ''), - 'alt': img.get('alt', ''), - 'desc': find_closest_parent_with_useful_text(img), - 'score': score, - 'type': 'image' - } - - sources = [] - srcset = img.get('srcset', '') - if srcset: - sources = parse_srcset(srcset) - if sources: - return [dict(base_result, src=source['url'], width=source['width']) - for source in sources] - - return [base_result] # Always return a list - - def process_image(img, url, index, total_images): - parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w') - if ' ' in u else None} - for u in [f"http{p}" for p in s.split("http") if p]] - - # Constants for checks - classes_to_check = frozenset(['button', 'icon', 'logo']) - tags_to_check = frozenset(['button', 'input']) - - # Pre-fetch commonly used attributes - style = img.get('style', '') - alt = img.get('alt', '') - src = img.get('src', '') - data_src = img.get('data-src', '') - width = img.get('width') - height = img.get('height') - parent = img.parent - parent_classes = parent.get('class', []) - - # Quick validation checks - if ('display:none' in style or - parent.name in tags_to_check or - any(c in cls for c in parent_classes for cls in classes_to_check) or - any(c in src for c in classes_to_check) or - any(c in alt for c in classes_to_check)): - return None - - # Quick score calculation - score = 0 - if width and width.isdigit(): - width_val = int(width) - score += 1 if width_val > 150 else 0 - if height and height.isdigit(): - height_val = int(height) - score += 1 if height_val > 150 else 0 - if alt: - score += 1 - score += index/total_images < 0.5 - - image_format = '' - if "data:image/" in src: - image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0] - else: - image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0] - - if image_format in ('jpg', 'png', 'webp', 'avif'): - score += 1 - - if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD): - return None - - # Use set for deduplication - unique_urls = set() - image_variants = [] - - # Generate a unique group ID for this set of variants - group_id = index - - # Base image info template - base_info = { - 'alt': alt, - 'desc': find_closest_parent_with_useful_text(img), - 'score': score, - 'type': 'image', - 'group_id': group_id # Group ID for this set of variants - } - - # Inline function for adding variants - def add_variant(src, width=None): - if src and not src.startswith('data:') and src not in unique_urls: - unique_urls.add(src) - image_variants.append({**base_info, 'src': src, 'width': width}) - - # Process all sources - add_variant(src) - add_variant(data_src) - - # Handle srcset and data-srcset in one pass - for attr in ('srcset', 'data-srcset'): - if value := img.get(attr): - for source in parse_srcset(value): - add_variant(source['url'], source['width']) - - # Quick picture element check - if picture := img.find_parent('picture'): - for source in picture.find_all('source'): - if srcset := source.get('srcset'): - for src in parse_srcset(srcset): - add_variant(src['url'], src['width']) - - # Framework-specific attributes in one pass - for attr, value in img.attrs.items(): - if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value: - add_variant(value) - - return image_variants if image_variants else None - - def remove_unwanted_attributes(element, important_attrs, keep_data_attributes=False): - attrs_to_remove = [] - for attr in element.attrs: - if attr not in important_attrs: - if keep_data_attributes: - if not attr.startswith('data-'): - attrs_to_remove.append(attr) - else: - attrs_to_remove.append(attr) - - for attr in attrs_to_remove: - del element[attr] + result_obj = self.process_element( + url, + body, + word_count_threshold = word_count_threshold, + **kwargs + ) - def process_element(element: element.PageElement) -> bool: - try: - if isinstance(element, NavigableString): - if isinstance(element, Comment): - element.extract() - return False - - # if element.name == 'img': - # process_image(element, url, 0, 1) - # return True - - if element.name in ['script', 'style', 'link', 'meta', 'noscript']: - element.decompose() - return False - - keep_element = False - - exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', []) - exclude_social_media_domains = list(set(exclude_social_media_domains)) - - try: - if element.name == 'a' and element.get('href'): - href = element.get('href', '').strip() - if not href: # Skip empty hrefs - return False - - url_base = url.split('/')[2] - - # Normalize the URL - try: - normalized_href = normalize_url(href, url) - except ValueError as e: - # logging.warning(f"Invalid URL format: {href}, Error: {str(e)}") - return False - - link_data = { - 'href': normalized_href, - 'text': element.get_text().strip(), - 'title': element.get('title', '').strip() - } - - # Check for duplicates and add to appropriate dictionary - is_external = is_external_url(normalized_href, url_base) - if is_external: - if normalized_href not in external_links_dict: - external_links_dict[normalized_href] = link_data - else: - if normalized_href not in internal_links_dict: - internal_links_dict[normalized_href] = link_data - - keep_element = True - - # Handle external link exclusions - if is_external: - if kwargs.get('exclude_external_links', False): - element.decompose() - return False - elif kwargs.get('exclude_social_media_links', False): - if any(domain in normalized_href.lower() for domain in exclude_social_media_domains): - element.decompose() - return False - elif kwargs.get('exclude_domains', []): - if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])): - element.decompose() - return False - - except Exception as e: - raise Exception(f"Error processing links: {str(e)}") - - try: - if element.name == 'img': - potential_sources = ['src', 'data-src', 'srcset' 'data-lazy-src', 'data-original'] - src = element.get('src', '') - while not src and potential_sources: - src = element.get(potential_sources.pop(0), '') - if not src: - element.decompose() - return False - - # If it is srcset pick up the first image - if 'srcset' in element.attrs: - src = element.attrs['srcset'].split(',')[0].split(' ')[0] - - # Check flag if we should remove external images - if kwargs.get('exclude_external_images', False): - src_url_base = src.split('/')[2] - url_base = url.split('/')[2] - if url_base not in src_url_base: - element.decompose() - return False - - if not kwargs.get('exclude_external_images', False) and kwargs.get('exclude_social_media_links', False): - src_url_base = src.split('/')[2] - url_base = url.split('/')[2] - if any(domain in src for domain in exclude_social_media_domains): - element.decompose() - return False - - # Handle exclude domains - if kwargs.get('exclude_domains', []): - if any(domain in src for domain in kwargs.get('exclude_domains', [])): - element.decompose() - return False - - return True # Always keep image elements - except Exception as e: - raise "Error processing images" - - - # Check if flag to remove all forms is set - if kwargs.get('remove_forms', False) and element.name == 'form': - element.decompose() - return False - - if element.name in ['video', 'audio']: - media[f"{element.name}s"].append({ - 'src': element.get('src'), - 'alt': element.get('alt'), - 'type': element.name, - 'description': find_closest_parent_with_useful_text(element) - }) - source_tags = element.find_all('source') - for source_tag in source_tags: - media[f"{element.name}s"].append({ - 'src': source_tag.get('src'), - 'alt': element.get('alt'), - 'type': element.name, - 'description': find_closest_parent_with_useful_text(element) - }) - return True # Always keep video and audio elements - - if element.name in ONLY_TEXT_ELIGIBLE_TAGS: - if kwargs.get('only_text', False): - element.replace_with(element.get_text()) - - try: - remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False)) - except Exception as e: - # print('Error removing unwanted attributes:', str(e)) - self._log('error', - message="Error removing unwanted attributes: {error}", - tag="SCRAPE", - params={"error": str(e)} - ) - # Process children - for child in list(element.children): - if isinstance(child, NavigableString) and not isinstance(child, Comment): - if len(child.strip()) > 0: - keep_element = True - else: - if process_element(child): - keep_element = True - - - # Check word count - if not keep_element: - word_count = len(element.get_text(strip=True).split()) - keep_element = word_count >= word_count_threshold - - if not keep_element: - element.decompose() - - return keep_element - except Exception as e: - # print('Error processing element:', str(e)) - self._log('error', - message="Error processing element: {error}", - tag="SCRAPE", - params={"error": str(e)} - ) - return False - - process_element(body) + links = {'internal': [], 'external': []} + media = result_obj['media'] + internal_links_dict = result_obj['internal_links_dict'] + external_links_dict = result_obj['external_links_dict'] # Update the links dictionary with unique links links['internal'] = list(internal_links_dict.values()) @@ -613,23 +554,14 @@ class WebScrapingStrategy(ContentScrapingStrategy): # # Process images using ThreadPoolExecutor imgs = body.find_all('img') - # For test we use for loop instead of thread media['images'] = [ - img for result in (process_image(img, url, i, len(imgs)) + img for result in (self.process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs)) if result is not None for img in result ] - def flatten_nested_elements(node): - if isinstance(node, NavigableString): - return node - if len(node.contents) == 1 and isinstance(node.contents[0], element.Tag) and node.contents[0].name == node.name: - return flatten_nested_elements(node.contents[0]) - node.contents = [flatten_nested_elements(child) for child in node.contents] - return node - - body = flatten_nested_elements(body) + body = self.flatten_nested_elements(body) base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') for img in imgs: src = img.get('src', '') diff --git a/crawl4ai/utils.py b/crawl4ai/utils.py index 253ec079..0a9e6f56 100644 --- a/crawl4ai/utils.py +++ b/crawl4ai/utils.py @@ -22,7 +22,7 @@ import textwrap from .html2text import HTML2Text class CustomHTML2Text(HTML2Text): - def __init__(self, *args, **kwargs): + def __init__(self, *args, handle_code_in_pre=False, **kwargs): super().__init__(*args, **kwargs) self.inside_pre = False self.inside_code = False @@ -30,6 +30,7 @@ class CustomHTML2Text(HTML2Text): self.current_preserved_tag = None self.preserved_content = [] self.preserve_depth = 0 + self.handle_code_in_pre = handle_code_in_pre # Configuration options self.skip_internal_links = False @@ -50,6 +51,8 @@ class CustomHTML2Text(HTML2Text): for key, value in kwargs.items(): if key == 'preserve_tags': self.preserve_tags = set(value) + elif key == 'handle_code_in_pre': + self.handle_code_in_pre = value else: setattr(self, key, value) @@ -88,13 +91,21 @@ class CustomHTML2Text(HTML2Text): # Handle pre tags if tag == 'pre': if start: - self.o('```\n') + self.o('```\n') # Markdown code block start self.inside_pre = True else: - self.o('\n```') + self.o('\n```\n') # Markdown code block end self.inside_pre = False - # elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]: - # pass + elif tag == 'code': + if self.inside_pre and not self.handle_code_in_pre: + # Ignore code tags inside pre blocks if handle_code_in_pre is False + return + if start: + self.o('`') # Markdown inline code start + self.inside_code = True + else: + self.o('`') # Markdown inline code end + self.inside_code = False else: super().handle_tag(tag, attrs, start) @@ -103,7 +114,39 @@ class CustomHTML2Text(HTML2Text): if self.preserve_depth > 0: self.preserved_content.append(data) return + + if self.inside_pre: + # Output the raw content for pre blocks, including content inside code tags + self.o(data) # Directly output the data as-is (preserve newlines) + return + if self.inside_code: + # Inline code: no newlines allowed + self.o(data.replace('\n', ' ')) + return + + # Default behavior for other tags super().handle_data(data, entity_char) + + + # # Handle pre tags + # if tag == 'pre': + # if start: + # self.o('```\n') + # self.inside_pre = True + # else: + # self.o('\n```') + # self.inside_pre = False + # # elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]: + # # pass + # else: + # super().handle_tag(tag, attrs, start) + + # def handle_data(self, data, entity_char=False): + # """Override handle_data to capture content within preserved tags.""" + # if self.preserve_depth > 0: + # self.preserved_content.append(data) + # return + # super().handle_data(data, entity_char) class InvalidCSSSelectorError(Exception): pass diff --git a/crawl4ai/utils.scraping.py b/crawl4ai/utils.scraping.py new file mode 100644 index 00000000..e69de29b diff --git a/docs/examples/quickstart_async.py b/docs/examples/quickstart_async.py index 176b0ba7..9d97dabd 100644 --- a/docs/examples/quickstart_async.py +++ b/docs/examples/quickstart_async.py @@ -547,6 +547,7 @@ async def generate_knowledge_graph(): f.write(result.extracted_content) async def fit_markdown_remove_overlay(): + async with AsyncWebCrawler( headless=True, # Set to False to see what is happening verbose=True, @@ -560,13 +561,15 @@ async def fit_markdown_remove_overlay(): url='https://www.kidocode.com/degrees/technology', cache_mode=CacheMode.BYPASS, markdown_generator=DefaultMarkdownGenerator( - content_filter=PruningContentFilter(threshold=0.48, threshold_type="fixed", min_word_threshold=0), + content_filter=PruningContentFilter( + threshold=0.48, threshold_type="fixed", min_word_threshold=0 + ), options={ "ignore_links": True } ), # markdown_generator=DefaultMarkdownGenerator( - # content_filter=BM25ContentFilter(user_query=None, bm25_threshold=1.0), + # content_filter=BM25ContentFilter(user_query="", bm25_threshold=1.0), # options={ # "ignore_links": True # }