- Added a post-installation setup script for initialization. - Updated README with installation notes for Playwright setup. - Enhanced migration logging for better error visibility. - Added 'pydantic' to requirements. - Bumped version to 0.3.746.
168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
import os
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
import aiosqlite
|
|
from typing import Optional
|
|
import xxhash
|
|
import aiofiles
|
|
import shutil
|
|
import time
|
|
from datetime import datetime
|
|
from .async_logger import AsyncLogger, LogLevel
|
|
|
|
# Initialize logger
|
|
logger = AsyncLogger(log_level=LogLevel.DEBUG, verbose=True)
|
|
|
|
# logging.basicConfig(level=logging.INFO)
|
|
# logger = logging.getLogger(__name__)
|
|
|
|
class DatabaseMigration:
|
|
def __init__(self, db_path: str):
|
|
self.db_path = db_path
|
|
self.content_paths = self._ensure_content_dirs(os.path.dirname(db_path))
|
|
|
|
def _ensure_content_dirs(self, base_path: str) -> dict:
|
|
dirs = {
|
|
'html': 'html_content',
|
|
'cleaned': 'cleaned_html',
|
|
'markdown': 'markdown_content',
|
|
'extracted': 'extracted_content',
|
|
'screenshots': 'screenshots'
|
|
}
|
|
content_paths = {}
|
|
for key, dirname in dirs.items():
|
|
path = os.path.join(base_path, dirname)
|
|
os.makedirs(path, exist_ok=True)
|
|
content_paths[key] = path
|
|
return content_paths
|
|
|
|
def _generate_content_hash(self, content: str) -> str:
|
|
x = xxhash.xxh64()
|
|
x.update(content.encode())
|
|
content_hash = x.hexdigest()
|
|
return content_hash
|
|
# return hashlib.sha256(content.encode()).hexdigest()
|
|
|
|
async def _store_content(self, content: str, content_type: str) -> str:
|
|
if not content:
|
|
return ""
|
|
|
|
content_hash = self._generate_content_hash(content)
|
|
file_path = os.path.join(self.content_paths[content_type], content_hash)
|
|
|
|
if not os.path.exists(file_path):
|
|
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
|
|
await f.write(content)
|
|
|
|
return content_hash
|
|
|
|
async def migrate_database(self):
|
|
"""Migrate existing database to file-based storage"""
|
|
# logger.info("Starting database migration...")
|
|
logger.info("Starting database migration...", tag="INIT")
|
|
|
|
try:
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
# Get all rows
|
|
async with db.execute(
|
|
'''SELECT url, html, cleaned_html, markdown,
|
|
extracted_content, screenshot FROM crawled_data'''
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
|
|
migrated_count = 0
|
|
for row in rows:
|
|
url, html, cleaned_html, markdown, extracted_content, screenshot = row
|
|
|
|
# Store content in files and get hashes
|
|
html_hash = await self._store_content(html, 'html')
|
|
cleaned_hash = await self._store_content(cleaned_html, 'cleaned')
|
|
markdown_hash = await self._store_content(markdown, 'markdown')
|
|
extracted_hash = await self._store_content(extracted_content, 'extracted')
|
|
screenshot_hash = await self._store_content(screenshot, 'screenshots')
|
|
|
|
# Update database with hashes
|
|
await db.execute('''
|
|
UPDATE crawled_data
|
|
SET html = ?,
|
|
cleaned_html = ?,
|
|
markdown = ?,
|
|
extracted_content = ?,
|
|
screenshot = ?
|
|
WHERE url = ?
|
|
''', (html_hash, cleaned_hash, markdown_hash,
|
|
extracted_hash, screenshot_hash, url))
|
|
|
|
migrated_count += 1
|
|
if migrated_count % 100 == 0:
|
|
logger.info(f"Migrated {migrated_count} records...", tag="INIT")
|
|
|
|
|
|
await db.commit()
|
|
logger.success(f"Migration completed. {migrated_count} records processed.", tag="COMPLETE")
|
|
|
|
except Exception as e:
|
|
# logger.error(f"Migration failed: {e}")
|
|
logger.error(
|
|
message="Migration failed: {error}",
|
|
tag="ERROR",
|
|
params={"error": str(e)}
|
|
)
|
|
raise e
|
|
|
|
async def backup_database(db_path: str) -> str:
|
|
"""Create backup of existing database"""
|
|
if not os.path.exists(db_path):
|
|
logger.info("No existing database found. Skipping backup.", tag="INIT")
|
|
return None
|
|
|
|
# Create backup with timestamp
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_path = f"{db_path}.backup_{timestamp}"
|
|
|
|
try:
|
|
# Wait for any potential write operations to finish
|
|
await asyncio.sleep(1)
|
|
|
|
# Create backup
|
|
shutil.copy2(db_path, backup_path)
|
|
logger.info(f"Database backup created at: {backup_path}", tag="COMPLETE")
|
|
return backup_path
|
|
except Exception as e:
|
|
# logger.error(f"Backup failed: {e}")
|
|
logger.error(
|
|
message="Migration failed: {error}",
|
|
tag="ERROR",
|
|
params={"error": str(e)}
|
|
)
|
|
raise e
|
|
|
|
async def run_migration(db_path: Optional[str] = None):
|
|
"""Run database migration"""
|
|
if db_path is None:
|
|
db_path = os.path.join(Path.home(), ".crawl4ai", "crawl4ai.db")
|
|
|
|
if not os.path.exists(db_path):
|
|
logger.info("No existing database found. Skipping migration.", tag="INIT")
|
|
return
|
|
|
|
# Create backup first
|
|
backup_path = await backup_database(db_path)
|
|
if not backup_path:
|
|
return
|
|
|
|
migration = DatabaseMigration(db_path)
|
|
await migration.migrate_database()
|
|
|
|
def main():
|
|
"""CLI entry point for migration"""
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description='Migrate Crawl4AI database to file-based storage')
|
|
parser.add_argument('--db-path', help='Custom database path')
|
|
args = parser.parse_args()
|
|
|
|
asyncio.run(run_migration(args.db_path))
|
|
|
|
if __name__ == "__main__":
|
|
main() |