- Fix JsonCssExtractionStrategy._get_elements to return all matching elements instead of just one - Add robust error handling to page_need_scroll with default fallback - Improve JSON extraction strategies documentation - Refactor content scraping strategy - Update version to 0.4.247
724 lines
30 KiB
Python
724 lines
30 KiB
Python
import re # Point 1: Pre-Compile Regular Expressions
|
|
import time
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any, Optional
|
|
from bs4 import BeautifulSoup
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import asyncio, requests, re, os
|
|
from .config import *
|
|
from bs4 import element, NavigableString, Comment
|
|
from bs4 import PageElement, Tag
|
|
from urllib.parse import urljoin
|
|
from requests.exceptions import InvalidSchema
|
|
# from .content_cleaning_strategy import ContentCleaningStrategy
|
|
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter
|
|
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
|
|
from .models import MarkdownGenerationResult
|
|
from .utils import (
|
|
extract_metadata,
|
|
normalize_url,
|
|
is_external_url,
|
|
get_base_domain,
|
|
)
|
|
|
|
|
|
# Pre-compile regular expressions for Open Graph and Twitter metadata
|
|
OG_REGEX = re.compile(r'^og:')
|
|
TWITTER_REGEX = re.compile(r'^twitter:')
|
|
DIMENSION_REGEX = re.compile(r"(\d+)(\D*)")
|
|
|
|
# Function to parse image height/width value and units
|
|
def parse_dimension(dimension):
|
|
if dimension:
|
|
# match = re.match(r"(\d+)(\D*)", dimension)
|
|
match = DIMENSION_REGEX.match(dimension)
|
|
if match:
|
|
number = int(match.group(1))
|
|
unit = match.group(2) or 'px' # Default unit is 'px' if not specified
|
|
return number, unit
|
|
return None, None
|
|
|
|
# Fetch image file metadata to extract size and extension
|
|
def fetch_image_file_size(img, base_url):
|
|
#If src is relative path construct full URL, if not it may be CDN URL
|
|
img_url = urljoin(base_url,img.get('src'))
|
|
try:
|
|
response = requests.head(img_url)
|
|
if response.status_code == 200:
|
|
return response.headers.get('Content-Length',None)
|
|
else:
|
|
print(f"Failed to retrieve file size for {img_url}")
|
|
return None
|
|
except InvalidSchema as e:
|
|
return None
|
|
finally:
|
|
return
|
|
|
|
class ContentScrapingStrategy(ABC):
|
|
@abstractmethod
|
|
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
|
|
pass
|
|
|
|
class WebScrapingStrategy(ContentScrapingStrategy):
|
|
"""
|
|
Class for web content scraping. Perhaps the most important class.
|
|
|
|
How it works:
|
|
1. Extract content from HTML using BeautifulSoup.
|
|
2. Clean the extracted content using a content cleaning strategy.
|
|
3. Filter the cleaned content using a content filtering strategy.
|
|
4. Generate markdown content from the filtered content.
|
|
5. Return the markdown content.
|
|
"""
|
|
|
|
def __init__(self, logger=None):
|
|
self.logger = logger
|
|
|
|
def _log(self, level, message, tag="SCRAPE", **kwargs):
|
|
"""Helper method to safely use logger."""
|
|
if self.logger:
|
|
log_method = getattr(self.logger, level)
|
|
log_method(message=message, tag=tag, **kwargs)
|
|
|
|
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Main entry point for content scraping.
|
|
|
|
Args:
|
|
url (str): The URL of the page to scrape.
|
|
html (str): The HTML content of the page.
|
|
**kwargs: Additional keyword arguments.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys:
|
|
|
|
- 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'.
|
|
- 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'.
|
|
- 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'.
|
|
- 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown'
|
|
"""
|
|
return self._scrap(url, html, is_async=False, **kwargs)
|
|
|
|
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Main entry point for asynchronous content scraping.
|
|
|
|
Args:
|
|
url (str): The URL of the page to scrape.
|
|
html (str): The HTML content of the page.
|
|
**kwargs: Additional keyword arguments.
|
|
|
|
Returns:
|
|
Dict[str, Any]: A dictionary containing the scraped content. This dictionary contains the following keys:
|
|
|
|
- 'markdown': The generated markdown content, type is str, however soon will become MarkdownGenerationResult via 'markdown.raw_markdown'.
|
|
- 'fit_markdown': The generated markdown content with relevant content filtered, this will be removed soon and available in 'markdown.fit_markdown'.
|
|
- 'fit_html': The HTML content with relevant content filtered, this will be removed soon and available in 'markdown.fit_html'.
|
|
- 'markdown_v2': The generated markdown content with relevant content filtered, this is temporary and will be removed soon and replaced with 'markdown'
|
|
"""
|
|
return await asyncio.to_thread(self._scrap, url, html, **kwargs)
|
|
|
|
def flatten_nested_elements(self, node):
|
|
"""
|
|
Flatten nested elements in a HTML tree.
|
|
|
|
Args:
|
|
node (Tag): The root node of the HTML tree.
|
|
|
|
Returns:
|
|
Tag: The flattened HTML tree.
|
|
"""
|
|
if isinstance(node, NavigableString):
|
|
return node
|
|
if len(node.contents) == 1 and isinstance(node.contents[0], Tag) and node.contents[0].name == node.name:
|
|
return self.flatten_nested_elements(node.contents[0])
|
|
node.contents = [self.flatten_nested_elements(child) for child in node.contents]
|
|
return node
|
|
|
|
def find_closest_parent_with_useful_text(self, tag, **kwargs):
|
|
"""
|
|
Find the closest parent with useful text.
|
|
|
|
Args:
|
|
tag (Tag): The starting tag to search from.
|
|
**kwargs: Additional keyword arguments.
|
|
|
|
Returns:
|
|
Tag: The closest parent with useful text, or None if not found.
|
|
"""
|
|
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
|
|
current_tag = tag
|
|
while current_tag:
|
|
current_tag = current_tag.parent
|
|
# Get the text content of the parent tag
|
|
if current_tag:
|
|
text_content = current_tag.get_text(separator=' ',strip=True)
|
|
# Check if the text content has at least word_count_threshold
|
|
if len(text_content.split()) >= image_description_min_word_threshold:
|
|
return text_content
|
|
return None
|
|
|
|
def remove_unwanted_attributes(self, element, important_attrs, keep_data_attributes=False):
|
|
"""
|
|
Remove unwanted attributes from an HTML element.
|
|
|
|
Args:
|
|
element (Tag): The HTML element to remove attributes from.
|
|
important_attrs (list): List of important attributes to keep.
|
|
keep_data_attributes (bool): Whether to keep data attributes.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
attrs_to_remove = []
|
|
for attr in element.attrs:
|
|
if attr not in important_attrs:
|
|
if keep_data_attributes:
|
|
if not attr.startswith('data-'):
|
|
attrs_to_remove.append(attr)
|
|
else:
|
|
attrs_to_remove.append(attr)
|
|
|
|
for attr in attrs_to_remove:
|
|
del element[attr]
|
|
|
|
def process_image(self, img, url, index, total_images, **kwargs):
|
|
"""
|
|
Process an image element.
|
|
|
|
How it works:
|
|
1. Check if the image has valid display and inside undesired html elements.
|
|
2. Score an image for it's usefulness.
|
|
3. Extract image file metadata to extract size and extension.
|
|
4. Generate a dictionary with the processed image information.
|
|
5. Return the processed image information.
|
|
|
|
Args:
|
|
img (Tag): The image element to process.
|
|
url (str): The URL of the page containing the image.
|
|
index (int): The index of the image in the list of images.
|
|
total_images (int): The total number of images in the list.
|
|
**kwargs: Additional keyword arguments.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the processed image information.
|
|
"""
|
|
parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
|
|
if ' ' in u else None}
|
|
for u in [f"http{p}" for p in s.split("http") if p]]
|
|
|
|
# Constants for checks
|
|
classes_to_check = frozenset(['button', 'icon', 'logo'])
|
|
tags_to_check = frozenset(['button', 'input'])
|
|
image_formats = frozenset(['jpg', 'jpeg', 'png', 'webp', 'avif', 'gif'])
|
|
|
|
# Pre-fetch commonly used attributes
|
|
style = img.get('style', '')
|
|
alt = img.get('alt', '')
|
|
src = img.get('src', '')
|
|
data_src = img.get('data-src', '')
|
|
srcset = img.get('srcset', '')
|
|
data_srcset = img.get('data-srcset', '')
|
|
width = img.get('width')
|
|
height = img.get('height')
|
|
parent = img.parent
|
|
parent_classes = parent.get('class', [])
|
|
|
|
# Quick validation checks
|
|
if ('display:none' in style or
|
|
parent.name in tags_to_check or
|
|
any(c in cls for c in parent_classes for cls in classes_to_check) or
|
|
any(c in src for c in classes_to_check) or
|
|
any(c in alt for c in classes_to_check)):
|
|
return None
|
|
|
|
# Quick score calculation
|
|
score = 0
|
|
if width and width.isdigit():
|
|
width_val = int(width)
|
|
score += 1 if width_val > 150 else 0
|
|
if height and height.isdigit():
|
|
height_val = int(height)
|
|
score += 1 if height_val > 150 else 0
|
|
if alt:
|
|
score += 1
|
|
score += index/total_images < 0.5
|
|
|
|
# image_format = ''
|
|
# if "data:image/" in src:
|
|
# image_format = src.split(',')[0].split(';')[0].split('/')[1].split(';')[0]
|
|
# else:
|
|
# image_format = os.path.splitext(src)[1].lower().strip('.').split('?')[0]
|
|
|
|
# if image_format in ('jpg', 'png', 'webp', 'avif'):
|
|
# score += 1
|
|
|
|
|
|
# Check for image format in all possible sources
|
|
def has_image_format(url):
|
|
return any(fmt in url.lower() for fmt in image_formats)
|
|
|
|
# Score for having proper image sources
|
|
if any(has_image_format(url) for url in [src, data_src, srcset, data_srcset]):
|
|
score += 1
|
|
if srcset or data_srcset:
|
|
score += 1
|
|
if img.find_parent('picture'):
|
|
score += 1
|
|
|
|
# Detect format from any available source
|
|
detected_format = None
|
|
for url in [src, data_src, srcset, data_srcset]:
|
|
if url:
|
|
format_matches = [fmt for fmt in image_formats if fmt in url.lower()]
|
|
if format_matches:
|
|
detected_format = format_matches[0]
|
|
break
|
|
|
|
if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD):
|
|
return None
|
|
|
|
# Use set for deduplication
|
|
unique_urls = set()
|
|
image_variants = []
|
|
|
|
# Generate a unique group ID for this set of variants
|
|
group_id = index
|
|
|
|
# Base image info template
|
|
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
|
|
base_info = {
|
|
'alt': alt,
|
|
'desc': self.find_closest_parent_with_useful_text(img, **kwargs),
|
|
'score': score,
|
|
'type': 'image',
|
|
'group_id': group_id, # Group ID for this set of variants
|
|
'format': detected_format,
|
|
}
|
|
|
|
# Inline function for adding variants
|
|
def add_variant(src, width=None):
|
|
if src and not src.startswith('data:') and src not in unique_urls:
|
|
unique_urls.add(src)
|
|
image_variants.append({**base_info, 'src': src, 'width': width})
|
|
|
|
# Process all sources
|
|
add_variant(src)
|
|
add_variant(data_src)
|
|
|
|
# Handle srcset and data-srcset in one pass
|
|
for attr in ('srcset', 'data-srcset'):
|
|
if value := img.get(attr):
|
|
for source in parse_srcset(value):
|
|
add_variant(source['url'], source['width'])
|
|
|
|
# Quick picture element check
|
|
if picture := img.find_parent('picture'):
|
|
for source in picture.find_all('source'):
|
|
if srcset := source.get('srcset'):
|
|
for src in parse_srcset(srcset):
|
|
add_variant(src['url'], src['width'])
|
|
|
|
# Framework-specific attributes in one pass
|
|
for attr, value in img.attrs.items():
|
|
if attr.startswith('data-') and ('src' in attr or 'srcset' in attr) and 'http' in value:
|
|
add_variant(value)
|
|
|
|
return image_variants if image_variants else None
|
|
|
|
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Process an HTML element.
|
|
|
|
How it works:
|
|
1. Check if the element is an image, video, or audio.
|
|
2. Extract the element's attributes and content.
|
|
3. Process the element based on its type.
|
|
4. Return the processed element information.
|
|
|
|
Args:
|
|
url (str): The URL of the page containing the element.
|
|
element (Tag): The HTML element to process.
|
|
**kwargs: Additional keyword arguments.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the processed element information.
|
|
"""
|
|
media = {'images': [], 'videos': [], 'audios': []}
|
|
internal_links_dict = {}
|
|
external_links_dict = {}
|
|
self._process_element(
|
|
url,
|
|
element,
|
|
media,
|
|
internal_links_dict,
|
|
external_links_dict,
|
|
**kwargs
|
|
)
|
|
return {
|
|
'media': media,
|
|
'internal_links_dict': internal_links_dict,
|
|
'external_links_dict': external_links_dict
|
|
}
|
|
|
|
def _process_element(self, url, element: PageElement, media: Dict[str, Any], internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool:
|
|
"""
|
|
Process an HTML element.
|
|
"""
|
|
try:
|
|
if isinstance(element, NavigableString):
|
|
if isinstance(element, Comment):
|
|
element.extract()
|
|
return False
|
|
|
|
# if element.name == 'img':
|
|
# process_image(element, url, 0, 1)
|
|
# return True
|
|
base_domain = kwargs.get("base_domain", get_base_domain(url))
|
|
|
|
if element.name in ['script', 'style', 'link', 'meta', 'noscript']:
|
|
element.decompose()
|
|
return False
|
|
|
|
keep_element = False
|
|
|
|
exclude_domains = kwargs.get('exclude_domains', [])
|
|
# exclude_social_media_domains = kwargs.get('exclude_social_media_domains', set(SOCIAL_MEDIA_DOMAINS))
|
|
# exclude_social_media_domains = SOCIAL_MEDIA_DOMAINS + kwargs.get('exclude_social_media_domains', [])
|
|
# exclude_social_media_domains = list(set(exclude_social_media_domains))
|
|
|
|
try:
|
|
if element.name == 'a' and element.get('href'):
|
|
href = element.get('href', '').strip()
|
|
if not href: # Skip empty hrefs
|
|
return False
|
|
|
|
url_base = url.split('/')[2]
|
|
|
|
# Normalize the URL
|
|
try:
|
|
normalized_href = normalize_url(href, url)
|
|
except ValueError as e:
|
|
# logging.warning(f"Invalid URL format: {href}, Error: {str(e)}")
|
|
return False
|
|
|
|
link_data = {
|
|
'href': normalized_href,
|
|
'text': element.get_text().strip(),
|
|
'title': element.get('title', '').strip(),
|
|
'base_domain': base_domain
|
|
}
|
|
|
|
is_external = is_external_url(normalized_href, base_domain)
|
|
|
|
keep_element = True
|
|
|
|
# Handle external link exclusions
|
|
if is_external:
|
|
link_base_domain = get_base_domain(normalized_href)
|
|
link_data['base_domain'] = link_base_domain
|
|
if kwargs.get('exclude_external_links', False):
|
|
element.decompose()
|
|
return False
|
|
# elif kwargs.get('exclude_social_media_links', False):
|
|
# if link_base_domain in exclude_social_media_domains:
|
|
# element.decompose()
|
|
# return False
|
|
# if any(domain in normalized_href.lower() for domain in exclude_social_media_domains):
|
|
# element.decompose()
|
|
# return False
|
|
elif exclude_domains:
|
|
if link_base_domain in exclude_domains:
|
|
element.decompose()
|
|
return False
|
|
# if any(domain in normalized_href.lower() for domain in kwargs.get('exclude_domains', [])):
|
|
# element.decompose()
|
|
# return False
|
|
|
|
if is_external:
|
|
if normalized_href not in external_links_dict:
|
|
external_links_dict[normalized_href] = link_data
|
|
else:
|
|
if normalized_href not in internal_links_dict:
|
|
internal_links_dict[normalized_href] = link_data
|
|
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Error processing links: {str(e)}")
|
|
|
|
try:
|
|
if element.name == 'img':
|
|
potential_sources = ['src', 'data-src', 'srcset' 'data-lazy-src', 'data-original']
|
|
src = element.get('src', '')
|
|
while not src and potential_sources:
|
|
src = element.get(potential_sources.pop(0), '')
|
|
if not src:
|
|
element.decompose()
|
|
return False
|
|
|
|
# If it is srcset pick up the first image
|
|
if 'srcset' in element.attrs:
|
|
src = element.attrs['srcset'].split(',')[0].split(' ')[0]
|
|
|
|
# If image src is internal, then skip
|
|
if not is_external_url(src, base_domain):
|
|
return True
|
|
|
|
image_src_base_domain = get_base_domain(src)
|
|
|
|
# Check flag if we should remove external images
|
|
if kwargs.get('exclude_external_images', False):
|
|
element.decompose()
|
|
return False
|
|
# src_url_base = src.split('/')[2]
|
|
# url_base = url.split('/')[2]
|
|
# if url_base not in src_url_base:
|
|
# element.decompose()
|
|
# return False
|
|
|
|
# if kwargs.get('exclude_social_media_links', False):
|
|
# if image_src_base_domain in exclude_social_media_domains:
|
|
# element.decompose()
|
|
# return False
|
|
# src_url_base = src.split('/')[2]
|
|
# url_base = url.split('/')[2]
|
|
# if any(domain in src for domain in exclude_social_media_domains):
|
|
# element.decompose()
|
|
# return False
|
|
|
|
# Handle exclude domains
|
|
if exclude_domains:
|
|
if image_src_base_domain in exclude_domains:
|
|
element.decompose()
|
|
return False
|
|
# if any(domain in src for domain in kwargs.get('exclude_domains', [])):
|
|
# element.decompose()
|
|
# return False
|
|
|
|
return True # Always keep image elements
|
|
except Exception as e:
|
|
raise "Error processing images"
|
|
|
|
|
|
# Check if flag to remove all forms is set
|
|
if kwargs.get('remove_forms', False) and element.name == 'form':
|
|
element.decompose()
|
|
return False
|
|
|
|
if element.name in ['video', 'audio']:
|
|
media[f"{element.name}s"].append({
|
|
'src': element.get('src'),
|
|
'alt': element.get('alt'),
|
|
'type': element.name,
|
|
'description': self.find_closest_parent_with_useful_text(element, **kwargs)
|
|
})
|
|
source_tags = element.find_all('source')
|
|
for source_tag in source_tags:
|
|
media[f"{element.name}s"].append({
|
|
'src': source_tag.get('src'),
|
|
'alt': element.get('alt'),
|
|
'type': element.name,
|
|
'description': self.find_closest_parent_with_useful_text(element, **kwargs)
|
|
})
|
|
return True # Always keep video and audio elements
|
|
|
|
if element.name in ONLY_TEXT_ELIGIBLE_TAGS:
|
|
if kwargs.get('only_text', False):
|
|
element.replace_with(element.get_text())
|
|
|
|
try:
|
|
self.remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False))
|
|
except Exception as e:
|
|
# print('Error removing unwanted attributes:', str(e))
|
|
self._log('error',
|
|
message="Error removing unwanted attributes: {error}",
|
|
tag="SCRAPE",
|
|
params={"error": str(e)}
|
|
)
|
|
# Process children
|
|
for child in list(element.children):
|
|
if isinstance(child, NavigableString) and not isinstance(child, Comment):
|
|
if len(child.strip()) > 0:
|
|
keep_element = True
|
|
else:
|
|
if self._process_element(url, child, media, internal_links_dict, external_links_dict, **kwargs):
|
|
keep_element = True
|
|
|
|
|
|
# Check word count
|
|
word_count_threshold = kwargs.get('word_count_threshold', MIN_WORD_THRESHOLD)
|
|
if not keep_element:
|
|
word_count = len(element.get_text(strip=True).split())
|
|
keep_element = word_count >= word_count_threshold
|
|
|
|
if not keep_element:
|
|
element.decompose()
|
|
|
|
return keep_element
|
|
except Exception as e:
|
|
# print('Error processing element:', str(e))
|
|
self._log('error',
|
|
message="Error processing element: {error}",
|
|
tag="SCRAPE",
|
|
params={"error": str(e)}
|
|
)
|
|
return False
|
|
|
|
def _scrap(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Extract content from HTML using BeautifulSoup.
|
|
|
|
Args:
|
|
url (str): The URL of the page to scrape.
|
|
html (str): The HTML content of the page to scrape.
|
|
word_count_threshold (int): The minimum word count threshold for content extraction.
|
|
css_selector (str): The CSS selector to use for content extraction.
|
|
**kwargs: Additional keyword arguments.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the extracted content.
|
|
"""
|
|
success = True
|
|
if not html:
|
|
return None
|
|
|
|
parser_type = kwargs.get('parser', 'lxml')
|
|
soup = BeautifulSoup(html, parser_type)
|
|
body = soup.body
|
|
base_domain = get_base_domain(url)
|
|
|
|
try:
|
|
meta = extract_metadata("", soup)
|
|
except Exception as e:
|
|
self._log('error',
|
|
message="Error extracting metadata: {error}",
|
|
tag="SCRAPE",
|
|
params={"error": str(e)}
|
|
)
|
|
meta = {}
|
|
|
|
# Handle tag-based removal first - faster than CSS selection
|
|
excluded_tags = set(kwargs.get('excluded_tags', []) or [])
|
|
if excluded_tags:
|
|
for element in body.find_all(lambda tag: tag.name in excluded_tags):
|
|
element.extract()
|
|
|
|
# Handle CSS selector-based removal
|
|
excluded_selector = kwargs.get('excluded_selector', '')
|
|
if excluded_selector:
|
|
is_single_selector = ',' not in excluded_selector and ' ' not in excluded_selector
|
|
if is_single_selector:
|
|
while element := body.select_one(excluded_selector):
|
|
element.extract()
|
|
else:
|
|
for element in body.select(excluded_selector):
|
|
element.extract()
|
|
|
|
if css_selector:
|
|
selected_elements = body.select(css_selector)
|
|
if not selected_elements:
|
|
return {
|
|
'markdown': '',
|
|
'cleaned_html': '',
|
|
'success': True,
|
|
'media': {'images': [], 'videos': [], 'audios': []},
|
|
'links': {'internal': [], 'external': []},
|
|
'metadata': {},
|
|
'message': f"No elements found for CSS selector: {css_selector}"
|
|
}
|
|
# raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}")
|
|
body = soup.new_tag('div')
|
|
for el in selected_elements:
|
|
body.append(el)
|
|
|
|
kwargs['exclude_social_media_domains'] = set(kwargs.get('exclude_social_media_domains', []) + SOCIAL_MEDIA_DOMAINS)
|
|
kwargs['exclude_domains'] = set(kwargs.get('exclude_domains', []))
|
|
if kwargs.get('exclude_social_media_links', False):
|
|
kwargs['exclude_domains'] = kwargs['exclude_domains'].union(kwargs['exclude_social_media_domains'])
|
|
|
|
result_obj = self.process_element(
|
|
url,
|
|
body,
|
|
word_count_threshold = word_count_threshold,
|
|
base_domain=base_domain,
|
|
**kwargs
|
|
)
|
|
|
|
links = {'internal': [], 'external': []}
|
|
media = result_obj['media']
|
|
internal_links_dict = result_obj['internal_links_dict']
|
|
external_links_dict = result_obj['external_links_dict']
|
|
|
|
# Update the links dictionary with unique links
|
|
links['internal'] = list(internal_links_dict.values())
|
|
links['external'] = list(external_links_dict.values())
|
|
|
|
# # Process images using ThreadPoolExecutor
|
|
imgs = body.find_all('img')
|
|
|
|
media['images'] = [
|
|
img for result in (self.process_image(img, url, i, len(imgs))
|
|
for i, img in enumerate(imgs))
|
|
if result is not None
|
|
for img in result
|
|
]
|
|
|
|
body = self.flatten_nested_elements(body)
|
|
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)')
|
|
for img in imgs:
|
|
src = img.get('src', '')
|
|
if base64_pattern.match(src):
|
|
# Replace base64 data with empty string
|
|
img['src'] = base64_pattern.sub('', src)
|
|
|
|
str_body = ""
|
|
try:
|
|
str_body = body.encode_contents().decode('utf-8')
|
|
except Exception as e:
|
|
# Reset body to the original HTML
|
|
success = False
|
|
body = BeautifulSoup(html, 'html.parser')
|
|
|
|
# Create a new div with a special ID
|
|
error_div = body.new_tag('div', id='crawl4ai_error_message')
|
|
error_div.string = '''
|
|
Crawl4AI Error: This page is not fully supported.
|
|
|
|
Possible reasons:
|
|
1. The page may have restrictions that prevent crawling.
|
|
2. The page might not be fully loaded.
|
|
|
|
Suggestions:
|
|
- Try calling the crawl function with these parameters:
|
|
magic=True,
|
|
- Set headless=False to visualize what's happening on the page.
|
|
|
|
If the issue persists, please check the page's structure and any potential anti-crawling measures.
|
|
'''
|
|
|
|
# Append the error div to the body
|
|
body.body.append(error_div)
|
|
str_body = body.encode_contents().decode('utf-8')
|
|
|
|
print(f"[LOG] 😧 Error: After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.")
|
|
self._log('error',
|
|
message="After processing the crawled HTML and removing irrelevant tags, nothing was left in the page. Check the markdown for further details.",
|
|
tag="SCRAPE"
|
|
)
|
|
|
|
cleaned_html = str_body.replace('\n\n', '\n').replace(' ', ' ')
|
|
|
|
|
|
return {
|
|
# **markdown_content,
|
|
'cleaned_html': cleaned_html,
|
|
'success': success,
|
|
'media': media,
|
|
'links': links,
|
|
'metadata': meta
|
|
}
|