357 lines
13 KiB
Python
357 lines
13 KiB
Python
import os, time
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
|
from pathlib import Path
|
|
|
|
from .models import UrlModel, CrawlResult
|
|
from .database import init_db, get_cached_url, cache_url, DB_PATH, flush_db
|
|
from .utils import *
|
|
from .chunking_strategy import *
|
|
from .extraction_strategy import *
|
|
from .crawler_strategy import *
|
|
from typing import List
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from .config import *
|
|
|
|
|
|
class WebCrawler:
|
|
def __init__(
|
|
self,
|
|
# db_path: str = None,
|
|
crawler_strategy: CrawlerStrategy = None,
|
|
always_by_pass_cache: bool = False,
|
|
verbose: bool = False,
|
|
):
|
|
# self.db_path = db_path
|
|
self.crawler_strategy = crawler_strategy or LocalSeleniumCrawlerStrategy(verbose=verbose)
|
|
self.always_by_pass_cache = always_by_pass_cache
|
|
|
|
# Create the .crawl4ai folder in the user's home directory if it doesn't exist
|
|
self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai")
|
|
os.makedirs(self.crawl4ai_folder, exist_ok=True)
|
|
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True)
|
|
|
|
# If db_path is not provided, use the default path
|
|
# if not db_path:
|
|
# self.db_path = f"{self.crawl4ai_folder}/crawl4ai.db"
|
|
|
|
# flush_db()
|
|
init_db()
|
|
|
|
self.ready = False
|
|
|
|
def warmup(self):
|
|
print("[LOG] 🌤️ Warming up the WebCrawler")
|
|
result = self.run(
|
|
url='https://crawl4ai.uccode.io/',
|
|
word_count_threshold=5,
|
|
extraction_strategy= NoExtractionStrategy(),
|
|
bypass_cache=False,
|
|
verbose = False
|
|
)
|
|
self.ready = True
|
|
print("[LOG] 🌞 WebCrawler is ready to crawl")
|
|
|
|
def fetch_page(
|
|
self,
|
|
url_model: UrlModel,
|
|
provider: str = DEFAULT_PROVIDER,
|
|
api_token: str = None,
|
|
extract_blocks_flag: bool = True,
|
|
word_count_threshold=MIN_WORD_THRESHOLD,
|
|
css_selector: str = None,
|
|
screenshot: bool = False,
|
|
use_cached_html: bool = False,
|
|
extraction_strategy: ExtractionStrategy = None,
|
|
chunking_strategy: ChunkingStrategy = RegexChunking(),
|
|
**kwargs,
|
|
) -> CrawlResult:
|
|
return self.run(
|
|
url_model.url,
|
|
word_count_threshold,
|
|
extraction_strategy or NoExtractionStrategy(),
|
|
chunking_strategy,
|
|
bypass_cache=url_model.forced,
|
|
css_selector=css_selector,
|
|
screenshot=screenshot,
|
|
**kwargs,
|
|
)
|
|
pass
|
|
|
|
def run_old(
|
|
self,
|
|
url: str,
|
|
word_count_threshold=MIN_WORD_THRESHOLD,
|
|
extraction_strategy: ExtractionStrategy = None,
|
|
chunking_strategy: ChunkingStrategy = RegexChunking(),
|
|
bypass_cache: bool = False,
|
|
css_selector: str = None,
|
|
screenshot: bool = False,
|
|
user_agent: str = None,
|
|
verbose=True,
|
|
**kwargs,
|
|
) -> CrawlResult:
|
|
if user_agent:
|
|
self.crawler_strategy.update_user_agent(user_agent)
|
|
extraction_strategy = extraction_strategy or NoExtractionStrategy()
|
|
extraction_strategy.verbose = verbose
|
|
# Check if extraction strategy is an instance of ExtractionStrategy if not raise an error
|
|
if not isinstance(extraction_strategy, ExtractionStrategy):
|
|
raise ValueError("Unsupported extraction strategy")
|
|
if not isinstance(chunking_strategy, ChunkingStrategy):
|
|
raise ValueError("Unsupported chunking strategy")
|
|
|
|
# make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD
|
|
if word_count_threshold < MIN_WORD_THRESHOLD:
|
|
word_count_threshold = MIN_WORD_THRESHOLD
|
|
|
|
# Check cache first
|
|
if not bypass_cache and not self.always_by_pass_cache:
|
|
cached = get_cached_url(url)
|
|
if cached:
|
|
return CrawlResult(
|
|
**{
|
|
"url": cached[0],
|
|
"html": cached[1],
|
|
"cleaned_html": cached[2],
|
|
"markdown": cached[3],
|
|
"extracted_content": cached[4],
|
|
"success": cached[5],
|
|
"media": json.loads(cached[6] or "{}"),
|
|
"links": json.loads(cached[7] or "{}"),
|
|
"metadata": json.loads(cached[8] or "{}"), # "metadata": "{}
|
|
"screenshot": cached[9],
|
|
"error_message": "",
|
|
}
|
|
)
|
|
|
|
# Initialize WebDriver for crawling
|
|
t = time.time()
|
|
if kwargs.get("js", None):
|
|
self.crawler_strategy.js_code = kwargs.get("js")
|
|
html = self.crawler_strategy.crawl(url)
|
|
base64_image = None
|
|
if screenshot:
|
|
base64_image = self.crawler_strategy.take_screenshot()
|
|
success = True
|
|
error_message = ""
|
|
# Extract content from HTML
|
|
try:
|
|
result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector)
|
|
metadata = extract_metadata(html)
|
|
if result is None:
|
|
raise ValueError(f"Failed to extract content from the website: {url}")
|
|
except InvalidCSSSelectorError as e:
|
|
raise ValueError(str(e))
|
|
|
|
cleaned_html = result.get("cleaned_html", "")
|
|
markdown = result.get("markdown", "")
|
|
media = result.get("media", [])
|
|
links = result.get("links", [])
|
|
|
|
# Print a profession LOG style message, show time taken and say crawling is done
|
|
if verbose:
|
|
print(
|
|
f"[LOG] 🚀 Crawling done for {url}, success: {success}, time taken: {time.time() - t} seconds"
|
|
)
|
|
|
|
extracted_content = []
|
|
if verbose:
|
|
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
|
|
t = time.time()
|
|
# Split markdown into sections
|
|
sections = chunking_strategy.chunk(markdown)
|
|
# sections = merge_chunks_based_on_token_threshold(sections, CHUNK_TOKEN_THRESHOLD)
|
|
|
|
extracted_content = extraction_strategy.run(
|
|
url, sections,
|
|
)
|
|
extracted_content = json.dumps(extracted_content)
|
|
|
|
if verbose:
|
|
print(
|
|
f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds."
|
|
)
|
|
|
|
# Cache the result
|
|
cleaned_html = beautify_html(cleaned_html)
|
|
cache_url(
|
|
url,
|
|
html,
|
|
cleaned_html,
|
|
markdown,
|
|
extracted_content,
|
|
success,
|
|
json.dumps(media),
|
|
json.dumps(links),
|
|
json.dumps(metadata),
|
|
screenshot=base64_image,
|
|
)
|
|
|
|
return CrawlResult(
|
|
url=url,
|
|
html=html,
|
|
cleaned_html=cleaned_html,
|
|
markdown=markdown,
|
|
media=media,
|
|
links=links,
|
|
metadata=metadata,
|
|
screenshot=base64_image,
|
|
extracted_content=extracted_content,
|
|
success=success,
|
|
error_message=error_message,
|
|
)
|
|
|
|
def fetch_pages(
|
|
self,
|
|
url_models: List[UrlModel],
|
|
provider: str = DEFAULT_PROVIDER,
|
|
api_token: str = None,
|
|
extract_blocks_flag: bool = True,
|
|
word_count_threshold=MIN_WORD_THRESHOLD,
|
|
use_cached_html: bool = False,
|
|
css_selector: str = None,
|
|
screenshot: bool = False,
|
|
extraction_strategy: ExtractionStrategy = None,
|
|
chunking_strategy: ChunkingStrategy = RegexChunking(),
|
|
**kwargs,
|
|
) -> List[CrawlResult]:
|
|
extraction_strategy = extraction_strategy or NoExtractionStrategy()
|
|
def fetch_page_wrapper(url_model, *args, **kwargs):
|
|
return self.fetch_page(url_model, *args, **kwargs)
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
results = list(
|
|
executor.map(
|
|
fetch_page_wrapper,
|
|
url_models,
|
|
[provider] * len(url_models),
|
|
[api_token] * len(url_models),
|
|
[extract_blocks_flag] * len(url_models),
|
|
[word_count_threshold] * len(url_models),
|
|
[css_selector] * len(url_models),
|
|
[screenshot] * len(url_models),
|
|
[use_cached_html] * len(url_models),
|
|
[extraction_strategy] * len(url_models),
|
|
[chunking_strategy] * len(url_models),
|
|
*[kwargs] * len(url_models),
|
|
)
|
|
)
|
|
|
|
return results
|
|
|
|
def run(
|
|
self,
|
|
url: str,
|
|
word_count_threshold=MIN_WORD_THRESHOLD,
|
|
extraction_strategy: ExtractionStrategy = None,
|
|
chunking_strategy: ChunkingStrategy = RegexChunking(),
|
|
bypass_cache: bool = False,
|
|
css_selector: str = None,
|
|
screenshot: bool = False,
|
|
user_agent: str = None,
|
|
verbose=True,
|
|
**kwargs,
|
|
) -> CrawlResult:
|
|
extraction_strategy = extraction_strategy or NoExtractionStrategy()
|
|
extraction_strategy.verbose = verbose
|
|
if not isinstance(extraction_strategy, ExtractionStrategy):
|
|
raise ValueError("Unsupported extraction strategy")
|
|
if not isinstance(chunking_strategy, ChunkingStrategy):
|
|
raise ValueError("Unsupported chunking strategy")
|
|
|
|
if word_count_threshold < MIN_WORD_THRESHOLD:
|
|
word_count_threshold = MIN_WORD_THRESHOLD
|
|
|
|
# Check cache first
|
|
cached = None
|
|
extracted_content = None
|
|
if not bypass_cache and not self.always_by_pass_cache:
|
|
cached = get_cached_url(url)
|
|
|
|
if cached:
|
|
html = cached[1]
|
|
extracted_content = cached[2]
|
|
if screenshot:
|
|
screenshot = cached[9]
|
|
|
|
else:
|
|
if user_agent:
|
|
self.crawler_strategy.update_user_agent(user_agent)
|
|
html = self.crawler_strategy.crawl(url)
|
|
if screenshot:
|
|
screenshot = self.crawler_strategy.take_screenshot()
|
|
|
|
return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, bool(cached), **kwargs)
|
|
|
|
def process_html(
|
|
self,
|
|
url: str,
|
|
html: str,
|
|
extracted_content: str,
|
|
word_count_threshold: int,
|
|
extraction_strategy: ExtractionStrategy,
|
|
chunking_strategy: ChunkingStrategy,
|
|
css_selector: str,
|
|
screenshot: bool,
|
|
verbose: bool,
|
|
is_cached: bool,
|
|
**kwargs,
|
|
) -> CrawlResult:
|
|
t = time.time()
|
|
# Extract content from HTML
|
|
try:
|
|
result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector)
|
|
metadata = extract_metadata(html)
|
|
if result is None:
|
|
raise ValueError(f"Failed to extract content from the website: {url}")
|
|
except InvalidCSSSelectorError as e:
|
|
raise ValueError(str(e))
|
|
|
|
cleaned_html = result.get("cleaned_html", "")
|
|
markdown = result.get("markdown", "")
|
|
media = result.get("media", [])
|
|
links = result.get("links", [])
|
|
|
|
if verbose:
|
|
print(f"[LOG] 🚀 Crawling done for {url}, success: True, time taken: {time.time() - t} seconds")
|
|
|
|
if extracted_content is None:
|
|
if verbose:
|
|
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
|
|
|
|
sections = chunking_strategy.chunk(markdown)
|
|
extracted_content = extraction_strategy.run(url, sections)
|
|
extracted_content = json.dumps(extracted_content)
|
|
|
|
if verbose:
|
|
print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.")
|
|
|
|
screenshot = None if not screenshot else screenshot
|
|
|
|
if not is_cached:
|
|
cache_url(
|
|
url,
|
|
html,
|
|
cleaned_html,
|
|
markdown,
|
|
extracted_content,
|
|
True,
|
|
json.dumps(media),
|
|
json.dumps(links),
|
|
json.dumps(metadata),
|
|
screenshot=screenshot,
|
|
)
|
|
|
|
return CrawlResult(
|
|
url=url,
|
|
html=html,
|
|
cleaned_html=cleaned_html,
|
|
markdown=markdown,
|
|
media=media,
|
|
links=links,
|
|
metadata=metadata,
|
|
screenshot=screenshot,
|
|
extracted_content=extracted_content,
|
|
success=True,
|
|
error_message="",
|
|
) |