feat: Add page load check for LocalSeleniumCrawlerStrategy

This commit adds a page load check for the LocalSeleniumCrawlerStrategy in the `crawl` method. The `_ensure_page_load` method is introduced to ensure that the page has finished loading before proceeding. This helps to prevent issues with incomplete page sources and improves the reliability of the crawler.
This commit is contained in:
unclecode
2024-07-01 00:07:32 +08:00
parent d58286989c
commit 88d8cd8650
2 changed files with 79 additions and 59 deletions

View File

@@ -9,7 +9,7 @@ from selenium.common.exceptions import InvalidArgumentException, WebDriverExcept
from selenium.webdriver.chrome.service import Service as ChromeService from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.chrome import ChromeDriverManager
import logging import logging, time
import base64 import base64
from PIL import Image, ImageDraw, ImageFont from PIL import Image, ImageDraw, ImageFont
from io import BytesIO from io import BytesIO
@@ -177,7 +177,19 @@ class LocalSeleniumCrawlerStrategy(CrawlerStrategy):
# Set extra HTTP headers # Set extra HTTP headers
self.driver.execute_cdp_cmd('Network.setExtraHTTPHeaders', {'headers': headers}) self.driver.execute_cdp_cmd('Network.setExtraHTTPHeaders', {'headers': headers})
def _ensure_page_load(self, max_checks=6, check_interval=0.01):
initial_length = len(self.driver.page_source)
for ix in range(max_checks):
print(f"Checking page load: {ix}")
time.sleep(check_interval)
current_length = len(self.driver.page_source)
if current_length != initial_length:
break
return self.driver.page_source
def crawl(self, url: str) -> str: def crawl(self, url: str) -> str:
# Create md5 hash of the URL # Create md5 hash of the URL
import hashlib import hashlib
@@ -194,10 +206,14 @@ class LocalSeleniumCrawlerStrategy(CrawlerStrategy):
if self.verbose: if self.verbose:
print(f"[LOG] 🕸️ Crawling {url} using LocalSeleniumCrawlerStrategy...") print(f"[LOG] 🕸️ Crawling {url} using LocalSeleniumCrawlerStrategy...")
self.driver.get(url) #<html><head></head><body></body></html> self.driver.get(url) #<html><head></head><body></body></html>
html = self.driver.page_source
WebDriverWait(self.driver, 20).until(
lambda d: d.execute_script('return document.readyState') == 'complete'
)
WebDriverWait(self.driver, 10).until( WebDriverWait(self.driver, 10).until(
EC.presence_of_all_elements_located((By.TAG_NAME, "body")) EC.presence_of_all_elements_located((By.TAG_NAME, "body"))
) )
html = self._ensure_page_load() # self.driver.page_source
can_not_be_done_headless = False # Look at my creativity for naming variables can_not_be_done_headless = False # Look at my creativity for naming variables
# TODO: Very ugly way for now but it works # TODO: Very ugly way for now but it works
if html == "<html><head></head><body></body></html>": if html == "<html><head></head><body></body></html>":

View File

@@ -439,71 +439,75 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
media = {'images': [], 'videos': [], 'audios': []} media = {'images': [], 'videos': [], 'audios': []}
def process_element(element: element.PageElement) -> bool: def process_element(element: element.PageElement) -> bool:
if isinstance(element, NavigableString): try:
if isinstance(element, Comment): if isinstance(element, NavigableString):
element.extract() if isinstance(element, Comment):
return False element.extract()
return False
if element.name in ['script', 'style', 'link', 'meta', 'noscript']: if element.name in ['script', 'style', 'link', 'meta', 'noscript']:
element.decompose() element.decompose()
return False return False
keep_element = False keep_element = False
if element.name == 'a' and element.get('href'): if element.name == 'a' and element.get('href'):
href = element['href'] href = element['href']
url_base = url.split('/')[2] url_base = url.split('/')[2]
link_data = {'href': href, 'text': element.get_text()} link_data = {'href': href, 'text': element.get_text()}
if href.startswith('http') and url_base not in href: if href.startswith('http') and url_base not in href:
links['external'].append(link_data) links['external'].append(link_data)
else:
links['internal'].append(link_data)
keep_element = True
elif element.name == 'img':
media['images'].append({
'src': element.get('src'),
'alt': element.get('alt'),
'type': 'image'
})
return True # Always keep image elements
elif element.name in ['video', 'audio']:
media[f"{element.name}s"].append({
'src': element.get('src'),
'alt': element.get('alt'),
'type': element.name
})
return True # Always keep video and audio elements
if element.name != 'pre':
if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']:
if kwargs.get('only_text', False):
element.replace_with(element.get_text())
else: else:
element.unwrap() links['internal'].append(link_data)
elif element.name != 'img': keep_element = True
element.attrs = {}
# Process children elif element.name == 'img':
for child in list(element.children): media['images'].append({
if isinstance(child, NavigableString) and not isinstance(child, Comment): 'src': element.get('src'),
if len(child.strip()) > 0: 'alt': element.get('alt'),
keep_element = True 'type': 'image'
else: })
if process_element(child): return True # Always keep image elements
keep_element = True
# Check word count elif element.name in ['video', 'audio']:
if not keep_element: media[f"{element.name}s"].append({
word_count = len(element.get_text(strip=True).split()) 'src': element.get('src'),
keep_element = word_count >= word_count_threshold 'alt': element.get('alt'),
'type': element.name
})
return True # Always keep video and audio elements
if not keep_element: if element.name != 'pre':
element.decompose() if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']:
if kwargs.get('only_text', False):
element.replace_with(element.get_text())
else:
element.unwrap()
elif element.name != 'img':
element.attrs = {}
return keep_element # Process children
for child in list(element.children):
if isinstance(child, NavigableString) and not isinstance(child, Comment):
if len(child.strip()) > 0:
keep_element = True
else:
if process_element(child):
keep_element = True
# Check word count
if not keep_element:
word_count = len(element.get_text(strip=True).split())
keep_element = word_count >= word_count_threshold
if not keep_element:
element.decompose()
return keep_element
except Exception as e:
print('Error processing element:', str(e))
return False
process_element(body) process_element(body)