[v0.3.72] Enhance content extraction and proxy support

- Add ContentCleaningStrategy for improved content extraction
- Implement advanced proxy configuration with authentication
- Enhance image source detection and handling
- Add fit_markdown and fit_html for refined content output
- Improve external link and image handling flexibility
This commit is contained in:
UncleCode
2024-10-22 20:19:22 +08:00
parent 04d16e6d2b
commit 60ba131ac8
6 changed files with 260 additions and 3 deletions

View File

@@ -1,5 +1,39 @@
# Changelog
## [v0.3.72] - 2024-10-22
### Added
- New `ContentCleaningStrategy` class:
- Smart content extraction based on text density and element scoring
- Automatic removal of boilerplate content
- DOM tree analysis for better content identification
- Configurable thresholds for content detection
- Advanced proxy support:
- Added `proxy_config` option for authenticated proxy connections
- Support for username/password in proxy configuration
- New content output formats:
- `fit_markdown`: Optimized markdown output with main content focus
- `fit_html`: Clean HTML with only essential content
### Enhanced
- Image source detection:
- Support for multiple image source attributes (`src`, `data-src`, `srcset`, etc.)
- Automatic fallback through potential source attributes
- Smart handling of srcset attribute
- External content handling:
- Made external link exclusion optional (disabled by default)
- Improved detection and handling of social media links
- Better control over external image filtering
### Fixed
- Image extraction reliability with multiple source attribute checks
- External link and image handling logic for better accuracy
### Developer Notes
- The new `ContentCleaningStrategy` uses configurable thresholds for customization
- Proxy configuration now supports more complex authentication scenarios
- Content extraction process now provides both regular and optimized outputs
## [v0.3.72] - 2024-10-20
### Fixed

View File

@@ -71,6 +71,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
self.proxy = kwargs.get("proxy")
self.proxy_config = kwargs.get("proxy_config")
self.headless = kwargs.get("headless", True)
self.browser_type = kwargs.get("browser_type", "chromium")
self.headers = kwargs.get("headers", {})
@@ -121,6 +122,9 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if self.proxy:
proxy_settings = ProxySettings(server=self.proxy)
browser_args["proxy"] = proxy_settings
elif self.proxy_config:
proxy_settings = ProxySettings(server=self.proxy_config.get("server"), username=self.proxy_config.get("username"), password=self.proxy_config.get("password"))
browser_args["proxy"] = proxy_settings
# Select the appropriate browser based on the browser_type
if self.browser_type == "firefox":

View File

@@ -212,6 +212,8 @@ class AsyncWebCrawler:
cleaned_html = sanitize_input_encode(result.get("cleaned_html", ""))
markdown = sanitize_input_encode(result.get("markdown", ""))
fit_markdown = sanitize_input_encode(result.get("fit_markdown", ""))
fit_html = sanitize_input_encode(result.get("fit_html", ""))
media = result.get("media", [])
links = result.get("links", [])
metadata = result.get("metadata", {})
@@ -258,6 +260,8 @@ class AsyncWebCrawler:
html=html,
cleaned_html=format_html(cleaned_html),
markdown=markdown,
fit_markdown=fit_markdown,
fit_html= fit_html,
media=media,
links=links,
metadata=metadata,

View File

