1. Introduced a bool flag to ascrape method to switch between sequential and concurrent processing
2. Introduced a dictionary for depth tracking across various tasks 3. Removed redundancy with crawled_urls variable. Instead created a list with visited set variable in returned object.
This commit is contained in:
@@ -23,8 +23,8 @@ class AsyncWebScraper:
|
|||||||
self.strategy = strategy
|
self.strategy = strategy
|
||||||
self.batch_processor = BatchProcessor(batch_size, concurrency_limit)
|
self.batch_processor = BatchProcessor(batch_size, concurrency_limit)
|
||||||
|
|
||||||
async def ascrape(self, url: str) -> ScraperResult:
|
async def ascrape(self, url: str, parallel_processing: bool = True) -> ScraperResult:
|
||||||
return await self.strategy.ascrape(url, self.crawler)
|
return await self.strategy.ascrape(url, self.crawler, parallel_processing)
|
||||||
|
|
||||||
async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]:
|
async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]:
|
||||||
all_results = []
|
all_results = []
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ from aiolimiter import AsyncLimiter
|
|||||||
from tenacity import retry, stop_after_attempt, wait_exponential
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Dict
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
|
||||||
rate_limiter = AsyncLimiter(1, 1) # 1 request per second
|
rate_limiter = AsyncLimiter(1, 1) # 1 request per second
|
||||||
@@ -44,7 +45,7 @@ class BFSScraperStrategy(ScraperStrategy):
|
|||||||
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
|
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
|
||||||
return await crawler.arun(url)
|
return await crawler.arun(url)
|
||||||
|
|
||||||
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set) -> CrawlResult:
|
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> CrawlResult:
|
||||||
def normalize_url(url: str) -> str:
|
def normalize_url(url: str) -> str:
|
||||||
parsed = urlparse(url)
|
parsed = urlparse(url)
|
||||||
return urlunparse(parsed._replace(fragment=""))
|
return urlunparse(parsed._replace(fragment=""))
|
||||||
@@ -98,34 +99,39 @@ class BFSScraperStrategy(ScraperStrategy):
|
|||||||
absolute_link = urljoin(url, link['href'])
|
absolute_link = urljoin(url, link['href'])
|
||||||
normalized_link = normalize_url(absolute_link)
|
normalized_link = normalize_url(absolute_link)
|
||||||
if self.filter_chain.apply(normalized_link) and normalized_link not in visited:
|
if self.filter_chain.apply(normalized_link) and normalized_link not in visited:
|
||||||
new_depth = depth + 1
|
new_depth = depths[url] + 1
|
||||||
if new_depth <= self.max_depth:
|
if new_depth <= self.max_depth:
|
||||||
# URL Scoring
|
# URL Scoring
|
||||||
score = self.url_scorer.score(normalized_link)
|
score = self.url_scorer.score(normalized_link)
|
||||||
await queue.put((score, new_depth, normalized_link))
|
await queue.put((score, new_depth, normalized_link))
|
||||||
|
depths[normalized_link] = new_depth
|
||||||
|
|
||||||
return crawl_result
|
return crawl_result
|
||||||
|
|
||||||
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler) -> ScraperResult:
|
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> ScraperResult:
|
||||||
queue = asyncio.PriorityQueue()
|
queue = asyncio.PriorityQueue()
|
||||||
queue.put_nowait((0, 0, start_url))
|
queue.put_nowait((0, 0, start_url))
|
||||||
visited = set()
|
visited = set()
|
||||||
crawled_urls = []
|
|
||||||
extracted_data = {}
|
extracted_data = {}
|
||||||
|
depths = {start_url: 0}
|
||||||
|
|
||||||
while not queue.empty():
|
while not queue.empty():
|
||||||
tasks = []
|
tasks = []
|
||||||
while not queue.empty() and len(tasks) < self.max_concurrent:
|
while not queue.empty() and len(tasks) < self.max_concurrent:
|
||||||
_, depth, url = await queue.get()
|
_, depth, url = await queue.get()
|
||||||
if url not in visited:
|
if url not in visited:
|
||||||
task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited))
|
if parallel_processing:
|
||||||
|
task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths))
|
||||||
tasks.append(task)
|
tasks.append(task)
|
||||||
|
else:
|
||||||
|
result = await self.process_url(url, depth, crawler, queue, visited, depths)
|
||||||
|
if result:
|
||||||
|
extracted_data[result.url] = result
|
||||||
|
|
||||||
if tasks:
|
if parallel_processing and tasks:
|
||||||
results = await asyncio.gather(*tasks)
|
results = await asyncio.gather(*tasks)
|
||||||
for result in results:
|
for result in results:
|
||||||
if result:
|
if result:
|
||||||
crawled_urls.append(result.url)
|
|
||||||
extracted_data[result.url] = result
|
extracted_data[result.url] = result
|
||||||
|
|
||||||
return ScraperResult(url=start_url, crawled_urls=crawled_urls, extracted_data=extracted_data)
|
return ScraperResult(url=start_url, crawled_urls=list(visited), extracted_data=extracted_data)
|
||||||
Reference in New Issue
Block a user