feat(crawler): enhance JavaScript execution and PDF processing

Add JavaScript execution result handling and improve PDF processing capabilities:
- Add js_execution_result to CrawlResult and AsyncCrawlResponse models
- Implement execution result capture in AsyncPlaywrightCrawlerStrategy
- Add batch processing for PDF pages with configurable batch size
- Enhance JsonElementExtractionStrategy with better schema generation
- Add HTML optimization utilities

BREAKING CHANGE: PDF processing now uses batch processing by default
This commit is contained in:
UncleCode
2025-01-29 21:03:39 +08:00
parent f8fd9d9eff
commit 31938fb922
7 changed files with 150 additions and 20 deletions

View File

@@ -872,6 +872,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"on_page_context_created": None,
"on_user_agent_updated": None,
"on_execution_started": None,
"on_execution_ended": None,
"before_goto": None,
"after_goto": None,
"before_return_html": None,
@@ -1529,6 +1530,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
)
await self.execute_hook("on_execution_started", page, context=context, config=config)
await self.execute_hook("on_execution_ended", page, context=context, config=config, result=execution_result)
# Handle user simulation
if config.simulate_user or config.magic:
@@ -1621,6 +1623,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
return AsyncCrawlResponse(
html=html,
response_headers=response_headers,
js_execution_result=execution_result,
status_code=status_code,
screenshot=screenshot_data,
pdf_data=pdf_data,
@@ -2028,8 +2031,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
f"""
(async () => {{
try {{
{script}
return {{ success: true }};
const script_result = {script};
return {{ success: true, result: script_result }};
}} catch (err) {{
return {{ success: false, error: err.toString(), stack: err.stack }};
}}

View File

@@ -425,6 +425,7 @@ class AsyncWebCrawler:
html = sanitize_input_encode(async_response.html)
screenshot_data = async_response.screenshot
pdf_data = async_response.pdf_data
js_execution_result = async_response.js_execution_result
t2 = time.perf_counter()
self.logger.url_status(
@@ -453,6 +454,7 @@ class AsyncWebCrawler:
crawl_result.redirected_url = async_response.redirected_url or url
crawl_result.response_headers = async_response.response_headers
crawl_result.downloaded_files = async_response.downloaded_files
crawl_result.js_execution_result = js_execution_result
crawl_result.ssl_certificate = (
async_response.ssl_certificate
) # Add SSL certificate
@@ -646,7 +648,7 @@ class AsyncWebCrawler:
# Use IdentityChunking for HTML input, otherwise use provided chunking strategy
chunking = (
IdentityChunking()
if content_format == "html"
if content_format in ["html", "cleaned_html"]
else config.chunking_strategy
)
sections = chunking.chunk(content)

View File

@@ -1065,6 +1065,7 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
html: str,
schema_type: str = "CSS", # or XPATH
query: str = None,
target_json_example: str = None,
provider: str = "gpt-4o",
api_token: str = os.getenv("OPENAI_API_KEY"),
**kwargs
@@ -1092,7 +1093,26 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
# Build the prompt
system_message = {
"role": "system",
"content": "You are a specialized HTML schema generator. Analyze the HTML and generate a JSON schema that follows the specified format. Only output valid JSON schema, nothing else."
"content": f"""You specialize in generating special JSON schemas for web scraping. This schema uses CSS or XPATH selectors to present a repetitive pattern in crawled HTML, such as a product in a product list or a search result item in a list of search results. You use this JSON schema to pass to a language model along with the HTML content to extract structured data from the HTML. The language model uses the JSON schema to extract data from the HTML and retrieve values for fields in the JSON schema, following the schema.
Generating this HTML manually is not feasible, so you need to generate the JSON schema using the HTML content. The HTML copied from the crawled website is provided below, which we believe contains the repetitive pattern.
# Schema main keys:
- name: This is the name of the schema.
- baseSelector: This is the CSS or XPATH selector that identifies the base element that contains all the repetitive patterns.
- baseFields: This is a list of fields that you extract from the base element itself.
- fields: This is a list of fields that you extract from the children of the base element. {{name, selector, type}} based on the type, you may have extra keys such as "attribute" when the type is "attribute".
# Extra Context:
In this context, the following items may or may not be present:
- Example of target JSON object: This is a sample of the final JSON object that we hope to extract from the HTML using the schema you are generating.
- Extra Instructions: This is optional instructions to consider when generating the schema provided by the user.
# What if there is no example of target JSON object?
In this scenario, use your best judgment to generate the schema. Try to maximize the number of fields that you can extract from the HTML.
# What are the instructions and details for this schema generation?
{prompt_template}"""
}
user_message = {
@@ -1102,15 +1122,18 @@ class JsonElementExtractionStrategy(ExtractionStrategy):
```html
{html}
```
Instructions to extract schema for the above given HTML:
{prompt_template}
"""
}
if query:
user_message["content"] += f"\n\nImportant Notes to Consider:\n{query}"
if target_json_example:
user_message["content"] += f"\n\nExample of target JSON object:\n{target_json_example}"
user_message["content"] += """IMPORTANT: Ensure your schema is reliable, meaning do not use selectors that seem to generate dynamically and are not reliable. A reliable schema is what you want, as it consistently returns the same data even after many reloads of the page.
Analyze the HTML and generate a JSON schema that follows the specified format. Only output valid JSON schema, nothing else.
"""
try:
# Call LLM with backoff handling

