Change the project folder name from crawler to crawl4ai

This commit is contained in:
unclecode
2024-05-09 22:16:28 +08:00
parent 7ee8001b7d
commit 3ff1d15702
9 changed files with 6 additions and 6 deletions

1
crawl4ai/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .web_crawler import WebCrawler

24
crawl4ai/config.py Normal file
View File

@@ -0,0 +1,24 @@
import os
from dotenv import load_dotenv
load_dotenv() # Load environment variables from .env file
# Default provider
DEFAULT_PROVIDER = "openai/gpt-4-turbo"
# Provider-model dictionary
PROVIDER_MODELS = {
"groq/llama3-70b-8192": os.getenv("GROQ_API_KEY", "YOUR_GROQ_TOKEN"),
"groq/llama3-8b-8192": os.getenv("GROQ_API_KEY", "YOUR_GROQ_TOKEN"),
"openai/gpt-3.5-turbo": os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_TOKEN"),
"openai/gpt-4-turbo": os.getenv("OPENAI_API_KEY", "YOUR_OPENAI_TOKEN"),
"anthropic/claude-3-haiku-20240307": os.getenv("ANTHROPIC_API_KEY", "YOUR_ANTHROPIC_TOKEN"),
"anthropic/claude-3-opus-20240229": os.getenv("ANTHROPIC_API_KEY", "YOUR_ANTHROPIC_TOKEN"),
"anthropic/claude-3-sonnet-20240229": os.getenv("ANTHROPIC_API_KEY", "YOUR_ANTHROPIC_TOKEN"),
}
# Chunk token threshold
CHUNK_TOKEN_THRESHOLD = 1000
# Threshold for the minimum number of word in a HTML tag to be considered
MIN_WORD_THRESHOLD = 5

61
crawl4ai/database.py Normal file
View File

@@ -0,0 +1,61 @@
import sqlite3
from typing import Optional
def init_db(db_path: str):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS crawled_data (
url TEXT PRIMARY KEY,
html TEXT,
cleaned_html TEXT,
markdown TEXT,
parsed_json TEXT,
success BOOLEAN
)
''')
conn.commit()
conn.close()
def get_cached_url(db_path: str, url: str) -> Optional[tuple]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('SELECT url, html, cleaned_html, markdown, parsed_json, success FROM crawled_data WHERE url = ?', (url,))
result = cursor.fetchone()
conn.close()
return result
def cache_url(db_path: str, url: str, html: str, cleaned_html: str, markdown: str, parsed_json: str, success: bool):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO crawled_data (url, html, cleaned_html, markdown, parsed_json, success)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
html = excluded.html,
cleaned_html = excluded.cleaned_html,
markdown = excluded.markdown,
parsed_json = excluded.parsed_json,
success = excluded.success
''', (str(url), html, cleaned_html, markdown, parsed_json, success))
conn.commit()
conn.close()
def get_total_count(db_path: str) -> int:
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM crawled_data')
result = cursor.fetchone()
conn.close()
return result[0]
except Exception as e:
return 0
# Crete function to cler the database
def clear_db(db_path: str):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('DELETE FROM crawled_data')
conn.commit()
conn.close()

15
crawl4ai/models.py Normal file
View File

@@ -0,0 +1,15 @@
from pydantic import BaseModel, HttpUrl
from typing import List
class UrlModel(BaseModel):
url: HttpUrl
forced: bool = False
class CrawlResult(BaseModel):
url: str
html: str
success: bool
cleaned_html: str = None
markdown: str = None
parsed_json: str = None
error_message: str = None

110
crawl4ai/prompts.py Normal file
View File

