Created scaffolding for Scraper as per the plan. Implemented the ascrape method in bfs_scraper_strategy

This commit is contained in:
Aravind Karnam
2024-09-09 13:13:34 +05:30
parent eb131bebdf
commit 44ce12c62c
12 changed files with 155 additions and 0 deletions

View File

View File

@@ -0,0 +1,36 @@
import asyncio
from typing import List, Dict
from .scraper_strategy import ScraperStrategy
from .bfs_scraper_strategy import BFSScraperStrategy
from .models import ScraperResult
from ..async_webcrawler import AsyncWebCrawler
class BatchProcessor:
def __init__(self, batch_size: int, concurrency_limit: int):
self.batch_size = batch_size
self.concurrency_limit = concurrency_limit
async def process_batch(self, scraper: 'AsyncWebScraper', urls: List[str]) -> List[ScraperResult]:
semaphore = asyncio.Semaphore(self.concurrency_limit)
async def scrape_with_semaphore(url):
async with semaphore:
return await scraper.ascrape(url)
return await asyncio.gather(*[scrape_with_semaphore(url) for url in urls])
class AsyncWebScraper:
def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy, batch_size: int = 10, concurrency_limit: int = 5):
self.crawler = crawler
self.strategy = strategy
self.batch_processor = BatchProcessor(batch_size, concurrency_limit)
async def ascrape(self, url: str) -> ScraperResult:
crawl_result = await self.crawler.arun(url)
return await self.strategy.ascrape(url, crawl_result, self.crawler)
async def ascrape_many(self, urls: List[str]) -> List[ScraperResult]:
all_results = []
for i in range(0, len(urls), self.batch_processor.batch_size):
batch = urls[i:i+self.batch_processor.batch_size]
batch_results = await self.batch_processor.process_batch(self, batch)
all_results.extend(batch_results)
return all_results

View File

@@ -0,0 +1,50 @@
from .scraper_strategy import ScraperStrategy
from .filters import FilterChain
from .scorers import URLScorer
from .models import ScraperResult
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
import asyncio
from urllib.parse import urljoin
class BFSScraperStrategy(ScraperStrategy):
def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
async def ascrape(self, start_url: str, initial_crawl_result: CrawlResult, crawler: AsyncWebCrawler) -> ScraperResult:
queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url)) # (score, depth, url)
visited = set()
crawled_urls = []
extracted_data = {}
while not queue.empty():
_, depth, url = await queue.get()
if depth > self.max_depth or url in visited:
continue
crawl_result = initial_crawl_result if url == start_url else await crawler.arun(url)
visited.add(url)
crawled_urls.append(url)
extracted_data[url]=crawl_result
if crawl_result.success == False:
print(f"failed to crawl -- {url}")
continue
for internal in crawl_result.links["internal"]:
link = internal['href']
is_special_uri = any(link.startswith(scheme) for scheme in ('tel:', 'mailto:', 'sms:', 'geo:', 'fax:', 'file:', 'data:', 'sip:', 'ircs:', 'magnet:'))
is_fragment = '#' in link
if not (is_fragment or is_special_uri):
# To fix partial links: eg:'/support' to 'https://example.com/support'
absolute_link = urljoin(url, link)
if self.filter_chain.apply(absolute_link) and absolute_link not in visited:
score = self.url_scorer.score(absolute_link)
await queue.put((1 / score, depth + 1, absolute_link))
for external in crawl_result.links["external"]:
link = external['href']
if self.filter_chain.apply(link) and link not in visited:
score = self.url_scorer.score(link)
await queue.put((1 / score, depth + 1, link))
return ScraperResult(url=start_url, crawled_urls=crawled_urls, extracted_data=extracted_data)

View File

@@ -0,0 +1,3 @@
from .url_filter import URLFilter, FilterChain
from .content_type_filter import ContentTypeFilter
from .url_pattern_filter import URLPatternFilter

View File

@@ -0,0 +1,8 @@
from .url_filter import URLFilter
class ContentTypeFilter(URLFilter):
def __init__(self, contentType: str):
self.contentType = contentType
def apply(self, url: str) -> bool:
#TODO: This is a stub. Will implement this later
return True

View File

@@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
class URLFilter(ABC):
@abstractmethod
def apply(self, url: str) -> bool:
pass
class FilterChain:
def __init__(self):
self.filters = []
def add_filter(self, filter: URLFilter):
self.filters.append(filter)
def apply(self, url: str) -> bool:
return all(filter.apply(url) for filter in self.filters)

View File

@@ -0,0 +1,9 @@
from .url_filter import URLFilter
from re import Pattern
class URLPatternFilter(URLFilter):
def __init__(self, pattern: Pattern):
self.pattern = pattern
def apply(self, url: str) -> bool:
#TODO: This is a stub. Will implement this later.
return True

View File

@@ -0,0 +1,7 @@
from pydantic import BaseModel
from typing import List, Dict
class ScraperResult(BaseModel):
url: str
crawled_urls: List[str]
extracted_data: Dict

View File

@@ -0,0 +1,2 @@
from .url_scorer import URLScorer
from .keyword_relevance_scorer import KeywordRelevanceScorer

View File

@@ -0,0 +1,9 @@
from .url_scorer import URLScorer
from typing import List
class KeywordRelevanceScorer(URLScorer):
def __init__(self,keywords: List[str]):
self.keyworkds = keywords
def score(self, url: str) -> float:
#TODO: This is a stub. Will implement this later.
return 1

View File

@@ -0,0 +1,6 @@
from abc import ABC, abstractmethod
class URLScorer(ABC):
@abstractmethod
def score(self, url: str) -> float:
pass

View File

@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from .models import ScraperResult
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
class ScraperStrategy(ABC):
@abstractmethod
async def ascrape(self, url: str, crawl_result: CrawlResult, crawler: AsyncWebCrawler) -> ScraperResult:
pass