Commit Message:

- Added examples for Amazon product data extraction methods
  - Updated configuration options and enhance documentation
  - Minor refactoring for improved performance and readability
  - Cleaned up version control settings.
This commit is contained in:
UncleCode
2024-12-29 20:05:18 +08:00
parent f2d9912697
commit fb33a24891
27 changed files with 4371 additions and 1408 deletions

View File

@@ -25,64 +25,91 @@ from functools import wraps
class InvalidCSSSelectorError(Exception):
pass
def create_box_message(
message: str,
type: str = "info",
width: int = 120,
add_newlines: bool = True,
double_line: bool = False
) -> str:
init()
# Define border and text colors for different types
styles = {
"warning": (Fore.YELLOW, Fore.LIGHTYELLOW_EX, ""),
"info": (Fore.BLUE, Fore.LIGHTBLUE_EX, ""),
"success": (Fore.GREEN, Fore.LIGHTGREEN_EX, ""),
"error": (Fore.RED, Fore.LIGHTRED_EX, "×"),
}
border_color, text_color, prefix = styles.get(type.lower(), styles["info"])
# Define box characters based on line style
box_chars = {
"single": ("", "", "", "", "", ""),
"double": ("", "", "", "", "", "")
}
line_style = "double" if double_line else "single"
h_line, v_line, tl, tr, bl, br = box_chars[line_style]
# Process lines with lighter text color
formatted_lines = []
raw_lines = message.split('\n')
if raw_lines:
first_line = f"{prefix} {raw_lines[0].strip()}"
wrapped_first = textwrap.fill(first_line, width=width-4)
formatted_lines.extend(wrapped_first.split('\n'))
for line in raw_lines[1:]:
if line.strip():
wrapped = textwrap.fill(f" {line.strip()}", width=width-4)
formatted_lines.extend(wrapped.split('\n'))
else:
formatted_lines.append("")
# Create the box with colored borders and lighter text
horizontal_line = h_line * (width - 1)
box = [
f"{border_color}{tl}{horizontal_line}{tr}",
*[f"{border_color}{v_line}{text_color} {line:<{width-2}}{border_color}{v_line}" for line in formatted_lines],
f"{border_color}{bl}{horizontal_line}{br}{Style.RESET_ALL}"
]
result = "\n".join(box)
if add_newlines:
result = f"\n{result}\n"
return result
def create_box_message(message: str, type: str = "info", width: int = 120, add_newlines: bool = True, double_line: bool = False) -> str:
"""
Create a styled message box with colored borders and formatted text.
How it works:
1. Determines box style and colors based on the message type (e.g., info, warning).
2. Wraps text to fit within the specified width.
3. Constructs a box using characters (single or double lines) with appropriate formatting.
4. Adds optional newlines before and after the box.
Args:
message (str): The message to display inside the box.
type (str): Type of the message (e.g., "info", "warning", "error", "success"). Defaults to "info".
width (int): Width of the box. Defaults to 120.
add_newlines (bool): Whether to add newlines before and after the box. Defaults to True.
double_line (bool): Whether to use double lines for the box border. Defaults to False.
Returns:
str: A formatted string containing the styled message box.
"""
init()
# Define border and text colors for different types
styles = {
"warning": (Fore.YELLOW, Fore.LIGHTYELLOW_EX, ""),
"info": (Fore.BLUE, Fore.LIGHTBLUE_EX, ""),
"success": (Fore.GREEN, Fore.LIGHTGREEN_EX, ""),
"error": (Fore.RED, Fore.LIGHTRED_EX, "×"),
}
border_color, text_color, prefix = styles.get(type.lower(), styles["info"])
# Define box characters based on line style
box_chars = {
"single": ("", "", "", "", "", ""),
"double": ("", "", "", "", "", "")
}
line_style = "double" if double_line else "single"
h_line, v_line, tl, tr, bl, br = box_chars[line_style]
# Process lines with lighter text color
formatted_lines = []
raw_lines = message.split('\n')
if raw_lines:
first_line = f"{prefix} {raw_lines[0].strip()}"
wrapped_first = textwrap.fill(first_line, width=width-4)
formatted_lines.extend(wrapped_first.split('\n'))
for line in raw_lines[1:]:
if line.strip():
wrapped = textwrap.fill(f" {line.strip()}", width=width-4)
formatted_lines.extend(wrapped.split('\n'))
else:
formatted_lines.append("")
# Create the box with colored borders and lighter text
horizontal_line = h_line * (width - 1)
box = [
f"{border_color}{tl}{horizontal_line}{tr}",
*[f"{border_color}{v_line}{text_color} {line:<{width-2}}{border_color}{v_line}" for line in formatted_lines],
f"{border_color}{bl}{horizontal_line}{br}{Style.RESET_ALL}"
]
result = "\n".join(box)
if add_newlines:
result = f"\n{result}\n"
return result
def calculate_semaphore_count():
"""
Calculate the optimal semaphore count based on system resources.
How it works:
1. Determines the number of CPU cores and total system memory.
2. Sets a base count as half of the available CPU cores.
3. Limits the count based on memory, assuming 2GB per semaphore instance.
4. Returns the minimum value between CPU and memory-based limits.
Returns:
int: The calculated semaphore count.
"""
cpu_count = os.cpu_count()
memory_gb = get_system_memory() / (1024 ** 3) # Convert to GB
base_count = max(1, cpu_count // 2)
@@ -90,6 +117,21 @@ def calculate_semaphore_count():
return min(base_count, memory_based_cap)
def get_system_memory():
"""
Get the total system memory in bytes.
How it works:
1. Detects the operating system.
2. Reads memory information from system-specific commands or files.
3. Converts the memory to bytes for uniformity.
Returns:
int: The total system memory in bytes.
Raises:
OSError: If the operating system is unsupported.
"""
system = platform.system()
if system == "Linux":
with open('/proc/meminfo', 'r') as mem:
@@ -124,6 +166,18 @@ def get_system_memory():
raise OSError("Unsupported operating system")
def get_home_folder():
"""
Get or create the home folder for Crawl4AI configuration and cache.
How it works:
1. Uses environment variables or defaults to the user's home directory.
2. Creates `.crawl4ai` and its subdirectories (`cache`, `models`) if they don't exist.
3. Returns the path to the home folder.
Returns:
str: The path to the Crawl4AI home folder.
"""
home_folder = os.path.join(os.getenv("CRAWL4_AI_BASE_DIRECTORY", os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home())), ".crawl4ai")
os.makedirs(home_folder, exist_ok=True)
os.makedirs(f"{home_folder}/cache", exist_ok=True)
@@ -194,6 +248,20 @@ def split_and_parse_json_objects(json_string):
return parsed_objects, unparsed_segments
def sanitize_html(html):
"""
Sanitize an HTML string by escaping quotes.
How it works:
1. Replaces all unwanted and special characters with an empty string.
2. Escapes double and single quotes for safe usage.
Args:
html (str): The HTML string to sanitize.
Returns:
str: The sanitized HTML string.
"""
# Replace all unwanted and special characters with an empty string
sanitized_html = html
# sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html)
@@ -248,6 +316,23 @@ def escape_json_string(s):
return s
def replace_inline_tags(soup, tags, only_text=False):
"""
Replace inline HTML tags with Markdown-style equivalents.
How it works:
1. Maps specific tags (e.g., <b>, <i>) to Markdown syntax.
2. Finds and replaces all occurrences of these tags in the provided BeautifulSoup object.
3. Optionally replaces tags with their text content only.
Args:
soup (BeautifulSoup): Parsed HTML content.
tags (List[str]): List of tags to replace.
only_text (bool): Whether to replace tags with plain text. Defaults to False.
Returns:
BeautifulSoup: Updated BeautifulSoup object with replaced tags.
"""
tag_replacements = {
'b': lambda tag: f"**{tag.text}**",
'i': lambda tag: f"*{tag.text}*",
@@ -292,6 +377,26 @@ def replace_inline_tags(soup, tags, only_text=False):
# return soup
def get_content_of_website(url, html, word_count_threshold = MIN_WORD_THRESHOLD, css_selector = None, **kwargs):
"""
Extract structured content, media, and links from website HTML.
How it works:
1. Parses the HTML content using BeautifulSoup.
2. Extracts internal/external links and media (images, videos, audios).
3. Cleans the content by removing unwanted tags and attributes.
4. Converts cleaned HTML to Markdown.
5. Collects metadata and returns the extracted information.
Args:
url (str): The website URL.
html (str): The HTML content of the website.
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD.
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None.
Returns:
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata.
"""
try:
if not html:
return None
@@ -762,6 +867,27 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
}
def extract_metadata(html, soup=None):
"""
Extract optimized content, media, and links from website HTML.
How it works:
1. Similar to `get_content_of_website`, but optimized for performance.
2. Filters and scores images for usefulness.
3. Extracts contextual descriptions for media files.
4. Handles excluded tags and CSS selectors.
5. Cleans HTML and converts it to Markdown.
Args:
url (str): The website URL.
html (str): The HTML content of the website.
word_count_threshold (int): Minimum word count for content inclusion. Defaults to MIN_WORD_THRESHOLD.
css_selector (Optional[str]): CSS selector to extract specific content. Defaults to None.
**kwargs: Additional options for customization.
Returns:
Dict[str, Any]: Extracted content including Markdown, cleaned HTML, media, links, and metadata.
"""
metadata = {}
if not html and not soup:
@@ -809,10 +935,35 @@ def extract_metadata(html, soup=None):
return metadata
def extract_xml_tags(string):
"""
Extracts XML tags from a string.
Args:
string (str): The input string containing XML tags.
Returns:
List[str]: A list of XML tags extracted from the input string.
"""
tags = re.findall(r'<(\w+)>', string)
return list(set(tags))
def extract_xml_data(tags, string):
"""
Extract data for specified XML tags from a string.
How it works:
1. Searches the string for each tag using regex.
2. Extracts the content within the tags.
3. Returns a dictionary of tag-content pairs.
Args:
tags (List[str]): The list of XML tags to extract.
string (str): The input string containing XML data.
Returns:
Dict[str, str]: A dictionary with tag names as keys and extracted content as values.
"""
data = {}
for tag in tags:
@@ -833,6 +984,26 @@ def perform_completion_with_backoff(
base_url=None,
**kwargs
):
"""
Perform an API completion request with exponential backoff.
How it works:
1. Sends a completion request to the API.
2. Retries on rate-limit errors with exponential delays.
3. Returns the API response or an error after all retries.
Args:
provider (str): The name of the API provider.
prompt_with_variables (str): The input prompt for the completion request.
api_token (str): The API token for authentication.
json_response (bool): Whether to request a JSON response. Defaults to False.
base_url (Optional[str]): The base URL for the API. Defaults to None.
**kwargs: Additional arguments for the API request.
Returns:
dict: The API response or an error message after all retries.
"""
from litellm import completion
from litellm.exceptions import RateLimitError
max_attempts = 3
@@ -878,6 +1049,25 @@ def perform_completion_with_backoff(
}]
def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None, base_url = None):
"""
Extract content blocks from website HTML using an AI provider.
How it works:
1. Prepares a prompt by sanitizing and escaping HTML.
2. Sends the prompt to an AI provider with optional retries.
3. Parses the response to extract structured blocks or errors.
Args:
url (str): The website URL.
html (str): The HTML content of the website.
provider (str): The AI provider for content extraction. Defaults to DEFAULT_PROVIDER.
api_token (Optional[str]): The API token for authentication. Defaults to None.
base_url (Optional[str]): The base URL for the API. Defaults to None.
Returns:
List[dict]: A list of extracted content blocks.
"""
# api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token
api_token = PROVIDER_MODELS.get(provider, None) if not api_token else api_token
@@ -914,6 +1104,23 @@ def extract_blocks(url, html, provider = DEFAULT_PROVIDER, api_token = None, bas
return blocks
def extract_blocks_batch(batch_data, provider = "groq/llama3-70b-8192", api_token = None):
"""
Extract content blocks from a batch of website HTMLs.
How it works:
1. Prepares prompts for each URL and HTML pair.
2. Sends the prompts to the AI provider in a batch request.
3. Parses the responses to extract structured blocks or errors.
Args:
batch_data (List[Tuple[str, str]]): A list of (URL, HTML) pairs.
provider (str): The AI provider for content extraction. Defaults to "groq/llama3-70b-8192".
api_token (Optional[str]): The API token for authentication. Defaults to None.
Returns:
List[dict]: A list of extracted content blocks from all batch items.
"""
api_token = os.getenv('GROQ_API_KEY', None) if not api_token else api_token
from litellm import batch_completion
messages = []
@@ -986,6 +1193,25 @@ def merge_chunks_based_on_token_threshold(chunks, token_threshold):
return merged_sections
def process_sections(url: str, sections: list, provider: str, api_token: str, base_url=None) -> list:
"""
Process sections of HTML content sequentially or in parallel.
How it works:
1. Sequentially processes sections with delays for "groq/" providers.
2. Uses ThreadPoolExecutor for parallel processing with other providers.
3. Extracts content blocks for each section.
Args:
url (str): The website URL.
sections (List[str]): The list of HTML sections to process.
provider (str): The AI provider for content extraction.
api_token (str): The API token for authentication.
base_url (Optional[str]): The base URL for the API. Defaults to None.
Returns:
List[dict]: The list of extracted content blocks from all sections.
"""
extracted_content = []
if provider.startswith("groq/"):
# Sequential processing with a delay
@@ -1002,6 +1228,24 @@ def process_sections(url: str, sections: list, provider: str, api_token: str, ba
return extracted_content
def wrap_text(draw, text, font, max_width):
"""
Wrap text to fit within a specified width for rendering.
How it works:
1. Splits the text into words.
2. Constructs lines that fit within the maximum width using the provided font.
3. Returns the wrapped text as a single string.
Args:
draw (ImageDraw.Draw): The drawing context for measuring text size.
text (str): The text to wrap.
font (ImageFont.FreeTypeFont): The font to use for measuring text size.
max_width (int): The maximum width for each line.
Returns:
str: The wrapped text.
"""
# Wrap the text to fit within the specified width
lines = []
words = text.split()
@@ -1013,6 +1257,21 @@ def wrap_text(draw, text, font, max_width):
return '\n'.join(lines)
def format_html(html_string):
"""
Prettify an HTML string using BeautifulSoup.
How it works:
1. Parses the HTML string with BeautifulSoup.
2. Formats the HTML with proper indentation.
3. Returns the prettified HTML string.
Args:
html_string (str): The HTML string to format.
Returns:
str: The prettified HTML string.
"""
soup = BeautifulSoup(html_string, 'lxml.parser')
return soup.prettify()
@@ -1110,7 +1369,20 @@ def normalize_url_tmp(href, base_url):
return href.strip()
def get_base_domain(url: str) -> str:
"""Extract base domain from URL, handling various edge cases."""
"""
Extract the base domain from a given URL, handling common edge cases.
How it works:
1. Parses the URL to extract the domain.
2. Removes the port number and 'www' prefix.
3. Handles special domains (e.g., 'co.uk') to extract the correct base.
Args:
url (str): The URL to extract the base domain from.
Returns:
str: The extracted base domain or an empty string if parsing fails.
"""
try:
# Get domain from URL
domain = urlparse(url).netloc.lower()
@@ -1136,7 +1408,20 @@ def get_base_domain(url: str) -> str:
return ""
def is_external_url(url: str, base_domain: str) -> bool:
"""Check if URL is external to base domain."""
"""
Extract the base domain from a given URL, handling common edge cases.
How it works:
1. Parses the URL to extract the domain.
2. Removes the port number and 'www' prefix.
3. Handles special domains (e.g., 'co.uk') to extract the correct base.
Args:
url (str): The URL to extract the base domain from.
Returns:
str: The extracted base domain or an empty string if parsing fails.
"""
special = {'mailto:', 'tel:', 'ftp:', 'file:', 'data:', 'javascript:'}
if any(url.lower().startswith(p) for p in special):
return True
@@ -1155,8 +1440,22 @@ def is_external_url(url: str, base_domain: str) -> bool:
except Exception:
return False
def clean_tokens(tokens: list[str]) -> list[str]:
"""
Clean a list of tokens by removing noise, stop words, and short tokens.
How it works:
1. Defines a set of noise words and stop words.
2. Filters tokens based on length and exclusion criteria.
3. Excludes tokens starting with certain symbols (e.g., "", "").
Args:
tokens (list[str]): The list of tokens to clean.
Returns:
list[str]: The cleaned list of tokens.
"""
# Set of tokens to remove
noise = {'ccp', 'up', '', '', '⬆️', 'a', 'an', 'at', 'by', 'in', 'of', 'on', 'to', 'the'}
@@ -1212,6 +1511,21 @@ def clean_tokens(tokens: list[str]) -> list[str]:
and not token.startswith('')]
def profile_and_time(func):
"""
Decorator to profile a function's execution time and performance.
How it works:
1. Records the start time before executing the function.
2. Profiles the function's execution using `cProfile`.
3. Prints the elapsed time and profiling statistics.
Args:
func (Callable): The function to decorate.
Returns:
Callable: The decorated function with profiling and timing enabled.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
# Start timer