chore: Refactor get_content_of_website_optimized function in utils.py

This commit is contained in:
unclecode
2024-06-26 14:43:09 +08:00
parent 96d1eb0d0d
commit 7ba2142363
2 changed files with 28 additions and 21 deletions

View File

@@ -438,18 +438,17 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
links = {'internal': [], 'external': []} links = {'internal': [], 'external': []}
media = {'images': [], 'videos': [], 'audios': []} media = {'images': [], 'videos': [], 'audios': []}
def process_element(element: element.PageElement) -> None: def process_element(element: element.PageElement) -> bool:
if isinstance(element, NavigableString): if isinstance(element, NavigableString):
if isinstance(element, Comment): if isinstance(element, Comment):
element.extract() element.extract()
return return False
# if not isinstance(element, element.Tag):
# return
if element.name in ['script', 'style', 'link', 'meta', 'noscript']: if element.name in ['script', 'style', 'link', 'meta', 'noscript']:
element.decompose() element.decompose()
return return False
keep_element = False
if element.name == 'a' and element.get('href'): if element.name == 'a' and element.get('href'):
href = element['href'] href = element['href']
@@ -459,6 +458,7 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
links['external'].append(link_data) links['external'].append(link_data)
else: else:
links['internal'].append(link_data) links['internal'].append(link_data)
keep_element = True
elif element.name == 'img': elif element.name == 'img':
media['images'].append({ media['images'].append({
@@ -466,12 +466,7 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
'alt': element.get('alt'), 'alt': element.get('alt'),
'type': 'image' 'type': 'image'
}) })
alt_text = element.get('alt') return True # Always keep image elements
if alt_text:
element.replace_with(soup.new_string(alt_text))
else:
element.decompose()
return
elif element.name in ['video', 'audio']: elif element.name in ['video', 'audio']:
media[f"{element.name}s"].append({ media[f"{element.name}s"].append({
@@ -479,6 +474,7 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
'alt': element.get('alt'), 'alt': element.get('alt'),
'type': element.name 'type': element.name
}) })
return True # Always keep video and audio elements
if element.name != 'pre': if element.name != 'pre':
if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']: if element.name in ['b', 'i', 'u', 'span', 'del', 'ins', 'sub', 'sup', 'strong', 'em', 'code', 'kbd', 'var', 's', 'q', 'abbr', 'cite', 'dfn', 'time', 'small', 'mark']:
@@ -489,17 +485,26 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
elif element.name != 'img': elif element.name != 'img':
element.attrs = {} element.attrs = {}
word_count = len(element.get_text(strip=True).split()) # Process children
if word_count < word_count_threshold:
element.decompose()
return
for child in list(element.children): for child in list(element.children):
process_element(child) if isinstance(child, NavigableString) and not isinstance(child, Comment):
if len(child.strip()) > 0:
keep_element = True
else:
if process_element(child):
keep_element = True
if not element.contents and not element.get_text(strip=True): # Check word count
if not keep_element:
word_count = len(element.get_text(strip=True).split())
keep_element = word_count >= word_count_threshold
if not keep_element:
element.decompose() element.decompose()
return keep_element
process_element(body) process_element(body)
def flatten_nested_elements(node): def flatten_nested_elements(node):

View File

@@ -136,8 +136,10 @@ class WebCrawler:
if not isinstance(chunking_strategy, ChunkingStrategy): if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy") raise ValueError("Unsupported chunking strategy")
if word_count_threshold < MIN_WORD_THRESHOLD: # if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD # word_count_threshold = MIN_WORD_THRESHOLD
word_count_threshold = max(word_count_threshold, 0)
# Check cache first # Check cache first
cached = None cached = None