@@ -0,0 +1,196 @@
from bs4 import BeautifulSoup, Tag
import re
from typing import Optional
class ContentCleaningStrategy:
def __init__(self):
# Precompile regex patterns for performance
self.negative_patterns = re.compile(r'nav|footer|header|sidebar|ads|comment', re.I)
self.positive_patterns = re.compile(r'content|article|main|post', re.I)
self.priority_tags = {'article', 'main', 'section', 'div'}
self.non_content_tags = {'nav', 'footer', 'header', 'aside'}
# Thresholds
self.text_density_threshold = 9.0
self.min_word_count = 50
self.link_density_threshold = 0.2
self.max_dom_depth = 10 # To prevent excessive DOM traversal
def clean(self, clean_html: str) -> str:
"""
Main function that takes cleaned HTML and returns super cleaned HTML.
Args:
clean_html (str): The cleaned HTML content.
Returns:
str: The super cleaned HTML containing only the main content.
"""
try:
if not clean_html or not isinstance(clean_html, str):
return ''
soup = BeautifulSoup(clean_html, 'html.parser')
main_content = self.extract_main_content(soup)
if main_content:
super_clean_element = self.clean_element(main_content)
return str(super_clean_element)
else:
return ''
except Exception:
# Handle exceptions silently or log them as needed
return ''
def extract_main_content(self, soup: BeautifulSoup) -> Optional[Tag]:
"""
Identifies and extracts the main content element from the HTML.
Args:
soup (BeautifulSoup): The parsed HTML soup.
Returns:
Optional[Tag]: The Tag object containing the main content, or None if not found.
"""
candidates = []
for element in soup.find_all(self.priority_tags):
if self.is_non_content_tag(element):
continue
if self.has_negative_class_id(element):
continue
score = self.calculate_content_score(element)
candidates.append((score, element))
if not candidates:
return None
# Sort candidates by score in descending order
candidates.sort(key=lambda x: x[0], reverse=True)
# Select the element with the highest score
best_element = candidates[0][1]
return best_element
def calculate_content_score(self, element: Tag) -> float:
"""
Calculates a score for an element based on various heuristics.
Args:
element (Tag): The HTML element to score.
Returns:
float: The content score of the element.
"""
score = 0.0
if self.is_priority_tag(element):
score += 5.0
if self.has_positive_class_id(element):
score += 3.0
if self.has_negative_class_id(element):
score -= 3.0
if self.is_high_text_density(element):
score += 2.0
if self.is_low_link_density(element):
score += 2.0
if self.has_sufficient_content(element):
score += 2.0
if self.has_headings(element):
score += 3.0
dom_depth = self.calculate_dom_depth(element)
score += min(dom_depth, self.max_dom_depth) * 0.5 # Adjust weight as needed
return score
def is_priority_tag(self, element: Tag) -> bool:
"""Checks if the element is a priority tag."""
return element.name in self.priority_tags
def is_non_content_tag(self, element: Tag) -> bool:
"""Checks if the element is a non-content tag."""
return element.name in self.non_content_tags
def has_negative_class_id(self, element: Tag) -> bool:
"""Checks if the element has negative indicators in its class or id."""
class_id = ' '.join(filter(None, [
self.get_attr_str(element.get('class')),
element.get('id', '')
]))
return bool(self.negative_patterns.search(class_id))
def has_positive_class_id(self, element: Tag) -> bool:
"""Checks if the element has positive indicators in its class or id."""
class_id = ' '.join(filter(None, [
self.get_attr_str(element.get('class')),
element.get('id', '')
]))
return bool(self.positive_patterns.search(class_id))
@staticmethod
def get_attr_str(attr) -> str:
"""Converts an attribute value to a string."""
if isinstance(attr, list):
return ' '.join(attr)
elif isinstance(attr, str):
return attr
else:
return ''
def is_high_text_density(self, element: Tag) -> bool:
"""Determines if the element has high text density."""
text_density = self.calculate_text_density(element)
return text_density > self.text_density_threshold
def calculate_text_density(self, element: Tag) -> float:
"""Calculates the text density of an element."""
text_length = len(element.get_text(strip=True))
tag_count = len(element.find_all())
tag_count = tag_count or 1 # Prevent division by zero
return text_length / tag_count
def is_low_link_density(self, element: Tag) -> bool:
"""Determines if the element has low link density."""
link_density = self.calculate_link_density(element)
return link_density < self.link_density_threshold
def calculate_link_density(self, element: Tag) -> float:
"""Calculates the link density of an element."""
text = element.get_text(strip=True)
if not text:
return 0.0
link_text = ' '.join(a.get_text(strip=True) for a in element.find_all('a'))
return len(link_text) / len(text) if text else 0.0
def has_sufficient_content(self, element: Tag) -> bool:
"""Checks if the element has sufficient word count."""
word_count = len(element.get_text(strip=True).split())
return word_count >= self.min_word_count
def calculate_dom_depth(self, element: Tag) -> int:
"""Calculates the depth of an element in the DOM tree."""
depth = 0
current_element = element
while current_element.parent and depth < self.max_dom_depth:
depth += 1
current_element = current_element.parent
return depth
def has_headings(self, element: Tag) -> bool:
"""Checks if the element contains heading tags."""
return bool(element.find(['h1', 'h2', 'h3']))
def clean_element(self, element: Tag) -> Tag:
"""
Cleans the selected element by removing unnecessary attributes and nested non-content elements.
Args:
element (Tag): The HTML element to clean.
Returns:
Tag: The cleaned HTML element.
"""
for tag in element.find_all(['script', 'style', 'aside']):
tag.decompose()
for tag in element.find_all():
attrs = dict(tag.attrs)
for attr in attrs:
if attr in ['style', 'onclick', 'onmouseover', 'align', 'bgcolor']:
del tag.attrs[attr]
return element

