From 393bb911c0144b9cbf4ba69703f15a56f5ec0586 Mon Sep 17 00:00:00 2001 From: UncleCode Date: Tue, 17 Dec 2024 22:40:10 +0800 Subject: [PATCH] Enhance crawler strategies with new features - ReImplemented JsonXPathExtractionStrategy for enhanced JSON data extraction. - Updated existing extraction strategies for better performance. - Improved handling of response status codes during crawls. --- CHANGELOG.md | 2 +- crawl4ai/async_crawler_strategy.py | 9 +++- crawl4ai/async_webcrawler.py | 3 +- crawl4ai/extraction_strategy.py | 75 +++++++++++++++--------------- 4 files changed, 48 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58dacf81..829b2cc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -709,7 +709,7 @@ This commit introduces several key enhancements, including improved error handli - Improved `AsyncPlaywrightCrawlerStrategy.close()` method to use a shorter sleep time (0.5 seconds instead of 500), significantly reducing wait time when closing the crawler. - Enhanced flexibility in `CosineStrategy`: - Now uses a more generic `load_HF_embedding_model` function, allowing for easier swapping of embedding models. -- Updated `JsonCssExtractionStrategy` and `JsonXPATHExtractionStrategy` for better JSON-based extraction. +- Updated `JsonCssExtractionStrategy` and `JsonXPathExtractionStrategy` for better JSON-based extraction. ### Fixed - Addressed potential issues with the sliding window chunking strategy to ensure all text is properly chunked. diff --git a/crawl4ai/async_crawler_strategy.py b/crawl4ai/async_crawler_strategy.py index 4f9a1f56..6a1d8d49 100644 --- a/crawl4ai/async_crawler_strategy.py +++ b/crawl4ai/async_crawler_strategy.py @@ -795,9 +795,14 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): raise RuntimeError(f"Failed on navigating ACS-GOTO:\n{str(e)}") await self.execute_hook('after_goto', page, context=context) + + if response is None: + status_code = 200 + response_headers = {} + else: + status_code = response.status + response_headers = response.headers - status_code = response.status - response_headers = response.headers else: status_code = 200 response_headers = {} diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py index 9b968158..2036f56f 100644 --- a/crawl4ai/async_webcrawler.py +++ b/crawl4ai/async_webcrawler.py @@ -274,6 +274,7 @@ class AsyncWebCrawler: if cached_result: html = sanitize_input_encode(cached_result.html) extracted_content = sanitize_input_encode(cached_result.extracted_content or "") + extracted_content = None if not extracted_content or extracted_content == "[]" else extracted_content # If screenshot is requested but its not in cache, then set cache_result to None screenshot_data = cached_result.screenshot pdf_data = cached_result.pdf @@ -476,7 +477,7 @@ class AsyncWebCrawler: t1 = time.perf_counter() # Handle different extraction strategy types - if isinstance(config.extraction_strategy, (JsonCssExtractionStrategy, JsonCssExtractionStrategy)): + if isinstance(config.extraction_strategy, (JsonCssExtractionStrategy, JsonXPathExtractionStrategy)): config.extraction_strategy.verbose = verbose extracted_content = config.extraction_strategy.run(url, [html]) extracted_content = json.dumps(extracted_content, indent=4, default=str, ensure_ascii=False) diff --git a/crawl4ai/extraction_strategy.py b/crawl4ai/extraction_strategy.py index 50e5da36..6043d04c 100644 --- a/crawl4ai/extraction_strategy.py +++ b/crawl4ai/extraction_strategy.py @@ -10,7 +10,9 @@ from functools import partial from .model_loader import * import math import numpy as np -from lxml import etree +import re +from bs4 import BeautifulSoup +from lxml import html, etree class ExtractionStrategy(ABC): """ @@ -741,28 +743,15 @@ class JsonCssExtractionStrategy(ExtractionStrategy): def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]: combined_html = self.DEL.join(sections) return self.extract(url, combined_html, **kwargs) - -class JsonXPATHExtractionStrategy(ExtractionStrategy): +class JsonXPathExtractionStrategy(ExtractionStrategy): def __init__(self, schema: Dict[str, Any], **kwargs): super().__init__(**kwargs) self.schema = schema - self.use_cssselect = self._check_cssselect() - def _check_cssselect(self): - try: - import cssselect - return True - except ImportError: - print("Warning: cssselect is not installed. Falling back to XPath for all selectors.") - return False - - def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]: - self.soup = BeautifulSoup(html, 'lxml') - self.tree = etree.HTML(str(self.soup)) - - selector_type = 'xpath' if not self.use_cssselect else self.schema.get('selectorType', 'css') - base_selector = self.schema.get('baseXPath' if selector_type == 'xpath' else 'baseSelector') - base_elements = self._select_elements(base_selector, selector_type) + def extract(self, url: str, html_content: str, *q, **kwargs) -> List[Dict[str, Any]]: + tree = html.fromstring(html_content) + base_xpath = self.schema['baseSelector'] + base_elements = tree.xpath(base_xpath) results = [] for element in base_elements: @@ -772,27 +761,40 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy): return results - def _select_elements(self, selector, selector_type, element=None): - if selector_type == 'xpath' or not self.use_cssselect: - return self.tree.xpath(selector) if element is None else element.xpath(selector) - else: # CSS - return self.tree.cssselect(selector) if element is None else element.cssselect(selector) + def _css_to_xpath(self, css_selector: str) -> str: + """Convert CSS selector to XPath if needed""" + if '/' in css_selector: # Already an XPath + return css_selector + else: + # Fallback to basic conversion for common cases + return self._basic_css_to_xpath(css_selector) + + def _basic_css_to_xpath(self, css_selector: str) -> str: + """Basic CSS to XPath conversion for common cases""" + # Handle basic cases + if ' > ' in css_selector: + parts = css_selector.split(' > ') + return '//' + '/'.join(parts) + if ' ' in css_selector: + parts = css_selector.split(' ') + return '//' + '//'.join(parts) + return '//' + css_selector def _extract_field(self, element, field): try: - selector_type = 'xpath' if not self.use_cssselect else field.get('selectorType', 'css') - selector = field.get('xpathSelector' if selector_type == 'xpath' else 'selector') - if field['type'] == 'nested': - nested_element = self._select_elements(selector, selector_type, element) - return self._extract_item(nested_element[0], field['fields']) if nested_element else {} + xpath = self._css_to_xpath(field['selector']) + nested_element = element.xpath(xpath)[0] if element.xpath(xpath) else None + return self._extract_item(nested_element, field['fields']) if nested_element is not None else {} if field['type'] == 'list': - elements = self._select_elements(selector, selector_type, element) + xpath = self._css_to_xpath(field['selector']) + elements = element.xpath(xpath) return [self._extract_list_item(el, field['fields']) for el in elements] if field['type'] == 'nested_list': - elements = self._select_elements(selector, selector_type, element) + xpath = self._css_to_xpath(field['selector']) + elements = element.xpath(xpath) return [self._extract_item(el, field['fields']) for el in elements] return self._extract_single_field(element, field) @@ -810,10 +812,9 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy): return item def _extract_single_field(self, element, field): - selector_type = field.get('selectorType', 'css') - if 'selector' in field: - selected = self._select_elements(field['selector'], selector_type, element) + xpath = self._css_to_xpath(field['selector']) + selected = element.xpath(xpath) if not selected: return field.get('default') selected = selected[0] @@ -822,13 +823,13 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy): value = None if field['type'] == 'text': - value = selected.text_content().strip() if hasattr(selected, 'text_content') else selected.text.strip() + value = ''.join(selected.xpath('.//text()')).strip() elif field['type'] == 'attribute': value = selected.get(field['attribute']) elif field['type'] == 'html': value = etree.tostring(selected, encoding='unicode') elif field['type'] == 'regex': - text = selected.text_content().strip() if hasattr(selected, 'text_content') else selected.text.strip() + text = ''.join(selected.xpath('.//text()')).strip() match = re.search(field['pattern'], text) value = match.group(1) if match else None @@ -870,4 +871,4 @@ class JsonXPATHExtractionStrategy(ExtractionStrategy): def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]: combined_html = self.DEL.join(sections) - return self.extract(url, combined_html, **kwargs) + return self.extract(url, combined_html, **kwargs)