Merge pull request #172 from aravindkarnam/scraper

Scraper
This commit is contained in:
UncleCode
2024-11-06 07:00:44 +01:00
committed by GitHub
25 changed files with 737 additions and 126 deletions

4
.gitignore vendored
View File

@@ -201,4 +201,6 @@ test_env/
todo.md
git_changes.py
git_changes.md
pypi_build.sh
pypi_build.sh
.tests/

View File

@@ -1,5 +1,83 @@
# Changelog
## [v0.3.6] - 2024-10-12
### 1. Improved Crawling Control
- **New Hook**: Added `before_retrieve_html` hook in `AsyncPlaywrightCrawlerStrategy`.
- **Delayed HTML Retrieval**: Introduced `delay_before_return_html` parameter to allow waiting before retrieving HTML content.
- Useful for pages with delayed content loading.
- **Flexible Timeout**: `smart_wait` function now uses `page_timeout` (default 60 seconds) instead of a fixed 30-second timeout.
- Provides better handling for slow-loading pages.
- **How to use**: Set `page_timeout=your_desired_timeout` (in milliseconds) when calling `crawler.arun()`.
### 2. Browser Type Selection
- Added support for different browser types (Chromium, Firefox, WebKit).
- Users can now specify the browser type when initializing AsyncWebCrawler.
- **How to use**: Set `browser_type="firefox"` or `browser_type="webkit"` when initializing AsyncWebCrawler.
### 3. Screenshot Capture
- Added ability to capture screenshots during crawling.
- Useful for debugging and content verification.
- **How to use**: Set `screenshot=True` when calling `crawler.arun()`.
### 4. Enhanced LLM Extraction Strategy
- Added support for multiple LLM providers (OpenAI, Hugging Face, Ollama).
- **Custom Arguments**: Added support for passing extra arguments to LLM providers via `extra_args` parameter.
- **Custom Headers**: Users can now pass custom headers to the extraction strategy.
- **How to use**: Specify the desired provider and custom arguments when using `LLMExtractionStrategy`.
### 5. iframe Content Extraction
- New feature to process and extract content from iframes.
- **How to use**: Set `process_iframes=True` in the crawl method.
### 6. Delayed Content Retrieval
- Introduced `get_delayed_content` method in `AsyncCrawlResponse`.
- Allows retrieval of content after a specified delay, useful for dynamically loaded content.
- **How to use**: Access `result.get_delayed_content(delay_in_seconds)` after crawling.
## Improvements and Optimizations
### 1. AsyncWebCrawler Enhancements
- **Flexible Initialization**: Now accepts arbitrary keyword arguments, passed directly to the crawler strategy.
- Allows for more customized setups.
### 2. Image Processing Optimization
- Enhanced image handling in WebScrappingStrategy.
- Added filtering for small, invisible, or irrelevant images.
- Improved image scoring system for better content relevance.
- Implemented JavaScript-based image dimension updating for more accurate representation.
### 3. Database Schema Auto-updates
- Automatic database schema updates ensure compatibility with the latest version.
### 4. Enhanced Error Handling and Logging
- Improved error messages and logging for easier debugging.
### 5. Content Extraction Refinements
- Refined HTML sanitization process.
- Improved handling of base64 encoded images.
- Enhanced Markdown conversion process.
- Optimized content extraction algorithms.
### 6. Utility Function Enhancements
- `perform_completion_with_backoff` function now supports additional arguments for more customized API calls to LLM providers.
## Bug Fixes
- Fixed an issue where image tags were being prematurely removed during content extraction.
## Examples and Documentation
- Updated `quickstart_async.py` with examples of:
- Using custom headers in LLM extraction.
- Different LLM provider usage (OpenAI, Hugging Face, Ollama).
- Custom browser type usage.
## Developer Notes
- Refactored code for better maintainability, flexibility, and performance.
- Enhanced type hinting throughout the codebase for improved development experience.
- Expanded error handling for more robust operation.
These updates significantly enhance the flexibility, accuracy, and robustness of crawl4ai, providing users with more control and options for their web crawling and content extraction tasks.
## [v0.3.5] - 2024-09-02
Enhance AsyncWebCrawler with smart waiting and screenshot capabilities

View File

