Refactored web scraping components

- Enhanced the web scraping strategy with new methods for optimized media handling.
  - Added new utility functions for better content processing.
  - Refined existing features for improved accuracy and efficiency in scraping tasks.
  - Introduced more robust filtering criteria for media elements.
This commit is contained in:
UncleCode
2024-12-05 22:33:47 +08:00
parent 486db3a771
commit 8c611dcb4b
4 changed files with 408 additions and 430 deletions

View File

@@ -6,10 +6,11 @@ from concurrent.futures import ThreadPoolExecutor
import asyncio, requests, re, os
from .config import *
from bs4 import element, NavigableString, Comment
from bs4 import PageElement, Tag
from urllib.parse import urljoin
from requests.exceptions import InvalidSchema
# from .content_cleaning_strategy import ContentCleaningStrategy
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter, PruningContentFilter
from .content_filter_strategy import RelevantContentFilter, BM25ContentFilter#, HeuristicContentFilter
from .markdown_generation_strategy import MarkdownGenerationStrategy, DefaultMarkdownGenerator
from .models import MarkdownGenerationResult
from .utils import (
@@ -80,45 +81,21 @@ class WebScrapingStrategy(ContentScrapingStrategy):
async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
return await asyncio.to_thread(self._get_content_of_website_optimized, url, html, **kwargs)
def _generate_markdown_content(self,
cleaned_html: str,
html: str,
url: str,
success: bool,
**kwargs) -> Dict[str, Any]:
"""Generate markdown content using either new strategy or legacy method.
Args:
cleaned_html: Sanitized HTML content
html: Original HTML content
url: Base URL of the page
success: Whether scraping was successful
**kwargs: Additional options including:
- markdown_generator: Optional[MarkdownGenerationStrategy]
- html2text: Dict[str, Any] options for HTML2Text
- content_filter: Optional[RelevantContentFilter]
- fit_markdown: bool
- fit_markdown_user_query: Optional[str]
- fit_markdown_bm25_threshold: float
Returns:
Dict containing markdown content in various formats
"""
markdown_generator: Optional[MarkdownGenerationStrategy] = kwargs.get('markdown_generator', DefaultMarkdownGenerator())
if markdown_generator:
try:
if kwargs.get('fit_markdown', False) and not markdown_generator.content_filter:
markdown_generator.content_filter = PruningContentFilter(
threshold_type=kwargs.get('fit_markdown_treshold_type', 'fixed'),
threshold=kwargs.get('fit_markdown_treshold', 0.48),
min_word_threshold=kwargs.get('fit_markdown_min_word_threshold', ),
markdown_generator.content_filter = BM25ContentFilter(
user_query=kwargs.get('fit_markdown_user_query', None),
bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0)
)
# markdown_generator.content_filter = BM25ContentFilter(
# user_query=kwargs.get('fit_markdown_user_query', None),
# bm25_threshold=kwargs.get('fit_markdown_bm25_threshold', 1.0)
# )
markdown_result: MarkdownGenerationResult = markdown_generator.generate_markdown(
cleaned_html=cleaned_html,
@@ -182,58 +159,16 @@ class WebScrapingStrategy(ContentScrapingStrategy):
'markdown_v2' : markdown_v2
}
def flatten_nested_elements(self, node):
if isinstance(node, NavigableString):
return node
if len(node.contents) == 1 and isinstance(node.contents[0], Tag) and node.contents[0].name == node.name:
return self.flatten_nested_elements(node.contents[0])
node.contents = [self.flatten_nested_elements(child) for child in node.contents]
return node
def _get_content_of_website_optimized(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]:
success = True
if not html:
return None
# soup = BeautifulSoup(html, 'html.parser')
soup = BeautifulSoup(html, 'lxml')
body = soup.body
try:
meta = extract_metadata("", soup)
except Exception as e:
self._log('error',
message="Error extracting metadata: {error}",
tag="SCRAPE",
params={"error": str(e)}
)
# print('Error extracting metadata:', str(e))
meta = {}
def find_closest_parent_with_useful_text(self, tag, **kwargs):
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
for tag in kwargs.get('excluded_tags', []) or []:
for el in body.select(tag):
el.decompose()
if css_selector:
selected_elements = body.select(css_selector)
if not selected_elements:
return {
'markdown': '',
'cleaned_html': '',
'success': True,
'media': {'images': [], 'videos': [], 'audios': []},
'links': {'internal': [], 'external': []},
'metadata': {},
'message': f"No elements found for CSS selector: {css_selector}"
}
# raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}")
body = soup.new_tag('div')
for el in selected_elements:
body.append(el)
links = {'internal': [], 'external': []}
media = {'images': [], 'videos': [], 'audios': []}
internal_links_dict = {}
external_links_dict = {}
# Extract meaningful text for media files from closest parent
def find_closest_parent_with_useful_text(tag):
current_tag = tag
while current_tag:
current_tag = current_tag.parent
@@ -245,84 +180,20 @@ class WebScrapingStrategy(ContentScrapingStrategy):
return text_content
return None
def process_image_old(img, url, index, total_images):
#Check if an image has valid display and inside undesired html elements
def is_valid_image(img, parent, parent_classes):
style = img.get('style', '')
src = img.get('src', '')
classes_to_check = ['button', 'icon', 'logo']
tags_to_check = ['button', 'input']
return all([
'display:none' not in style,
src,
not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check),
parent.name not in tags_to_check
])
#Score an image for it's usefulness
def score_image_for_usefulness(img, base_url, index, images_count):
image_height = img.get('height')
height_value, height_unit = parse_dimension(image_height)
image_width = img.get('width')
width_value, width_unit = parse_dimension(image_width)
image_size = 0 #int(fetch_image_file_size(img,base_url) or 0)
image_src = img.get('src','')
if "data:image/" in image_src:
image_format = image_src.split(',')[0].split(';')[0].split('/')[1]
def remove_unwanted_attributes(self, element, important_attrs, keep_data_attributes=False):
attrs_to_remove = []
for attr in element.attrs:
if attr not in important_attrs:
if keep_data_attributes:
if not attr.startswith('data-'):
attrs_to_remove.append(attr)
else:
image_format = os.path.splitext(img.get('src',''))[1].lower()
# Remove . from format
image_format = image_format.strip('.').split('?')[0]
score = 0
if height_value:
if height_unit == 'px' and height_value > 150:
score += 1
if height_unit in ['%','vh','vmin','vmax'] and height_value >30:
score += 1
if width_value:
if width_unit == 'px' and width_value > 150:
score += 1
if width_unit in ['%','vh','vmin','vmax'] and width_value >30:
score += 1
if image_size > 10000:
score += 1
if img.get('alt') != '':
score+=1
if any(image_format==format for format in ['jpg','png','webp']):
score+=1
if index/images_count<0.5:
score+=1
return score
attrs_to_remove.append(attr)
if not is_valid_image(img, img.parent, img.parent.get('class', [])):
return None
for attr in attrs_to_remove:
del element[attr]
score = score_image_for_usefulness(img, url, index, total_images)
if score <= kwargs.get('image_score_threshold', IMAGE_SCORE_THRESHOLD):
return None
base_result = {
'src': img.get('src', ''),
'data-src': img.get('data-src', ''),
'alt': img.get('alt', ''),
'desc': find_closest_parent_with_useful_text(img),
'score': score,
'type': 'image'
}
sources = []
srcset = img.get('srcset', '')
if srcset:
sources = parse_srcset(srcset)
if sources:
return [dict(base_result, src=source['url'], width=source['width'])
for source in sources]
return [base_result] # Always return a list
def process_image(img, url, index, total_images):
def process_image(self, img, url, index, total_images, **kwargs):
parse_srcset = lambda s: [{'url': u.strip().split()[0], 'width': u.strip().split()[-1].rstrip('w')
if ' ' in u else None}
for u in [f"http{p}" for p in s.split("http") if p]]
@@ -381,9 +252,10 @@ class WebScrapingStrategy(ContentScrapingStrategy):
group_id = index
# Base image info template
image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD)
base_info = {
'alt': alt,
'desc': find_closest_parent_with_useful_text(img),
'desc': self.find_closest_parent_with_useful_text(img, **kwargs),
'score': score,
'type': 'image',
'group_id': group_id # Group ID for this set of variants
@@ -419,20 +291,26 @@ class WebScrapingStrategy(ContentScrapingStrategy):
return image_variants if image_variants else None
def remove_unwanted_attributes(element, important_attrs, keep_data_attributes=False):
attrs_to_remove = []
for attr in element.attrs:
if attr not in important_attrs:
if keep_data_attributes:
if not attr.startswith('data-'):
attrs_to_remove.append(attr)
else:
attrs_to_remove.append(attr)
for attr in attrs_to_remove:
del element[attr]
def process_element(self, url, element: PageElement, **kwargs) -> Dict[str, Any]:
media = {'images': [], 'videos': [], 'audios': []}
internal_links_dict = {}
external_links_dict = {}
self._process_element(
url,
element,
media,
internal_links_dict,
external_links_dict,
**kwargs
)
return {
'media': media,
'internal_links_dict': internal_links_dict,
'external_links_dict': external_links_dict
}
def process_element(element: element.PageElement) -> bool:
def _process_element(self, url, element: PageElement, media: Dict[str, Any], internal_links_dict: Dict[str, Any], external_links_dict: Dict[str, Any], **kwargs) -> bool:
try:
if isinstance(element, NavigableString):
if isinstance(element, Comment):
@@ -551,7 +429,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
'src': element.get('src'),
'alt': element.get('alt'),
'type': element.name,
'description': find_closest_parent_with_useful_text(element)
'description': self.find_closest_parent_with_useful_text(element, **kwargs)
})
source_tags = element.find_all('source')
for source_tag in source_tags:
@@ -559,7 +437,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
'src': source_tag.get('src'),
'alt': element.get('alt'),
'type': element.name,
'description': find_closest_parent_with_useful_text(element)
'description': self.find_closest_parent_with_useful_text(element, **kwargs)
})
return True # Always keep video and audio elements
@@ -568,7 +446,7 @@ class WebScrapingStrategy(ContentScrapingStrategy):
element.replace_with(element.get_text())
try:
remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False))
self.remove_unwanted_attributes(element, IMPORTANT_ATTRS, kwargs.get('keep_data_attributes', False))
except Exception as e:
# print('Error removing unwanted attributes:', str(e))
self._log('error',
@@ -582,11 +460,12 @@ class WebScrapingStrategy(ContentScrapingStrategy):
if len(child.strip()) > 0:
keep_element = True
else:
if process_element(child):
if self._process_element(url, child, media, internal_links_dict, external_links_dict, **kwargs):
keep_element = True
# Check word count
word_count_threshold = kwargs.get('word_count_threshold', MIN_WORD_THRESHOLD)
if not keep_element:
word_count = len(element.get_text(strip=True).split())
keep_element = word_count >= word_count_threshold
@@ -604,7 +483,69 @@ class WebScrapingStrategy(ContentScrapingStrategy):
)
return False
process_element(body)
def _get_content_of_website_optimized(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]:
success = True
if not html:
return None
soup = BeautifulSoup(html, 'lxml')
body = soup.body
try:
meta = extract_metadata("", soup)
except Exception as e:
self._log('error',
message="Error extracting metadata: {error}",
tag="SCRAPE",
params={"error": str(e)}
)
meta = {}
# Handle tag-based removal first - faster than CSS selection
excluded_tags = set(kwargs.get('excluded_tags', []) or [])
if excluded_tags:
for element in body.find_all(lambda tag: tag.name in excluded_tags):
element.extract()
# Handle CSS selector-based removal
excluded_selector = kwargs.get('excluded_selector', '')
if excluded_selector:
is_single_selector = ',' not in excluded_selector and ' ' not in excluded_selector
if is_single_selector:
while element := body.select_one(excluded_selector):
element.extract()
else:
for element in body.select(excluded_selector):
element.extract()
if css_selector:
selected_elements = body.select(css_selector)
if not selected_elements:
return {
'markdown': '',
'cleaned_html': '',
'success': True,
'media': {'images': [], 'videos': [], 'audios': []},
'links': {'internal': [], 'external': []},
'metadata': {},
'message': f"No elements found for CSS selector: {css_selector}"
}
# raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}")
body = soup.new_tag('div')
for el in selected_elements:
body.append(el)
result_obj = self.process_element(
url,
body,
word_count_threshold = word_count_threshold,
**kwargs
)
links = {'internal': [], 'external': []}
media = result_obj['media']
internal_links_dict = result_obj['internal_links_dict']
external_links_dict = result_obj['external_links_dict']
# Update the links dictionary with unique links
links['internal'] = list(internal_links_dict.values())
@@ -613,23 +554,14 @@ class WebScrapingStrategy(ContentScrapingStrategy):
# # Process images using ThreadPoolExecutor
imgs = body.find_all('img')
# For test we use for loop instead of thread
media['images'] = [
img for result in (process_image(img, url, i, len(imgs))
img for result in (self.process_image(img, url, i, len(imgs))
for i, img in enumerate(imgs))
if result is not None
for img in result
]
def flatten_nested_elements(node):
if isinstance(node, NavigableString):
return node
if len(node.contents) == 1 and isinstance(node.contents[0], element.Tag) and node.contents[0].name == node.name:
return flatten_nested_elements(node.contents[0])
node.contents = [flatten_nested_elements(child) for child in node.contents]
return node
body = flatten_nested_elements(body)
body = self.flatten_nested_elements(body)
base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)')
for img in imgs:
src = img.get('src', '')