@@ -0,0 +1,110 @@
PROMPT_EXTRACT_BLOCKS = """YHere is the URL of the webpage:
<url>{URL}</url>
And here is the cleaned HTML content of that webpage:
<html>
{HTML}
</html>
Your task is to break down this HTML content into semantically relevant blocks, and for each block, generate a JSON object with the following keys:
- index: an integer representing the index of the block in the content
- tags: a list of semantic tags that are relevant to the content of the block
- content: a list of strings containing the text content of the block
- questions: a list of 3 questions that a user may ask about the content in this block
To generate the JSON objects:
1. Carefully read through the HTML content and identify logical breaks or shifts in the content that would warrant splitting it into separate blocks.
2. For each block:
a. Assign it an index based on its order in the content.
b. Analyze the content and generate a list of relevant semantic tags that describe what the block is about.
c. Extract the text content, clean it up if needed, and store it as a list of strings in the "content" field.
d. Come up with 3 questions that a user might ask about this specific block of content, based on the tags and content. The questions should be relevant and answerable by the content in the block.
3. Ensure that the order of the JSON objects matches the order of the blocks as they appear in the original HTML content.
4. Double-check that each JSON object includes all required keys (index, tags, content, questions) and that the values are in the expected format (integer, list of strings, etc.).
5. Make sure the generated JSON is complete and parsable, with no errors or omissions.
6. Make sur to escape any special characters in the HTML content, and also single or double quote to avoid JSON parsing issues.
Please provide your output within <blocks> tags, like this:
<blocks>
[{
"index": 0,
"tags": ["introduction", "overview"],
"content": ["This is the first paragraph of the article, which provides an introduction and overview of the main topic."],
"questions": [
"What is the main topic of this article?",
"What can I expect to learn from reading this article?",
"Is this article suitable for beginners or experts in the field?"
]
},
{
"index": 1,
"tags": ["history", "background"],
"content": ["This is the second paragraph, which delves into the history and background of the topic.",
"It provides context and sets the stage for the rest of the article."],
"questions": [
"What historical events led to the development of this topic?",
"How has the understanding of this topic evolved over time?",
"What are some key milestones in the history of this topic?"
]
}]
</blocks>
Remember, the output should be a complete, parsable JSON wrapped in <blocks> tags, with no omissions or errors. The JSON objects should semantically break down the content into relevant blocks, maintaining the original order."""
PROMPT_EXTRACT_BLOCKS = """YHere is the URL of the webpage:
<url>{URL}</url>
And here is the cleaned HTML content of that webpage:
<html>
{HTML}
</html>
Your task is to break down this HTML content into semantically relevant blocks, and for each block, generate a JSON object with the following keys:
- index: an integer representing the index of the block in the content
- content: a list of strings containing the text content of the block
To generate the JSON objects:
1. Carefully read through the HTML content and identify logical breaks or shifts in the content that would warrant splitting it into separate blocks.
2. For each block:
a. Assign it an index based on its order in the content.
b. Analyze the content and generate ONE semantic tag that describe what the block is about.
c. Extract the text content, EXACTLY SAME AS GIVE DATA, clean it up if needed, and store it as a list of strings in the "content" field.
3. Ensure that the order of the JSON objects matches the order of the blocks as they appear in the original HTML content.
4. Double-check that each JSON object includes all required keys (index, tag, content) and that the values are in the expected format (integer, list of strings, etc.).
5. Make sure the generated JSON is complete and parsable, with no errors or omissions.
6. Make sur to escape any special characters in the HTML content, and also single or double quote to avoid JSON parsing issues.
7. Never alter the extracted content, just copy and paste it as it is.
Please provide your output within <blocks> tags, like this:
<blocks>
[{
"index": 0,
"tags": ["introduction"],
"content": ["This is the first paragraph of the article, which provides an introduction and overview of the main topic."]
},
{
"index": 1,
"tags": ["background"],
"content": ["This is the second paragraph, which delves into the history and background of the topic.",
"It provides context and sets the stage for the rest of the article."]
}]
</blocks>
Remember, the output should be a complete, parsable JSON wrapped in <blocks> tags, with no omissions or errors. The JSON objects should semantically break down the content into relevant blocks, maintaining the original order."""

400
crawl4ai/utils.py Normal file
View File