@@ -10,6 +10,14 @@ Crawl4AI simplifies asynchronous web crawling and data extraction, making it acc
> Looking for the synchronous version? Check out [README.sync.md](./README.sync.md). You can also access the previous version in the branch [V0.2.76](https://github.com/unclecode/crawl4ai/blob/v0.2.76).
## New update 0.3.6
- 🌐 Multi-browser support (Chromium, Firefox, WebKit)
- 🖼️ Improved image processing with lazy-loading detection
- 🔧 Custom page timeout parameter for better control over crawling behavior
- 🕰️ Enhanced handling of delayed content loading
- 🔑 Custom headers support for LLM interactions
- 🖼️ iframe content extraction for comprehensive page analysis
- ⏱️ Flexible timeout and delayed content retrieval options
## Try it Now!
@@ -124,7 +132,7 @@ async def main():
result = await crawler.arun(
url="https://www.nbcnews.com/business",
js_code=js_code,
css_selector="article.tease-card",
css_selector=".wide-tease-item__description",
bypass_cache=True
)
print(result.extracted_content)

View File

@@ -3,7 +3,7 @@
from .async_webcrawler import AsyncWebCrawler
from .models import CrawlResult
__version__ = "0.3.5"
__version__ = "0.3.6"
__all__ = [
"AsyncWebCrawler",

View File

@@ -1,7 +1,7 @@
import asyncio
import base64, time
from abc import ABC, abstractmethod
from typing import Callable, Dict, Any, List, Optional
from typing import Callable, Dict, Any, List, Optional, Awaitable
import os
from playwright.async_api import async_playwright, Page, Browser, Error
from io import BytesIO
@@ -18,6 +18,10 @@ class AsyncCrawlResponse(BaseModel):
response_headers: Dict[str, str]
status_code: int
screenshot: Optional[str] = None
get_delayed_content: Optional[Callable[[Optional[float]], Awaitable[str]]] = None
class Config:
arbitrary_types_allowed = True
class AsyncCrawlerStrategy(ABC):
@abstractmethod
@@ -46,7 +50,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
self.user_agent = kwargs.get("user_agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
self.proxy = kwargs.get("proxy")
self.headless = kwargs.get("headless", True)
self.headers = {}
self.browser_type = kwargs.get("browser_type", "chromium") # New parameter
self.headers = kwargs.get("headers", {})
self.sessions = {}
self.session_ttl = 1800
self.js_code = js_code
@@ -59,7 +64,8 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
'on_execution_started': None,
'before_goto': None,
'after_goto': None,
'before_return_html': None
'before_return_html': None,
'before_retrieve_html': None
}
async def __aenter__(self):
@@ -75,7 +81,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if self.browser is None:
browser_args = {
"headless": self.headless,
# "headless": False,
"args": [
"--disable-gpu",
"--disable-dev-shm-usage",
@@ -90,7 +95,14 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
browser_args["proxy"] = proxy_settings
self.browser = await self.playwright.chromium.launch(**browser_args)
# Select the appropriate browser based on the browser_type
if self.browser_type == "firefox":
self.browser = await self.playwright.firefox.launch(**browser_args)
elif self.browser_type == "webkit":
self.browser = await self.playwright.webkit.launch(**browser_args)
else:
self.browser = await self.playwright.chromium.launch(**browser_args)
await self.execute_hook('on_browser_created', self.browser)
async def close(self):
@@ -140,7 +152,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
for sid in expired_sessions:
asyncio.create_task(self.kill_session(sid))
async def smart_wait(self, page: Page, wait_for: str, timeout: float = 30000):
wait_for = wait_for.strip()
@@ -204,6 +215,48 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
except Exception as e:
raise RuntimeError(f"Error in wait condition: {str(e)}")
async def process_iframes(self, page):
# Find all iframes
iframes = await page.query_selector_all('iframe')
for i, iframe in enumerate(iframes):
try:
# Add a unique identifier to the iframe
await iframe.evaluate(f'(element) => element.id = "iframe-{i}"')
# Get the frame associated with this iframe
frame = await iframe.content_frame()
if frame:
# Wait for the frame to load
await frame.wait_for_load_state('load', timeout=30000) # 30 seconds timeout
# Extract the content of the iframe's body
iframe_content = await frame.evaluate('() => document.body.innerHTML')
# Generate a unique class name for this iframe
class_name = f'extracted-iframe-content-{i}'
# Replace the iframe with a div containing the extracted content
_iframe = iframe_content.replace('`', '\\`')
await page.evaluate(f"""
() => {{
const iframe = document.getElementById('iframe-{i}');
const div = document.createElement('div');
div.innerHTML = `{_iframe}`;
div.className = '{class_name}';
iframe.replaceWith(div);
}}
""")
else:
print(f"Warning: Could not access content frame for iframe {i}")
except Exception as e:
print(f"Error processing iframe {i}: {str(e)}")
# Return the page object
return page
async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse:
response_headers = {}
status_code = None
@@ -248,7 +301,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
if not kwargs.get("js_only", False):
await self.execute_hook('before_goto', page)
response = await page.goto(url, wait_until="domcontentloaded", timeout=60000)
response = await page.goto(url, wait_until="domcontentloaded", timeout=kwargs.get("page_timeout", 60000))
await self.execute_hook('after_goto', page)
# Get status code and headers
@@ -258,6 +311,7 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
status_code = 200
response_headers = {}
await page.wait_for_selector('body')
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
@@ -291,12 +345,89 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
wait_for = kwargs.get("wait_for")
if wait_for:
try:
await self.smart_wait(page, wait_for, timeout=kwargs.get("timeout", 30000))
await self.smart_wait(page, wait_for, timeout=kwargs.get("page_timeout", 60000))
except Exception as e:
raise RuntimeError(f"Wait condition failed: {str(e)}")
# Check if kwargs has screenshot=True then take screenshot
screenshot_data = None
if kwargs.get("screenshot"):
screenshot_data = await self.take_screenshot(url)
# New code to update image dimensions
update_image_dimensions_js = """
() => {
return new Promise((resolve) => {
const filterImage = (img) => {
// Filter out images that are too small
if (img.width < 100 && img.height < 100) return false;
// Filter out images that are not visible
const rect = img.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
// Filter out images with certain class names (e.g., icons, thumbnails)
if (img.classList.contains('icon') || img.classList.contains('thumbnail')) return false;
// Filter out images with certain patterns in their src (e.g., placeholder images)
if (img.src.includes('placeholder') || img.src.includes('icon')) return false;
return true;
};
const images = Array.from(document.querySelectorAll('img')).filter(filterImage);
let imagesLeft = images.length;
if (imagesLeft === 0) {
resolve();
return;
}
const checkImage = (img) => {
if (img.complete && img.naturalWidth !== 0) {
img.setAttribute('width', img.naturalWidth);
img.setAttribute('height', img.naturalHeight);
imagesLeft--;
if (imagesLeft === 0) resolve();
}
};
images.forEach(img => {
checkImage(img);
if (!img.complete) {
img.onload = () => {
checkImage(img);
};
img.onerror = () => {
imagesLeft--;
if (imagesLeft === 0) resolve();
};
}
});
// Fallback timeout of 5 seconds
setTimeout(() => resolve(), 5000);
});
}
"""
await page.evaluate(update_image_dimensions_js)
# Wait a bit for any onload events to complete
await page.wait_for_timeout(100)
# Process iframes
if kwargs.get("process_iframes", False):
page = await self.process_iframes(page)
await self.execute_hook('before_retrieve_html', page)
# Check if delay_before_return_html is set then wait for that time
delay_before_return_html = kwargs.get("delay_before_return_html")
if delay_before_return_html:
await asyncio.sleep(delay_before_return_html)
html = await page.content()
page = await self.execute_hook('before_return_html', page, html)
await self.execute_hook('before_return_html', page, html)
if self.verbose:
print(f"[LOG] ✅ Crawled {url} successfully!")
@@ -312,7 +443,20 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
"status_code": status_code
}, f)
response = AsyncCrawlResponse(html=html, response_headers=response_headers, status_code=status_code)
async def get_delayed_content(delay: float = 5.0) -> str:
if self.verbose:
print(f"[LOG] Waiting for {delay} seconds before retrieving content for {url}")
await asyncio.sleep(delay)
return await page.content()
response = AsyncCrawlResponse(
html=html,
response_headers=response_headers,
status_code=status_code,
screenshot=screenshot_data,
get_delayed_content=get_delayed_content
)
return response
except Error as e:
raise Error(f"Failed to crawl {url}: {str(e)}")
@@ -370,7 +514,6 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
except Error as e:
raise Error(f"Failed to execute JavaScript or wait for condition in session {session_id}: {str(e)}")
async def crawl_many(self, urls: List[str], **kwargs) -> List[AsyncCrawlResponse]:
semaphore_count = kwargs.get('semaphore_count', calculate_semaphore_count())
semaphore = asyncio.Semaphore(semaphore_count)
@@ -383,11 +526,13 @@ class AsyncPlaywrightCrawlerStrategy(AsyncCrawlerStrategy):
results = await asyncio.gather(*tasks, return_exceptions=True)
return [result if not isinstance(result, Exception) else str(result) for result in results]
async def take_screenshot(self, url: str) -> str:
async def take_screenshot(self, url: str, wait_time = 1000) -> str:
async with await self.browser.new_context(user_agent=self.user_agent) as context:
page = await context.new_page()
try:
await page.goto(url, wait_until="domcontentloaded")
await page.goto(url, wait_until="domcontentloaded", timeout=30000)
# Wait for a specified time (default is 1 second)
await page.wait_for_timeout(wait_time)
screenshot = await page.screenshot(full_page=True)
return base64.b64encode(screenshot).decode('utf-8')
except Exception as e:

View File

@@ -29,14 +29,31 @@ class AsyncDatabaseManager:
)
''')
await db.commit()
await self.update_db_schema()
async def aalter_db_add_screenshot(self, new_column: str = "media"):
async def update_db_schema(self):
async with aiosqlite.connect(self.db_path) as db:
# Check if the 'media' column exists
cursor = await db.execute("PRAGMA table_info(crawled_data)")
columns = await cursor.fetchall()
column_names = [column[1] for column in columns]
if 'media' not in column_names:
await self.aalter_db_add_column('media')
# Check for other missing columns and add them if necessary
for column in ['links', 'metadata', 'screenshot']:
if column not in column_names:
await self.aalter_db_add_column(column)
async def aalter_db_add_column(self, new_column: str):
try:
async with aiosqlite.connect(self.db_path) as db:
await db.execute(f'ALTER TABLE crawled_data ADD COLUMN {new_column} TEXT DEFAULT ""')
await db.commit()
print(f"Added column '{new_column}' to the database.")
except Exception as e:
print(f"Error altering database to add screenshot column: {e}")
print(f"Error altering database to add {new_column} column: {e}")
async def aget_cached_url(self, url: str) -> Optional[Tuple[str, str, str, str, str, str, str, bool, str]]:
try:

View File

@@ -23,17 +23,17 @@ class AsyncWebCrawler:
self,
crawler_strategy: Optional[AsyncCrawlerStrategy] = None,
always_by_pass_cache: bool = False,
verbose: bool = False,
**kwargs,
):
self.crawler_strategy = crawler_strategy or AsyncPlaywrightCrawlerStrategy(
verbose=verbose
**kwargs
)
self.always_by_pass_cache = always_by_pass_cache
self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai")
os.makedirs(self.crawl4ai_folder, exist_ok=True)
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True)
self.ready = False
self.verbose = verbose
self.verbose = kwargs.get("verbose", False)
async def __aenter__(self):
await self.crawler_strategy.__aenter__()
@@ -202,11 +202,11 @@ class AsyncWebCrawler:
)
if result is None:
raise ValueError(f"Failed to extract content from the website: {url}")
raise ValueError(f"Process HTML, Failed to extract content from the website: {url}")
except InvalidCSSSelectorError as e:
raise ValueError(str(e))
except Exception as e:
raise ValueError(f"Failed to extract content from the website: {url}, error: {str(e)}")
raise ValueError(f"Process HTML, Failed to extract content from the website: {url}, error: {str(e)}")
cleaned_html = sanitize_input_encode(result.get("cleaned_html", ""))
markdown = sanitize_input_encode(result.get("markdown", ""))

View File

@@ -16,8 +16,6 @@ from .utils import (
CustomHTML2Text
)
class ContentScrappingStrategy(ABC):
@abstractmethod
def scrap(self, url: str, html: str, **kwargs) -> Dict[str, Any]:
@@ -129,7 +127,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
image_size = 0 #int(fetch_image_file_size(img,base_url) or 0)
image_format = os.path.splitext(img.get('src',''))[1].lower()
# Remove . from format
image_format = image_format.strip('.')
image_format = image_format.strip('.').split('?')[0]
score = 0
if height_value:
if height_unit == 'px' and height_value > 150:
@@ -158,6 +156,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
return None
return {
'src': img.get('src', ''),
'data-src': img.get('data-src', ''),
'alt': img.get('alt', ''),
'desc': find_closest_parent_with_useful_text(img),
'score': score,
@@ -170,10 +169,12 @@ class WebScrappingStrategy(ContentScrappingStrategy):
if isinstance(element, Comment):
element.extract()
return False
# if element.name == 'img':
# process_image(element, url, 0, 1)
# return True
if element.name in ['script', 'style', 'link', 'meta', 'noscript']:
if element.name == 'img':
process_image(element, url, 0, 1)
element.decompose()
return False
@@ -273,11 +274,14 @@ class WebScrappingStrategy(ContentScrappingStrategy):
# Replace base64 data with empty string
img['src'] = base64_pattern.sub('', src)
cleaned_html = str(body).replace('\n\n', '\n').replace(' ', ' ')
cleaned_html = sanitize_html(cleaned_html)
h = CustomHTML2Text()
h.ignore_links = True
markdown = h.handle(cleaned_html)
h.body_width = 0
try:
markdown = h.handle(cleaned_html)
except Exception as e:
markdown = h.handle(sanitize_html(cleaned_html))
markdown = markdown.replace(' ```', '```')
try:
@@ -286,6 +290,7 @@ class WebScrappingStrategy(ContentScrappingStrategy):
print('Error extracting metadata:', str(e))
meta = {}
cleaned_html = sanitize_html(cleaned_html)
return {
'markdown': markdown,
'cleaned_html': cleaned_html,

View File

@@ -80,6 +80,7 @@ class LLMExtractionStrategy(ExtractionStrategy):
self.word_token_rate = kwargs.get("word_token_rate", WORD_TOKEN_RATE)
self.apply_chunking = kwargs.get("apply_chunking", True)
self.base_url = kwargs.get("base_url", None)
self.extra_args = kwargs.get("extra_args", {})
if not self.apply_chunking:
self.chunk_token_threshold = 1e9
@@ -111,7 +112,13 @@ class LLMExtractionStrategy(ExtractionStrategy):
"{" + variable + "}", variable_values[variable]
)
response = perform_completion_with_backoff(self.provider, prompt_with_variables, self.api_token, base_url=self.base_url) # , json_response=self.extract_type == "schema")
response = perform_completion_with_backoff(
self.provider,
prompt_with_variables,
self.api_token,
base_url=self.base_url,
extra_args = self.extra_args
) # , json_response=self.extract_type == "schema")
try:
blocks = extract_xml_data(["blocks"], response.choices[0].message.content)['blocks']
blocks = json.loads(blocks)

View File

@@ -1,4 +1,4 @@
PROMPT_EXTRACT_BLOCKS = """YHere is the URL of the webpage:
PROMPT_EXTRACT_BLOCKS = """Here is the URL of the webpage:
<url>{URL}</url>
And here is the cleaned HTML content of that webpage:
@@ -79,7 +79,7 @@ To generate the JSON objects:
2. For each block:
a. Assign it an index based on its order in the content.
b. Analyze the content and generate ONE semantic tag that describe what the block is about.
c. Extract the text content, EXACTLY SAME AS GIVE DATA, clean it up if needed, and store it as a list of strings in the "content" field.
c. Extract the text content, EXACTLY SAME AS THE GIVE DATA, clean it up if needed, and store it as a list of strings in the "content" field.
3. Ensure that the order of the JSON objects matches the order of the blocks as they appear in the original HTML content.

View File

@@ -0,0 +1,2 @@
from .async_web_scraper import AsyncWebScraper
from .bfs_scraper_strategy import BFSScraperStrategy

View File

@@ -0,0 +1,33 @@
from .scraper_strategy import ScraperStrategy
from .models import ScraperResult, CrawlResult
from ..async_webcrawler import AsyncWebCrawler
from typing import Union, AsyncGenerator
class AsyncWebScraper:
def __init__(self, crawler: AsyncWebCrawler, strategy: ScraperStrategy):
self.crawler = crawler
self.strategy = strategy
async def ascrape(self, url: str, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
if stream:
return self._ascrape_yielding(url, parallel_processing)
else:
return await self._ascrape_collecting(url, parallel_processing)
async def _ascrape_yielding(self, url: str, parallel_processing: bool) -> AsyncGenerator[CrawlResult, None]:
result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing)
async for res in result_generator: # Consume the async generator
yield res # Yielding individual results
async def _ascrape_collecting(self, url: str, parallel_processing: bool) -> ScraperResult:
extracted_data = {}
result_generator = self.strategy.ascrape(url, self.crawler, parallel_processing)
async for res in result_generator: # Consume the async generator
extracted_data[res.url] = res
# Return a final ScraperResult
return ScraperResult(
url=url,
crawled_urls=list(extracted_data.keys()),
extracted_data=extracted_data
)

View File

@@ -0,0 +1,139 @@
from .scraper_strategy import ScraperStrategy
from .filters import FilterChain
from .scorers import URLScorer
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
import asyncio
import validators
from urllib.parse import urljoin,urlparse,urlunparse
from urllib.robotparser import RobotFileParser
import time
from aiolimiter import AsyncLimiter
from tenacity import retry, stop_after_attempt, wait_exponential
from collections import defaultdict
import logging
from typing import Dict, AsyncGenerator
logging.basicConfig(level=logging.DEBUG)
rate_limiter = AsyncLimiter(1, 1) # 1 request per second
class BFSScraperStrategy(ScraperStrategy):
def __init__(self, max_depth: int, filter_chain: FilterChain, url_scorer: URLScorer, max_concurrent: int = 5, min_crawl_delay: int=1):
self.max_depth = max_depth
self.filter_chain = filter_chain
self.url_scorer = url_scorer
self.max_concurrent = max_concurrent
# For Crawl Politeness
self.last_crawl_time = defaultdict(float)
self.min_crawl_delay = min_crawl_delay # 1 second delay between requests to the same domain
# For Robots.txt Compliance
self.robot_parsers = {}
# Robots.txt Parser
def get_robot_parser(self, url: str) -> RobotFileParser:
domain = urlparse(url)
scheme = domain.scheme if domain.scheme else 'http' # Default to 'http' if no scheme provided
netloc = domain.netloc
if netloc not in self.robot_parsers:
rp = RobotFileParser()
rp.set_url(f"{scheme}://{netloc}/robots.txt")
try:
rp.read()
except Exception as e:
# Log the type of error, message, and the URL
logging.warning(f"Error {type(e).__name__} occurred while fetching robots.txt for {netloc}: {e}")
return None
self.robot_parsers[netloc] = rp
return self.robot_parsers[netloc]
# Retry with exponential backoff
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def retry_crawl(self, crawler: AsyncWebCrawler, url: str) -> CrawlResult:
return await crawler.arun(url)
async def process_url(self, url: str, depth: int, crawler: AsyncWebCrawler, queue: asyncio.PriorityQueue, visited: set, depths: Dict[str, int]) -> AsyncGenerator[CrawlResult, None]:
def normalize_url(url: str) -> str:
parsed = urlparse(url)
return urlunparse(parsed._replace(fragment=""))
# URL Validation
if not validators.url(url):
logging.warning(f"Invalid URL: {url}")
return None
# Robots.txt Compliance
robot_parser = self.get_robot_parser(url)
if robot_parser is None:
logging.info(f"Could not retrieve robots.txt for {url}, hence proceeding with crawl.")
else:
# If robots.txt was fetched, check if crawling is allowed
if not robot_parser.can_fetch(crawler.crawler_strategy.user_agent, url):
logging.info(f"Skipping {url} as per robots.txt")
return None
# Crawl Politeness
domain = urlparse(url).netloc
time_since_last_crawl = time.time() - self.last_crawl_time[domain]
if time_since_last_crawl < self.min_crawl_delay:
await asyncio.sleep(self.min_crawl_delay - time_since_last_crawl)
self.last_crawl_time[domain] = time.time()
# Rate Limiting
async with rate_limiter:
# Error Handling
try:
crawl_result = await self.retry_crawl(crawler, url)
except Exception as e:
logging.error(f"Error crawling {url}: {str(e)}")
crawl_result = CrawlResult(url=url, html="", success=False, status_code=0, error_message=str(e))
if not crawl_result.success:
# Logging and Monitoring
logging.error(f"Failed to crawl URL: {url}. Error: {crawl_result.error_message}")
return crawl_result
# Process links
for link_type in ["internal", "external"]:
for link in crawl_result.links[link_type]:
absolute_link = urljoin(url, link['href'])
normalized_link = normalize_url(absolute_link)
if self.filter_chain.apply(normalized_link) and normalized_link not in visited:
new_depth = depths[url] + 1
if new_depth <= self.max_depth:
# URL Scoring
score = self.url_scorer.score(normalized_link)
await queue.put((score, new_depth, normalized_link))
depths[normalized_link] = new_depth
return crawl_result
async def ascrape(self, start_url: str, crawler: AsyncWebCrawler, parallel_processing:bool = True) -> AsyncGenerator[CrawlResult,None]:
queue = asyncio.PriorityQueue()
queue.put_nowait((0, 0, start_url))
visited = set()
depths = {start_url: 0}
pending_tasks = set()
while not queue.empty() or pending_tasks:
while not queue.empty() and len(pending_tasks) < self.max_concurrent:
_, depth, url = await queue.get()
if url not in visited:
# Adding URL to the visited set here itself, (instead of after result generation)
# so that other tasks are not queued for same URL, found at different depth before
# crawling and extraction of this task is completed.
visited.add(url)
if parallel_processing:
task = asyncio.create_task(self.process_url(url, depth, crawler, queue, visited, depths))
pending_tasks.add(task)
else:
result = await self.process_url(url, depth, crawler, queue, visited, depths)
if result:
yield result
# Wait for the first task to complete and yield results incrementally as each task is completed
if pending_tasks:
done, pending_tasks = await asyncio.wait(pending_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
result = await task
if result:
yield result

View File

@@ -0,0 +1,3 @@
from .url_filter import URLFilter, FilterChain
from .content_type_filter import ContentTypeFilter
from .url_pattern_filter import URLPatternFilter

View File

@@ -0,0 +1,8 @@
from .url_filter import URLFilter
class ContentTypeFilter(URLFilter):
def __init__(self, contentType: str):
self.contentType = contentType
def apply(self, url: str) -> bool:
#TODO: This is a stub. Will implement this later
return True

View File

@@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
class URLFilter(ABC):
@abstractmethod
def apply(self, url: str) -> bool:
pass
class FilterChain:
def __init__(self):
self.filters = []
def add_filter(self, filter: URLFilter):
self.filters.append(filter)
def apply(self, url: str) -> bool:
return all(filter.apply(url) for filter in self.filters)

View File

@@ -0,0 +1,9 @@
from .url_filter import URLFilter
from re import Pattern
class URLPatternFilter(URLFilter):
def __init__(self, pattern: Pattern):
self.pattern = pattern
def apply(self, url: str) -> bool:
#TODO: This is a stub. Will implement this later.
return True

View File

@@ -0,0 +1,8 @@
from pydantic import BaseModel
from typing import List, Dict
from ..models import CrawlResult
class ScraperResult(BaseModel):
url: str
crawled_urls: List[str]
extracted_data: Dict[str,CrawlResult]

View File

@@ -0,0 +1,2 @@
from .url_scorer import URLScorer
from .keyword_relevance_scorer import KeywordRelevanceScorer

View File

@@ -0,0 +1,9 @@
from .url_scorer import URLScorer
from typing import List
class KeywordRelevanceScorer(URLScorer):
def __init__(self,keywords: List[str]):
self.keyworkds = keywords
def score(self, url: str) -> float:
#TODO: This is a stub. Will implement this later.
return 1

View File

@@ -0,0 +1,6 @@
from abc import ABC, abstractmethod
class URLScorer(ABC):
@abstractmethod
def score(self, url: str) -> float:
pass

View File

@@ -0,0 +1,26 @@
from abc import ABC, abstractmethod
from .models import ScraperResult, CrawlResult
from ..models import CrawlResult
from ..async_webcrawler import AsyncWebCrawler
from typing import Union, AsyncGenerator
class ScraperStrategy(ABC):
@abstractmethod
async def ascrape(self, url: str, crawler: AsyncWebCrawler, parallel_processing: bool = True, stream: bool = False) -> Union[AsyncGenerator[CrawlResult, None], ScraperResult]:
"""Scrape the given URL using the specified crawler.
Args:
url (str): The starting URL for the scrape.
crawler (AsyncWebCrawler): The web crawler instance.
parallel_processing (bool): Whether to use parallel processing. Defaults to True.
stream (bool): If True, yields individual crawl results as they are ready;
if False, accumulates results and returns a final ScraperResult.
Yields:
CrawlResult: Individual crawl results if stream is True.
Returns:
ScraperResult: A summary of the scrape results containing the final extracted data
and the list of crawled URLs if stream is False.
"""
pass

View File

@@ -131,7 +131,7 @@ def split_and_parse_json_objects(json_string):
return parsed_objects, unparsed_segments
def sanitize_html(html):
# Replace all weird and special characters with an empty string
# Replace all unwanted and special characters with an empty string
sanitized_html = html
# sanitized_html = re.sub(r'[^\w\s.,;:!?=\[\]{}()<>\/\\\-"]', '', html)
@@ -301,7 +301,7 @@ def get_content_of_website(url, html, word_count_threshold = MIN_WORD_THRESHOLD,
if tag.name != 'img':
tag.attrs = {}
# Extract all img tgas inti [{src: '', alt: ''}]
# Extract all img tgas int0 [{src: '', alt: ''}]
media = {
'images': [],
'videos': [],
@@ -339,7 +339,7 @@ def get_content_of_website(url, html, word_count_threshold = MIN_WORD_THRESHOLD,
img.decompose()
# Create a function that replace content of all"pre" tage with its inner text
# Create a function that replace content of all"pre" tag with its inner text
def replace_pre_tags_with_text(node):
for child in node.find_all('pre'):
# set child inner html to its text
@@ -502,7 +502,7 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
current_tag = tag
while current_tag:
current_tag = current_tag.parent
# Get the text content of the parent tag
# Get the text content from the parent tag
if current_tag:
text_content = current_tag.get_text(separator=' ',strip=True)
# Check if the text content has at least word_count_threshold
@@ -511,88 +511,88 @@ def get_content_of_website_optimized(url: str, html: str, word_count_threshold:
return None
def process_image(img, url, index, total_images):
#Check if an image has valid display and inside undesired html elements
def is_valid_image(img, parent, parent_classes):
style = img.get('style', '')
src = img.get('src', '')
classes_to_check = ['button', 'icon', 'logo']
tags_to_check = ['button', 'input']
return all([
'display:none' not in style,
src,
not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check),
parent.name not in tags_to_check
])
#Check if an image has valid display and inside undesired html elements
def is_valid_image(img, parent, parent_classes):
style = img.get('style', '')
src = img.get('src', '')
classes_to_check = ['button', 'icon', 'logo']
tags_to_check = ['button', 'input']
return all([
'display:none' not in style,
src,
not any(s in var for var in [src, img.get('alt', ''), *parent_classes] for s in classes_to_check),
parent.name not in tags_to_check
])
#Score an image for it's usefulness
def score_image_for_usefulness(img, base_url, index, images_count):
# Function to parse image height/width value and units
def parse_dimension(dimension):
if dimension:
match = re.match(r"(\d+)(\D*)", dimension)
if match:
number = int(match.group(1))
unit = match.group(2) or 'px' # Default unit is 'px' if not specified
return number, unit
return None, None
#Score an image for it's usefulness
def score_image_for_usefulness(img, base_url, index, images_count):
# Function to parse image height/width value and units
def parse_dimension(dimension):
if dimension:
match = re.match(r"(\d+)(\D*)", dimension)
if match:
number = int(match.group(1))
unit = match.group(2) or 'px' # Default unit is 'px' if not specified
return number, unit
return None, None
# Fetch image file metadata to extract size and extension
def fetch_image_file_size(img, base_url):
#If src is relative path construct full URL, if not it may be CDN URL
img_url = urljoin(base_url,img.get('src'))
try:
response = requests.head(img_url)
if response.status_code == 200:
return response.headers.get('Content-Length',None)
else:
print(f"Failed to retrieve file size for {img_url}")
return None
except InvalidSchema as e:
# Fetch image file metadata to extract size and extension
def fetch_image_file_size(img, base_url):
#If src is relative path construct full URL, if not it may be CDN URL
img_url = urljoin(base_url,img.get('src'))
try:
response = requests.head(img_url)
if response.status_code == 200:
return response.headers.get('Content-Length',None)
else:
print(f"Failed to retrieve file size for {img_url}")
return None
finally:
return
except InvalidSchema as e:
return None
finally:
return
image_height = img.get('height')
height_value, height_unit = parse_dimension(image_height)
image_width = img.get('width')
width_value, width_unit = parse_dimension(image_width)
image_size = 0 #int(fetch_image_file_size(img,base_url) or 0)
image_format = os.path.splitext(img.get('src',''))[1].lower()
# Remove . from format
image_format = image_format.strip('.')
score = 0
if height_value:
if height_unit == 'px' and height_value > 150:
score += 1
if height_unit in ['%','vh','vmin','vmax'] and height_value >30:
score += 1
if width_value:
if width_unit == 'px' and width_value > 150:
score += 1
if width_unit in ['%','vh','vmin','vmax'] and width_value >30:
score += 1
if image_size > 10000:
image_height = img.get('height')
height_value, height_unit = parse_dimension(image_height)
image_width = img.get('width')
width_value, width_unit = parse_dimension(image_width)
image_size = 0 #int(fetch_image_file_size(img,base_url) or 0)
image_format = os.path.splitext(img.get('src',''))[1].lower()
# Remove . from format
image_format = image_format.strip('.')
score = 0
if height_value:
if height_unit == 'px' and height_value > 150:
score += 1
if img.get('alt') != '':
score+=1
if any(image_format==format for format in ['jpg','png','webp']):
score+=1
if index/images_count<0.5:
score+=1
return score
if height_unit in ['%','vh','vmin','vmax'] and height_value >30:
score += 1
if width_value:
if width_unit == 'px' and width_value > 150:
score += 1
if width_unit in ['%','vh','vmin','vmax'] and width_value >30:
score += 1
if image_size > 10000:
score += 1
if img.get('alt') != '':
score+=1
if any(image_format==format for format in ['jpg','png','webp']):
score+=1
if index/images_count<0.5:
score+=1
return score
if not is_valid_image(img, img.parent, img.parent.get('class', [])):
return None
score = score_image_for_usefulness(img, url, index, total_images)
if score <= IMAGE_SCORE_THRESHOLD:
return None
return {
'src': img.get('src', ''),
'alt': img.get('alt', ''),
'desc': find_closest_parent_with_useful_text(img),
'score': score,
'type': 'image'
}
if not is_valid_image(img, img.parent, img.parent.get('class', [])):
return None
score = score_image_for_usefulness(img, url, index, total_images)
if score <= IMAGE_SCORE_THRESHOLD:
return None
return {
'src': img.get('src', '').replace('\\"', '"').strip(),
'alt': img.get('alt', ''),
'desc': find_closest_parent_with_useful_text(img),
'score': score,
'type': 'image'
}
def process_element(element: element.PageElement) -> bool:
try:
@@ -775,7 +775,14 @@ def extract_xml_data(tags, string):
return data
# Function to perform the completion with exponential backoff
def perform_completion_with_backoff(provider, prompt_with_variables, api_token, json_response = False, base_url=None):
def perform_completion_with_backoff(
provider,
prompt_with_variables,
api_token,
json_response = False,
base_url=None,
**kwargs
):
from litellm import completion
from litellm.exceptions import RateLimitError
max_attempts = 3
@@ -784,6 +791,9 @@ def perform_completion_with_backoff(provider, prompt_with_variables, api_token,
extra_args = {}
if json_response:
extra_args["response_format"] = { "type": "json_object" }
if kwargs.get("extra_args"):
extra_args.update(kwargs["extra_args"])
for attempt in range(max_attempts):
try:

View File

@@ -12,6 +12,7 @@ from typing import List
from concurrent.futures import ThreadPoolExecutor
from .config import *
import warnings
import json
warnings.filterwarnings("ignore", message='Field "model_name" has conflict with protected namespace "model_".')

View File

@@ -10,6 +10,7 @@ import time
import json
import os
import re
from typing import Dict
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from crawl4ai import AsyncWebCrawler
@@ -18,6 +19,8 @@ from crawl4ai.extraction_strategy import (
LLMExtractionStrategy,
)
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
print("Crawl4AI: Advanced Web Crawling and Data Extraction")
print("GitHub Repository: https://github.com/unclecode/crawl4ai")
print("Twitter: @unclecode")
@@ -30,7 +33,7 @@ async def simple_crawl():
result = await crawler.arun(url="https://www.nbcnews.com/business")
print(result.markdown[:500]) # Print first 500 characters
async def js_and_css():
async def simple_example_with_running_js_code():
print("\n--- Executing JavaScript and Using CSS Selectors ---")
# New code to handle the wait_for parameter
wait_for = """() => {
@@ -47,12 +50,21 @@ async def js_and_css():
result = await crawler.arun(
url="https://www.nbcnews.com/business",
js_code=js_code,
# css_selector="article.tease-card",
# wait_for=wait_for,
bypass_cache=True,
)
print(result.markdown[:500]) # Print first 500 characters
async def simple_example_with_css_selector():
print("\n--- Using CSS Selectors ---")
async with AsyncWebCrawler(verbose=True) as crawler:
result = await crawler.arun(
url="https://www.nbcnews.com/business",
css_selector=".wide-tease-item__description",
bypass_cache=True,
)
print(result.markdown[:500]) # Print first 500 characters
async def use_proxy():
print("\n--- Using a Proxy ---")
print(
@@ -66,6 +78,28 @@ async def use_proxy():
# )
# print(result.markdown[:500]) # Print first 500 characters
async def capture_and_save_screenshot(url: str, output_path: str):
async with AsyncWebCrawler(verbose=True) as crawler:
result = await crawler.arun(
url=url,
screenshot=True,
bypass_cache=True
)
if result.success and result.screenshot:
import base64
# Decode the base64 screenshot data
screenshot_data = base64.b64decode(result.screenshot)
# Save the screenshot as a JPEG file
with open(output_path, 'wb') as f:
f.write(screenshot_data)
print(f"Screenshot saved successfully to {output_path}")
else:
print("Failed to capture screenshot")
class OpenAIModelFee(BaseModel):
model_name: str = Field(..., description="Name of the OpenAI model.")
input_fee: str = Field(..., description="Fee for input token for the OpenAI model.")
@@ -73,27 +107,30 @@ class OpenAIModelFee(BaseModel):
..., description="Fee for output token for the OpenAI model."
)
async def extract_structured_data_using_llm():
print("\n--- Extracting Structured Data with OpenAI ---")
print(
"Note: Set your OpenAI API key as an environment variable to run this example."
)
if not os.getenv("OPENAI_API_KEY"):
print("OpenAI API key not found. Skipping this example.")
async def extract_structured_data_using_llm(provider: str, api_token: str = None, extra_headers: Dict[str, str] = None):
print(f"\n--- Extracting Structured Data with {provider} ---")
if api_token is None and provider != "ollama":
print(f"API token is required for {provider}. Skipping this example.")
return
extra_args = {}
if extra_headers:
extra_args["extra_headers"] = extra_headers
async with AsyncWebCrawler(verbose=True) as crawler:
result = await crawler.arun(
url="https://openai.com/api/pricing/",
word_count_threshold=1,
extraction_strategy=LLMExtractionStrategy(
provider="openai/gpt-4o",
api_token=os.getenv("OPENAI_API_KEY"),
provider=provider,
api_token=api_token,
schema=OpenAIModelFee.schema(),
extraction_type="schema",
instruction="""From the crawled content, extract all mentioned model names along with their fees for input and output tokens.
Do not miss any models in the entire content. One extracted model JSON format should look like this:
{"model_name": "GPT-4", "input_fee": "US$10.00 / 1M tokens", "output_fee": "US$30.00 / 1M tokens"}.""",
extra_args=extra_args
),
bypass_cache=True,
)
@@ -320,6 +357,28 @@ async def crawl_dynamic_content_pages_method_3():
await crawler.crawler_strategy.kill_session(session_id)
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
async def crawl_custom_browser_type():
# Use Firefox
start = time.time()
async with AsyncWebCrawler(browser_type="firefox", verbose=True, headless = True) as crawler:
result = await crawler.arun(url="https://www.example.com", bypass_cache=True)
print(result.markdown[:500])
print("Time taken: ", time.time() - start)
# Use WebKit
start = time.time()
async with AsyncWebCrawler(browser_type="webkit", verbose=True, headless = True) as crawler:
result = await crawler.arun(url="https://www.example.com", bypass_cache=True)
print(result.markdown[:500])
print("Time taken: ", time.time() - start)
# Use Chromium (default)
start = time.time()
async with AsyncWebCrawler(verbose=True, headless = True) as crawler:
result = await crawler.arun(url="https://www.example.com", bypass_cache=True)
print(result.markdown[:500])
print("Time taken: ", time.time() - start)
async def speed_comparison():
# print("\n--- Speed Comparison ---")
# print("Firecrawl (simulated):")
@@ -387,13 +446,31 @@ async def speed_comparison():
async def main():
await simple_crawl()
await js_and_css()
await simple_example_with_running_js_code()
await simple_example_with_css_selector()
await use_proxy()
await capture_and_save_screenshot("https://www.example.com", os.path.join(__location__, "tmp/example_screenshot.jpg"))
await extract_structured_data_using_css_extractor()
# LLM extraction examples
await extract_structured_data_using_llm()
await extract_structured_data_using_llm("huggingface/meta-llama/Meta-Llama-3.1-8B-Instruct", os.getenv("HUGGINGFACE_API_KEY"))
await extract_structured_data_using_llm("openai/gpt-4", os.getenv("OPENAI_API_KEY"))
await extract_structured_data_using_llm("ollama/llama3.2")
# You always can pass custom headers to the extraction strategy
custom_headers = {
"Authorization": "Bearer your-custom-token",
"X-Custom-Header": "Some-Value"
}
await extract_structured_data_using_llm(extra_headers=custom_headers)
# await crawl_dynamic_content_pages_method_1()
# await crawl_dynamic_content_pages_method_2()
await crawl_dynamic_content_pages_method_3()
await crawl_custom_browser_type()
await speed_comparison()