1. Make active_crawls into a dict instead of set and remove jobs array. Effective lookup and storage of active crawls and crawl control.
2. Put a lock on active_crawls, so similtanious push and pop by coroutines doesn't cause a race condition
3. Move the depth check logic outside the child link for loop, as source_url doesn't change in the loop.
This commit is contained in:
Aravind Karnam
2025-01-28 12:39:45 +05:30
parent f34b4878cf
commit 85847ff13f
2 changed files with 33 additions and 42 deletions

View File

@@ -46,6 +46,7 @@ class BFSScraperStrategy(ScraperStrategy):
self.stats = CrawlStats(start_time=datetime.now()) self.stats = CrawlStats(start_time=datetime.now())
self._cancel_event = asyncio.Event() self._cancel_event = asyncio.Event()
self.process_external_links = process_external_links self.process_external_links = process_external_links
self._active_crawls_lock = asyncio.Lock()
async def can_process_url(self, url: str, depth: int) -> bool: async def can_process_url(self, url: str, depth: int) -> bool:
"""Check if URL can be processed based on filters """Check if URL can be processed based on filters
@@ -77,21 +78,25 @@ class BFSScraperStrategy(ScraperStrategy):
self, self,
result: CrawlResult, result: CrawlResult,
source_url: str, source_url: str,
depth: int,
queue: asyncio.PriorityQueue, queue: asyncio.PriorityQueue,
visited: Set[str], visited: Set[str],
depths: Dict[str, int], depths: Dict[str, int],
): ):
"""Process extracted links from crawl result. """Process extracted links from crawl result.
This is our link processor that: This is our link processor that:
Handles both internal and external links
Checks if URL can be processed - validates URL, applies Filters with can_process_url
Checks depth limits Checks depth limits
Handles both internal and external links
Checks if URL is visited already
Checks if URL can be processed - validates URL, applies Filters with can_process_url
Scores URLs for priority Scores URLs for priority
Updates depth tracking Updates depth tracking dictionary
Adds valid URLs to the queue Adds valid URLs to the queue
Updates maximum depth statistics Updates maximum depth statistics
""" """
next_depth = depths[source_url] + 1
# If depth limit reached, exit without processing links
if next_depth > self.max_depth:
return
links_to_process = result.links["internal"] links_to_process = result.links["internal"]
if self.process_external_links: if self.process_external_links:
links_to_process += result.links["external"] links_to_process += result.links["external"]
@@ -99,17 +104,14 @@ class BFSScraperStrategy(ScraperStrategy):
url = link["href"] url = link["href"]
if url in visited: if url in visited:
continue continue
new_depth = depths[source_url] + 1 if not await self.can_process_url(url, next_depth):
if new_depth > self.max_depth:
continue
if not await self.can_process_url(url, new_depth):
self.stats.urls_skipped += 1 self.stats.urls_skipped += 1
continue continue
score = self.url_scorer.score(url) if self.url_scorer else 0 score = self.url_scorer.score(url) if self.url_scorer else 0
await queue.put((score, new_depth, url)) await queue.put((score, next_depth, url))
depths[url] = new_depth depths[url] = next_depth
self.stats.total_depth_reached = max( self.stats.total_depth_reached = max(
self.stats.total_depth_reached, new_depth self.stats.total_depth_reached, next_depth
) )
async def ascrape( async def ascrape(
@@ -134,7 +136,7 @@ class BFSScraperStrategy(ScraperStrategy):
await queue.put((0, 0, start_url)) await queue.put((0, 0, start_url))
visited: Set[str] = set() visited: Set[str] = set()
depths = {start_url: 0} depths = {start_url: 0}
active_crawls = set() # Track URLs currently being processed active_crawls = {} # Track URLs currently being processed with depth and score
async with AsyncWebCrawler( async with AsyncWebCrawler(
config=browser_config, config=browser_config,
verbose=True, verbose=True,
@@ -149,39 +151,31 @@ class BFSScraperStrategy(ScraperStrategy):
- Or while there are active crawls still running (arun_many) - Or while there are active crawls still running (arun_many)
- Can be interrupted via cancellation (not self._cancel_event.is_set()) - Can be interrupted via cancellation (not self._cancel_event.is_set())
""" """
# Collect batch of jobs to process # Collect batch of URLs into active_crawls to process
jobs = [] async with self._active_crawls_lock:
# Fill batch with available jobs while len(active_crawls) < SCRAPER_BATCH_SIZE and not queue.empty():
while len(jobs) < SCRAPER_BATCH_SIZE and not queue.empty(): score, depth, url = await queue.get()
score, depth, url = await queue.get() active_crawls[url] = {"depth": depth, "score": score}
if (
url not in active_crawls
): # Only add if not currently processing
jobs.append((score, depth, url))
active_crawls.add(url)
self.stats.current_depth = depth self.stats.current_depth = depth
if not jobs: if not active_crawls:
# If no jobs but active crawls exist, wait a bit and continue # If no active crawls exist, wait a bit and continue
if active_crawls: await asyncio.sleep(0.1)
await asyncio.sleep(0.1)
continue continue
# Process batch # Process batch
try: try:
async for result in await crawler.arun_many( async for result in await crawler.arun_many(
urls=[url for _, _, url in jobs], urls=list(active_crawls.keys()),
config=crawler_config.clone(stream=True), config=crawler_config.clone(stream=True),
): ):
source_url, depth = next( source_url = result.url
(url, depth) depth = active_crawls[source_url]["depth"]
for _, depth, url in jobs async with self._active_crawls_lock:
if url == result.url active_crawls.pop(source_url, None)
)
active_crawls.remove(source_url) # Remove from active set
if result.success: if result.success:
await self._process_links( await self._process_links(
result, source_url, depth, queue, visited, depths result, source_url, queue, visited, depths
) )
yield result yield result
else: else:
@@ -189,9 +183,6 @@ class BFSScraperStrategy(ScraperStrategy):
f"Failed to crawl {result.url}: {result.error_message}" f"Failed to crawl {result.url}: {result.error_message}"
) )
except Exception as e: except Exception as e:
# Remove failed URLs from active set
for _, _, url in jobs:
active_crawls.discard(url)
self.logger.error(f"Batch processing error: {e}") self.logger.error(f"Batch processing error: {e}")
# Continue processing other batches # Continue processing other batches
continue continue

View File

@@ -188,11 +188,11 @@ if __name__ == "__main__":
import time import time
# Run basic example # Run basic example
# start_time = time.perf_counter() start_time = time.perf_counter()
# print("Running basic scraper example...") print("Running basic scraper example...")
# asyncio.run(basic_scraper_example()) asyncio.run(basic_scraper_example())
# end_time = time.perf_counter() end_time = time.perf_counter()
# print(f"Basic scraper example completed in {end_time - start_time:.2f} seconds") print(f"Basic scraper example completed in {end_time - start_time:.2f} seconds")
# # Run advanced example # # Run advanced example
print("\nRunning advanced scraper example...") print("\nRunning advanced scraper example...")