diff --git a/crawl4ai/web_crawler.back.py b/crawl4ai/web_crawler.back.py new file mode 100644 index 00000000..af78f126 --- /dev/null +++ b/crawl4ai/web_crawler.back.py @@ -0,0 +1,357 @@ +import os, time +os.environ["TOKENIZERS_PARALLELISM"] = "false" +from pathlib import Path + +from .models import UrlModel, CrawlResult +from .database import init_db, get_cached_url, cache_url, DB_PATH, flush_db +from .utils import * +from .chunking_strategy import * +from .extraction_strategy import * +from .crawler_strategy import * +from typing import List +from concurrent.futures import ThreadPoolExecutor +from .config import * + + +class WebCrawler: + def __init__( + self, + # db_path: str = None, + crawler_strategy: CrawlerStrategy = None, + always_by_pass_cache: bool = False, + verbose: bool = False, + ): + # self.db_path = db_path + self.crawler_strategy = crawler_strategy or LocalSeleniumCrawlerStrategy(verbose=verbose) + self.always_by_pass_cache = always_by_pass_cache + + # Create the .crawl4ai folder in the user's home directory if it doesn't exist + self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai") + os.makedirs(self.crawl4ai_folder, exist_ok=True) + os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True) + + # If db_path is not provided, use the default path + # if not db_path: + # self.db_path = f"{self.crawl4ai_folder}/crawl4ai.db" + + # flush_db() + init_db() + + self.ready = False + + def warmup(self): + print("[LOG] 🌤️ Warming up the WebCrawler") + result = self.run( + url='https://crawl4ai.uccode.io/', + word_count_threshold=5, + extraction_strategy= NoExtractionStrategy(), + bypass_cache=False, + verbose = False + ) + self.ready = True + print("[LOG] 🌞 WebCrawler is ready to crawl") + + def fetch_page( + self, + url_model: UrlModel, + provider: str = DEFAULT_PROVIDER, + api_token: str = None, + extract_blocks_flag: bool = True, + word_count_threshold=MIN_WORD_THRESHOLD, + css_selector: str = None, + screenshot: bool = False, + use_cached_html: bool = False, + extraction_strategy: ExtractionStrategy = None, + chunking_strategy: ChunkingStrategy = RegexChunking(), + **kwargs, + ) -> CrawlResult: + return self.run( + url_model.url, + word_count_threshold, + extraction_strategy or NoExtractionStrategy(), + chunking_strategy, + bypass_cache=url_model.forced, + css_selector=css_selector, + screenshot=screenshot, + **kwargs, + ) + pass + + def run_old( + self, + url: str, + word_count_threshold=MIN_WORD_THRESHOLD, + extraction_strategy: ExtractionStrategy = None, + chunking_strategy: ChunkingStrategy = RegexChunking(), + bypass_cache: bool = False, + css_selector: str = None, + screenshot: bool = False, + user_agent: str = None, + verbose=True, + **kwargs, + ) -> CrawlResult: + if user_agent: + self.crawler_strategy.update_user_agent(user_agent) + extraction_strategy = extraction_strategy or NoExtractionStrategy() + extraction_strategy.verbose = verbose + # Check if extraction strategy is an instance of ExtractionStrategy if not raise an error + if not isinstance(extraction_strategy, ExtractionStrategy): + raise ValueError("Unsupported extraction strategy") + if not isinstance(chunking_strategy, ChunkingStrategy): + raise ValueError("Unsupported chunking strategy") + + # make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD + if word_count_threshold < MIN_WORD_THRESHOLD: + word_count_threshold = MIN_WORD_THRESHOLD + + # Check cache first + if not bypass_cache and not self.always_by_pass_cache: + cached = get_cached_url(url) + if cached: + return CrawlResult( + **{ + "url": cached[0], + "html": cached[1], + "cleaned_html": cached[2], + "markdown": cached[3], + "extracted_content": cached[4], + "success": cached[5], + "media": json.loads(cached[6] or "{}"), + "links": json.loads(cached[7] or "{}"), + "metadata": json.loads(cached[8] or "{}"), # "metadata": "{} + "screenshot": cached[9], + "error_message": "", + } + ) + + # Initialize WebDriver for crawling + t = time.time() + if kwargs.get("js", None): + self.crawler_strategy.js_code = kwargs.get("js") + html = self.crawler_strategy.crawl(url) + base64_image = None + if screenshot: + base64_image = self.crawler_strategy.take_screenshot() + success = True + error_message = "" + # Extract content from HTML + try: + result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector) + metadata = extract_metadata(html) + if result is None: + raise ValueError(f"Failed to extract content from the website: {url}") + except InvalidCSSSelectorError as e: + raise ValueError(str(e)) + + cleaned_html = result.get("cleaned_html", "") + markdown = result.get("markdown", "") + media = result.get("media", []) + links = result.get("links", []) + + # Print a profession LOG style message, show time taken and say crawling is done + if verbose: + print( + f"[LOG] 🚀 Crawling done for {url}, success: {success}, time taken: {time.time() - t} seconds" + ) + + extracted_content = [] + if verbose: + print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}") + t = time.time() + # Split markdown into sections + sections = chunking_strategy.chunk(markdown) + # sections = merge_chunks_based_on_token_threshold(sections, CHUNK_TOKEN_THRESHOLD) + + extracted_content = extraction_strategy.run( + url, sections, + ) + extracted_content = json.dumps(extracted_content) + + if verbose: + print( + f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds." + ) + + # Cache the result + cleaned_html = beautify_html(cleaned_html) + cache_url( + url, + html, + cleaned_html, + markdown, + extracted_content, + success, + json.dumps(media), + json.dumps(links), + json.dumps(metadata), + screenshot=base64_image, + ) + + return CrawlResult( + url=url, + html=html, + cleaned_html=cleaned_html, + markdown=markdown, + media=media, + links=links, + metadata=metadata, + screenshot=base64_image, + extracted_content=extracted_content, + success=success, + error_message=error_message, + ) + + def fetch_pages( + self, + url_models: List[UrlModel], + provider: str = DEFAULT_PROVIDER, + api_token: str = None, + extract_blocks_flag: bool = True, + word_count_threshold=MIN_WORD_THRESHOLD, + use_cached_html: bool = False, + css_selector: str = None, + screenshot: bool = False, + extraction_strategy: ExtractionStrategy = None, + chunking_strategy: ChunkingStrategy = RegexChunking(), + **kwargs, + ) -> List[CrawlResult]: + extraction_strategy = extraction_strategy or NoExtractionStrategy() + def fetch_page_wrapper(url_model, *args, **kwargs): + return self.fetch_page(url_model, *args, **kwargs) + + with ThreadPoolExecutor() as executor: + results = list( + executor.map( + fetch_page_wrapper, + url_models, + [provider] * len(url_models), + [api_token] * len(url_models), + [extract_blocks_flag] * len(url_models), + [word_count_threshold] * len(url_models), + [css_selector] * len(url_models), + [screenshot] * len(url_models), + [use_cached_html] * len(url_models), + [extraction_strategy] * len(url_models), + [chunking_strategy] * len(url_models), + *[kwargs] * len(url_models), + ) + ) + + return results + + def run( + self, + url: str, + word_count_threshold=MIN_WORD_THRESHOLD, + extraction_strategy: ExtractionStrategy = None, + chunking_strategy: ChunkingStrategy = RegexChunking(), + bypass_cache: bool = False, + css_selector: str = None, + screenshot: bool = False, + user_agent: str = None, + verbose=True, + **kwargs, + ) -> CrawlResult: + extraction_strategy = extraction_strategy or NoExtractionStrategy() + extraction_strategy.verbose = verbose + if not isinstance(extraction_strategy, ExtractionStrategy): + raise ValueError("Unsupported extraction strategy") + if not isinstance(chunking_strategy, ChunkingStrategy): + raise ValueError("Unsupported chunking strategy") + + if word_count_threshold < MIN_WORD_THRESHOLD: + word_count_threshold = MIN_WORD_THRESHOLD + + # Check cache first + cached = None + extracted_content = None + if not bypass_cache and not self.always_by_pass_cache: + cached = get_cached_url(url) + + if cached: + html = cached[1] + extracted_content = cached[2] + if screenshot: + screenshot = cached[9] + + else: + if user_agent: + self.crawler_strategy.update_user_agent(user_agent) + html = self.crawler_strategy.crawl(url) + if screenshot: + screenshot = self.crawler_strategy.take_screenshot() + + return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, bool(cached), **kwargs) + + def process_html( + self, + url: str, + html: str, + extracted_content: str, + word_count_threshold: int, + extraction_strategy: ExtractionStrategy, + chunking_strategy: ChunkingStrategy, + css_selector: str, + screenshot: bool, + verbose: bool, + is_cached: bool, + **kwargs, + ) -> CrawlResult: + t = time.time() + # Extract content from HTML + try: + result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector) + metadata = extract_metadata(html) + if result is None: + raise ValueError(f"Failed to extract content from the website: {url}") + except InvalidCSSSelectorError as e: + raise ValueError(str(e)) + + cleaned_html = result.get("cleaned_html", "") + markdown = result.get("markdown", "") + media = result.get("media", []) + links = result.get("links", []) + + if verbose: + print(f"[LOG] 🚀 Crawling done for {url}, success: True, time taken: {time.time() - t} seconds") + + if extracted_content is None: + if verbose: + print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}") + + sections = chunking_strategy.chunk(markdown) + extracted_content = extraction_strategy.run(url, sections) + extracted_content = json.dumps(extracted_content) + + if verbose: + print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.") + + screenshot = None if not screenshot else screenshot + + if not is_cached: + cache_url( + url, + html, + cleaned_html, + markdown, + extracted_content, + True, + json.dumps(media), + json.dumps(links), + json.dumps(metadata), + screenshot=screenshot, + ) + + return CrawlResult( + url=url, + html=html, + cleaned_html=cleaned_html, + markdown=markdown, + media=media, + links=links, + metadata=metadata, + screenshot=screenshot, + extracted_content=extracted_content, + success=True, + error_message="", + ) \ No newline at end of file diff --git a/crawl4ai/web_crawler.py b/crawl4ai/web_crawler.py index 0286c0cf..da44cc19 100644 --- a/crawl4ai/web_crawler.py +++ b/crawl4ai/web_crawler.py @@ -51,7 +51,6 @@ class WebCrawler: self.ready = True print("[LOG] 🌞 WebCrawler is ready to crawl") - def fetch_page( self, url_model: UrlModel, @@ -78,131 +77,6 @@ class WebCrawler: ) pass - - def run( - self, - url: str, - word_count_threshold=MIN_WORD_THRESHOLD, - extraction_strategy: ExtractionStrategy = None, - chunking_strategy: ChunkingStrategy = RegexChunking(), - bypass_cache: bool = False, - css_selector: str = None, - screenshot: bool = False, - verbose=True, - user_agent: str = None, - **kwargs, - ) -> CrawlResult: - if user_agent: - self.crawler_strategy.update_user_agent(user_agent) - extraction_strategy = extraction_strategy or NoExtractionStrategy() - extraction_strategy.verbose = verbose - # Check if extraction strategy is an instance of ExtractionStrategy if not raise an error - if not isinstance(extraction_strategy, ExtractionStrategy): - raise ValueError("Unsupported extraction strategy") - if not isinstance(chunking_strategy, ChunkingStrategy): - raise ValueError("Unsupported chunking strategy") - - # make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD - if word_count_threshold < MIN_WORD_THRESHOLD: - word_count_threshold = MIN_WORD_THRESHOLD - - # Check cache first - if not bypass_cache and not self.always_by_pass_cache: - cached = get_cached_url(url) - if cached: - return CrawlResult( - **{ - "url": cached[0], - "html": cached[1], - "cleaned_html": cached[2], - "markdown": cached[3], - "extracted_content": cached[4], - "success": cached[5], - "media": json.loads(cached[6] or "{}"), - "links": json.loads(cached[7] or "{}"), - "metadata": json.loads(cached[8] or "{}"), # "metadata": "{} - "screenshot": cached[9], - "error_message": "", - } - ) - - # Initialize WebDriver for crawling - t = time.time() - if kwargs.get("js", None): - self.crawler_strategy.js_code = kwargs.get("js") - html = self.crawler_strategy.crawl(url) - base64_image = None - if screenshot: - base64_image = self.crawler_strategy.take_screenshot() - success = True - error_message = "" - # Extract content from HTML - try: - result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector) - metadata = extract_metadata(html) - if result is None: - raise ValueError(f"Failed to extract content from the website: {url}") - except InvalidCSSSelectorError as e: - raise ValueError(str(e)) - - cleaned_html = result.get("cleaned_html", "") - markdown = result.get("markdown", "") - media = result.get("media", []) - links = result.get("links", []) - - # Print a profession LOG style message, show time taken and say crawling is done - if verbose: - print( - f"[LOG] 🚀 Crawling done for {url}, success: {success}, time taken: {time.time() - t} seconds" - ) - - extracted_content = [] - if verbose: - print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}") - t = time.time() - # Split markdown into sections - sections = chunking_strategy.chunk(markdown) - # sections = merge_chunks_based_on_token_threshold(sections, CHUNK_TOKEN_THRESHOLD) - - extracted_content = extraction_strategy.run( - url, sections, - ) - extracted_content = json.dumps(extracted_content) - - if verbose: - print( - f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds." - ) - - # Cache the result - cleaned_html = beautify_html(cleaned_html) - cache_url( - url, - html, - cleaned_html, - markdown, - extracted_content, - success, - json.dumps(media), - json.dumps(links), - json.dumps(metadata), - screenshot=base64_image, - ) - - return CrawlResult( - url=url, - html=html, - cleaned_html=cleaned_html, - markdown=markdown, - media=media, - links=links, - metadata=metadata, - screenshot=base64_image, - extracted_content=extracted_content, - success=success, - error_message=error_message, - ) - def fetch_pages( self, url_models: List[UrlModel], @@ -241,8 +115,7 @@ class WebCrawler: return results - - def run_less_db( + def run( self, url: str, word_count_threshold=MIN_WORD_THRESHOLD, @@ -251,6 +124,7 @@ class WebCrawler: bypass_cache: bool = False, css_selector: str = None, screenshot: bool = False, + user_agent: str = None, verbose=True, **kwargs, ) -> CrawlResult: @@ -273,11 +147,17 @@ class WebCrawler: if cached: html = cached[1] extracted_content = cached[2] - else: - html = self.crawler_strategy.crawl(url) - cache_url(url, html) + if screenshot: + screenshot = cached[9] - return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, **kwargs) + else: + if user_agent: + self.crawler_strategy.update_user_agent(user_agent) + html = self.crawler_strategy.crawl(url) + if screenshot: + screenshot = self.crawler_strategy.take_screenshot() + + return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, bool(cached), **kwargs) def process_html( self, @@ -290,16 +170,14 @@ class WebCrawler: css_selector: str, screenshot: bool, verbose: bool, + is_cached: bool, **kwargs, ) -> CrawlResult: t = time.time() - base64_image = None - if screenshot: - base64_image = self.crawler_strategy.take_screenshot() - # Extract content from HTML try: result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector) + metadata = extract_metadata(html) if result is None: raise ValueError(f"Failed to extract content from the website: {url}") except InvalidCSSSelectorError as e: @@ -312,20 +190,33 @@ class WebCrawler: if verbose: print(f"[LOG] 🚀 Crawling done for {url}, success: True, time taken: {time.time() - t} seconds") - - if verbose: - print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}") - - sections = chunking_strategy.chunk(markdown) - + if extracted_content is None: + if verbose: + print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}") + + sections = chunking_strategy.chunk(markdown) extracted_content = extraction_strategy.run(url, sections) extracted_content = json.dumps(extracted_content) - # Cache the extracted content - cache_url(url, html, extracted_content) - if verbose: - print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.") + if verbose: + print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.") + + screenshot = None if not screenshot else screenshot + + if not is_cached: + cache_url( + url, + html, + cleaned_html, + markdown, + extracted_content, + True, + json.dumps(media), + json.dumps(links), + json.dumps(metadata), + screenshot=screenshot, + ) return CrawlResult( url=url, @@ -334,7 +225,8 @@ class WebCrawler: markdown=markdown, media=media, links=links, - screenshot=base64_image, + metadata=metadata, + screenshot=screenshot, extracted_content=extracted_content, success=True, error_message="",