Enhance crawler strategies with new features
- ReImplemented JsonXPathExtractionStrategy for enhanced JSON data extraction. - Updated existing extraction strategies for better performance. - Improved handling of response status codes during crawls.
This commit is contained in:
@@ -10,7 +10,9 @@ from functools import partial
|
||||
from .model_loader import *
|
||||
import math
|
||||
import numpy as np
|
||||
from lxml import etree
|
||||
import re
|
||||
from bs4 import BeautifulSoup
|
||||
from lxml import html, etree
|
||||
|
||||
class ExtractionStrategy(ABC):
|
||||
"""
|
||||
@@ -741,28 +743,15 @@ class JsonCssExtractionStrategy(ExtractionStrategy):
|
||||
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
|
||||
combined_html = self.DEL.join(sections)
|
||||
return self.extract(url, combined_html, **kwargs)
|
||||
|
||||
class JsonXPATHExtractionStrategy(ExtractionStrategy):
|
||||
class JsonXPathExtractionStrategy(ExtractionStrategy):
|
||||
def __init__(self, schema: Dict[str, Any], **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.schema = schema
|
||||
self.use_cssselect = self._check_cssselect()
|
||||
|
||||
def _check_cssselect(self):
|
||||
try:
|
||||
import cssselect
|
||||
return True
|
||||
except ImportError:
|
||||
print("Warning: cssselect is not installed. Falling back to XPath for all selectors.")
|
||||
return False
|
||||
|
||||
def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]:
|
||||
self.soup = BeautifulSoup(html, 'lxml')
|
||||
self.tree = etree.HTML(str(self.soup))
|
||||
|
||||
selector_type = 'xpath' if not self.use_cssselect else self.schema.get('selectorType', 'css')
|
||||
base_selector = self.schema.get('baseXPath' if selector_type == 'xpath' else 'baseSelector')
|
||||
base_elements = self._select_elements(base_selector, selector_type)
|
||||
def extract(self, url: str, html_content: str, *q, **kwargs) -> List[Dict[str, Any]]:
|
||||
tree = html.fromstring(html_content)
|
||||
base_xpath = self.schema['baseSelector']
|
||||
base_elements = tree.xpath(base_xpath)
|
||||
|
||||
results = []
|
||||
for element in base_elements:
|
||||
@@ -772,27 +761,40 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy):
|
||||
|
||||
return results
|
||||
|
||||
def _select_elements(self, selector, selector_type, element=None):
|
||||
if selector_type == 'xpath' or not self.use_cssselect:
|
||||
return self.tree.xpath(selector) if element is None else element.xpath(selector)
|
||||
else: # CSS
|
||||
return self.tree.cssselect(selector) if element is None else element.cssselect(selector)
|
||||
def _css_to_xpath(self, css_selector: str) -> str:
|
||||
"""Convert CSS selector to XPath if needed"""
|
||||
if '/' in css_selector: # Already an XPath
|
||||
return css_selector
|
||||
else:
|
||||
# Fallback to basic conversion for common cases
|
||||
return self._basic_css_to_xpath(css_selector)
|
||||
|
||||
def _basic_css_to_xpath(self, css_selector: str) -> str:
|
||||
"""Basic CSS to XPath conversion for common cases"""
|
||||
# Handle basic cases
|
||||
if ' > ' in css_selector:
|
||||
parts = css_selector.split(' > ')
|
||||
return '//' + '/'.join(parts)
|
||||
if ' ' in css_selector:
|
||||
parts = css_selector.split(' ')
|
||||
return '//' + '//'.join(parts)
|
||||
return '//' + css_selector
|
||||
|
||||
def _extract_field(self, element, field):
|
||||
try:
|
||||
selector_type = 'xpath' if not self.use_cssselect else field.get('selectorType', 'css')
|
||||
selector = field.get('xpathSelector' if selector_type == 'xpath' else 'selector')
|
||||
|
||||
if field['type'] == 'nested':
|
||||
nested_element = self._select_elements(selector, selector_type, element)
|
||||
return self._extract_item(nested_element[0], field['fields']) if nested_element else {}
|
||||
xpath = self._css_to_xpath(field['selector'])
|
||||
nested_element = element.xpath(xpath)[0] if element.xpath(xpath) else None
|
||||
return self._extract_item(nested_element, field['fields']) if nested_element is not None else {}
|
||||
|
||||
if field['type'] == 'list':
|
||||
elements = self._select_elements(selector, selector_type, element)
|
||||
xpath = self._css_to_xpath(field['selector'])
|
||||
elements = element.xpath(xpath)
|
||||
return [self._extract_list_item(el, field['fields']) for el in elements]
|
||||
|
||||
if field['type'] == 'nested_list':
|
||||
elements = self._select_elements(selector, selector_type, element)
|
||||
xpath = self._css_to_xpath(field['selector'])
|
||||
elements = element.xpath(xpath)
|
||||
return [self._extract_item(el, field['fields']) for el in elements]
|
||||
|
||||
return self._extract_single_field(element, field)
|
||||
@@ -810,10 +812,9 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy):
|
||||
return item
|
||||
|
||||
def _extract_single_field(self, element, field):
|
||||
selector_type = field.get('selectorType', 'css')
|
||||
|
||||
if 'selector' in field:
|
||||
selected = self._select_elements(field['selector'], selector_type, element)
|
||||
xpath = self._css_to_xpath(field['selector'])
|
||||
selected = element.xpath(xpath)
|
||||
if not selected:
|
||||
return field.get('default')
|
||||
selected = selected[0]
|
||||
@@ -822,13 +823,13 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy):
|
||||
|
||||
value = None
|
||||
if field['type'] == 'text':
|
||||
value = selected.text_content().strip() if hasattr(selected, 'text_content') else selected.text.strip()
|
||||
value = ''.join(selected.xpath('.//text()')).strip()
|
||||
elif field['type'] == 'attribute':
|
||||
value = selected.get(field['attribute'])
|
||||
elif field['type'] == 'html':
|
||||
value = etree.tostring(selected, encoding='unicode')
|
||||
elif field['type'] == 'regex':
|
||||
text = selected.text_content().strip() if hasattr(selected, 'text_content') else selected.text.strip()
|
||||
text = ''.join(selected.xpath('.//text()')).strip()
|
||||
match = re.search(field['pattern'], text)
|
||||
value = match.group(1) if match else None
|
||||
|
||||
@@ -870,4 +871,4 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy):
|
||||
|
||||
def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]:
|
||||
combined_html = self.DEL.join(sections)
|
||||
return self.extract(url, combined_html, **kwargs)
|
||||
return self.extract(url, combined_html, **kwargs)
|
||||
|
||||
Reference in New Issue
Block a user