perf(crawler): major performance improvements & raw HTML support

- Switch to lxml parser (~4x speedup)
- Add raw HTML & local file crawling support
- Fix cache headers & async cleanup
- Add browser process monitoring
- Optimize BeautifulSoup operations
- Pre-compile regex patterns

Breaking: Raw HTML handling requires new URL prefixes
Fixes: #256, #253
This commit is contained in:
UncleCode
2024-11-13 19:40:40 +08:00
parent 61b93ebf36
commit c38ac29edb
11 changed files with 2953 additions and 130 deletions

View File

@@ -5,6 +5,7 @@ import asyncio
from typing import Optional, Tuple, Dict
from contextlib import asynccontextmanager
import logging
import json # Added for serialization/deserialization
# Set up logging
logging.basicConfig(level=logging.INFO)
@@ -89,7 +90,8 @@ class AsyncDatabaseManager:
media TEXT DEFAULT "{}",
links TEXT DEFAULT "{}",
metadata TEXT DEFAULT "{}",
screenshot TEXT DEFAULT ""
screenshot TEXT DEFAULT "",
response_headers TEXT DEFAULT "{}" -- New column added
)
''')
@@ -105,26 +107,51 @@ class AsyncDatabaseManager:
column_names = await self.execute_with_retry(_check_columns)
for column in ['media', 'links', 'metadata', 'screenshot']:
# List of new columns to add
new_columns = ['media', 'links', 'metadata', 'screenshot', 'response_headers']
for column in new_columns:
if column not in column_names:
await self.aalter_db_add_column(column)
async def aalter_db_add_column(self, new_column: str):
"""Add new column to the database"""
async def _alter(db):
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
if new_column == 'response_headers':
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT "{{}}"')
else:
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
logger.info(f"Added column '{new_column}' to the database.")
await self.execute_with_retry(_alter)
async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, str, str, bool, str]]:
async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, bool, str, str, str, str]]:
"""Retrieve cached URL data"""
async def _get(db):
async with db.execute(
'SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot FROM crawled_data WHERE url = ?',
'''
SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers
FROM crawled_data WHERE url = ?
''',
(url,)
) as cursor:
return await cursor.fetchone()
row = await cursor.fetchone()
if row:
# Deserialize JSON fields
return (
row[0], # url
row[1], # html
row[2], # cleaned_html
row[3], # markdown
row[4], # extracted_content
row[5], # success
json.loads(row[6] or '{}'), # media
json.loads(row[7] or '{}'), # links
json.loads(row[8] or '{}'), # metadata
row[9], # screenshot
json.loads(row[10] or '{}') # response_headers
)
return None
try:
return await self.execute_with_retry(_get)
@@ -132,12 +159,27 @@ class AsyncDatabaseManager:
logger.error(f"Error retrieving cached URL: {e}")
return None
async def acache_url(self, url: str, html: str, cleaned_html: str, markdown: str, extracted_content: str, success: bool, media: str = "{}", links: str = "{}", metadata: str = "{}", screenshot: str = ""):
async def acache_url(
self,
url: str,
html: str,
cleaned_html: str,
markdown: str,
extracted_content: str,
success: bool,
media: str = "{}",
links: str = "{}",
metadata: str = "{}",
screenshot: str = "",
response_headers: str = "{}" # New parameter added
):
"""Cache URL data with retry logic"""
async def _cache(db):
await db.execute('''
INSERT INTO crawled_data (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO crawled_data (
url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
html = excluded.html,
cleaned_html = excluded.cleaned_html,
@@ -147,8 +189,9 @@ class AsyncDatabaseManager:
media = excluded.media,
links = excluded.links,
metadata = excluded.metadata,
screenshot = excluded.screenshot
''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot))
screenshot = excluded.screenshot,
response_headers = excluded.response_headers -- Update response_headers
''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot, response_headers))
try:
await self.execute_with_retry(_cache)
@@ -189,4 +232,4 @@ class AsyncDatabaseManager:
logger.error(f"Error flushing database: {e}")
# Create a singleton instance
async_db_manager = AsyncDatabaseManager()
async_db_manager = AsyncDatabaseManager()