@@ -0,0 +1,400 @@
import requests
from bs4 import BeautifulSoup, Comment, element, Tag, NavigableString
import html2text
import json
import re
import os
import litellm
from litellm import completion, batch_completion
from .prompts import PROMPT_EXTRACT_BLOCKS
from .config import *
import re
import html
def beautify_html(escaped_html):
"""
Beautifies an escaped HTML string.
Parameters:
escaped_html (str): A string containing escaped HTML.
Returns:
str: A beautifully formatted HTML string.
"""
# Unescape the HTML string
unescaped_html = html.unescape(escaped_html)
# Use BeautifulSoup to parse and prettify the HTML
soup = BeautifulSoup(unescaped_html, 'html.parser')
pretty_html = soup.prettify()
return pretty_html
def split_and_parse_json_objects(json_string):
"""
Splits a JSON string which is a list of objects and tries to parse each object.
Parameters:
json_string (str): A string representation of a list of JSON objects, e.g., '[{...}, {...}, ...]'.
Returns:
tuple: A tuple containing two lists:
- First list contains all successfully parsed JSON objects.
- Second list contains the string representations of all segments that couldn't be parsed.
"""
# Trim the leading '[' and trailing ']'
if json_string.startswith('[') and json_string.endswith(']'):
json_string = json_string[1:-1].strip()
# Split the string into segments that look like individual JSON objects
segments = []
depth = 0
start_index = 0
for i, char in enumerate(json_string):
if char == '{':
if depth == 0:
start_index = i
depth += 1
elif char == '}':
depth -= 1
if depth == 0:
segments.append(json_string[start_index:i+1])
# Try parsing each segment
parsed_objects = []
unparsed_segments = []
for segment in segments:
try:
obj = json.loads(segment)
parsed_objects.append(obj)
except json.JSONDecodeError:
unparsed_segments.append(segment)
return parsed_objects, unparsed_segments
def sanitize_html(html):
# Replace all weird and special characters with an empty string
sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html)
# Escape all double and single quotes
sanitized_html = sanitized_html.replace('"', '\\"').replace("'", "\\'")
return sanitized_html
def escape_json_string(s):
"""
Escapes characters in a string to be JSON safe.
Parameters:
s (str): The input string to be escaped.
Returns:
str: The escaped string, safe for JSON encoding.
"""
# Replace problematic backslash first
s = s.replace('\\', '\\\\')
# Replace the double quote
s = s.replace('"', '\\"')
# Escape control characters
s = s.replace('\b', '\\b')
s = s.replace('\f', '\\f')
s = s.replace('\n', '\\n')
s = s.replace('\r', '\\r')
s = s.replace('\t', '\\t')
# Additional problematic characters
# Unicode control characters
s = re.sub(r'[\x00-\x1f\x7f-\x9f]', lambda x: '\\u{:04x}'.format(ord(x.group())), s)
return s
def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD):
try:
# Parse HTML content with BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Get the content within the <body> tag
body = soup.body
# Remove script, style, and other tags that don't carry useful content from body
for tag in body.find_all(['script', 'style', 'link', 'meta', 'noscript']):
tag.decompose()
# Remove all attributes from remaining tags in body, except for img tags
for tag in body.find_all():
if tag.name != 'img':
tag.attrs = {}
# Replace images with their alt text or remove them if no alt text is available
for img in body.find_all('img'):
alt_text = img.get('alt')
if alt_text:
img.replace_with(soup.new_string(alt_text))
else:
img.decompose()
# Recursively remove empty elements, their parent elements, and elements with word count below threshold
def remove_empty_and_low_word_count_elements(node):
for child in node.contents:
if isinstance(child, element.Tag):
remove_empty_and_low_word_count_elements(child)
word_count = len(child.get_text(strip=True).split())
if (len(child.contents) == 0 and not child.get_text(strip=True)) or word_count < word_count_threshold:
child.decompose()
return node
body = remove_empty_and_low_word_count_elements(body)
def remove_small_text_tags(body: Tag, word_count_threshold: int = MIN_WORD_THRESHOLD):
# We'll use a list to collect all tags that don't meet the word count requirement
tags_to_remove = []
# Traverse all tags in the body
for tag in body.find_all(True): # True here means all tags
# Check if the tag contains text and if it's not just whitespace
if tag.string and tag.string.strip():
# Split the text by spaces and count the words
word_count = len(tag.string.strip().split())
# If the word count is less than the threshold, mark the tag for removal
if word_count < word_count_threshold:
tags_to_remove.append(tag)
# Remove all marked tags from the tree
for tag in tags_to_remove:
tag.decompose() # or tag.extract() to remove and get the element
return body
# Remove small text tags
body = remove_small_text_tags(body, word_count_threshold)
def is_empty_or_whitespace(tag: Tag):
if isinstance(tag, NavigableString):
return not tag.strip()
# Check if the tag itself is empty or all its children are empty/whitespace
if not tag.contents:
return True
return all(is_empty_or_whitespace(child) for child in tag.contents)
def remove_empty_tags(body: Tag):
# Continue processing until no more changes are made
changes = True
while changes:
changes = False
# Collect all tags that are empty or contain only whitespace
empty_tags = [tag for tag in body.find_all(True) if is_empty_or_whitespace(tag)]
for tag in empty_tags:
# If a tag is empty, decompose it
tag.decompose()
changes = True # Mark that a change was made
return body
# Remove empty tags
body = remove_empty_tags(body)
# Flatten nested elements with only one child of the same type
def flatten_nested_elements(node):
for child in node.contents:
if isinstance(child, element.Tag):
flatten_nested_elements(child)
if len(child.contents) == 1 and child.contents[0].name == child.name:
# print('Flattening:', child.name)
child_content = child.contents[0]
child.replace_with(child_content)
return node
body = flatten_nested_elements(body)
# Remove comments
for comment in soup.find_all(text=lambda text: isinstance(text, Comment)):
comment.extract()
# Remove consecutive empty newlines and replace multiple spaces with a single space
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ')
# Sanitize the cleaned HTML content
cleaned_html = sanitize_html(cleaned_html)
# sanitized_html = escape_json_string(cleaned_html)
# Convert cleaned HTML to Markdown
h = html2text.HTML2Text()
h.ignore_links = True
markdown = h.handle(cleaned_html)
# Return the Markdown content
return{
'markdown': markdown,
'cleaned_html': cleaned_html,
'success': True
}
except Exception as e:
print('Error processing HTML content:', str(e))
return None
# Example usage
# word_count_threshold = 5 # Adjust this value according to your desired threshold
# markdown_content = get_content_of_website(word_count_threshold)
# print(markdown_content)
def extract_xml_tags(string):
tags = re.findall(r'<(\w+)>', string)
return list(set(tags))
def extract_xml_data(tags, string):
data = {}
for tag in tags:
pattern = f"<{tag}>(.*?)</{tag}>"
match = re.search(pattern, string, re.DOTALL)
if match:
data[tag] = match.group(1).strip()
else:
data[tag] = ""
return data
import time
import litellm
# Function to perform the completion with exponential backoff
def perform_completion_with_backoff(provider, prompt_with_variables, api_token):
max_attempts = 3
base_delay = 2 # Base delay in seconds, you can adjust this based on your needs
for attempt in range(max_attempts):
try:
response = completion(
model=provider,
messages=[
{"role": "user", "content": prompt_with_variables}
],
temperature=0.01,
api_key=api_token
)
return response # Return the successful response
except litellm.exceptions.RateLimitError as e:
print("Rate limit error:", str(e))
# Check if we have exhausted our max attempts
if attempt < max_attempts - 1:
# Calculate the delay and wait
delay = base_delay * (2 ** attempt) # Exponential backoff formula
print(f"Waiting for {delay} seconds before retrying...")
time.sleep(delay)
else:
# Return an error response after exhausting all retries
return [{
"index": 0,
"tags": ["error"],
"content": ["Rate limit error. Please try again later."]
}]
def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None):
# api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token
api_token = PROVIDER_MODELS.get(provider, None) if not api_token else api_token
variable_values = {
"URL": url,
"HTML": escape_json_string(sanitize_html(html)),
}
prompt_with_variables = PROMPT_EXTRACT_BLOCKS
for variable in variable_values:
prompt_with_variables = prompt_with_variables.replace(
"{" + variable + "}", variable_values[variable]
)
response = perform_completion_with_backoff(provider, prompt_with_variables, api_token)
# try:
# response = completion(
# model = provider,
# messages = [
# {"role": "user", "content": prompt_with_variables}
# ],
# temperature = 0.01,
# api_key = api_token
# )
# except litellm.exceptions.RateLimitError as e:
# print("Rate limit error:", str(e))
# return [{
# "index": 0,
# "tags": ["error"],
# "content": ["Rate limit error. Please try again later."]
# }]
try:
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks']
blocks = json.loads(blocks)
## Add error: False to the blocks
for block in blocks:
block['error'] = False
except Exception as e:
print("Error extracting blocks:", str(e))
parsed, unparsed = split_and_parse_json_objects(response.choices[0].message.content)
blocks = parsed
# Append all unparsed segments as onr error block and content is list of unparsed segments
if unparsed:
blocks.append({
"index": 0,
"error": True,
"tags": ["error"],
"content": unparsed
})
return blocks
def extract_blocks_batch(batch_data, provider = "groq/llama3-70b-8192", api_token = None):
api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token
messages = []
for url, html in batch_data:
variable_values = {
"URL": url,
"HTML": html,
}
prompt_with_variables = PROMPT_EXTRACT_BLOCKS
for variable in variable_values:
prompt_with_variables = prompt_with_variables.replace(
"{" + variable + "}", variable_values[variable]
)
messages.append([{"role": "user", "content": prompt_with_variables}])
responses = batch_completion(
model = provider,
messages = messages,
temperature = 0.01
)
all_blocks = []
for response in responses:
try:
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks']
blocks = json.loads(blocks)
except Exception as e:
print("Error extracting blocks:", str(e))
blocks = [{
"index": 0,
"tags": ["error"],
"content": ["Error extracting blocks from the HTML content. Choose another provider/model or try again."],
"questions": ["What went wrong during the block extraction process?"]
}]
all_blocks.append(blocks)
return sum(all_blocks, [])

