Compare commits

..

21 Commits

Author SHA1 Message Date
unclecode
77da48050d chore: Add custom headers to LocalSeleniumCrawlerStrategy 2024-06-17 15:50:03 +08:00
unclecode
9a97aacd85 chore: Add hooks for customizing the LocalSeleniumCrawlerStrategy 2024-06-17 15:37:18 +08:00
unclecode
52daf3936a Fix typo in README 2024-06-17 15:15:37 +08:00
unclecode
42a5da854d Update version and change log. 2024-06-17 14:47:58 +08:00
unclecode
d1d83a6ef7 Fix issue #22: Use MD5 hash for caching HTML files to handle long URLs 2024-06-17 14:44:01 +08:00
unclecode
194050705d chore: Add pillow library to requirements.txt 2024-06-10 23:03:32 +08:00
unclecode
989f8c91c8 Update README 2024-06-08 18:50:35 +08:00
unclecode
edba5fb5e9 Update README 2024-06-08 18:48:21 +08:00
unclecode
faa1defa5c Update README 2024-06-08 18:47:23 +08:00
unclecode
f7e0cee1b0 vital: Right now, only raw html is retrived from datbase, therefore, css selector and other filter will be executed every time. 2024-06-08 18:37:40 +08:00
unclecode
b3a0edaa6d - User agent
- Extract Links
- Extract Metadata
- Update Readme
- Update REST API document
2024-06-08 17:59:42 +08:00
unclecode
9c34b30723 Extract internal and external links. 2024-06-08 16:53:06 +08:00
unclecode
36a5847df5 Add css selector example 2024-06-07 20:47:20 +08:00
unclecode
a19379aa58 Add recipe images, update README, and REST api example 2024-06-07 20:43:50 +08:00
unclecode
768d048e1c Update rest call how to use 2024-06-07 18:10:45 +08:00
unclecode
94c11a0262 Add image 2024-06-07 18:09:21 +08:00
unclecode
649b0bfd02 feat: Remove default checked state for bypass-cache-checkbox
The code changes in this commit remove the default checked state for the bypass-cache-checkbox in the try_it.html file. This allows users to manually select whether they want to bypass the cache or not.

This commit message follows the established convention of starting with a type (feat for feature) and providing a concise and descriptive summary of the changes made.
2024-06-07 16:26:36 +08:00
unclecode
57a00ec677 Update Readme 2024-06-07 16:25:30 +08:00
unclecode
aeb2114170 Add example of REST API call 2024-06-07 16:24:40 +08:00
unclecode
b8d405fddd Update version number in landing page header 2024-06-07 16:19:30 +08:00
unclecode
b32013cb97 Fix README file hyperlink 2024-06-07 15:37:05 +08:00
25 changed files with 815 additions and 168 deletions

View File

@@ -1 +1,5 @@
# Changelog
# Changelog
## [0.2.4] - 2024-06-17
### Fixed
- Fix issue #22: Use MD5 hash for caching HTML files to handle long URLs

View File

