diff --git a/crawl4ai/async_crawler_strategy.py b/crawl4ai/async_crawler_strategy.py new file mode 100644 index 00000000..3840260e --- /dev/null +++ b/crawl4ai/async_crawler_strategy.py @@ -0,0 +1,254 @@ +import asyncio +import base64, time +from abc import ABC, abstractmethod +from typing import Callable, Dict, Any, List, Optional +import os +import psutil +from playwright.async_api import async_playwright, Page, Browser, Error +from io import BytesIO +from PIL import Image, ImageDraw, ImageFont +from .utils import sanitize_input_encode +import json, uuid +import hashlib +from pathlib import Path +from playwright.async_api import ProxySettings + +def calculate_semaphore_count(): + cpu_count = os.cpu_count() + memory_gb = psutil.virtual_memory().total / (1024 ** 3) # Convert to GB + base_count = max(1, cpu_count // 2) + memory_based_cap = int(memory_gb / 2) # Assume 2GB per instance + return min(base_count, memory_based_cap) + +class AsyncCrawlerStrategy(ABC): + @abstractmethod + async def crawl(self, url: str, **kwargs) -> str: + pass + + @abstractmethod + async def crawl_many(self, urls: List[str], **kwargs) -> List[str]: + pass + + @abstractmethod + async def take_screenshot(self, url: str) -> str: + pass + + @abstractmethod + def update_user_agent(self, user_agent: str): + pass + + @abstractmethod + def set_hook(self, hook_type: str, hook: Callable): + pass + +class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy): + def __init__(self, use_cached_html=False, js_code=None, **kwargs): + self.use_cached_html = use_cached_html + self.user_agent = kwargs.get("user_agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36") + self.proxy = kwargs.get("proxy") + self.headers = {} + self.sessions = {} + self.session_ttl = 1800 + self.js_code = js_code + self.verbose = kwargs.get("verbose", False) + self.playwright = None + self.browser = None + self.hooks = { + 'on_browser_created': None, + 'on_user_agent_updated': None, + 'on_execution_started': None, + 'before_goto': None, + 'after_goto': None, + 'before_return_html': None + } + + async def __aenter__(self): + await self.start() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def start(self): + if self.playwright is None: + self.playwright = await async_playwright().start() + if self.browser is None: + browser_args = { + "headless": True, + # "headless": False, + "args": [ + "--disable-gpu", + "--disable-dev-shm-usage", + "--disable-setuid-sandbox", + "--no-sandbox", + ] + } + + # Add proxy settings if a proxy is specified + if self.proxy: + proxy_settings = ProxySettings(server=self.proxy) + browser_args["proxy"] = proxy_settings + + + self.browser = await self.playwright.chromium.launch(**browser_args) + await self.execute_hook('on_browser_created', self.browser) + + async def close(self): + if self.browser: + await self.browser.close() + self.browser = None + if self.playwright: + await self.playwright.stop() + self.playwright = None + + def __del__(self): + if self.browser or self.playwright: + asyncio.get_event_loop().run_until_complete(self.close()) + + def set_hook(self, hook_type: str, hook: Callable): + if hook_type in self.hooks: + self.hooks[hook_type] = hook + else: + raise ValueError(f"Invalid hook type: {hook_type}") + + async def execute_hook(self, hook_type: str, *args): + hook = self.hooks.get(hook_type) + if hook: + if asyncio.iscoroutinefunction(hook): + return await hook(*args) + else: + return hook(*args) + return args[0] if args else None + + def update_user_agent(self, user_agent: str): + self.user_agent = user_agent + + def set_custom_headers(self, headers: Dict[str, str]): + self.headers = headers + + async def kill_session(self, session_id: str): + if session_id in self.sessions: + context, page, _ = self.sessions[session_id] + await page.close() + await context.close() + del self.sessions[session_id] + + def _cleanup_expired_sessions(self): + current_time = time.time() + expired_sessions = [sid for sid, (_, _, last_used) in self.sessions.items() + if current_time - last_used > self.session_ttl] + for sid in expired_sessions: + asyncio.create_task(self.kill_session(sid)) + + async def crawl(self, url: str, **kwargs) -> str: + self._cleanup_expired_sessions() + session_id = kwargs.get("session_id") + if session_id: + context, page, _ = self.sessions.get(session_id, (None, None, None)) + if not context: + context = await self.browser.new_context( + user_agent=self.user_agent, + proxy={"server": self.proxy} if self.proxy else None + ) + await context.set_extra_http_headers(self.headers) + page = await context.new_page() + self.sessions[session_id] = (context, page, time.time()) + else: + context = await self.browser.new_context( + user_agent=self.user_agent, + proxy={"server": self.proxy} if self.proxy else None + ) + await context.set_extra_http_headers(self.headers) + page = await context.new_page() + + try: + if self.verbose: + print(f"[LOG] πΈοΈ Crawling {url} using AsyncPlaywrightCrawlerStrategy...") + + if self.use_cached_html: + cache_file_path = os.path.join(Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()) + if os.path.exists(cache_file_path): + with open(cache_file_path, "r") as f: + return f.read() + + if not kwargs.get("js_only", False): + await self.execute_hook('before_goto', page) + await page.goto(url, wait_until="domcontentloaded", timeout=60000) + await self.execute_hook('after_goto', page) + + await page.wait_for_selector('body') + await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") + + js_code = kwargs.get("js_code", kwargs.get("js", self.js_code)) + if js_code: + if isinstance(js_code, str): + await page.evaluate(js_code) + elif isinstance(js_code, list): + for js in js_code: + await page.evaluate(js) + + # await page.wait_for_timeout(100) + await page.wait_for_load_state('networkidle') + # Check for on execution even + await self.execute_hook('on_execution_started', page) + + html = await page.content() + page = await self.execute_hook('before_return_html', page, html) + + if self.verbose: + print(f"[LOG] β Crawled {url} successfully!") + + if self.use_cached_html: + cache_file_path = os.path.join(Path.home(), ".crawl4ai", "cache", hashlib.md5(url.encode()).hexdigest()) + with open(cache_file_path, "w", encoding="utf-8") as f: + f.write(html) + + return html + except Error as e: + raise Error(f"Failed to crawl {url}: {str(e)}") + finally: + if not session_id: + await page.close() + + # try: + # html = await _crawl() + # return sanitize_input_encode(html) + # except Error as e: + # raise Error(f"Failed to crawl {url}: {str(e)}") + # except Exception as e: + # raise Exception(f"Failed to crawl {url}: {str(e)}") + + async def crawl_many(self, urls: List[str], **kwargs) -> List[str]: + semaphore_count = kwargs.get('semaphore_count', calculate_semaphore_count()) + semaphore = asyncio.Semaphore(semaphore_count) + + async def crawl_with_semaphore(url): + async with semaphore: + return await self.crawl(url, **kwargs) + + tasks = [crawl_with_semaphore(url) for url in urls] + results = await asyncio.gather(*tasks, return_exceptions=True) + return [result if not isinstance(result, Exception) else str(result) for result in results] + + async def take_screenshot(self, url: str) -> str: + async with await self.browser.new_context(user_agent=self.user_agent) as context: + page = await context.new_page() + try: + await page.goto(url, wait_until="domcontentloaded") + screenshot = await page.screenshot(full_page=True) + return base64.b64encode(screenshot).decode('utf-8') + except Exception as e: + error_message = f"Failed to take screenshot: {str(e)}" + print(error_message) + + # Generate an error image + img = Image.new('RGB', (800, 600), color='black') + draw = ImageDraw.Draw(img) + font = ImageFont.load_default() + draw.text((10, 10), error_message, fill=(255, 255, 255), font=font) + + buffered = BytesIO() + img.save(buffered, format="JPEG") + return base64.b64encode(buffered.getvalue()).decode('utf-8') + finally: + await page.close() \ No newline at end of file diff --git a/crawl4ai/async_database.py b/crawl4ai/async_database.py new file mode 100644 index 00000000..baa53255 --- /dev/null +++ b/crawl4ai/async_database.py @@ -0,0 +1,97 @@ +import os +from pathlib import Path +import aiosqlite +import asyncio +from typing import Optional, Tuple + +DB_PATH = os.path.join(Path.home(), ".crawl4ai") +os.makedirs(DB_PATH, exist_ok=True) +DB_PATH = os.path.join(DB_PATH, "crawl4ai.db") + +class AsyncDatabaseManager: + def __init__(self): + self.db_path = DB_PATH + + async def ainit_db(self): + async with aiosqlite.connect(self.db_path) as db: + await db.execute(''' + CREATE TABLE IF NOT EXISTS crawled_data ( + url TEXT PRIMARY KEY, + html TEXT, + cleaned_html TEXT, + markdown TEXT, + extracted_content TEXT, + success BOOLEAN, + media TEXT DEFAULT "{}", + links TEXT DEFAULT "{}", + metadata TEXT DEFAULT "{}", + screenshot TEXT DEFAULT "" + ) + ''') + await db.commit() + + async def aalter_db_add_screenshot(self, new_column: str = "media"): + try: + async with aiosqlite.connect(self.db_path) as db: + await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""') + await db.commit() + except Exception as e: + print(f"Error altering database to add screenshot column: {e}") + + async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, str, str, bool, str]]: + try: + async with aiosqlite.connect(self.db_path) as db: + async with db.execute('SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot FROM crawled_data WHERE url = ?', (url,)) as cursor: + return await cursor.fetchone() + except Exception as e: + print(f"Error retrieving cached URL: {e}") + return None + + async def acache_url(self, url: str, html: str, cleaned_html: str, markdown: str, extracted_content: str, success: bool, media: str = "{}", links: str = "{}", metadata: str = "{}", screenshot: str = ""): + try: + async with aiosqlite.connect(self.db_path) as db: + await db.execute(''' + INSERT INTO crawled_data (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(url) DO UPDATE SET + html = excluded.html, + cleaned_html = excluded.cleaned_html, + markdown = excluded.markdown, + extracted_content = excluded.extracted_content, + success = excluded.success, + media = excluded.media, + links = excluded.links, + metadata = excluded.metadata, + screenshot = excluded.screenshot + ''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot)) + await db.commit() + except Exception as e: + print(f"Error caching URL: {e}") + + async def aget_total_count(self) -> int: + try: + async with aiosqlite.connect(self.db_path) as db: + async with db.execute('SELECT COUNT(*) FROM crawled_data') as cursor: + result = await cursor.fetchone() + return result[0] if result else 0 + except Exception as e: + print(f"Error getting total count: {e}") + return 0 + + async def aclear_db(self): + try: + async with aiosqlite.connect(self.db_path) as db: + await db.execute('DELETE FROM crawled_data') + await db.commit() + except Exception as e: + print(f"Error clearing database: {e}") + + async def aflush_db(self): + try: + async with aiosqlite.connect(self.db_path) as db: + await db.execute('DROP TABLE IF EXISTS crawled_data') + await db.commit() + except Exception as e: + print(f"Error flushing database: {e}") + +async_db_manager = AsyncDatabaseManager() \ No newline at end of file diff --git a/crawl4ai/async_webcrawler.py b/crawl4ai/async_webcrawler.py new file mode 100644 index 00000000..212d59ca --- /dev/null +++ b/crawl4ai/async_webcrawler.py @@ -0,0 +1,269 @@ +import os +import time +from pathlib import Path +from typing import Optional +import json +import asyncio +from .models import CrawlResult +from .async_database import async_db_manager +from .chunking_strategy import * +from .extraction_strategy import * +from .async_crawler_strategy import AsyncCrawlerStrategy, AsyncPlaywrightCrawlerStrategy +from .content_scrapping_strategy import WebScrappingStrategy +from .config import MIN_WORD_THRESHOLD, IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD +from .utils import ( + sanitize_input_encode, + InvalidCSSSelectorError, + format_html +) + + +class AsyncWebCrawler: + def __init__( + self, + crawler_strategy: Optional[AsyncCrawlerStrategy] = None, + always_by_pass_cache: bool = False, + verbose: bool = False, + ): + self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy( + verbose=verbose + ) + self.always_by_pass_cache = always_by_pass_cache + self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai") + os.makedirs(self.crawl4ai_folder, exist_ok=True) + os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True) + self.ready = False + self.verbose = verbose + + async def __aenter__(self): + await self.crawler_strategy.__aenter__() + await self.awarmup() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.crawler_strategy.__aexit__(exc_type, exc_val, exc_tb) + + async def awarmup(self): + print("[LOG] π€οΈ Warming up the AsyncWebCrawler") + await async_db_manager.ainit_db() + await self.arun( + url="https://google.com/", + word_count_threshold=5, + bypass_cache=False, + verbose=False, + ) + self.ready = True + print("[LOG] π AsyncWebCrawler is ready to crawl") + + async def arun( + self, + url: str, + word_count_threshold=MIN_WORD_THRESHOLD, + extraction_strategy: ExtractionStrategy = None, + chunking_strategy: ChunkingStrategy = RegexChunking(), + bypass_cache: bool = False, + css_selector: str = None, + screenshot: bool = False, + user_agent: str = None, + verbose=True, + **kwargs, + ) -> CrawlResult: + try: + extraction_strategy = extraction_strategy or NoExtractionStrategy() + extraction_strategy.verbose = verbose + if not isinstance(extraction_strategy, ExtractionStrategy): + raise ValueError("Unsupported extraction strategy") + if not isinstance(chunking_strategy, ChunkingStrategy): + raise ValueError("Unsupported chunking strategy") + + word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD) + + cached = None + screenshot_data = None + extracted_content = None + if not bypass_cache and not self.always_by_pass_cache: + cached = await async_db_manager.aget_cached_url(url) + + if kwargs.get("warmup", True) and not self.ready: + return None + + if cached: + html = sanitize_input_encode(cached[1]) + extracted_content = sanitize_input_encode(cached[4]) + if screenshot: + screenshot_data = cached[9] + if not screenshot_data: + cached = None + + if not cached or not html: + t1 = time.time() + if user_agent: + self.crawler_strategy.update_user_agent(user_agent) + html = await self.crawler_strategy.crawl(url, **kwargs) + t2 = time.time() + if verbose: + print( + f"[LOG] π Crawling done for {url}, success: {bool(html)}, time taken: {t2 - t1:.2f} seconds" + ) + if screenshot: + screenshot_data = await self.crawler_strategy.take_screenshot(url) + + crawl_result = await self.aprocess_html( + url, + html, + extracted_content, + word_count_threshold, + extraction_strategy, + chunking_strategy, + css_selector, + screenshot_data, + verbose, + bool(cached), + **kwargs, + ) + crawl_result.success = bool(html) + crawl_result.session_id = kwargs.get("session_id", None) + return crawl_result + except Exception as e: + if not hasattr(e, "msg"): + e.msg = str(e) + print(f"[ERROR] π« Failed to crawl {url}, error: {e.msg}") + return CrawlResult(url=url, html="", success=False, error_message=e.msg) + + async def arun_many( + self, + urls: List[str], + word_count_threshold=MIN_WORD_THRESHOLD, + extraction_strategy: ExtractionStrategy = None, + chunking_strategy: ChunkingStrategy = RegexChunking(), + bypass_cache: bool = False, + css_selector: str = None, + screenshot: bool = False, + user_agent: str = None, + verbose=True, + **kwargs, + ) -> List[CrawlResult]: + tasks = [ + self.arun( + url, + word_count_threshold, + extraction_strategy, + chunking_strategy, + bypass_cache, + css_selector, + screenshot, + user_agent, + verbose, + **kwargs + ) + for url in urls + ] + return await asyncio.gather(*tasks) + + + async def aprocess_html( + self, + url: str, + html: str, + extracted_content: str, + word_count_threshold: int, + extraction_strategy: ExtractionStrategy, + chunking_strategy: ChunkingStrategy, + css_selector: str, + screenshot: str, + verbose: bool, + is_cached: bool, + **kwargs, + ) -> CrawlResult: + t = time.time() + # Extract content from HTML + try: + t1 = time.time() + scrapping_strategy = WebScrappingStrategy() + result = await scrapping_strategy.ascrap( + url, + html, + word_count_threshold=word_count_threshold, + css_selector=css_selector, + only_text=kwargs.get("only_text", False), + image_description_min_word_threshold=kwargs.get( + "image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD + ), + ) + if verbose: + print( + f"[LOG] π Content extracted for {url}, success: True, time taken: {time.time() - t1:.2f} seconds" + ) + + if result is None: + raise ValueError(f"Failed to extract content from the website: {url}") + except InvalidCSSSelectorError as e: + raise ValueError(str(e)) + except Exception as e: + raise ValueError(f"Failed to extract content from the website: {url}, error: {str(e)}") + + cleaned_html = sanitize_input_encode(result.get("cleaned_html", "")) + markdown = sanitize_input_encode(result.get("markdown", "")) + media = result.get("media", []) + links = result.get("links", []) + metadata = result.get("metadata", {}) + + if extracted_content is None and extraction_strategy and chunking_strategy: + if verbose: + print( + f"[LOG] π₯ Extracting semantic blocks for {url}, Strategy: {self.__class__.__name__}" + ) + + # Check if extraction strategy is type of JsonCssExtractionStrategy + if isinstance(extraction_strategy, JsonCssExtractionStrategy) or isinstance(extraction_strategy, EnhancedJsonCssExtractionStrategy): + extraction_strategy.verbose = verbose + extracted_content = extraction_strategy.run(url, [html]) + extracted_content = json.dumps(extracted_content, indent=4, default=str) + else: + sections = chunking_strategy.chunk(markdown) + extracted_content = extraction_strategy.run(url, sections) + extracted_content = json.dumps(extracted_content, indent=4, default=str) + + if verbose: + print( + f"[LOG] π Extraction done for {url}, time taken: {time.time() - t:.2f} seconds." + ) + + screenshot = None if not screenshot else screenshot + + if not is_cached: + await async_db_manager.acache_url( + url, + html, + cleaned_html, + markdown, + extracted_content, + True, + json.dumps(media), + json.dumps(links), + json.dumps(metadata), + screenshot=screenshot, + ) + + return CrawlResult( + url=url, + html=html, + cleaned_html=format_html(cleaned_html), + markdown=markdown, + media=media, + links=links, + metadata=metadata, + screenshot=screenshot, + extracted_content=extracted_content, + success=True, + error_message="", + ) + + async def aclear_cache(self): + await async_db_manager.aclear_db() + + async def aflush_cache(self): + await async_db_manager.aflush_db() + + async def aget_cache_size(self): + return await async_db_manager.aget_total_count() diff --git a/crawl4ai/content_scrapping_strategy.py b/crawl4ai/content_scrapping_strategy.py new file mode 100644 index 00000000..56868354 --- /dev/null +++ b/crawl4ai/content_scrapping_strategy.py @@ -0,0 +1,283 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any +from bs4 import BeautifulSoup +from concurrent.futures import ThreadPoolExecutor +import asyncio, requests, re, os +from .config import * +from bs4 import element, NavigableString, Comment +from urllib.parse import urljoin +from requests.exceptions import InvalidSchema + +from .utils import ( + sanitize_input_encode, + sanitize_html, + extract_metadata, + InvalidCSSSelectorError, + CustomHTML2Text +) + + + +class ContentScrappingStrategy(ABC): + @abstractmethod + def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: + pass + + @abstractmethod + async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: + pass + +class WebScrappingStrategy(ContentScrappingStrategy): + def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: + return self._get_content_of_website_optimized(url, html, is_async=False, **kwargs) + + async def ascrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]: + return await asyncio.to_thread(self._get_content_of_website_optimized, url, html, **kwargs) + + def _get_content_of_website_optimized(self, url: str, html: str, word_count_threshold: int = MIN_WORD_THRESHOLD, css_selector: str = None, **kwargs) -> Dict[str, Any]: + if not html: + return None + + soup = BeautifulSoup(html, 'html.parser') + body = soup.body + + image_description_min_word_threshold = kwargs.get('image_description_min_word_threshold', IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD) + + if css_selector: + selected_elements = body.select(css_selector) + if not selected_elements: + raise InvalidCSSSelectorError(f"Invalid CSS selector, No elements found for CSS selector: {css_selector}") + body = soup.new_tag('div') + for el in selected_elements: + body.append(el) + + links = {'internal': [], 'external': []} + media = {'images': [], 'videos': [], 'audios': []} + + # Extract meaningful text for media files from closest parent + def find_closest_parent_with_useful_text(tag): + current_tag = tag + while current_tag: + current_tag = current_tag.parent + # Get the text content of the parent tag + if current_tag: + text_content = current_tag.get_text(separator=' ',strip=True) + # Check if the text content has at least word_count_threshold + if len(text_content.split()) >= image_description_min_word_threshold: + return text_content + return None + + def process_image(img, url, index, total_images): + #Check if an image has valid display and inside undesired html elements + def is_valid_image(img, parent, parent_classes): + style = img.get('style', '') + src = img.get('src', '') + classes_to_check = ['button', 'icon', 'logo'] + tags_to_check = ['button', 'input'] + return all([ + 'display:none' not in style, + src, + not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check), + parent.name not in tags_to_check + ]) + + #Score an image for it's usefulness + def score_image_for_usefulness(img, base_url, index, images_count): + # Function to parse image height/width value and units + def parse_dimension(dimension): + if dimension: + match = re.match(r"(\d+)(\D*)", dimension) + if match: + number = int(match.group(1)) + unit = match.group(2) or 'px' # Default unit is 'px' if not specified + return number, unit + return None, None + + # Fetch image file metadata to extract size and extension + def fetch_image_file_size(img, base_url): + #If src is relative path construct full URL, if not it may be CDN URL + img_url = urljoin(base_url,img.get('src')) + try: + response = requests.head(img_url) + if response.status_code == 200: + return response.headers.get('Content-Length',None) + else: + print(f"Failed to retrieve file size for {img_url}") + return None + except InvalidSchema as e: + return None + finally: + return + + image_height = img.get('height') + height_value, height_unit = parse_dimension(image_height) + image_width = img.get('width') + width_value, width_unit = parse_dimension(image_width) + image_size = 0 #int(fetch_image_file_size(img,base_url) or 0) + image_format = os.path.splitext(img.get('src',''))[1].lower() + # Remove . from format + image_format = image_format.strip('.') + score = 0 + if height_value: + if height_unit == 'px' and height_value > 150: + score += 1 + if height_unit in ['%','vh','vmin','vmax'] and height_value >30: + score += 1 + if width_value: + if width_unit == 'px' and width_value > 150: + score += 1 + if width_unit in ['%','vh','vmin','vmax'] and width_value >30: + score += 1 + if image_size > 10000: + score += 1 + if img.get('alt') != '': + score+=1 + if any(image_format==format for format in ['jpg','png','webp']): + score+=1 + if index/images_count<0.5: + score+=1 + return score + + if not is_valid_image(img, img.parent, img.parent.get('class', [])): + return None + score = score_image_for_usefulness(img, url, index, total_images) + if score <= IMAGE_SCORE_THRESHOLD: + return None + return { + 'src': img.get('src', ''), + 'alt': img.get('alt', ''), + 'desc': find_closest_parent_with_useful_text(img), + 'score': score, + 'type': 'image' + } + + def process_element(element: element.PageElement) -> bool: + try: + if isinstance(element, NavigableString): + if isinstance(element, Comment): + element.extract() + return False + + if element.name in ['script', 'style', 'link', 'meta', 'noscript']: + if element.name == 'img': + process_image(element, url, 0, 1) + element.decompose() + return False + + keep_element = False + + if element.name == 'a' and element.get('href'): + href = element['href'] + url_base = url.split('/')[2] + link_data = {'href': href, 'text': element.get_text()} + if href.startswith('http') and url_base not in href: + links['external'].append(link_data) + else: + links['internal'].append(link_data) + keep_element = True + + elif element.name == 'img': + return True # Always keep image elements + + elif element.name in ['video', 'audio']: + media[f"{element.name}s"].append({ + 'src': element.get('src'), + 'alt': element.get('alt'), + 'type': element.name, + 'description': find_closest_parent_with_useful_text(element) + }) + source_tags = element.find_all('source') + for source_tag in source_tags: + media[f"{element.name}s"].append({ + 'src': source_tag.get('src'), + 'alt': element.get('alt'), + 'type': element.name, + 'description': find_closest_parent_with_useful_text(element) + }) + return True # Always keep video and audio elements + + if element.name != 'pre': + if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']: + if kwargs.get('only_text', False): + element.replace_with(element.get_text()) + else: + element.unwrap() + elif element.name != 'img': + element.attrs = {} + + # Process children + for child in list(element.children): + if isinstance(child, NavigableString) and not isinstance(child, Comment): + if len(child.strip()) > 0: + keep_element = True + else: + if process_element(child): + keep_element = True + + + # Check word count + if not keep_element: + word_count = len(element.get_text(strip=True).split()) + keep_element = word_count >= word_count_threshold + + if not keep_element: + element.decompose() + + return keep_element + except Exception as e: + print('Error processing element:', str(e)) + return False + + #process images by filtering and extracting contextual text from the page + # imgs = body.find_all('img') + # media['images'] = [ + # result for result in + # (process_image(img, url, i, len(imgs)) for i, img in enumerate(imgs)) + # if result is not None + # ] + + process_element(body) + + # # Process images using ThreadPoolExecutor + imgs = body.find_all('img') + with ThreadPoolExecutor() as executor: + image_results = list(executor.map(process_image, imgs, [url]*len(imgs), range(len(imgs)), [len(imgs)]*len(imgs))) + media['images'] = [result for result in image_results if result is not None] + + def flatten_nested_elements(node): + if isinstance(node, NavigableString): + return node + if len(node.contents) == 1 and isinstance(node.contents[0], element.Tag) and node.contents[0].name == node.name: + return flatten_nested_elements(node.contents[0]) + node.contents = [flatten_nested_elements(child) for child in node.contents] + return node + + body = flatten_nested_elements(body) + base64_pattern = re.compile(r'data:image/[^;]+;base64,([^"]+)') + for img in imgs: + src = img.get('src', '') + if base64_pattern.match(src): + # Replace base64 data with empty string + img['src'] = base64_pattern.sub('', src) + cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ') + cleaned_html = sanitize_html(cleaned_html) + + h = CustomHTML2Text() + h.ignore_links = True + markdown = h.handle(cleaned_html) + markdown = markdown.replace(' ```', '```') + + try: + meta = extract_metadata(html, soup) + except Exception as e: + print('Error extracting metadata:', str(e)) + meta = {} + + return { + 'markdown': markdown, + 'cleaned_html': cleaned_html, + 'success': True, + 'media': media, + 'links': links, + 'metadata': meta + } diff --git a/crawl4ai/extraction_strategy.py b/crawl4ai/extraction_strategy.py index 4f5bb261..53a63310 100644 --- a/crawl4ai/extraction_strategy.py +++ b/crawl4ai/extraction_strategy.py @@ -623,3 +623,158 @@ class ContentSummarizationStrategy(ExtractionStrategy): # Sort summaries by the original section index to maintain order summaries.sort(key=lambda x: x[0]) return [summary for _, summary in summaries] + + +class JsonCssExtractionStrategy(ExtractionStrategy): + def __init__(self, schema: Dict[str, Any], **kwargs): + super().__init__(**kwargs) + self.schema = schema + + def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]: + soup = BeautifulSoup(html, 'html.parser') + base_elements = soup.select(self.schema['baseSelector']) + + results = [] + for element in base_elements: + item = {} + for field in self.schema['fields']: + value = self._extract_field(element, field) + if value is not None: + item[field['name']] = value + if item: + results.append(item) + + return results + + def _extract_field(self, element, field): + try: + selected = element.select_one(field['selector']) + if not selected: + return None + + if field['type'] == 'text': + return selected.get_text(strip=True) + elif field['type'] == 'attribute': + return selected.get(field['attribute']) + elif field['type'] == 'html': + return str(selected) + elif field['type'] == 'regex': + text = selected.get_text(strip=True) + match = re.search(field['pattern'], text) + return match.group(1) if match else None + except Exception as e: + if self.verbose: + print(f"Error extracting field {field['name']}: {str(e)}") + return None + + def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]: + combined_html = self.DEL.join(sections) + return self.extract(url, combined_html, **kwargs) + + +class EnhancedJsonCssExtractionStrategy(ExtractionStrategy): + def __init__(self, schema: Dict[str, Any], **kwargs): + super().__init__(**kwargs) + self.schema = schema + + def extract(self, url: str, html: str, *q, **kwargs) -> List[Dict[str, Any]]: + soup = BeautifulSoup(html, 'html.parser') + base_elements = soup.select(self.schema['baseSelector']) + + results = [] + for element in base_elements: + item = self._extract_item(element, self.schema['fields']) + if item: + results.append(item) + + return results + + + + def _extract_field(self, element, field): + try: + if field['type'] == 'nested': + nested_element = element.select_one(field['selector']) + return self._extract_item(nested_element, field['fields']) if nested_element else {} + + if field['type'] == 'list': + elements = element.select(field['selector']) + return [self._extract_list_item(el, field['fields']) for el in elements] + + if field['type'] == 'nested_list': + elements = element.select(field['selector']) + return [self._extract_item(el, field['fields']) for el in elements] + + return self._extract_single_field(element, field) + except Exception as e: + if self.verbose: + print(f"Error extracting field {field['name']}: {str(e)}") + return field.get('default') + + def _extract_list_item(self, element, fields): + item = {} + for field in fields: + value = self._extract_single_field(element, field) + if value is not None: + item[field['name']] = value + return item + + def _extract_single_field(self, element, field): + if 'selector' in field: + selected = element.select_one(field['selector']) + if not selected: + return field.get('default') + else: + selected = element + + value = None + if field['type'] == 'text': + value = selected.get_text(strip=True) + elif field['type'] == 'attribute': + value = selected.get(field['attribute']) + elif field['type'] == 'html': + value = str(selected) + elif field['type'] == 'regex': + text = selected.get_text(strip=True) + match = re.search(field['pattern'], text) + value = match.group(1) if match else None + + if 'transform' in field: + value = self._apply_transform(value, field['transform']) + + return value if value is not None else field.get('default') + + def _extract_item(self, element, fields): + item = {} + for field in fields: + if field['type'] == 'computed': + value = self._compute_field(item, field) + else: + value = self._extract_field(element, field) + if value is not None: + item[field['name']] = value + return item + + def _apply_transform(self, value, transform): + if transform == 'lowercase': + return value.lower() + elif transform == 'uppercase': + return value.upper() + elif transform == 'strip': + return value.strip() + return value + + def _compute_field(self, item, field): + try: + if 'expression' in field: + return eval(field['expression'], {}, item) + elif 'function' in field: + return field['function'](item) + except Exception as e: + if self.verbose: + print(f"Error computing field {field['name']}: {str(e)}") + return field.get('default') + + def run(self, url: str, sections: List[str], *q, **kwargs) -> List[Dict[str, Any]]: + combined_html = self.DEL.join(sections) + return self.extract(url, combined_html, **kwargs) \ No newline at end of file diff --git a/crawl4ai/models.py b/crawl4ai/models.py index f844b23c..e48441b8 100644 --- a/crawl4ai/models.py +++ b/crawl4ai/models.py @@ -16,4 +16,5 @@ class CrawlResult(BaseModel): markdown: Optional[str] = None extracted_content: Optional[str] = None metadata: Optional[dict] = None - error_message: Optional[str] = None \ No newline at end of file + error_message: Optional[str] = None + session_id: Optional[str] = None \ No newline at end of file diff --git a/crawl4ai/web_crawler.py b/crawl4ai/web_crawler.py index 351fa62e..3eda6b45 100644 --- a/crawl4ai/web_crawler.py +++ b/crawl4ai/web_crawler.py @@ -122,7 +122,7 @@ class WebCrawler: if not isinstance(chunking_strategy, ChunkingStrategy): raise ValueError("Unsupported chunking strategy") - word_count_threshold = max(word_count_threshold, 0) + word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD) cached = None screenshot_data = None diff --git a/tests/async/test_basic_crawling.py b/tests/async/test_basic_crawling.py new file mode 100644 index 00000000..7184f464 --- /dev/null +++ b/tests/async/test_basic_crawling.py @@ -0,0 +1,81 @@ +import os +import sys +import pytest +import asyncio +import time + +# Add the parent directory to the Python path +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) + +from crawl4ai.async_webcrawler import AsyncWebCrawler + +@pytest.mark.asyncio +async def test_successful_crawl(): + async with AsyncWebCrawler(verbose=True) as crawler: + url = "https://www.nbcnews.com/business" + result = await crawler.arun(url=url, bypass_cache=True) + assert result.success + assert result.url == url + assert result.html + assert result.markdown + assert result.cleaned_html + +@pytest.mark.asyncio +async def test_invalid_url(): + async with AsyncWebCrawler(verbose=True) as crawler: + url = "https://www.invalidurl12345.com" + result = await crawler.arun(url=url, bypass_cache=True) + assert not result.success + assert result.error_message + +@pytest.mark.asyncio +async def test_multiple_urls(): + async with AsyncWebCrawler(verbose=True) as crawler: + urls = [ + "https://www.nbcnews.com/business", + "https://www.example.com", + "https://www.python.org" + ] + results = await crawler.arun_many(urls=urls, bypass_cache=True) + assert len(results) == len(urls) + assert all(result.success for result in results) + assert all(result.html for result in results) + +@pytest.mark.asyncio +async def test_javascript_execution(): + async with AsyncWebCrawler(verbose=True) as crawler: + js_code = "document.body.innerHTML = '