Enhance Crawl4AI with new features and documentation

- Fix crawler text mode for improved performance; cover missing `srcset` and `data_srcset` attributes in image tags.
  - Introduced Managed Browsers for enhanced crawling experience.
  - Updated documentation for clearer navigation on configuration.
  - Changed 'text_only' to 'text_mode' in configuration and methods.
  - Improved performance and relevance in content filtering strategies.
This commit is contained in:
UncleCode
2024-12-19 21:02:29 +08:00
parent 393bb911c0
commit 849765712f
23 changed files with 1825 additions and 1721 deletions

View File

@@ -57,6 +57,11 @@ class NoExtractionStrategy(ExtractionStrategy):
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
return [{"index": i, "tags": [], "content": section} for i, section in enumerate(sections)]
#######################################################
# Strategies using LLM-based extraction for text data #
#######################################################
class LLMExtractionStrategy(ExtractionStrategy):
def __init__(self,
provider: str = DEFAULT_PROVIDER, api_token: Optional[str] = None,
@@ -234,6 +239,11 @@ class LLMExtractionStrategy(ExtractionStrategy):
return extracted_content
#######################################################
# Strategies using clustering for text data extraction #
#######################################################
class CosineStrategy(ExtractionStrategy):
def __init__(self, semantic_filter = None, word_count_threshold=10, max_dist=0.2, linkage_method='ward', top_k=3, model_name = 'sentence-transformers/all-MiniLM-L6-v2', sim_threshold = 0.3, **kwargs):
"""
@@ -514,6 +524,11 @@ class CosineStrategy(ExtractionStrategy):
return self.extract(url, self.DEL.join(sections), **kwargs)
#######################################################
# Strategies based on the extraction of specific types #
#######################################################
class TopicExtractionStrategy(ExtractionStrategy):
def __init__(self, num_keywords: int = 3, **kwargs):
"""
@@ -637,7 +652,222 @@ class ContentSummarizationStrategy(ExtractionStrategy):
summaries.sort(key=lambda x: x[0])
return [summary for _, summary in summaries]
class JsonCssExtractionStrategy(ExtractionStrategy):
#######################################################
# New extraction strategies for JSON-based extraction #
#######################################################
class JsonElementExtractionStrategy(ExtractionStrategy):
DEL = '\n'
def __init__(self, schema: Dict[str, Any], **kwargs):
super().__init__(**kwargs)
self.schema = schema
self.verbose = kwargs.get('verbose', False)
def extract(self, url: str, html_content: str, *q, **kwargs) -> List[Dict[str, Any]]:
parsed_html = self._parse_html(html_content)
base_elements = self._get_base_elements(parsed_html, self.schema['baseSelector'])
results = []
for element in base_elements:
# Extract base element attributes
item = {}
if 'baseFields' in self.schema:
for field in self.schema['baseFields']:
value = self._extract_single_field(element, field)
if value is not None:
item[field['name']] = value
# Extract child fields
field_data = self._extract_item(element, self.schema['fields'])
item.update(field_data)
if item:
results.append(item)
return results
@abstractmethod
def _parse_html(self, html_content: str):
"""Parse HTML content into appropriate format"""
pass
@abstractmethod
def _get_base_elements(self, parsed_html, selector: str):
"""Get all base elements using the selector"""
pass
@abstractmethod
def _get_elements(self, element, selector: str):
"""Get child elements using the selector"""
pass
def _extract_field(self, element, field):
try:
if field['type'] == 'nested':
nested_elements = self._get_elements(element, field['selector'])
nested_element = nested_elements[0] if nested_elements else None
return self._extract_item(nested_element, field['fields']) if nested_element else {}
if field['type'] == 'list':
elements = self._get_elements(element, field['selector'])
return [self._extract_list_item(el, field['fields']) for el in elements]
if field['type'] == 'nested_list':
elements = self._get_elements(element, field['selector'])
return [self._extract_item(el, field['fields']) for el in elements]
return self._extract_single_field(element, field)
except Exception as e:
if self.verbose:
print(f"Error extracting field {field['name']}: {str(e)}")
return field.get('default')
def _extract_single_field(self, element, field):
if 'selector' in field:
selected = self._get_elements(element, field['selector'])
if not selected:
return field.get('default')
selected = selected[0]
else:
selected = element
value = None
if field['type'] == 'text':
value = self._get_element_text(selected)
elif field['type'] == 'attribute':
value = self._get_element_attribute(selected, field['attribute'])
elif field['type'] == 'html':
value = self._get_element_html(selected)
elif field['type'] == 'regex':
text = self._get_element_text(selected)
match = re.search(field['pattern'], text)
value = match.group(1) if match else None
if 'transform' in field:
value = self._apply_transform(value, field['transform'])
return value if value is not None else field.get('default')
def _extract_list_item(self, element, fields):
item = {}
for field in fields:
value = self._extract_single_field(element, field)
if value is not None:
item[field['name']] = value
return item
def _extract_item(self, element, fields):
item = {}
for field in fields:
if field['type'] == 'computed':
value = self._compute_field(item, field)
else:
value = self._extract_field(element, field)
if value is not None:
item[field['name']] = value
return item
def _apply_transform(self, value, transform):
if transform == 'lowercase':
return value.lower()
elif transform == 'uppercase':
return value.upper()
elif transform == 'strip':
return value.strip()
return value
def _compute_field(self, item, field):
try:
if 'expression' in field:
return eval(field['expression'], {}, item)
elif 'function' in field:
return field['function'](item)
except Exception as e:
if self.verbose:
print(f"Error computing field {field['name']}: {str(e)}")
return field.get('default')
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
combined_html = self.DEL.join(sections)
return self.extract(url, combined_html, **kwargs)
@abstractmethod
def _get_element_text(self, element) -> str:
"""Get text content from element"""
pass
@abstractmethod
def _get_element_html(self, element) -> str:
"""Get HTML content from element"""
pass
@abstractmethod
def _get_element_attribute(self, element, attribute: str):
"""Get attribute value from element"""
pass
class JsonCssExtractionStrategy(JsonElementExtractionStrategy):
def _parse_html(self, html_content: str):
return BeautifulSoup(html_content, 'html.parser')
def _get_base_elements(self, parsed_html, selector: str):
return parsed_html.select(selector)
def _get_elements(self, element, selector: str):
selected = element.select_one(selector)
return [selected] if selected else []
def _get_element_text(self, element) -> str:
return element.get_text(strip=True)
def _get_element_html(self, element) -> str:
return str(element)
def _get_element_attribute(self, element, attribute: str):
return element.get(attribute)
class JsonXPathExtractionStrategy(JsonElementExtractionStrategy):
def _parse_html(self, html_content: str):
return html.fromstring(html_content)
def _get_base_elements(self, parsed_html, selector: str):
return parsed_html.xpath(selector)
def _css_to_xpath(self, css_selector: str) -> str:
"""Convert CSS selector to XPath if needed"""
if '/' in css_selector: # Already an XPath
return css_selector
return self._basic_css_to_xpath(css_selector)
def _basic_css_to_xpath(self, css_selector: str) -> str:
"""Basic CSS to XPath conversion for common cases"""
if ' > ' in css_selector:
parts = css_selector.split(' > ')
return '//' + '/'.join(parts)
if ' ' in css_selector:
parts = css_selector.split(' ')
return '//' + '//'.join(parts)
return '//' + css_selector
def _get_elements(self, element, selector: str):
xpath = self._css_to_xpath(selector)
if not xpath.startswith('.'):
xpath = '.' + xpath
return element.xpath(xpath)
def _get_element_text(self, element) -> str:
return ''.join(element.xpath('.//text()')).strip()
def _get_element_html(self, element) -> str:
return etree.tostring(element, encoding='unicode')
def _get_element_attribute(self, element, attribute: str):
return element.get(attribute)
class _JsonCssExtractionStrategy(ExtractionStrategy):
def __init__(self, schema: Dict[str, Any], **kwargs):
super().__init__(**kwargs)
self.schema = schema
@@ -648,14 +878,22 @@ class JsonCssExtractionStrategy(ExtractionStrategy):
results = []
for element in base_elements:
item = self._extract_item(element, self.schema['fields'])
if item:
results.append(item)
# Extract base element attributes first
item = {}
if 'baseFields' in self.schema:
for field in self.schema['baseFields']:
value = self._extract_single_field(element, field)
if value is not None:
item[field['name']] = value
# Then extract child fields
field_data = self._extract_item(element, self.schema['fields'])
item.update(field_data)
results.append(item)
return results
def _extract_field(self, element, field):
try:
if field['type'] == 'nested':
@@ -743,7 +981,7 @@ class JsonCssExtractionStrategy(ExtractionStrategy):
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
combined_html = self.DEL.join(sections)
return self.extract(url, combined_html, **kwargs)
class JsonXPathExtractionStrategy(ExtractionStrategy):
class _JsonXPathExtractionStrategy(ExtractionStrategy):
def __init__(self, schema: Dict[str, Any], **kwargs):
super().__init__(**kwargs)
self.schema = schema
@@ -755,9 +993,19 @@ class JsonXPathExtractionStrategy(ExtractionStrategy):
results = []
for element in base_elements:
item = self._extract_item(element, self.schema['fields'])
if item:
results.append(item)
# Extract base element attributes first
item = {}
if 'baseFields' in self.schema:
for field in self.schema['baseFields']:
value = self._extract_single_field(element, field)
if value is not None:
item[field['name']] = value
# Then extract child fields
field_data = self._extract_item(element, self.schema['fields'])
item.update(field_data)
results.append(item)
return results