View File

@@ -103,6 +103,7 @@ class CrawlResult(BaseModel):
media: Dict[str, List[Dict]] = {}
links: Dict[str, List[Dict]] = {}
downloaded_files: Optional[List[str]] = None
js_execution_result: Optional[Dict[str, Any]] = None
screenshot: Optional[str] = None
pdf: Optional[bytes] = None
markdown: Optional[Union[str, MarkdownGenerationResult]] = None
@@ -126,6 +127,7 @@ class CrawlResult(BaseModel):
class AsyncCrawlResponse(BaseModel):
html: str
response_headers: Dict[str, str]
js_execution_result: Optional[Dict[str, Any]] = None
status_code: int
screenshot: Optional[str] = None
pdf_data: Optional[bytes] = None

View File

@@ -56,15 +56,17 @@ class PDFContentScrapingStrategy(ContentScrapingStrategy):
"""
def __init__(self,
save_images_locally=False,
extract_images=False,
image_save_dir=None,
save_images_locally : bool = False,
extract_images : bool = False,
image_save_dir : str = None,
batch_size: int = 4,
logger: AsyncLogger = None):
self.logger = logger
self.pdf_processor = NaivePDFProcessorStrategy(
save_images_locally=False,
extract_images=False,
image_save_dir=None
save_images_locally=save_images_locally,
extract_images=extract_images,
image_save_dir=image_save_dir,
batch_size=batch_size
)
def scrap(self, url: str, html: str, **params) -> ScrapingResult:
@@ -83,7 +85,8 @@ class PDFContentScrapingStrategy(ContentScrapingStrategy):
pdf_path = self._get_pdf_path(url)
try:
# Process PDF
result = self.pdf_processor.process(Path(pdf_path))
# result = self.pdf_processor.process(Path(pdf_path))
result = self.pdf_processor.process_batch(Path(pdf_path))
# Combine page HTML
cleaned_html = f"""

View File