View File

@@ -7,6 +7,7 @@ from .config import *
from bs4 import element, NavigableString, Comment
from urllib.parse import urljoin
from requests.exceptions import InvalidSchema
from .content_cleaning_strategy import ContentCleaningStrategy
from .utils import (
sanitize_input_encode,
@@ -215,7 +216,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
links['internal'].append(link_data)
keep_element = True
if kwargs.get('exclude_external_links', True):
if kwargs.get('exclude_external_links', False):
href_parts = href.split('/')
href_url_base = href_parts[2] if len(href_parts) > 2 else href
if url_base not in href_url_base:
@@ -231,9 +232,20 @@ class WebScrappingStrategy(ContentScrappingStrategy):
try:
if element.name == 'img':
potential_sources = ['src', 'data-src', 'srcset' 'data-lazy-src', 'data-original']
src = element.get('src', '')
while not src and potential_sources:
src = element.get(potential_sources.pop(0), '')
if not src:
element.decompose()
return False
# If it is srcset pick up the first image
if 'srcset' in element.attrs:
src = element.attrs['srcset'].split(',')[0].split(' ')[0]
# Check flag if we should remove external images
if kwargs.get('exclude_external_images', False):
src = element.get('src', '')
src_url_base = src.split('/')[2]
url_base = url.split('/')[2]
if url_base not in src_url_base:
@@ -241,7 +253,6 @@ class WebScrappingStrategy(ContentScrappingStrategy):
return False
if not kwargs.get('exclude_external_images', False) and kwargs.get('exclude_social_media_links', True):
src = element.get('src', '')
src_url_base = src.split('/')[2]
url_base = url.split('/')[2]
if any(domain in src for domain in social_media_domains):
@@ -386,10 +397,16 @@ class WebScrappingStrategy(ContentScrappingStrategy):
except Exception as e:
print('Error extracting metadata:', str(e))
meta = {}
cleaner = ContentCleaningStrategy()
fit_html = cleaner.clean(cleaned_html)
fit_markdown = h.handle(fit_html)
cleaned_html = sanitize_html(cleaned_html)
return {
'markdown': markdown,
'fit_markdown': fit_markdown,
'fit_html': fit_html,
'cleaned_html': cleaned_html,
'success': success,
'media': media,

View File

@@ -14,6 +14,8 @@ class CrawlResult(BaseModel):
links: Dict[str, List[Dict]] = {}
screenshot: Optional[str] = None
markdown: Optional[str] = None
fit_markdown: Optional[str] = None
fit_html: Optional[str] = None
extracted_content: Optional[str] = None
metadata: Optional[dict] = None
error_message: Optional[str] = None