133
crawl4ai/web_crawler.py Normal file
View File

@@ -0,0 +1,133 @@
import asyncio
import os, time
import json
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import chromedriver_autoinstaller
from pydantic import parse_obj_as
from .models import UrlModel, CrawlResult
from .database import init_db, get_cached_url, cache_url
from .utils import *
from typing import List
from concurrent.futures import ThreadPoolExecutor, as_completed
from .config import *
class WebCrawler:
def __init__(self, db_path: str):
self.db_path = db_path
init_db(self.db_path)
self.options = Options()
self.options.headless = True
self.options.add_argument("--no-sandbox")
self.options.add_argument("--disable-dev-shm-usage")
# make it headless
self.options.add_argument("--headless")
# Automatically install or update chromedriver
chromedriver_autoinstaller.install()
def fetch_page(self, url_model: UrlModel, provider: str = DEFAULT_PROVIDER, api_token: str = None, extract_blocks_flag: bool = True, word_count_threshold = MIN_WORD_THRESHOLD) -> CrawlResult:
# make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD
if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD
# Check cache first
cached = get_cached_url(self.db_path, str(url_model.url))
if cached and not url_model.forced:
return CrawlResult(**{
"url": cached[0],
"html": cached[1],
"cleaned_html": cached[2],
"markdown": cached[3],
"parsed_json": cached[4],
"success": cached[5],
"error_message": ""
})
# Initialize WebDriver for crawling
service = Service(chromedriver_autoinstaller.install())
driver = webdriver.Chrome(service=service, options=self.options)
try:
driver.get(str(url_model.url))
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.TAG_NAME, "html"))
)
html = driver.page_source
success = True
error_message = ""
except Exception as e:
html = ""
success = False
error_message = str(e)
finally:
driver.quit()
# Extract content from HTML
result = get_content_of_website(html, word_count_threshold)
cleaned_html = result.get('cleaned_html', html)
markdown = result.get('markdown', "")
print("Crawling is done 🚀")
parsed_json = []
if extract_blocks_flag:
# Split markdown into sections
paragraphs = markdown.split('\n\n')
sections = []
chunks = []
total_token_so_far = 0
for paragraph in paragraphs:
if total_token_so_far < CHUNK_TOKEN_THRESHOLD:
chunk = paragraph.split(' ')
total_token_so_far += len(chunk) * 1.3
chunks.append(paragraph)
else:
sections.append('\n\n'.join(chunks))
chunks = [paragraph]
total_token_so_far = len(paragraph.split(' ')) * 1.3
if chunks:
sections.append('\n\n'.join(chunks))
# Process sections to extract blocks
parsed_json = []
if provider.startswith("groq/"):
# Sequential processing with a delay
for section in sections:
parsed_json.extend(extract_blocks(str(url_model.url), section, provider, api_token))
time.sleep(0.5) # 500 ms delay between each processing
else:
# Parallel processing using ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
futures = [executor.submit(extract_blocks, str(url_model.url), section, provider, api_token) for section in sections]
for future in as_completed(futures):
parsed_json.extend(future.result())
parsed_json = json.dumps(parsed_json)
# Cache the result
cleaned_html = beautify_html(cleaned_html)
cache_url(self.db_path, str(url_model.url), html, cleaned_html, markdown, parsed_json, success)
return CrawlResult(
url=str(url_model.url),
html=html,
cleaned_html=cleaned_html,
markdown=markdown,
parsed_json=parsed_json,
success=success,
error_message=error_message
)
def fetch_pages(self, url_models: List[UrlModel], provider: str = DEFAULT_PROVIDER, api_token: str = None) -> List[CrawlResult]:
with ThreadPoolExecutor() as executor:
results = list(executor.map(self.fetch_page, url_models, [provider] * len(url_models), [api_token] * len(url_models)))
return results