diff --git a/.gitignore b/.gitignore index 59f39306..0cc68fb7 100644 --- a/.gitignore +++ b/.gitignore @@ -164,4 +164,5 @@ cython_debug/ Crawl4AI.egg-info/ Crawl4AI.egg-info/* crawler_data.db -.vscode/ \ No newline at end of file +.vscode/ +test_pad.py \ No newline at end of file diff --git a/crawl4ai/utils.py b/crawl4ai/utils.py index eeb4c12a..ee669251 100644 --- a/crawl4ai/utils.py +++ b/crawl4ai/utils.py @@ -10,6 +10,7 @@ from .prompts import PROMPT_EXTRACT_BLOCKS from .config import * import re import html +from html2text import HTML2Text def beautify_html(escaped_html): @@ -77,7 +78,8 @@ def split_and_parse_json_objects(json_string): def sanitize_html(html): # Replace all weird and special characters with an empty string - sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html) + sanitized_html = html + # sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html) # Escape all double and single quotes sanitized_html = sanitized_html.replace('"', '\\"').replace("'", "\\'") @@ -113,6 +115,32 @@ def escape_json_string(s): return s +class CustomHTML2Text(HTML2Text): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.ignore_links = True + self.inside_pre = False + self.inside_code = False + + def handle_tag(self, tag, attrs, start): + if tag == 'pre': + if start: + self.o('```\n') + self.inside_pre = True + else: + self.o('\n```') + self.inside_pre = False + # elif tag == 'code' and not self.inside_pre: + # if start: + # if not self.inside_pre: + # self.o('`') + # self.inside_code = True + # else: + # if not self.inside_pre: + # self.o('`') + # self.inside_code = False + + super().handle_tag(tag, attrs, start) def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD): try: @@ -139,17 +167,28 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD): else: img.decompose() + + # Create a function that replace content of all"pre" tage with its inner text + def replace_pre_tags_with_text(node): + for child in node.find_all('pre'): + # set child inner html to its text + child.string = child.get_text() + return node + + # Replace all "pre" tags with their inner text + body = replace_pre_tags_with_text(body) + # Recursively remove empty elements, their parent elements, and elements with word count below threshold - def remove_empty_and_low_word_count_elements(node): + def remove_empty_and_low_word_count_elements(node, word_count_threshold): for child in node.contents: if isinstance(child, element.Tag): - remove_empty_and_low_word_count_elements(child) + remove_empty_and_low_word_count_elements(child, word_count_threshold) word_count = len(child.get_text(strip=True).split()) if (len(child.contents) == 0 and not child.get_text(strip=True)) or word_count < word_count_threshold: child.decompose() return node - body = remove_empty_and_low_word_count_elements(body) + body = remove_empty_and_low_word_count_elements(body, word_count_threshold) def remove_small_text_tags(body: Tag, word_count_threshold: int = MIN_WORD_THRESHOLD): # We'll use a list to collect all tags that don't meet the word count requirement @@ -214,6 +253,8 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD): return node body = flatten_nested_elements(body) + + # Remove comments for comment in soup.find_all(text=lambda text: isinstance(text, Comment)): @@ -228,6 +269,7 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD): # Convert cleaned HTML to Markdown h = html2text.HTML2Text() + h = CustomHTML2Text() h.ignore_links = True markdown = h.handle(cleaned_html) @@ -242,12 +284,6 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD): print('Error processing HTML content:', str(e)) return None -# Example usage -# word_count_threshold = 5 # Adjust this value according to your desired threshold -# markdown_content = get_content_of_website(word_count_threshold) -# print(markdown_content) - - def extract_xml_tags(string): tags = re.findall(r'<(\w+)>', string) return list(set(tags)) @@ -318,23 +354,6 @@ def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None): response = perform_completion_with_backoff(provider, prompt_with_variables, api_token) - # try: - # response = completion( - # model = provider, - # messages = [ - # {"role": "user", "content": prompt_with_variables} - # ], - # temperature = 0.01, - # api_key = api_token - # ) - # except litellm.exceptions.RateLimitError as e: - # print("Rate limit error:", str(e)) - # return [{ - # "index": 0, - # "tags": ["error"], - # "content": ["Rate limit error. Please try again later."] - # }] - try: blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] blocks = json.loads(blocks) diff --git a/crawl4ai/web_crawler.py b/crawl4ai/web_crawler.py index 7385a7d7..43e0d9dd 100644 --- a/crawl4ai/web_crawler.py +++ b/crawl4ai/web_crawler.py @@ -1,6 +1,7 @@ import asyncio import os, time import json +from pathlib import Path from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By @@ -29,12 +30,27 @@ class WebCrawler: # Automatically install or update chromedriver chromedriver_autoinstaller.install() + + # Initialize WebDriver for crawling + self.service = Service(chromedriver_autoinstaller.install()) + self.driver = webdriver.Chrome(service=self.service, options=self.options) + + # Create the .crawl4ai folder in the user's home directory if it doesn't exist + self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai") + os.makedirs(self.crawl4ai_folder, exist_ok=True) + os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True) - def fetch_page(self, url_model: UrlModel, provider: str = DEFAULT_PROVIDER, api_token: str = None, extract_blocks_flag: bool = True, word_count_threshold = MIN_WORD_THRESHOLD) -> CrawlResult: + def fetch_page(self, + url_model: UrlModel, + provider: str = DEFAULT_PROVIDER, + api_token: str = None, + extract_blocks_flag: bool = True, + word_count_threshold = MIN_WORD_THRESHOLD, + use_cached_html: bool = False) -> CrawlResult: # make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD - if word_count_threshold < MIN_WORD_THRESHOLD: - word_count_threshold = MIN_WORD_THRESHOLD + # if word_count_threshold < MIN_WORD_THRESHOLD: + # word_count_threshold = MIN_WORD_THRESHOLD # Check cache first cached = get_cached_url(self.db_path, str(url_model.url)) @@ -51,23 +67,41 @@ class WebCrawler: # Initialize WebDriver for crawling - service = Service(chromedriver_autoinstaller.install()) - driver = webdriver.Chrome(service=service, options=self.options) - - try: - driver.get(str(url_model.url)) - WebDriverWait(driver, 10).until( - EC.presence_of_all_elements_located((By.TAG_NAME, "html")) - ) - html = driver.page_source + if use_cached_html: + # load html from crawl4ai_folder/cache + valid_file_name = str(url_model.url).replace("/", "_").replace(":", "_") + if os.path.exists(os.path.join(self.crawl4ai_folder, "cache", valid_file_name)): + with open(os.path.join(self.crawl4ai_folder, "cache", valid_file_name), "r") as f: + html = f.read() + else: + raise Exception("Cached HTML file not found") + success = True error_message = "" - except Exception as e: - html = "" - success = False - error_message = str(e) - finally: - driver.quit() + else: + service = self.service + driver = self.driver + + try: + driver.get(str(url_model.url)) + WebDriverWait(driver, 10).until( + EC.presence_of_all_elements_located((By.TAG_NAME, "html")) + ) + html = driver.page_source + success = True + error_message = "" + + # Save html in crawl4ai_folder/cache + valid_file_name = str(url_model.url).replace("/", "_").replace(":", "_") + with open(os.path.join(self.crawl4ai_folder, "cache", valid_file_name), "w") as f: + f.write(html) + + except Exception as e: + html = "" + success = False + error_message = str(e) + finally: + driver.quit() # Extract content from HTML result = get_content_of_website(html, word_count_threshold)