feat(extraction): add RegexExtractionStrategy for pattern-based extraction

Add new RegexExtractionStrategy for fast, zero-LLM extraction of common data types:
- Built-in patterns for emails, URLs, phones, dates, and more
- Support for custom regex patterns
- LLM-assisted pattern generation utility
- Optimized HTML preprocessing with fit_html field
- Enhanced network response body capture

Breaking changes: None
This commit is contained in:
UncleCode
2025-05-02 21:15:24 +08:00
parent 94e9959fe0
commit 9b5ccac76e
13 changed files with 984 additions and 124 deletions

View File

@@ -23,7 +23,8 @@ from .extraction_strategy import (
CosineStrategy,
JsonCssExtractionStrategy,
JsonXPathExtractionStrategy,
JsonLxmlExtractionStrategy
JsonLxmlExtractionStrategy,
RegexExtractionStrategy
)
from .chunking_strategy import ChunkingStrategy, RegexChunking
from .markdown_generation_strategy import DefaultMarkdownGenerator
@@ -105,6 +106,7 @@ __all__ = [
"JsonCssExtractionStrategy",
"JsonXPathExtractionStrategy",
"JsonLxmlExtractionStrategy",
"RegexExtractionStrategy",
"ChunkingStrategy",
"RegexChunking",
"DefaultMarkdownGenerator",

View File

@@ -571,6 +571,14 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
async def handle_response_capture(response):
try:
try:
# body = await response.body()
# json_body = await response.json()
text_body = await response.text()
except Exception as e:
body = None
# json_body = None
# text_body = None
captured_requests.append({
"event_type": "response",
"url": response.url,
@@ -579,7 +587,12 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"headers": dict(response.headers), # Convert Header dict
"from_service_worker": response.from_service_worker,
"request_timing": response.request.timing, # Detailed timing info
"timestamp": time.time()
"timestamp": time.time(),
"body" : {
# "raw": body,
# "json": json_body,
"text": text_body
}
})
except Exception as e:
if self.logger:

View File

@@ -503,6 +503,8 @@ class AsyncWebCrawler:
tables = media.pop("tables", [])
links = result.links.model_dump()
metadata = result.metadata
fit_html = preprocess_html_for_schema(html_content=html, text_threshold= 500, max_size= 300_000)
################################
# Generate Markdown #
@@ -519,7 +521,7 @@ class AsyncWebCrawler:
html_source_selector = {
"raw_html": lambda: html, # The original raw HTML
"cleaned_html": lambda: cleaned_html, # The HTML after scraping strategy
"fit_html": lambda: preprocess_html_for_schema(html_content=html), # Preprocessed raw HTML
"fit_html": lambda: fit_html, # The HTML after preprocessing for schema
}
markdown_input_html = cleaned_html # Default to cleaned_html
@@ -593,6 +595,7 @@ class AsyncWebCrawler:
content = {
"markdown": markdown_result.raw_markdown,
"html": html,
"fit_html": fit_html,
"cleaned_html": cleaned_html,
"fit_markdown": markdown_result.fit_markdown,
}.get(content_format, markdown_result.raw_markdown)
@@ -600,7 +603,7 @@ class AsyncWebCrawler:
# Use IdentityChunking for HTML input, otherwise use provided chunking strategy
chunking = (
IdentityChunking()
if content_format in ["html", "cleaned_html"]
if content_format in ["html", "cleaned_html", "fit_html"]
else config.chunking_strategy
)
sections = chunking.chunk(content)
@@ -624,6 +627,7 @@ class AsyncWebCrawler:
return CrawlResult(
url=url,
html=html,
fit_html=fit_html,
cleaned_html=cleaned_html,
markdown=markdown_result,
media=media,

View File

@@ -475,7 +475,7 @@ class BrowserProfiler:
self.logger.warning(" No profiles found. Create one first with option 1.", tag="PROFILES")
continue
# Print profile information with colorama formatting
# Print profile information
self.logger.info("\nAvailable profiles:", tag="PROFILES")
for i, profile in enumerate(profiles):
self.logger.info(f"[{i+1}] {profile['name']}", tag="PROFILES")

View File

@@ -1,9 +1,10 @@
from abc import ABC, abstractmethod
import inspect
from typing import Any, List, Dict, Optional
from typing import Any, List, Dict, Optional, Tuple, Pattern, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import time
from enum import IntFlag, auto
from .prompts import PROMPT_EXTRACT_BLOCKS, PROMPT_EXTRACT_BLOCKS_WITH_INSTRUCTION, PROMPT_EXTRACT_SCHEMA_WITH_INSTRUCTION, JSON_SCHEMA_BUILDER_XPATH, PROMPT_EXTRACT_INFERRED_SCHEMA
from .config import (
@@ -1668,3 +1669,303 @@ class JsonXPathExtractionStrategy(JsonElementExtractionStrategy):
def _get_element_attribute(self, element, attribute: str):
return element.get(attribute)
"""
RegexExtractionStrategy
Fast, zero-LLM extraction of common entities via regular expressions.
"""
_CTRL = {c: rf"\x{ord(c):02x}" for c in map(chr, range(32)) if c not in "\t\n\r"}
_WB_FIX = re.compile(r"\x08") # stray back-space → word-boundary
_NEEDS_ESCAPE = re.compile(r"(?<!\\)\\(?![\\u])") # lone backslash
def _sanitize_schema(schema: Dict[str, str]) -> Dict[str, str]:
"""Fix common JSON-escape goofs coming from LLMs or manual edits."""
safe = {}
for label, pat in schema.items():
# 1⃣ replace accidental control chars (inc. the infamous back-space)
pat = _WB_FIX.sub(r"\\b", pat).translate(_CTRL)
# 2⃣ double any single backslash that JSON kept single
pat = _NEEDS_ESCAPE.sub(r"\\\\", pat)
# 3⃣ quick sanity compile
try:
re.compile(pat)
except re.error as e:
raise ValueError(f"Regex for '{label}' wont compile after fix: {e}") from None
safe[label] = pat
return safe
class RegexExtractionStrategy(ExtractionStrategy):
"""
A lean strategy that finds e-mails, phones, URLs, dates, money, etc.,
using nothing but pre-compiled regular expressions.
Extraction returns::
{
"url": "<page-url>",
"label": "<pattern-label>",
"value": "<matched-string>",
"span": [start, end]
}
Only `generate_schema()` touches an LLM, extraction itself is pure Python.
"""
# -------------------------------------------------------------- #
# Built-in patterns exposed as IntFlag so callers can bit-OR them
# -------------------------------------------------------------- #
class _B(IntFlag):
EMAIL = auto()
PHONE_INTL = auto()
PHONE_US = auto()
URL = auto()
IPV4 = auto()
IPV6 = auto()
UUID = auto()
CURRENCY = auto()
PERCENTAGE = auto()
NUMBER = auto()
DATE_ISO = auto()
DATE_US = auto()
TIME_24H = auto()
POSTAL_US = auto()
POSTAL_UK = auto()
HTML_COLOR_HEX = auto()
TWITTER_HANDLE = auto()
HASHTAG = auto()
MAC_ADDR = auto()
IBAN = auto()
CREDIT_CARD = auto()
NOTHING = auto()
ALL = (
EMAIL | PHONE_INTL | PHONE_US | URL | IPV4 | IPV6 | UUID
| CURRENCY | PERCENTAGE | NUMBER | DATE_ISO | DATE_US | TIME_24H
| POSTAL_US | POSTAL_UK | HTML_COLOR_HEX | TWITTER_HANDLE
| HASHTAG | MAC_ADDR | IBAN | CREDIT_CARD
)
# user-friendly aliases (RegexExtractionStrategy.Email, .IPv4, …)
Email = _B.EMAIL
PhoneIntl = _B.PHONE_INTL
PhoneUS = _B.PHONE_US
Url = _B.URL
IPv4 = _B.IPV4
IPv6 = _B.IPV6
Uuid = _B.UUID
Currency = _B.CURRENCY
Percentage = _B.PERCENTAGE
Number = _B.NUMBER
DateIso = _B.DATE_ISO
DateUS = _B.DATE_US
Time24h = _B.TIME_24H
PostalUS = _B.POSTAL_US
PostalUK = _B.POSTAL_UK
HexColor = _B.HTML_COLOR_HEX
TwitterHandle = _B.TWITTER_HANDLE
Hashtag = _B.HASHTAG
MacAddr = _B.MAC_ADDR
Iban = _B.IBAN
CreditCard = _B.CREDIT_CARD
All = _B.ALL
Nothing = _B(0) # no patterns
# ------------------------------------------------------------------ #
# Built-in pattern catalog
# ------------------------------------------------------------------ #
DEFAULT_PATTERNS: Dict[str, str] = {
# Communication
"email": r"[\w.+-]+@[\w-]+\.[\w.-]+",
"phone_intl": r"\+?\d[\d .()-]{7,}\d",
"phone_us": r"\(?\d{3}\)?[ -. ]?\d{3}[ -. ]?\d{4}",
# Web
"url": r"https?://[^\s\"'<>]+",
"ipv4": r"(?:\d{1,3}\.){3}\d{1,3}",
"ipv6": r"[A-F0-9]{1,4}(?::[A-F0-9]{1,4}){7}",
# IDs
"uuid": r"[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}",
# Money / numbers
"currency": r"(?:USD|EUR|RM|\$|€|£)\s?\d+(?:[.,]\d{2})?",
"percentage": r"\d+(?:\.\d+)?%",
"number": r"\b\d{1,3}(?:[,.\s]\d{3})*(?:\.\d+)?\b",
# Dates / Times
"date_iso": r"\d{4}-\d{2}-\d{2}",
"date_us": r"\d{1,2}/\d{1,2}/\d{2,4}",
"time_24h": r"\b(?:[01]?\d|2[0-3]):[0-5]\d(?:[:.][0-5]\d)?\b",
# Misc
"postal_us": r"\b\d{5}(?:-\d{4})?\b",
"postal_uk": r"\b[A-Z]{1,2}\d[A-Z\d]? ?\d[A-Z]{2}\b",
"html_color_hex": r"#[0-9A-Fa-f]{6}\b",
"twitter_handle": r"@[\w]{1,15}",
"hashtag": r"#[\w-]+",
"mac_addr": r"(?:[0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}",
"iban": r"[A-Z]{2}\d{2}[A-Z0-9]{11,30}",
"credit_card": r"\b(?:4\d{12}(?:\d{3})?|5[1-5]\d{14}|3[47]\d{13}|6(?:011|5\d{2})\d{12})\b",
}
_FLAGS = re.IGNORECASE | re.MULTILINE
_UNWANTED_PROPS = {
"provider": "Use llm_config instead",
"api_token": "Use llm_config instead",
}
# ------------------------------------------------------------------ #
# Construction
# ------------------------------------------------------------------ #
def __init__(
self,
pattern: "_B" = _B.NOTHING,
*,
custom: Optional[Union[Dict[str, str], List[Tuple[str, str]]]] = None,
input_format: str = "fit_html",
**kwargs,
) -> None:
"""
Args:
patterns: Custom patterns overriding or extending defaults.
Dict[label, regex] or list[tuple(label, regex)].
input_format: "html", "markdown" or "text".
**kwargs: Forwarded to ExtractionStrategy.
"""
super().__init__(input_format=input_format, **kwargs)
# 1⃣ take only the requested built-ins
merged: Dict[str, str] = {
key: rx
for key, rx in self.DEFAULT_PATTERNS.items()
if getattr(self._B, key.upper()).value & pattern
}
# 2⃣ apply user overrides / additions
if custom:
if isinstance(custom, dict):
merged.update(custom)
else: # iterable of (label, regex)
merged.update({lbl: rx for lbl, rx in custom})
self._compiled: Dict[str, Pattern] = {
lbl: re.compile(rx, self._FLAGS) for lbl, rx in merged.items()
}
# ------------------------------------------------------------------ #
# Extraction
# ------------------------------------------------------------------ #
def extract(self, url: str, content: str, *q, **kw) -> List[Dict[str, Any]]:
# text = self._plain_text(html)
out: List[Dict[str, Any]] = []
for label, cre in self._compiled.items():
for m in cre.finditer(content):
out.append(
{
"url": url,
"label": label,
"value": m.group(0),
"span": [m.start(), m.end()],
}
)
return out
# ------------------------------------------------------------------ #
# Helpers
# ------------------------------------------------------------------ #
def _plain_text(self, content: str) -> str:
if self.input_format == "text":
return content
return BeautifulSoup(content, "lxml").get_text(" ", strip=True)
# ------------------------------------------------------------------ #
# LLM-assisted pattern generator
# ------------------------------------------------------------------ #
# ------------------------------------------------------------------ #
# LLM-assisted one-off pattern builder
# ------------------------------------------------------------------ #
@staticmethod
def generate_pattern(
label: str,
html: str,
*,
query: Optional[str] = None,
examples: Optional[List[str]] = None,
llm_config: Optional[LLMConfig] = None,
**kwargs,
) -> Dict[str, str]:
"""
Ask an LLM for a single page-specific regex and return
{label: pattern} ── ready for RegexExtractionStrategy(custom=…)
"""
# ── guard deprecated kwargs
for k in RegexExtractionStrategy._UNWANTED_PROPS:
if k in kwargs:
raise AttributeError(
f"{k} is deprecated, {RegexExtractionStrategy._UNWANTED_PROPS[k]}"
)
# ── default LLM config
if llm_config is None:
llm_config = create_llm_config()
# ── system prompt hardened
system_msg = (
"You are an expert Python-regex engineer.\n"
f"Return **one** JSON object whose single key is exactly \"{label}\", "
"and whose value is a raw-string regex pattern that works with "
"the standard `re` module in Python.\n\n"
"Strict rules (obey every bullet):\n"
"• If a *user query* is supplied, treat it as the precise semantic target and optimise the "
" pattern to capture ONLY text that answers that query. If the query conflicts with the "
" sample HTML, the HTML wins.\n"
"• Tailor the pattern to the *sample HTML* reproduce its exact punctuation, spacing, "
" symbols, capitalisation, etc. Do **NOT** invent a generic form.\n"
"• Keep it minimal and fast: avoid unnecessary capturing, prefer non-capturing `(?: … )`, "
" and guard against catastrophic backtracking.\n"
"• Anchor with `^`, `$`, or `\\b` only when it genuinely improves precision.\n"
"• Use inline flags like `(?i)` when needed; no verbose flag comments.\n"
"• Output must be valid JSON no markdown, code fences, comments, or extra keys.\n"
"• The regex value must be a Python string literal: **double every backslash** "
"(e.g. `\\\\b`, `\\\\d`, `\\\\\\\\`).\n\n"
"Example valid output:\n"
f"{{\"{label}\": \"(?:RM|rm)\\\\s?\\\\d{{1,3}}(?:,\\\\d{{3}})*(?:\\\\.\\\\d{{2}})?\"}}"
)
# ── user message: cropped HTML + optional hints
user_parts = ["```html", html[:5000], "```"] # protect token budget
if query:
user_parts.append(f"\n\n## Query\n{query.strip()}")
if examples:
user_parts.append("## Examples\n" + "\n".join(examples[:20]))
user_msg = "\n\n".join(user_parts)
# ── LLM call (with retry/backoff)
resp = perform_completion_with_backoff(
provider=llm_config.provider,
prompt_with_variables="\n\n".join([system_msg, user_msg]),
json_response=True,
api_token=llm_config.api_token,
base_url=llm_config.base_url,
extra_args=kwargs,
)
# ── clean & load JSON (fix common escape mistakes *before* json.loads)
raw = resp.choices[0].message.content
raw = raw.replace("\x08", "\\b") # stray back-space → \b
raw = re.sub(r'(?<!\\)\\(?![\\u"])', r"\\\\", raw) # lone \ → \\
try:
pattern_dict = json.loads(raw)
except Exception as exc:
raise ValueError(f"LLM did not return valid JSON: {raw}") from exc
# quick sanity-compile
for lbl, pat in pattern_dict.items():
try:
re.compile(pat)
except re.error as e:
raise ValueError(f"Invalid regex for '{lbl}': {e}") from None
return pattern_dict

View File

@@ -129,6 +129,7 @@ class MarkdownGenerationResult(BaseModel):
class CrawlResult(BaseModel):
url: str
html: str
fit_html: Optional[str] = None
success: bool
cleaned_html: Optional[str] = None
media: Dict[str, List[Dict]] = {}

View File

@@ -2735,33 +2735,67 @@ def preprocess_html_for_schema(html_content, text_threshold=100, attr_value_thre
# Also truncate tail text if present
if element.tail and len(element.tail.strip()) > text_threshold:
element.tail = element.tail.strip()[:text_threshold] + '...'
# 4. Find repeated patterns and keep only a few examples
# This is a simplistic approach - more sophisticated pattern detection could be implemented
pattern_elements = {}
for element in tree.xpath('//*[contains(@class, "")]'):
parent = element.getparent()
# 4. Detect duplicates and drop them in a single pass
seen: dict[tuple, None] = {}
for el in list(tree.xpath('//*[@class]')): # snapshot once, XPath is fast
parent = el.getparent()
if parent is None:
continue
# Create a signature based on tag and classes
classes = element.get('class', '')
if not classes:
cls = el.get('class')
if not cls:
continue
signature = f"{element.tag}.{classes}"
if signature in pattern_elements:
pattern_elements[signature].append(element)
# ── build signature ───────────────────────────────────────────
h = xxhash.xxh64() # stream, no big join()
for txt in el.itertext():
h.update(txt)
sig = (el.tag, cls, h.intdigest()) # tuple cheaper & hashable
# ── first seen? keep else drop ─────────────
if sig in seen and parent is not None:
parent.remove(el) # duplicate
else:
pattern_elements[signature] = [element]
seen[sig] = None
# Keep only 3 examples of each repeating pattern
for signature, elements in pattern_elements.items():
if len(elements) > 3:
# Keep the first 2 and last elements
for element in elements[2:-1]:
if element.getparent() is not None:
element.getparent().remove(element)
# # 4. Find repeated patterns and keep only a few examples
# # This is a simplistic approach - more sophisticated pattern detection could be implemented
# pattern_elements = {}
# for element in tree.xpath('//*[contains(@class, "")]'):
# parent = element.getparent()
# if parent is None:
# continue
# # Create a signature based on tag and classes
# classes = element.get('class', '')
# if not classes:
# continue
# innert_text = ''.join(element.xpath('.//text()'))
# innert_text_hash = xxhash.xxh64(innert_text.encode()).hexdigest()
# signature = f"{element.tag}.{classes}.{innert_text_hash}"
# if signature in pattern_elements:
# pattern_elements[signature].append(element)
# else:
# pattern_elements[signature] = [element]
# # Keep only first examples of each repeating pattern
# for signature, elements in pattern_elements.items():
# if len(elements) > 1:
# # Keep the first element and remove the rest
# for element in elements[1:]:
# if element.getparent() is not None:
# element.getparent().remove(element)
# # Keep only 3 examples of each repeating pattern
# for signature, elements in pattern_elements.items():
# if len(elements) > 3:
# # Keep the first 2 and last elements
# for element in elements[2:-1]:
# if element.getparent() is not None:
# element.getparent().remove(element)
# 5. Convert back to string
result = etree.tostring(tree, encoding='unicode', method='html')