@@ -8,13 +8,20 @@
Crawl4AI has one clear task: to simplify crawling and extract useful information from web pages, making it accessible for large language models (LLMs) and AI applications. 🆓🌐
[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1wz8u30rvbq6Scodye9AGCw8Qg_Z8QGsk)
- Use as REST API: Check [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1zODYjhemJ5bUmYceWpVoBMVpd0ofzNBZ?usp=sharing)
- Use as Python library: [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1wz8u30rvbq6Scodye9AGCw8Qg_Z8QGsk)
## Recent Changes
### v0.2.4
- 🐞 Resolve the issue with the long url. (Issue #22)
### v0.2.3
- 🎨 Extract and return all media tags (Images, Audio, and Video). Check `result.media`
- 🖼️ Take [screenshots](#taking-screenshots-) of the page.
- 🔗 Extrat all external and internal links. Check `result.links`
- 📚 Extract metadata from the page. Check `result.metadata`
- 🕵️ Support `user_agent` parameter to set the user agent for the HTTP requests.
- 🖼️ Take [screenshots](#taking-screenshots) of the page.
### v0.2.2
- Support multiple JS scripts
@@ -32,7 +39,27 @@ Crawl4AI has one clear task: to simplify crawling and extract useful information
## Power and Simplicity of Crawl4AI 🚀
To show the simplicity take a look at the first example:
The most easy way! If you don't want to install any library, you can use the REST API on my server. But remember, this is just a simple server. I may improve its capacity if I see there is demand. You can find ll examples of REST API in this colab notebook. [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1zODYjhemJ5bUmYceWpVoBMVpd0ofzNBZ?usp=sharing)
```python
import requests
data = {
"urls": [
"https://www.nbcnews.com/business"
],
"screenshot": True
}
response = requests.post("https://crawl4ai.com/crawl", json=data) # OR local host if your run locally
response_data = response.json()
print(response_data['results'][0].keys())
# dict_keys(['url', 'html', 'success', 'cleaned_html', 'media',
# 'links', 'screenshot', 'markdown', 'extracted_content',
# 'metadata', 'error_message'])
```
But you muore control then take a look at the first example of using the Python library.
```python
from crawl4ai import WebCrawler
@@ -42,24 +69,7 @@ crawler = WebCrawler()
# Run the crawler with keyword filtering and CSS selector
result = crawler.run(url="https://www.nbcnews.com/business")
print(result) # {url, html, markdown, extracted_content, metadata}
```
If you don't want to install Selenium, you can use the REST API or local server.
```python
import requests
data = {
"urls": [
"https://www.nbcnews.com/business"
],
"word_count_threshold": 10,
"extraction_strategy": "NoExtractionStrategy",
}
response = requests.post("https://crawl4ai.com/crawl", json=data) # OR local host if your run locally
print(response.json())
print(result) # {url, html, cleaned_html, markdown, media, links, extracted_content, metadata, screenshots}
```
Now let's try a complex task. Below is an example of how you can execute JavaScript, filter data using keywords, and use a CSS selector to extract specific content—all in one go!
@@ -77,20 +87,17 @@ from crawl4ai.extraction_strategy import *
from crawl4ai.crawler_strategy import *
# Define the JavaScript code to click the "Load More" button
js_code = """
js_code = ["""
const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More'));
loadMoreButton && loadMoreButton.click();
"""
# Define the crawling strategy
crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
# Create the WebCrawler instance with the defined strategy
crawler = WebCrawler(crawler_strategy=crawler_strategy)
"""]
crawler = WebCrawler(verbose=True)
crawler.warmup()
# Run the crawler with keyword filtering and CSS selector
result = crawler.run(
url="https://www.nbcnews.com/business",
js = js_code,
extraction_strategy=CosineStrategy(
semantic_filter="technology",
),
@@ -99,6 +106,7 @@ result = crawler.run(
# Run the crawler with LLM extraction strategy
result = crawler.run(
url="https://www.nbcnews.com/business",
js = js_code,
extraction_strategy=LLMExtractionStrategy(
provider="openai/gpt-4o",
api_token=os.getenv('OPENAI_API_KEY'),
@@ -226,8 +234,12 @@ To use the REST API, send a POST request to `http://localhost:8000/crawl` with t
"url": "https://www.nbcnews.com/business",
"extracted_content": "...",
"html": "...",
"cleaned_html": "...",
"markdown": "...",
"metadata": {...}
"media": {...},
"links": {...},
"metadata": {...},
"screenshots": "...",
}
]
}
@@ -266,6 +278,24 @@ Crawl result without raw HTML content:
result = crawler.run(url="https://www.nbcnews.com/business", include_raw_html=False)
```
### Result Structure
The result object contains the following fields:
```python
class CrawlResult(BaseModel):
url: str
html: str
success: bool
cleaned_html: Optional[str] = None
media: Dict[str, List[Dict]] = {} # Media tags in the page {"images": [], "audio": [], "video": []}
links: Dict[str, List[Dict]] = {} # Links in the page {"external": [], "internal": []}
screenshot: Optional[str] = None # Base64 encoded screenshot
markdown: Optional[str] = None
extracted_content: Optional[str] = None
metadata: Optional[dict] = None
error_message: Optional[str] = None
```
### Taking Screenshots
```python
@@ -385,6 +415,7 @@ result = crawler.run(url="https://www.nbcnews.com/business")
| `extraction_strategy` | The strategy to use for extracting content from the HTML (e.g., "CosineStrategy"). | No | `NoExtractionStrategy` |
| `chunking_strategy` | The strategy to use for chunking the text before processing (e.g., "RegexChunking"). | No | `RegexChunking` |
| `css_selector` | The CSS selector to target specific parts of the HTML for extraction. | No | `None` |
| `user_agent` | The user agent to use for the HTTP requests. | No | `Mozilla/5.0` |
| `verbose` | Whether to enable verbose logging. | No | `true` |
## Chunking Strategies 📚

View File

@@ -10,7 +10,7 @@ import logging
import base64
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
from typing import List
from typing import List, Callable
import requests
import os
from pathlib import Path
@@ -44,6 +44,14 @@ class CrawlerStrategy(ABC):
@abstractmethod
def take_screenshot(self, save_path: str):
pass
@abstractmethod
def update_user_agent(self, user_agent: str):
pass
@abstractmethod
def set_hook(self, hook_type: str, hook: Callable):
pass
class CloudCrawlerStrategy(CrawlerStrategy):
def __init__(self, use_cached_html = False):
@@ -69,6 +77,8 @@ class LocalSeleniumCrawlerStrategy(CrawlerStrategy):
print("[LOG] 🚀 Initializing LocalSeleniumCrawlerStrategy")
self.options = Options()
self.options.headless = True
if kwargs.get("user_agent"):
self.options.add_argument("--user-agent=" + kwargs.get("user_agent"))
self.options.add_argument("--no-sandbox")
self.options.add_argument("--headless")
# self.options.add_argument("--disable-dev-shm-usage")
@@ -90,6 +100,14 @@ class LocalSeleniumCrawlerStrategy(CrawlerStrategy):
self.use_cached_html = use_cached_html
self.js_code = js_code
self.verbose = kwargs.get("verbose", False)
# Hooks
self.hooks = {
'on_driver_created': None,
'before_get_url': None,
'after_get_url': None,
'before_return_html': None
}
# chromedriver_autoinstaller.install()
import chromedriver_autoinstaller
@@ -97,20 +115,57 @@ class LocalSeleniumCrawlerStrategy(CrawlerStrategy):
self.service.log_path = "NUL"
self.driver = webdriver.Chrome(service=self.service, options=self.options)
def set_hook(self, hook_type: str, hook: Callable):
if hook_type in self.hooks:
self.hooks[hook_type] = hook
else:
raise ValueError(f"Invalid hook type: {hook_type}")
def execute_hook(self, hook_type: str, *args):
hook = self.hooks.get(hook_type)
if hook:
result = hook(*args)
if result is not None:
if isinstance(result, webdriver.Chrome):
return result
else:
raise TypeError(f"Hook {hook_type} must return an instance of webdriver.Chrome or None.")
# If the hook returns None or there is no hook, return self.driver
return self.driver
def update_user_agent(self, user_agent: str):
self.options.add_argument(f"user-agent={user_agent}")
self.driver.quit()
self.driver = webdriver.Chrome(service=self.service, options=self.options)
self.driver = self.execute_hook('on_driver_created', self.driver)
def set_custom_headers(self, headers: dict):
# Enable Network domain for sending headers
self.driver.execute_cdp_cmd('Network.enable', {})
# Set extra HTTP headers
self.driver.execute_cdp_cmd('Network.setExtraHTTPHeaders', {'headers': headers})
def crawl(self, url: str) -> str:
# Create md5 hash of the URL
import hashlib
url_hash = hashlib.md5(url.encode()).hexdigest()
if self.use_cached_html:
cache_file_path = os.path.join(Path.home(), ".crawl4ai", "cache", url.replace("/", "_"))
cache_file_path = os.path.join(Path.home(), ".crawl4ai", "cache", url_hash)
if os.path.exists(cache_file_path):
with open(cache_file_path, "r") as f:
return f.read()
try:
self.driver = self.execute_hook('before_get_url', self.driver)
if self.verbose:
print(f"[LOG] 🕸️ Crawling {url} using LocalSeleniumCrawlerStrategy...")
self.driver.get(url)
WebDriverWait(self.driver, 10).until(
EC.presence_of_all_elements_located((By.TAG_NAME, "html"))
)
self.driver = self.execute_hook('after_get_url', self.driver)
# Execute JS code if provided
if self.js_code and type(self.js_code) == str:
@@ -127,9 +182,10 @@ class LocalSeleniumCrawlerStrategy(CrawlerStrategy):
)
html = self.driver.page_source
self.driver = self.execute_hook('before_return_html', self.driver, html)
# Store in cache
cache_file_path = os.path.join(Path.home(), ".crawl4ai", "cache", url.replace("/", "_"))
cache_file_path = os.path.join(Path.home(), ".crawl4ai", "cache", url_hash)
with open(cache_file_path, "w") as f:
f.write(html)

View File

@@ -20,6 +20,8 @@ def init_db():
extracted_content TEXT,
success BOOLEAN,
media TEXT DEFAULT "{}",
link TEXT DEFAULT "{}",
metadata TEXT DEFAULT "{}",
screenshot TEXT DEFAULT ""
)
''')
@@ -41,12 +43,12 @@ def check_db_path():
if not DB_PATH:
raise ValueError("Database path is not set or is empty.")
def get_cached_url(url: str) -> Optional[Tuple[str, str, str, str, str, bool, str]]:
def get_cached_url(url: str) -> Optional[Tuple[str, str, str, str, str, str, str, bool, str]]:
check_db_path()
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT url, html, cleaned_html, markdown, extracted_content, success, media, screenshot FROM crawled_data WHERE url = ?', (url,))
cursor.execute('SELECT url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot FROM crawled_data WHERE url = ?', (url,))
result = cursor.fetchone()
conn.close()
return result
@@ -54,23 +56,25 @@ def get_cached_url(url: str) -> Optional[Tuple[str, str, str, str, str, bool, st
print(f"Error retrieving cached URL: {e}")
return None
def cache_url(url: str, html: str, cleaned_html: str, markdown: str, extracted_content: str, success: bool, media : str = "{}", screenshot: str = ""):
def cache_url(url: str, html: str, cleaned_html: str, markdown: str, extracted_content: str, success: bool, media : str = "{}", links : str = "{}", metadata : str = "{}", screenshot: str = ""):
check_db_path()
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO crawled_data (url, html, cleaned_html, markdown, extracted_content, success, screenshot)
VALUES (?, ?, ?, ?, ?, ?, ?)
INSERT INTO crawled_data (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(url) DO UPDATE SET
html = excluded.html,
cleaned_html = excluded.cleaned_html,
markdown = excluded.markdown,
extracted_content = excluded.extracted_content,
success = excluded.success,
media = excluded.media,
media = excluded.media,
links = excluded.links,
metadata = excluded.metadata,
screenshot = excluded.screenshot
''', (url, html, cleaned_html, markdown, extracted_content, success, media, screenshot))
''', (url, html, cleaned_html, markdown, extracted_content, success, media, links, metadata, screenshot))
conn.commit()
conn.close()
except Exception as e:
@@ -124,5 +128,5 @@ def update_existing_records(new_column: str = "media", default_value: str = "{}"
if __name__ == "__main__":
init_db() # Initialize the database if not already initialized
alter_db_add_screenshot() # Add the new column to the table
update_existing_records() # Update existing records to set the new column to an empty string
alter_db_add_screenshot("metadata") # Add the new column to the table
update_existing_records("metadata") # Update existing records to set the new column to an empty string

View File

@@ -11,6 +11,7 @@ class CrawlResult(BaseModel):
success: bool
cleaned_html: Optional[str] = None
media: Dict[str, List[Dict]] = {}
links: Dict[str, List[Dict]] = {}
screenshot: Optional[str] = None
markdown: Optional[str] = None
extracted_content: Optional[str] = None

View File

@@ -151,7 +151,7 @@ class CustomHTML2Text(HTML2Text):
super().handle_tag(tag, attrs, start)
def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD, css_selector = None):
def get_content_of_website(url, html, word_count_threshold = MIN_WORD_THRESHOLD, css_selector = None):
try:
if not html:
return None
@@ -170,6 +170,28 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD, css_
for el in selected_elements:
div_tag.append(el)
body = div_tag
links = {
'internal': [],
'external': []
}
# Extract all internal and external links
for a in body.find_all('a', href=True):
href = a['href']
url_base = url.split('/')[2]
if href.startswith('http') and url_base not in href:
links['external'].append({
'href': href,
'text': a.get_text()
})
else:
links['internal'].append(
{
'href': href,
'text': a.get_text()
}
)
# Remove script, style, and other tags that don't carry useful content from body
for tag in body.find_all(['script', 'style', 'link', 'meta', 'noscript']):
@@ -329,13 +351,55 @@ def get_content_of_website(html, word_count_threshold = MIN_WORD_THRESHOLD, css_
'markdown': markdown,
'cleaned_html': cleaned_html,
'success': True,
'media': media
'media': media,
'links': links
}
except Exception as e:
print('Error processing HTML content:', str(e))
raise InvalidCSSSelectorError(f"Invalid CSS selector: {css_selector}") from e
def extract_metadata(html):
metadata = {}
if not html:
return metadata
# Parse HTML content with BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Title
title_tag = soup.find('title')
metadata['title'] = title_tag.string if title_tag else None
# Meta description
description_tag = soup.find('meta', attrs={'name': 'description'})
metadata['description'] = description_tag['content'] if description_tag else None
# Meta keywords
keywords_tag = soup.find('meta', attrs={'name': 'keywords'})
metadata['keywords'] = keywords_tag['content'] if keywords_tag else None
# Meta author
author_tag = soup.find('meta', attrs={'name': 'author'})
metadata['author'] = author_tag['content'] if author_tag else None
# Open Graph metadata
og_tags = soup.find_all('meta', attrs={'property': lambda value: value and value.startswith('og:')})
for tag in og_tags:
property_name = tag['property']
metadata[property_name] = tag['content']
# Twitter Card metadata
twitter_tags = soup.find_all('meta', attrs={'name': lambda value: value and value.startswith('twitter:')})
for tag in twitter_tags:
property_name = tag['name']
metadata[property_name] = tag['content']
return metadata
def extract_xml_tags(string):
tags = re.findall(r'<(\w+)>', string)
return list(set(tags))

View File

@@ -0,0 +1,357 @@
import os, time
os.environ["TOKENIZERS_PARALLELISM"] = "false"
from pathlib import Path
from .models import UrlModel, CrawlResult
from .database import init_db, get_cached_url, cache_url, DB_PATH, flush_db
from .utils import *
from .chunking_strategy import *
from .extraction_strategy import *
from .crawler_strategy import *
from typing import List
from concurrent.futures import ThreadPoolExecutor
from .config import *
class WebCrawler:
def __init__(
self,
# db_path: str = None,
crawler_strategy: CrawlerStrategy = None,
always_by_pass_cache: bool = False,
verbose: bool = False,
):
# self.db_path = db_path
self.crawler_strategy = crawler_strategy or LocalSeleniumCrawlerStrategy(verbose=verbose)
self.always_by_pass_cache = always_by_pass_cache
# Create the .crawl4ai folder in the user's home directory if it doesn't exist
self.crawl4ai_folder = os.path.join(Path.home(), ".crawl4ai")
os.makedirs(self.crawl4ai_folder, exist_ok=True)
os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True)
# If db_path is not provided, use the default path
# if not db_path:
# self.db_path = f"{self.crawl4ai_folder}/crawl4ai.db"
# flush_db()
init_db()
self.ready = False
def warmup(self):
print("[LOG] 🌤️ Warming up the WebCrawler")
result = self.run(
url='https://crawl4ai.uccode.io/',
word_count_threshold=5,
extraction_strategy= NoExtractionStrategy(),
bypass_cache=False,
verbose = False
)
self.ready = True
print("[LOG] 🌞 WebCrawler is ready to crawl")
def fetch_page(
self,
url_model: UrlModel,
provider: str = DEFAULT_PROVIDER,
api_token: str = None,
extract_blocks_flag: bool = True,
word_count_threshold=MIN_WORD_THRESHOLD,
css_selector: str = None,
screenshot: bool = False,
use_cached_html: bool = False,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
**kwargs,
) -> CrawlResult:
return self.run(
url_model.url,
word_count_threshold,
extraction_strategy or NoExtractionStrategy(),
chunking_strategy,
bypass_cache=url_model.forced,
css_selector=css_selector,
screenshot=screenshot,
**kwargs,
)
pass
def run_old(
self,
url: str,
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
user_agent: str = None,
verbose=True,
**kwargs,
) -> CrawlResult:
if user_agent:
self.crawler_strategy.update_user_agent(user_agent)
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
# Check if extraction strategy is an instance of ExtractionStrategy if not raise an error
if not isinstance(extraction_strategy, ExtractionStrategy):
raise ValueError("Unsupported extraction strategy")
if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy")
# make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD
if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD
# Check cache first
if not bypass_cache and not self.always_by_pass_cache:
cached = get_cached_url(url)
if cached:
return CrawlResult(
**{
"url": cached[0],
"html": cached[1],
"cleaned_html": cached[2],
"markdown": cached[3],
"extracted_content": cached[4],
"success": cached[5],
"media": json.loads(cached[6] or "{}"),
"links": json.loads(cached[7] or "{}"),
"metadata": json.loads(cached[8] or "{}"), # "metadata": "{}
"screenshot": cached[9],
"error_message": "",
}
)
# Initialize WebDriver for crawling
t = time.time()
if kwargs.get("js", None):
self.crawler_strategy.js_code = kwargs.get("js")
html = self.crawler_strategy.crawl(url)
base64_image = None
if screenshot:
base64_image = self.crawler_strategy.take_screenshot()
success = True
error_message = ""
# Extract content from HTML
try:
result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector)
metadata = extract_metadata(html)
if result is None:
raise ValueError(f"Failed to extract content from the website: {url}")
except InvalidCSSSelectorError as e:
raise ValueError(str(e))
cleaned_html = result.get("cleaned_html", "")
markdown = result.get("markdown", "")
media = result.get("media", [])
links = result.get("links", [])
# Print a profession LOG style message, show time taken and say crawling is done
if verbose:
print(
f"[LOG] 🚀 Crawling done for {url}, success: {success}, time taken: {time.time() - t} seconds"
)
extracted_content = []
if verbose:
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
t = time.time()
# Split markdown into sections
sections = chunking_strategy.chunk(markdown)
# sections = merge_chunks_based_on_token_threshold(sections, CHUNK_TOKEN_THRESHOLD)
extracted_content = extraction_strategy.run(
url, sections,
)
extracted_content = json.dumps(extracted_content)
if verbose:
print(
f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds."
)
# Cache the result
cleaned_html = beautify_html(cleaned_html)
cache_url(
url,
html,
cleaned_html,
markdown,
extracted_content,
success,
json.dumps(media),
json.dumps(links),
json.dumps(metadata),
screenshot=base64_image,
)
return CrawlResult(
url=url,
html=html,
cleaned_html=cleaned_html,
markdown=markdown,
media=media,
links=links,
metadata=metadata,
screenshot=base64_image,
extracted_content=extracted_content,
success=success,
error_message=error_message,
)
def fetch_pages(
self,
url_models: List[UrlModel],
provider: str = DEFAULT_PROVIDER,
api_token: str = None,
extract_blocks_flag: bool = True,
word_count_threshold=MIN_WORD_THRESHOLD,
use_cached_html: bool = False,
css_selector: str = None,
screenshot: bool = False,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
**kwargs,
) -> List[CrawlResult]:
extraction_strategy = extraction_strategy or NoExtractionStrategy()
def fetch_page_wrapper(url_model, *args, **kwargs):
return self.fetch_page(url_model, *args, **kwargs)
with ThreadPoolExecutor() as executor:
results = list(
executor.map(
fetch_page_wrapper,
url_models,
[provider] * len(url_models),
[api_token] * len(url_models),
[extract_blocks_flag] * len(url_models),
[word_count_threshold] * len(url_models),
[css_selector] * len(url_models),
[screenshot] * len(url_models),
[use_cached_html] * len(url_models),
[extraction_strategy] * len(url_models),
[chunking_strategy] * len(url_models),
*[kwargs] * len(url_models),
)
)
return results
def run(
self,
url: str,
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
user_agent: str = None,
verbose=True,
**kwargs,
) -> CrawlResult:
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
if not isinstance(extraction_strategy, ExtractionStrategy):
raise ValueError("Unsupported extraction strategy")
if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy")
if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD
# Check cache first
cached = None
extracted_content = None
if not bypass_cache and not self.always_by_pass_cache:
cached = get_cached_url(url)
if cached:
html = cached[1]
extracted_content = cached[2]
if screenshot:
screenshot = cached[9]
else:
if user_agent:
self.crawler_strategy.update_user_agent(user_agent)
html = self.crawler_strategy.crawl(url)
if screenshot:
screenshot = self.crawler_strategy.take_screenshot()
return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, bool(cached), **kwargs)
def process_html(
self,
url: str,
html: str,
extracted_content: str,
word_count_threshold: int,
extraction_strategy: ExtractionStrategy,
chunking_strategy: ChunkingStrategy,
css_selector: str,
screenshot: bool,
verbose: bool,
is_cached: bool,
**kwargs,
) -> CrawlResult:
t = time.time()
# Extract content from HTML
try:
result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector)
metadata = extract_metadata(html)
if result is None:
raise ValueError(f"Failed to extract content from the website: {url}")
except InvalidCSSSelectorError as e:
raise ValueError(str(e))
cleaned_html = result.get("cleaned_html", "")
markdown = result.get("markdown", "")
media = result.get("media", [])
links = result.get("links", [])
if verbose:
print(f"[LOG] 🚀 Crawling done for {url}, success: True, time taken: {time.time() - t} seconds")
if extracted_content is None:
if verbose:
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
sections = chunking_strategy.chunk(markdown)
extracted_content = extraction_strategy.run(url, sections)
extracted_content = json.dumps(extracted_content)
if verbose:
print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.")
screenshot = None if not screenshot else screenshot
if not is_cached:
cache_url(
url,
html,
cleaned_html,
markdown,
extracted_content,
True,
json.dumps(media),
json.dumps(links),
json.dumps(metadata),
screenshot=screenshot,
)
return CrawlResult(
url=url,
html=html,
cleaned_html=cleaned_html,
markdown=markdown,
media=media,
links=links,
metadata=metadata,
screenshot=screenshot,
extracted_content=extracted_content,
success=True,
error_message="",
)

View File

@@ -51,7 +51,6 @@ class WebCrawler:
self.ready = True
print("[LOG] 🌞 WebCrawler is ready to crawl")
def fetch_page(
self,
url_model: UrlModel,
@@ -78,118 +77,6 @@ class WebCrawler:
)
pass
def run(
self,
url: str,
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
verbose=True,
**kwargs,
) -> CrawlResult:
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
# Check if extraction strategy is an instance of ExtractionStrategy if not raise an error
if not isinstance(extraction_strategy, ExtractionStrategy):
raise ValueError("Unsupported extraction strategy")
if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy")
# make sure word_count_threshold is not lesser than MIN_WORD_THRESHOLD
if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD
# Check cache first
if not bypass_cache and not self.always_by_pass_cache:
cached = get_cached_url(url)
if cached:
return CrawlResult(
**{
"url": cached[0],
"html": cached[1],
"cleaned_html": cached[2],
"markdown": cached[3],
"extracted_content": cached[4],
"success": cached[5],
"media": json.loads(cached[6] or "{}"),
"screenshot": cached[7],
"error_message": "",
}
)
# Initialize WebDriver for crawling
t = time.time()
html = self.crawler_strategy.crawl(url)
base64_image = None
if screenshot:
base64_image = self.crawler_strategy.take_screenshot()
success = True
error_message = ""
# Extract content from HTML
try:
result = get_content_of_website(html, word_count_threshold, css_selector=css_selector)
if result is None:
raise ValueError(f"Failed to extract content from the website: {url}")
except InvalidCSSSelectorError as e:
raise ValueError(str(e))
cleaned_html = result.get("cleaned_html", html)
markdown = result.get("markdown", "")
media = result.get("media", [])
# Print a profession LOG style message, show time taken and say crawling is done
if verbose:
print(
f"[LOG] 🚀 Crawling done for {url}, success: {success}, time taken: {time.time() - t} seconds"
)
extracted_content = []
if verbose:
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
t = time.time()
# Split markdown into sections
sections = chunking_strategy.chunk(markdown)
# sections = merge_chunks_based_on_token_threshold(sections, CHUNK_TOKEN_THRESHOLD)
extracted_content = extraction_strategy.run(
url, sections,
)
extracted_content = json.dumps(extracted_content)
if verbose:
print(
f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds."
)
# Cache the result
cleaned_html = beautify_html(cleaned_html)
cache_url(
url,
html,
cleaned_html,
markdown,
extracted_content,
success,
json.dumps(media),
screenshot=base64_image,
)
return CrawlResult(
url=url,
html=html,
cleaned_html=cleaned_html,
markdown=markdown,
media=media,
screenshot=base64_image,
extracted_content=extracted_content,
success=success,
error_message=error_message,
)
def fetch_pages(
self,
url_models: List[UrlModel],
@@ -227,3 +114,120 @@ class WebCrawler:
)
return results
def run(
self,
url: str,
word_count_threshold=MIN_WORD_THRESHOLD,
extraction_strategy: ExtractionStrategy = None,
chunking_strategy: ChunkingStrategy = RegexChunking(),
bypass_cache: bool = False,
css_selector: str = None,
screenshot: bool = False,
user_agent: str = None,
verbose=True,
**kwargs,
) -> CrawlResult:
extraction_strategy = extraction_strategy or NoExtractionStrategy()
extraction_strategy.verbose = verbose
if not isinstance(extraction_strategy, ExtractionStrategy):
raise ValueError("Unsupported extraction strategy")
if not isinstance(chunking_strategy, ChunkingStrategy):
raise ValueError("Unsupported chunking strategy")
if word_count_threshold < MIN_WORD_THRESHOLD:
word_count_threshold = MIN_WORD_THRESHOLD
# Check cache first
cached = None
extracted_content = None
if not bypass_cache and not self.always_by_pass_cache:
cached = get_cached_url(url)
if cached:
html = cached[1]
extracted_content = cached[2]
if screenshot:
screenshot = cached[9]
else:
if user_agent:
self.crawler_strategy.update_user_agent(user_agent)
html = self.crawler_strategy.crawl(url)
if screenshot:
screenshot = self.crawler_strategy.take_screenshot()
return self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot, verbose, bool(cached), **kwargs)
def process_html(
self,
url: str,
html: str,
extracted_content: str,
word_count_threshold: int,
extraction_strategy: ExtractionStrategy,
chunking_strategy: ChunkingStrategy,
css_selector: str,
screenshot: bool,
verbose: bool,
is_cached: bool,
**kwargs,
) -> CrawlResult:
t = time.time()
# Extract content from HTML
try:
result = get_content_of_website(url, html, word_count_threshold, css_selector=css_selector)
metadata = extract_metadata(html)
if result is None:
raise ValueError(f"Failed to extract content from the website: {url}")
except InvalidCSSSelectorError as e:
raise ValueError(str(e))
cleaned_html = result.get("cleaned_html", "")
markdown = result.get("markdown", "")
media = result.get("media", [])
links = result.get("links", [])
if verbose:
print(f"[LOG] 🚀 Crawling done for {url}, success: True, time taken: {time.time() - t} seconds")
if extracted_content is None:
if verbose:
print(f"[LOG] 🔥 Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}")
sections = chunking_strategy.chunk(markdown)
extracted_content = extraction_strategy.run(url, sections)
extracted_content = json.dumps(extracted_content)
if verbose:
print(f"[LOG] 🚀 Extraction done for {url}, time taken: {time.time() - t} seconds.")
screenshot = None if not screenshot else screenshot
if not is_cached:
cache_url(
url,
html,
cleaned_html,
markdown,
extracted_content,
True,
json.dumps(media),
json.dumps(links),
json.dumps(metadata),
screenshot=screenshot,
)
return CrawlResult(
url=url,
html=html,
cleaned_html=cleaned_html,
markdown=markdown,
media=media,
links=links,
metadata=metadata,
screenshot=screenshot,
extracted_content=extracted_content,
success=True,
error_message="",
)

BIN
docs/.DS_Store vendored Normal file

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 372 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 403 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 537 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 375 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 469 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 477 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 419 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 485 KiB

View File

@@ -166,10 +166,11 @@ def interactive_extraction(crawler):
const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More'));
loadMoreButton && loadMoreButton.click();
"""
crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
crawler = WebCrawler(crawler_strategy=crawler_strategy, always_by_pass_cache=True)
# crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
# crawler = WebCrawler(crawler_strategy=crawler_strategy, always_by_pass_cache=True)
result = crawler.run(
url="https://www.nbcnews.com/business",
js = js_code
)
cprint("[LOG] 📦 [bold yellow]JavaScript Code (Load More button) result:[/bold yellow]")
print_result(result)
@@ -182,14 +183,73 @@ def multiple_scrip(crawler):
const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More'));
loadMoreButton && loadMoreButton.click();
"""] * 2
crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
crawler = WebCrawler(crawler_strategy=crawler_strategy, always_by_pass_cache=True)
# crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
# crawler = WebCrawler(crawler_strategy=crawler_strategy, always_by_pass_cache=True)
result = crawler.run(
url="https://www.nbcnews.com/business",
js = js_code
)
cprint("[LOG] 📦 [bold yellow]JavaScript Code (Load More button) result:[/bold yellow]")
print_result(result)
def using_crawler_hooks(crawler):
# Example usage of the hooks for authentication and setting a cookie
def on_driver_created(driver):
print("[HOOK] on_driver_created")
# Example customization: maximize the window
driver.maximize_window()
# Example customization: logging in to a hypothetical website
driver.get('https://example.com/login')
from selenium.webdriver.support.ui import WebDriverWait
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.NAME, 'username'))
)
driver.find_element(By.NAME, 'username').send_keys('testuser')
driver.find_element(By.NAME, 'password').send_keys('password123')
driver.find_element(By.NAME, 'login').click()
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, 'welcome'))
)
# Add a custom cookie
driver.add_cookie({'name': 'test_cookie', 'value': 'cookie_value'})
return driver
def before_get_url(driver):
print("[HOOK] before_get_url")
# Example customization: add a custom header
# Enable Network domain for sending headers
driver.execute_cdp_cmd('Network.enable', {})
# Add a custom header
driver.execute_cdp_cmd('Network.setExtraHTTPHeaders', {'headers': {'X-Test-Header': 'test'}})
return driver
def after_get_url(driver):
print("[HOOK] after_get_url")
# Example customization: log the URL
print(driver.current_url)
return driver
def before_return_html(driver, html):
print("[HOOK] before_return_html")
# Example customization: log the HTML
print(len(html))
return driver
cprint("\n🔗 [bold cyan]Using Crawler Hooks: Let's see how we can customize the crawler using hooks![/bold cyan]", True)
crawler.set_hook('on_driver_created', on_driver_created)
crawler.set_hook('before_get_url', before_get_url)
crawler.set_hook('after_get_url', after_get_url)
crawler.set_hook('before_return_html', before_return_html)
result = crawler.run(url="https://example.com")
cprint("[LOG] 📦 [bold yellow]Crawler Hooks result:[/bold yellow]")
print_result(result= result)
def main():
cprint("🌟 [bold green]Welcome to the Crawl4ai Quickstart Guide! Let's dive into some web crawling fun! 🌐[/bold green]")
cprint("⛳️ [bold cyan]First Step: Create an instance of WebCrawler and call the `warmup()` function.[/bold cyan]")

View File

@@ -0,0 +1,64 @@
import requests, base64, os
data = {
"urls": ["https://www.nbcnews.com/business"],
"screenshot": True,
}
response = requests.post("https://crawl4ai.com/crawl", json=data)
result = response.json()['results'][0]
print(result.keys())
# dict_keys(['url', 'html', 'success', 'cleaned_html', 'media',
# 'links', 'screenshot', 'markdown', 'extracted_content',
# 'metadata', 'error_message'])
with open("screenshot.png", "wb") as f:
f.write(base64.b64decode(result['screenshot']))
# Example of filtering the content using CSS selectors
data = {
"urls": [
"https://www.nbcnews.com/business"
],
"css_selector": "article",
"screenshot": True,
}
# Example of executing a JS script on the page before extracting the content
data = {
"urls": [
"https://www.nbcnews.com/business"
],
"screenshot": True,
'js' : ["""
const loadMoreButton = Array.from(document.querySelectorAll('button')).
find(button => button.textContent.includes('Load More'));
loadMoreButton && loadMoreButton.click();
"""]
}
# Example of using a custom extraction strategy
data = {
"urls": [
"https://www.nbcnews.com/business"
],
"extraction_strategy": "CosineStrategy",
"extraction_strategy_args": {
"semantic_filter": "inflation rent prices"
},
}
# Example of using LLM to extract content
data = {
"urls": [
"https://www.nbcnews.com/business"
],
"extraction_strategy": "LLMExtractionStrategy",
"extraction_strategy_args": {
"provider": "groq/llama3-8b-8192",
"api_token": os.environ.get("GROQ_API_KEY"),
"instruction": """I am interested in only financial news,
and translate them in French."""
},
}

View File

@@ -57,6 +57,7 @@ class CrawlRequest(BaseModel):
chunking_strategy_args: Optional[dict] = {}
css_selector: Optional[str] = None
screenshot: Optional[bool] = False
user_agent: Optional[str] = None
verbose: Optional[bool] = True
@@ -127,6 +128,7 @@ async def crawl_urls(crawl_request: CrawlRequest, request: Request):
crawl_request.bypass_cache,
crawl_request.css_selector,
crawl_request.screenshot,
crawl_request.user_agent,
crawl_request.verbose
)
for url in crawl_request.urls

View File

@@ -25,7 +25,7 @@
<header class="bg-zinc-950 text-lime-500 py-4 flex">
<div class="mx-auto px-4">
<h1 class="text-2xl font-bold">🔥🕷️ Crawl4AI: Web Data for your Thoughts v0.2.2</h1>
<h1 class="text-2xl font-bold">🔥🕷️ Crawl4AI: Web Data for your Thoughts v0.2.4</h1>
</div>
<div class="mx-auto px-4 flex font-bold text-xl gap-2">
<span>📊 Total Website Processed</span>

View File

@@ -157,9 +157,8 @@ with open("screenshot.png", "wb") as f:
const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More'));
loadMoreButton && loadMoreButton.click();
"""]
crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
crawler = WebCrawler(crawler_strategy=crawler_strategy, always_by_pass_cache=True)
result = crawler.run(url="https://www.nbcnews.com/business")</code></pre>
crawler = WebCrawler(verbos=crawler_strategy, always_by_pass_cache=True)
result = crawler.run(url="https://www.nbcnews.com/business", js = js_code)</code></pre>
<div class="">Remember that you can pass multiple JavaScript code snippets in the list. They all will be executed in the order they are passed.</div>
</div>

View File

@@ -121,7 +121,7 @@
</div>
<div class="flex gap-3">
<div class="flex items-center gap-2">
<input type="checkbox" id="bypass-cache-checkbox" checked />
<input type="checkbox" id="bypass-cache-checkbox" />
<label for="bypass-cache-checkbox" class="text-lime-500 font-bold">Bypass Cache</label>
</div>
<div class="flex items-center gap-2">

View File

@@ -18,3 +18,4 @@ chromedriver-autoinstaller
torch
onnxruntime
tokenizers
pillow

View File

@@ -26,7 +26,7 @@ class CustomInstallCommand(install):
setup(
name="Crawl4AI",
version="0.2.3",
version="0.2.4",
description="🔥🕷️ Crawl4AI: Open-source LLM Friendly Web Crawler & Scrapper",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",