vital: Right now, only raw html is retrived from datbase, therefore, css selector and other filter will be executed every time.

This commit is contained in:
unclecode
2024-06-08 18:37:40 +08:00
parent b3a0edaa6d
commit f7e0cee1b0
2 changed files with 396 additions and 147 deletions

View File

@@ -51,7 +51,6 @@ class WebCrawler:
self.ready = True
print("[LOG] 🌞 WebCrawler is ready to crawl")
def fetch_page(
self,
url_model: UrlModel,
@@ -78,131 +77,6 @@ class WebCrawler:
)
pass
def run(
self,
url: str,
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
verbose=True,
user_agent: str = None,
**kwargs,
) -> CrawlResult:
if user_agent:
self.crawler_strategy.update_user_agent(user_agent)
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
# Check if extraction strategy is an instance of ExtractionStrategy if not raise an error
if not isinstance(extraction_strategy, ExtractionStrategy):
raise ValueError("Unsupported extraction strategy")
if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy")
# make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD
if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD
# Check cache first
if not bypass_cache and not self.always_by_pass_cache:
cached = get_cached_url(url)
if cached:
return CrawlResult(
**{
"url": cached[0],
"html": cached[1],
"cleaned_html": cached[2],
"markdown": cached[3],
"extracted_content": cached[4],
"success": cached[5],
"media": json.loads(cached[6] or "{}"),
"links": json.loads(cached[7] or "{}"),
"metadata": json.loads(cached[8] or "{}"), # "metadata": "{}
"screenshot": cached[9],
"error_message": "",
}
)
# Initialize WebDriver for crawling
t = time.time()
if kwargs.get("js", None):
self.crawler_strategy.js_code = kwargs.get("js")
html = self.crawler_strategy.crawl(url)
base64_image = None
if screenshot:
base64_image = self.crawler_strategy.take_screenshot()
success = True
error_message = ""
# Extract content from HTML
try:
result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector)
metadata = extract_metadata(html)
if result is None:
raise ValueError(f"Failed to extract content from the website: {url}")
except InvalidCSSSelectorError as e:
raise ValueError(str(e))
cleaned_html = result.get("cleaned_html", "")
markdown = result.get("markdown", "")
media = result.get("media", [])
links = result.get("links", [])
# Print a profession LOG style message, show time taken and say crawling is done
if verbose:
print(
f"[LOG] 🚀 Crawling done for {url}, success: {success}, time taken: {time.time() - t} seconds"
)
extracted_content = []
if verbose:
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
t = time.time()
# Split markdown into sections
sections = chunking_strategy.chunk(markdown)
# sections = merge_chunks_based_on_token_threshold(sections, CHUNK_TOKEN_THRESHOLD)
extracted_content = extraction_strategy.run(
url, sections,
)
extracted_content = json.dumps(extracted_content)
if verbose:
print(
f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds."
)
# Cache the result
cleaned_html = beautify_html(cleaned_html)
cache_url(
url,
html,
cleaned_html,
markdown,
extracted_content,
success,
json.dumps(media),
json.dumps(links),
json.dumps(metadata),
screenshot=base64_image,
)
return CrawlResult(
url=url,
html=html,
cleaned_html=cleaned_html,
markdown=markdown,
media=media,
links=links,
metadata=metadata,
screenshot=base64_image,
extracted_content=extracted_content,
success=success,
error_message=error_message,
)
def fetch_pages(
self,
url_models: List[UrlModel],
@@ -241,8 +115,7 @@ class WebCrawler:
return results
def run_less_db(
def run(
self,
url: str,
word_count_threshold=MIN_WORD_THRESHOLD,
@@ -251,6 +124,7 @@ class WebCrawler:
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
user_agent: str = None,
verbose=True,
**kwargs,
) -> CrawlResult:
@@ -273,11 +147,17 @@ class WebCrawler:
if cached:
html = cached[1]
extracted_content = cached[2]
else:
html = self.crawler_strategy.crawl(url)
cache_url(url, html)
if screenshot:
screenshot = cached[9]
return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, **kwargs)
else:
if user_agent:
self.crawler_strategy.update_user_agent(user_agent)
html = self.crawler_strategy.crawl(url)
if screenshot:
screenshot = self.crawler_strategy.take_screenshot()
return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, bool(cached), **kwargs)
def process_html(
self,
@@ -290,16 +170,14 @@ class WebCrawler:
css_selector: str,
screenshot: bool,
verbose: bool,
is_cached: bool,
**kwargs,
) -> CrawlResult:
t = time.time()
base64_image = None
if screenshot:
base64_image = self.crawler_strategy.take_screenshot()
# Extract content from HTML
try:
result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector)
metadata = extract_metadata(html)
if result is None:
raise ValueError(f"Failed to extract content from the website: {url}")
except InvalidCSSSelectorError as e:
@@ -312,20 +190,33 @@ class WebCrawler:
if verbose:
print(f"[LOG] 🚀 Crawling done for {url}, success: True, time taken: {time.time() - t} seconds")
if verbose:
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
sections = chunking_strategy.chunk(markdown)
if extracted_content is None:
if verbose:
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
sections = chunking_strategy.chunk(markdown)
extracted_content = extraction_strategy.run(url, sections)
extracted_content = json.dumps(extracted_content)
# Cache the extracted content
cache_url(url, html, extracted_content)
if verbose:
print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.")
if verbose:
print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.")
screenshot = None if not screenshot else screenshot
if not is_cached:
cache_url(
url,
html,
cleaned_html,
markdown,
extracted_content,
True,
json.dumps(media),
json.dumps(links),
json.dumps(metadata),
screenshot=screenshot,
)
return CrawlResult(
url=url,
@@ -334,7 +225,8 @@ class WebCrawler:
markdown=markdown,
media=media,
links=links,
screenshot=base64_image,
metadata=metadata,
screenshot=screenshot,
extracted_content=extracted_content,
success=True,
error_message="",