View File

@@ -22,7 +22,7 @@ import textwrap
from .html2text import HTML2Text
class CustomHTML2Text(HTML2Text):
def __init__(self, *args, **kwargs):
def __init__(self, *args, handle_code_in_pre=False, **kwargs):
super().__init__(*args, **kwargs)
self.inside_pre = False
self.inside_code = False
@@ -30,6 +30,7 @@ class CustomHTML2Text(HTML2Text):
self.current_preserved_tag = None
self.preserved_content = []
self.preserve_depth = 0
self.handle_code_in_pre = handle_code_in_pre
# Configuration options
self.skip_internal_links = False
@@ -50,6 +51,8 @@ class CustomHTML2Text(HTML2Text):
for key, value in kwargs.items():
if key == 'preserve_tags':
self.preserve_tags = set(value)
elif key == 'handle_code_in_pre':
self.handle_code_in_pre = value
else:
setattr(self, key, value)
@@ -88,13 +91,21 @@ class CustomHTML2Text(HTML2Text):
# Handle pre tags
if tag == 'pre':
if start:
self.o('```\n')
self.o('```\n') # Markdown code block start
self.inside_pre = True
else:
self.o('\n```')
self.o('\n```\n') # Markdown code block end
self.inside_pre = False
# elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# pass
elif tag == 'code':
if self.inside_pre and not self.handle_code_in_pre:
# Ignore code tags inside pre blocks if handle_code_in_pre is False
return
if start:
self.o('`') # Markdown inline code start
self.inside_code = True
else:
self.o('`') # Markdown inline code end
self.inside_code = False
else:
super().handle_tag(tag, attrs, start)
@@ -103,7 +114,39 @@ class CustomHTML2Text(HTML2Text):
if self.preserve_depth > 0:
self.preserved_content.append(data)
return
if self.inside_pre:
# Output the raw content for pre blocks, including content inside code tags
self.o(data) # Directly output the data as-is (preserve newlines)
return
if self.inside_code:
# Inline code: no newlines allowed
self.o(data.replace('\n', ' '))
return
# Default behavior for other tags
super().handle_data(data, entity_char)
# # Handle pre tags
# if tag == 'pre':
# if start:
# self.o('```\n')
# self.inside_pre = True
# else:
# self.o('\n```')
# self.inside_pre = False
# # elif tag in ["h1", "h2", "h3", "h4", "h5", "h6"]:
# # pass
# else:
# super().handle_tag(tag, attrs, start)
# def handle_data(self, data, entity_char=False):
# """Override handle_data to capture content within preserved tags."""
# if self.preserve_depth > 0:
# self.preserved_content.append(data)
# return
# super().handle_data(data, entity_char)
class InvalidCSSSelectorError(Exception):
pass

View File

View File

@@ -547,6 +547,7 @@ async def generate_knowledge_graph():
f.write(result.extracted_content)
async def fit_markdown_remove_overlay():
async with AsyncWebCrawler(
headless=True, # Set to False to see what is happening
verbose=True,
@@ -560,13 +561,15 @@ async def fit_markdown_remove_overlay():
url='https://www.kidocode.com/degrees/technology',
cache_mode=CacheMode.BYPASS,
markdown_generator=DefaultMarkdownGenerator(
content_filter=PruningContentFilter(threshold=0.48, threshold_type="fixed", min_word_threshold=0),
content_filter=PruningContentFilter(
threshold=0.48, threshold_type="fixed", min_word_threshold=0
),
options={
"ignore_links": True
}
),
# markdown_generator=DefaultMarkdownGenerator(
# content_filter=BM25ContentFilter(user_query=None, bm25_threshold=1.0),
# content_filter=BM25ContentFilter(user_query="", bm25_threshold=1.0),
# options={
# "ignore_links": True
# }