- Issue Resolved: Every <pre> tag's HTML content is replaced with its inner text to address situations like syntax highlighters, where each character might be in a <span>. This avoids issues where the minimum word threshold might ignore them.

This commit is contained in:
unclecode
2024-05-12 14:08:22 +08:00
parent 8e536b9717
commit 7039e3c1ee
3 changed files with 100 additions and 46 deletions

3
.gitignore vendored
View File

@@ -164,4 +164,5 @@ cython_debug/
Crawl4AI.egg-info/ Crawl4AI.egg-info/
Crawl4AI.egg-info/* Crawl4AI.egg-info/*
crawler_data.db crawler_data.db
.vscode/ .vscode/
test_pad.py

View File

@@ -10,6 +10,7 @@ from .prompts import PROMPT_EXTRACT_BLOCKS
from .config import * from .config import *
import re import re
import html import html
from html2text import HTML2Text
def beautify_html(escaped_html): def beautify_html(escaped_html):
@@ -77,7 +78,8 @@ def split_and_parse_json_objects(json_string):
def sanitize_html(html): def sanitize_html(html):
# Replace all weird and special characters with an empty string # Replace all weird and special characters with an empty string
sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html) sanitized_html = html
# sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html)
# Escape all double and single quotes # Escape all double and single quotes
sanitized_html = sanitized_html.replace('"', '\\"').replace("'", "\\'") sanitized_html = sanitized_html.replace('"', '\\"').replace("'", "\\'")
@@ -113,6 +115,32 @@ def escape_json_string(s):
return s return s
class CustomHTML2Text(HTML2Text):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ignore_links = True
self.inside_pre = False
self.inside_code = False
def handle_tag(self, tag, attrs, start):
if tag == 'pre':
if start:
self.o('```\n')
self.inside_pre = True
else:
self.o('\n```')
self.inside_pre = False
# elif tag == 'code' and not self.inside_pre:
# if start:
# if not self.inside_pre:
# self.o('`')
# self.inside_code = True
# else:
# if not self.inside_pre:
# self.o('`')
# self.inside_code = False
super().handle_tag(tag, attrs, start)
def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD): def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD):
try: try:
@@ -139,17 +167,28 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD):
else: else:
img.decompose() img.decompose()
# Create a function that replace content of all"pre" tage with its inner text
def replace_pre_tags_with_text(node):
for child in node.find_all('pre'):
# set child inner html to its text
child.string = child.get_text()
return node
# Replace all "pre" tags with their inner text
body = replace_pre_tags_with_text(body)
# Recursively remove empty elements, their parent elements, and elements with word count below threshold # Recursively remove empty elements, their parent elements, and elements with word count below threshold
def remove_empty_and_low_word_count_elements(node): def remove_empty_and_low_word_count_elements(node, word_count_threshold):
for child in node.contents: for child in node.contents:
if isinstance(child, element.Tag): if isinstance(child, element.Tag):
remove_empty_and_low_word_count_elements(child) remove_empty_and_low_word_count_elements(child, word_count_threshold)
word_count = len(child.get_text(strip=True).split()) word_count = len(child.get_text(strip=True).split())
if (len(child.contents) == 0 and not child.get_text(strip=True)) or word_count < word_count_threshold: if (len(child.contents) == 0 and not child.get_text(strip=True)) or word_count < word_count_threshold:
child.decompose() child.decompose()
return node return node
body = remove_empty_and_low_word_count_elements(body) body = remove_empty_and_low_word_count_elements(body, word_count_threshold)
def remove_small_text_tags(body: Tag, word_count_threshold: int = MIN_WORD_THRESHOLD): def remove_small_text_tags(body: Tag, word_count_threshold: int = MIN_WORD_THRESHOLD):
# We'll use a list to collect all tags that don't meet the word count requirement # We'll use a list to collect all tags that don't meet the word count requirement
@@ -214,6 +253,8 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD):
return node return node
body = flatten_nested_elements(body) body = flatten_nested_elements(body)
# Remove comments # Remove comments
for comment in soup.find_all(text=lambda text: isinstance(text, Comment)): for comment in soup.find_all(text=lambda text: isinstance(text, Comment)):
@@ -228,6 +269,7 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD):
# Convert cleaned HTML to Markdown # Convert cleaned HTML to Markdown
h = html2text.HTML2Text() h = html2text.HTML2Text()
h = CustomHTML2Text()
h.ignore_links = True h.ignore_links = True
markdown = h.handle(cleaned_html) markdown = h.handle(cleaned_html)
@@ -242,12 +284,6 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD):
print('Error processing HTML content:', str(e)) print('Error processing HTML content:', str(e))
return None return None
# Example usage
# word_count_threshold = 5 # Adjust this value according to your desired threshold
# markdown_content = get_content_of_website(word_count_threshold)
# print(markdown_content)
def extract_xml_tags(string): def extract_xml_tags(string):
tags = re.findall(r'<(\w+)>', string) tags = re.findall(r'<(\w+)>', string)
return list(set(tags)) return list(set(tags))
@@ -318,23 +354,6 @@ def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None):
response = perform_completion_with_backoff(provider, prompt_with_variables, api_token) response = perform_completion_with_backoff(provider, prompt_with_variables, api_token)
# try:
# response = completion(
# model = provider,
# messages = [
# {"role": "user", "content": prompt_with_variables}
# ],
# temperature = 0.01,
# api_key = api_token
# )
# except litellm.exceptions.RateLimitError as e:
# print("Rate limit error:", str(e))
# return [{
# "index": 0,
# "tags": ["error"],
# "content": ["Rate limit error. Please try again later."]
# }]
try: try:
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks'] blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks']
blocks = json.loads(blocks) blocks = json.loads(blocks)

View File

@@ -1,6 +1,7 @@
import asyncio import asyncio
import os, time import os, time
import json import json
from pathlib import Path
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By from selenium.webdriver.common.by import By
@@ -29,12 +30,27 @@ class WebCrawler:
# Automatically install or update chromedriver # Automatically install or update chromedriver
chromedriver_autoinstaller.install() chromedriver_autoinstaller.install()
# Initialize WebDriver for crawling
self.service = Service(chromedriver_autoinstaller.install())
self.driver = webdriver.Chrome(service=self.service, options=self.options)
# Create the .crawl4ai folder in the user's home directory if it doesn't exist
self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai")
os.makedirs(self.crawl4ai_folder, exist_ok=True)
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True)
def fetch_page(self, url_model: UrlModel, provider: str = DEFAULT_PROVIDER, api_token: str = None, extract_blocks_flag: bool = True, word_count_threshold = MIN_WORD_THRESHOLD) -> CrawlResult: def fetch_page(self,
url_model: UrlModel,
provider: str = DEFAULT_PROVIDER,
api_token: str = None,
extract_blocks_flag: bool = True,
word_count_threshold = MIN_WORD_THRESHOLD,
use_cached_html: bool = False) -> CrawlResult:
# make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD # make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD
if word_count_threshold < MIN_WORD_THRESHOLD: # if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD # word_count_threshold = MIN_WORD_THRESHOLD
# Check cache first # Check cache first
cached = get_cached_url(self.db_path, str(url_model.url)) cached = get_cached_url(self.db_path, str(url_model.url))
@@ -51,23 +67,41 @@ class WebCrawler:
# Initialize WebDriver for crawling # Initialize WebDriver for crawling
service = Service(chromedriver_autoinstaller.install()) if use_cached_html:
driver = webdriver.Chrome(service=service, options=self.options) # load html from crawl4ai_folder/cache
valid_file_name = str(url_model.url).replace("/", "_").replace(":", "_")
try: if os.path.exists(os.path.join(self.crawl4ai_folder, "cache", valid_file_name)):
driver.get(str(url_model.url)) with open(os.path.join(self.crawl4ai_folder, "cache", valid_file_name), "r") as f:
WebDriverWait(driver, 10).until( html = f.read()
EC.presence_of_all_elements_located((By.TAG_NAME, "html")) else:
) raise Exception("Cached HTML file not found")
html = driver.page_source
success = True success = True
error_message = "" error_message = ""
except Exception as e: else:
html = "" service = self.service
success = False driver = self.driver
error_message = str(e)
finally: try:
driver.quit() driver.get(str(url_model.url))
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.TAG_NAME, "html"))
)
html = driver.page_source
success = True
error_message = ""
# Save html in crawl4ai_folder/cache
valid_file_name = str(url_model.url).replace("/", "_").replace(":", "_")
with open(os.path.join(self.crawl4ai_folder, "cache", valid_file_name), "w") as f:
f.write(html)
except Exception as e:
html = ""
success = False
error_message = str(e)
finally:
driver.quit()
# Extract content from HTML # Extract content from HTML
result = get_content_of_website(html, word_count_threshold) result = get_content_of_website(html, word_count_threshold)