@@ -55,13 +55,14 @@ class PDFProcessorStrategy(ABC):
class NaivePDFProcessorStrategy(PDFProcessorStrategy):
def __init__(self, image_dpi: int = 144, image_quality: int = 85, extract_images: bool = True,
save_images_locally: bool = False, image_save_dir: Optional[Path] = None):
save_images_locally: bool = False, image_save_dir: Optional[Path] = None, batch_size: int = 4):
self.image_dpi = image_dpi
self.image_quality = image_quality
self.current_page_number = 0
self.extract_images = extract_images
self.save_images_locally = save_images_locally
self.image_save_dir = image_save_dir
self.batch_size = batch_size
self._temp_dir = None
def process(self, pdf_path: Path) -> PDFProcessResult:
@@ -89,7 +90,7 @@ class NaivePDFProcessorStrategy(PDFProcessorStrategy):
for page_num, page in enumerate(reader.pages):
self.current_page_number = page_num + 1
pdf_page = self._process_page(page, image_dir, reader)
pdf_page = self._process_page(page, image_dir)
result.pages.append(pdf_page)
except Exception as e:
@@ -107,7 +108,80 @@ class NaivePDFProcessorStrategy(PDFProcessorStrategy):
result.processing_time = time() - start_time
return result
def _process_page(self, page, image_dir: Optional[Path], reader) -> PDFPage:
def process_batch(self, pdf_path: Path) -> PDFProcessResult:
"""Like process() but processes PDF pages in parallel batches"""
import concurrent.futures
import threading
# Initialize PyPDF2 thread support
if not hasattr(threading.current_thread(), "_children"):
threading.current_thread()._children = set()
start_time = time()
result = PDFProcessResult(
metadata=PDFMetadata(),
pages=[],
version="1.1"
)
try:
# Get metadata and page count from main thread
with pdf_path.open('rb') as file:
reader = PdfReader(file)
result.metadata = self._extract_metadata(pdf_path, reader)
total_pages = len(reader.pages)
# Handle image directory setup
image_dir = None
if self.extract_images and self.save_images_locally:
if self.image_save_dir:
image_dir = Path(self.image_save_dir)
image_dir.mkdir(exist_ok=True, parents=True)
else:
self._temp_dir = tempfile.mkdtemp(prefix='pdf_images_')
image_dir = Path(self._temp_dir)
def process_page_safely(page_num: int):
# Each thread opens its own file handle
with pdf_path.open('rb') as file:
thread_reader = PdfReader(file)
page = thread_reader.pages[page_num]
self.current_page_number = page_num + 1
return self._process_page(page, image_dir)
# Process pages in parallel batches
with concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size) as executor:
futures = []
for page_num in range(total_pages):
future = executor.submit(process_page_safely, page_num)
futures.append((page_num + 1, future))
# Collect results in order
result.pages = [None] * total_pages
for page_num, future in futures:
try:
pdf_page = future.result()
result.pages[page_num - 1] = pdf_page
except Exception as e:
logger.error(f"Failed to process page {page_num}: {str(e)}")
raise
except Exception as e:
logger.error(f"Failed to process PDF: {str(e)}")
raise
finally:
# Cleanup temp directory if it was created
if self._temp_dir and not self.image_save_dir:
import shutil
try:
shutil.rmtree(self._temp_dir)
except Exception as e:
logger.error(f"Failed to cleanup temp directory: {str(e)}")
result.processing_time = time() - start_time
return result
def _process_page(self, page, image_dir: Optional[Path]) -> PDFPage:
pdf_page = PDFPage(
page_number=self.current_page_number,
)

View File

@@ -22,7 +22,7 @@ import cProfile
import pstats
from functools import wraps
import asyncio
from lxml import html, etree
import sqlite3
import hashlib
from urllib.parse import urljoin, urlparse
@@ -2207,3 +2207,26 @@ def get_error_context(exc_info, context_lines: int = 5):
"function": func_name,
"code_context": code_context,
}
def truncate(value, threshold):
if len(value) > threshold:
return value[:threshold] + '...' # Add ellipsis to indicate truncation
return value
def optimize_html(html_str, threshold=200):
root = html.fromstring(html_str)
for element in root.iter():
# Process attributes
for attr in list(element.attrib):
element.attrib[attr] = truncate(element.attrib[attr], threshold)
# Process text content
if element.text and len(element.text) > threshold:
element.text = truncate(element.text, threshold)
# Process tail text
if element.tail and len(element.tail) > threshold:
element.tail = truncate(element.tail, threshold)
return html.tostring(root, encoding='unicode', pretty_print=False)