refactor(docs): update import statement in quickstart.py for improved clarity
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import os, sys
|
||||
|
||||
from crawl4ai.types import LLMConfig
|
||||
from crawl4ai import LLMConfig
|
||||
|
||||
sys.path.append(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
@@ -1,675 +0,0 @@
|
||||
import os, sys
|
||||
|
||||
from crawl4ai import LLMConfig
|
||||
|
||||
# append parent directory to system path
|
||||
sys.path.append(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
)
|
||||
os.environ["FIRECRAWL_API_KEY"] = "fc-84b370ccfad44beabc686b38f1769692"
|
||||
|
||||
import asyncio
|
||||
# import nest_asyncio
|
||||
# nest_asyncio.apply()
|
||||
|
||||
import time
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from typing import Dict, List
|
||||
from bs4 import BeautifulSoup
|
||||
from pydantic import BaseModel, Field
|
||||
from crawl4ai import AsyncWebCrawler, CacheMode
|
||||
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator
|
||||
from crawl4ai.content_filter_strategy import PruningContentFilter
|
||||
from crawl4ai.extraction_strategy import (
|
||||
JsonCssExtractionStrategy,
|
||||
LLMExtractionStrategy,
|
||||
)
|
||||
|
||||
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
|
||||
|
||||
print("Crawl4AI: Advanced Web Crawling and Data Extraction")
|
||||
print("GitHub Repository: https://github.com/unclecode/crawl4ai")
|
||||
print("Twitter: @unclecode")
|
||||
print("Website: https://crawl4ai.com")
|
||||
|
||||
|
||||
async def simple_crawl():
|
||||
print("\n--- Basic Usage ---")
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://www.nbcnews.com/business", cache_mode=CacheMode.BYPASS
|
||||
)
|
||||
print(result.markdown[:500]) # Print first 500 characters
|
||||
|
||||
|
||||
async def simple_example_with_running_js_code():
|
||||
print("\n--- Executing JavaScript and Using CSS Selectors ---")
|
||||
# New code to handle the wait_for parameter
|
||||
wait_for = """() => {
|
||||
return Array.from(document.querySelectorAll('article.tease-card')).length > 10;
|
||||
}"""
|
||||
|
||||
# wait_for can be also just a css selector
|
||||
# wait_for = "article.tease-card:nth-child(10)"
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
js_code = [
|
||||
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"
|
||||
]
|
||||
result = await crawler.arun(
|
||||
url="https://www.nbcnews.com/business",
|
||||
js_code=js_code,
|
||||
# wait_for=wait_for,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
)
|
||||
print(result.markdown[:500]) # Print first 500 characters
|
||||
|
||||
|
||||
async def simple_example_with_css_selector():
|
||||
print("\n--- Using CSS Selectors ---")
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://www.nbcnews.com/business",
|
||||
css_selector=".wide-tease-item__description",
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
)
|
||||
print(result.markdown[:500]) # Print first 500 characters
|
||||
|
||||
|
||||
async def use_proxy():
|
||||
print("\n--- Using a Proxy ---")
|
||||
print(
|
||||
"Note: Replace 'http://your-proxy-url:port' with a working proxy to run this example."
|
||||
)
|
||||
# Uncomment and modify the following lines to use a proxy
|
||||
async with AsyncWebCrawler(
|
||||
verbose=True, proxy="http://your-proxy-url:port"
|
||||
) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://www.nbcnews.com/business", cache_mode=CacheMode.BYPASS
|
||||
)
|
||||
if result.success:
|
||||
print(result.markdown[:500]) # Print first 500 characters
|
||||
|
||||
|
||||
async def capture_and_save_screenshot(url: str, output_path: str):
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
result = await crawler.arun(
|
||||
url=url, screenshot=True, cache_mode=CacheMode.BYPASS
|
||||
)
|
||||
|
||||
if result.success and result.screenshot:
|
||||
import base64
|
||||
|
||||
# Decode the base64 screenshot data
|
||||
screenshot_data = base64.b64decode(result.screenshot)
|
||||
|
||||
# Save the screenshot as a JPEG file
|
||||
with open(output_path, "wb") as f:
|
||||
f.write(screenshot_data)
|
||||
|
||||
print(f"Screenshot saved successfully to {output_path}")
|
||||
else:
|
||||
print("Failed to capture screenshot")
|
||||
|
||||
|
||||
class OpenAIModelFee(BaseModel):
|
||||
model_name: str = Field(..., description="Name of the OpenAI model.")
|
||||
input_fee: str = Field(..., description="Fee for input token for the OpenAI model.")
|
||||
output_fee: str = Field(
|
||||
..., description="Fee for output token for the OpenAI model."
|
||||
)
|
||||
|
||||
|
||||
async def extract_structured_data_using_llm(
|
||||
provider: str, api_token: str = None, extra_headers: Dict[str, str] = None
|
||||
):
|
||||
print(f"\n--- Extracting Structured Data with {provider} ---")
|
||||
|
||||
if api_token is None and provider != "ollama":
|
||||
print(f"API token is required for {provider}. Skipping this example.")
|
||||
return
|
||||
|
||||
# extra_args = {}
|
||||
extra_args = {
|
||||
"temperature": 0,
|
||||
"top_p": 0.9,
|
||||
"max_tokens": 2000,
|
||||
# any other supported parameters for litellm
|
||||
}
|
||||
if extra_headers:
|
||||
extra_args["extra_headers"] = extra_headers
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://openai.com/api/pricing/",
|
||||
word_count_threshold=1,
|
||||
extraction_strategy=LLMExtractionStrategy(
|
||||
llm_config=LLMConfig(provider=provider,api_token=api_token),
|
||||
schema=OpenAIModelFee.model_json_schema(),
|
||||
extraction_type="schema",
|
||||
instruction="""From the crawled content, extract all mentioned model names along with their fees for input and output tokens.
|
||||
Do not miss any models in the entire content. One extracted model JSON format should look like this:
|
||||
{"model_name": "GPT-4", "input_fee": "US$10.00 / 1M tokens", "output_fee": "US$30.00 / 1M tokens"}.""",
|
||||
extra_args=extra_args,
|
||||
),
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
)
|
||||
print(result.extracted_content)
|
||||
|
||||
|
||||
async def extract_structured_data_using_css_extractor():
|
||||
print("\n--- Using JsonCssExtractionStrategy for Fast Structured Output ---")
|
||||
schema = {
|
||||
"name": "KidoCode Courses",
|
||||
"baseSelector": "section.charge-methodology .w-tab-content > div",
|
||||
"fields": [
|
||||
{
|
||||
"name": "section_title",
|
||||
"selector": "h3.heading-50",
|
||||
"type": "text",
|
||||
},
|
||||
{
|
||||
"name": "section_description",
|
||||
"selector": ".charge-content",
|
||||
"type": "text",
|
||||
},
|
||||
{
|
||||
"name": "course_name",
|
||||
"selector": ".text-block-93",
|
||||
"type": "text",
|
||||
},
|
||||
{
|
||||
"name": "course_description",
|
||||
"selector": ".course-content-text",
|
||||
"type": "text",
|
||||
},
|
||||
{
|
||||
"name": "course_icon",
|
||||
"selector": ".image-92",
|
||||
"type": "attribute",
|
||||
"attribute": "src",
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
async with AsyncWebCrawler(headless=True, verbose=True) as crawler:
|
||||
# Create the JavaScript that handles clicking multiple times
|
||||
js_click_tabs = """
|
||||
(async () => {
|
||||
const tabs = document.querySelectorAll("section.charge-methodology .tabs-menu-3 > div");
|
||||
|
||||
for(let tab of tabs) {
|
||||
// scroll to the tab
|
||||
tab.scrollIntoView();
|
||||
tab.click();
|
||||
// Wait for content to load and animations to complete
|
||||
await new Promise(r => setTimeout(r, 500));
|
||||
}
|
||||
})();
|
||||
"""
|
||||
|
||||
result = await crawler.arun(
|
||||
url="https://www.kidocode.com/degrees/technology",
|
||||
extraction_strategy=JsonCssExtractionStrategy(schema, verbose=True),
|
||||
js_code=[js_click_tabs],
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
)
|
||||
|
||||
companies = json.loads(result.extracted_content)
|
||||
print(f"Successfully extracted {len(companies)} companies")
|
||||
print(json.dumps(companies[0], indent=2))
|
||||
|
||||
|
||||
# Advanced Session-Based Crawling with Dynamic Content 🔄
|
||||
async def crawl_dynamic_content_pages_method_1():
|
||||
print("\n--- Advanced Multi-Page Crawling with JavaScript Execution ---")
|
||||
first_commit = ""
|
||||
|
||||
async def on_execution_started(page):
|
||||
nonlocal first_commit
|
||||
try:
|
||||
while True:
|
||||
await page.wait_for_selector("li.Box-sc-g0xbh4-0 h4")
|
||||
commit = await page.query_selector("li.Box-sc-g0xbh4-0 h4")
|
||||
commit = await commit.evaluate("(element) => element.textContent")
|
||||
commit = re.sub(r"\s+", "", commit)
|
||||
if commit and commit != first_commit:
|
||||
first_commit = commit
|
||||
break
|
||||
await asyncio.sleep(0.5)
|
||||
except Exception as e:
|
||||
print(f"Warning: New content didn't appear after JavaScript execution: {e}")
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
crawler.crawler_strategy.set_hook("on_execution_started", on_execution_started)
|
||||
|
||||
url = "https://github.com/microsoft/TypeScript/commits/main"
|
||||
session_id = "typescript_commits_session"
|
||||
all_commits = []
|
||||
|
||||
js_next_page = """
|
||||
(() => {
|
||||
const button = document.querySelector('a[data-testid="pagination-next-button"]');
|
||||
if (button) button.click();
|
||||
})();
|
||||
"""
|
||||
|
||||
for page in range(3): # Crawl 3 pages
|
||||
result = await crawler.arun(
|
||||
url=url,
|
||||
session_id=session_id,
|
||||
css_selector="li.Box-sc-g0xbh4-0",
|
||||
js=js_next_page if page > 0 else None,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
js_only=page > 0,
|
||||
headless=False,
|
||||
)
|
||||
|
||||
assert result.success, f"Failed to crawl page {page + 1}"
|
||||
|
||||
soup = BeautifulSoup(result.cleaned_html, "html.parser")
|
||||
commits = soup.select("li")
|
||||
all_commits.extend(commits)
|
||||
|
||||
print(f"Page {page + 1}: Found {len(commits)} commits")
|
||||
|
||||
await crawler.crawler_strategy.kill_session(session_id)
|
||||
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
|
||||
|
||||
|
||||
async def crawl_dynamic_content_pages_method_2():
|
||||
print("\n--- Advanced Multi-Page Crawling with JavaScript Execution ---")
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
url = "https://github.com/microsoft/TypeScript/commits/main"
|
||||
session_id = "typescript_commits_session"
|
||||
all_commits = []
|
||||
last_commit = ""
|
||||
|
||||
js_next_page_and_wait = """
|
||||
(async () => {
|
||||
const getCurrentCommit = () => {
|
||||
const commits = document.querySelectorAll('li.Box-sc-g0xbh4-0 h4');
|
||||
return commits.length > 0 ? commits[0].textContent.trim() : null;
|
||||
};
|
||||
|
||||
const initialCommit = getCurrentCommit();
|
||||
const button = document.querySelector('a[data-testid="pagination-next-button"]');
|
||||
if (button) button.click();
|
||||
|
||||
// Poll for changes
|
||||
while (true) {
|
||||
await new Promise(resolve => setTimeout(resolve, 100)); // Wait 100ms
|
||||
const newCommit = getCurrentCommit();
|
||||
if (newCommit && newCommit !== initialCommit) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})();
|
||||
"""
|
||||
|
||||
schema = {
|
||||
"name": "Commit Extractor",
|
||||
"baseSelector": "li.Box-sc-g0xbh4-0",
|
||||
"fields": [
|
||||
{
|
||||
"name": "title",
|
||||
"selector": "h4.markdown-title",
|
||||
"type": "text",
|
||||
"transform": "strip",
|
||||
},
|
||||
],
|
||||
}
|
||||
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)
|
||||
|
||||
for page in range(3): # Crawl 3 pages
|
||||
result = await crawler.arun(
|
||||
url=url,
|
||||
session_id=session_id,
|
||||
css_selector="li.Box-sc-g0xbh4-0",
|
||||
extraction_strategy=extraction_strategy,
|
||||
js_code=js_next_page_and_wait if page > 0 else None,
|
||||
js_only=page > 0,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
headless=False,
|
||||
)
|
||||
|
||||
assert result.success, f"Failed to crawl page {page + 1}"
|
||||
|
||||
commits = json.loads(result.extracted_content)
|
||||
all_commits.extend(commits)
|
||||
|
||||
print(f"Page {page + 1}: Found {len(commits)} commits")
|
||||
|
||||
await crawler.crawler_strategy.kill_session(session_id)
|
||||
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
|
||||
|
||||
|
||||
async def crawl_dynamic_content_pages_method_3():
|
||||
print(
|
||||
"\n--- Advanced Multi-Page Crawling with JavaScript Execution using `wait_for` ---"
|
||||
)
|
||||
|
||||
async with AsyncWebCrawler(verbose=True) as crawler:
|
||||
url = "https://github.com/microsoft/TypeScript/commits/main"
|
||||
session_id = "typescript_commits_session"
|
||||
all_commits = []
|
||||
|
||||
js_next_page = """
|
||||
const commits = document.querySelectorAll('li.Box-sc-g0xbh4-0 h4');
|
||||
if (commits.length > 0) {
|
||||
window.firstCommit = commits[0].textContent.trim();
|
||||
}
|
||||
const button = document.querySelector('a[data-testid="pagination-next-button"]');
|
||||
if (button) button.click();
|
||||
"""
|
||||
|
||||
wait_for = """() => {
|
||||
const commits = document.querySelectorAll('li.Box-sc-g0xbh4-0 h4');
|
||||
if (commits.length === 0) return false;
|
||||
const firstCommit = commits[0].textContent.trim();
|
||||
return firstCommit !== window.firstCommit;
|
||||
}"""
|
||||
|
||||
schema = {
|
||||
"name": "Commit Extractor",
|
||||
"baseSelector": "li.Box-sc-g0xbh4-0",
|
||||
"fields": [
|
||||
{
|
||||
"name": "title",
|
||||
"selector": "h4.markdown-title",
|
||||
"type": "text",
|
||||
"transform": "strip",
|
||||
},
|
||||
],
|
||||
}
|
||||
extraction_strategy = JsonCssExtractionStrategy(schema, verbose=True)
|
||||
|
||||
for page in range(3): # Crawl 3 pages
|
||||
result = await crawler.arun(
|
||||
url=url,
|
||||
session_id=session_id,
|
||||
css_selector="li.Box-sc-g0xbh4-0",
|
||||
extraction_strategy=extraction_strategy,
|
||||
js_code=js_next_page if page > 0 else None,
|
||||
wait_for=wait_for if page > 0 else None,
|
||||
js_only=page > 0,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
headless=False,
|
||||
)
|
||||
|
||||
assert result.success, f"Failed to crawl page {page + 1}"
|
||||
|
||||
commits = json.loads(result.extracted_content)
|
||||
all_commits.extend(commits)
|
||||
|
||||
print(f"Page {page + 1}: Found {len(commits)} commits")
|
||||
|
||||
await crawler.crawler_strategy.kill_session(session_id)
|
||||
print(f"Successfully crawled {len(all_commits)} commits across 3 pages")
|
||||
|
||||
|
||||
async def crawl_custom_browser_type():
|
||||
# Use Firefox
|
||||
start = time.time()
|
||||
async with AsyncWebCrawler(
|
||||
browser_type="firefox", verbose=True, headless=True
|
||||
) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://www.example.com", cache_mode=CacheMode.BYPASS
|
||||
)
|
||||
print(result.markdown[:500])
|
||||
print("Time taken: ", time.time() - start)
|
||||
|
||||
# Use WebKit
|
||||
start = time.time()
|
||||
async with AsyncWebCrawler(
|
||||
browser_type="webkit", verbose=True, headless=True
|
||||
) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://www.example.com", cache_mode=CacheMode.BYPASS
|
||||
)
|
||||
print(result.markdown[:500])
|
||||
print("Time taken: ", time.time() - start)
|
||||
|
||||
# Use Chromium (default)
|
||||
start = time.time()
|
||||
async with AsyncWebCrawler(verbose=True, headless=True) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://www.example.com", cache_mode=CacheMode.BYPASS
|
||||
)
|
||||
print(result.markdown[:500])
|
||||
print("Time taken: ", time.time() - start)
|
||||
|
||||
|
||||
async def crawl_with_user_simultion():
|
||||
async with AsyncWebCrawler(verbose=True, headless=True) as crawler:
|
||||
url = "YOUR-URL-HERE"
|
||||
result = await crawler.arun(
|
||||
url=url,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
magic=True, # Automatically detects and removes overlays, popups, and other elements that block content
|
||||
# simulate_user = True,# Causes a series of random mouse movements and clicks to simulate user interaction
|
||||
# override_navigator = True # Overrides the navigator object to make it look like a real user
|
||||
)
|
||||
|
||||
print(result.markdown)
|
||||
|
||||
|
||||
async def speed_comparison():
|
||||
# print("\n--- Speed Comparison ---")
|
||||
# print("Firecrawl (simulated):")
|
||||
# print("Time taken: 7.02 seconds")
|
||||
# print("Content length: 42074 characters")
|
||||
# print("Images found: 49")
|
||||
# print()
|
||||
# Simulated Firecrawl performance
|
||||
from firecrawl import FirecrawlApp
|
||||
|
||||
app = FirecrawlApp(api_key=os.environ["FIRECRAWL_API_KEY"])
|
||||
start = time.time()
|
||||
scrape_status = app.scrape_url(
|
||||
"https://www.nbcnews.com/business", params={"formats": ["markdown", "html"]}
|
||||
)
|
||||
end = time.time()
|
||||
print("Firecrawl:")
|
||||
print(f"Time taken: {end - start:.2f} seconds")
|
||||
print(f"Content length: {len(scrape_status['markdown'])} characters")
|
||||
print(f"Images found: {scrape_status['markdown'].count('cldnry.s-nbcnews.com')}")
|
||||
print()
|
||||
|
||||
async with AsyncWebCrawler() as crawler:
|
||||
# Crawl4AI simple crawl
|
||||
start = time.time()
|
||||
result = await crawler.arun(
|
||||
url="https://www.nbcnews.com/business",
|
||||
word_count_threshold=0,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
verbose=False,
|
||||
)
|
||||
end = time.time()
|
||||
print("Crawl4AI (simple crawl):")
|
||||
print(f"Time taken: {end - start:.2f} seconds")
|
||||
print(f"Content length: {len(result.markdown)} characters")
|
||||
print(f"Images found: {result.markdown.count('cldnry.s-nbcnews.com')}")
|
||||
print()
|
||||
|
||||
# Crawl4AI with advanced content filtering
|
||||
start = time.time()
|
||||
result = await crawler.arun(
|
||||
url="https://www.nbcnews.com/business",
|
||||
word_count_threshold=0,
|
||||
markdown_generator=DefaultMarkdownGenerator(
|
||||
content_filter=PruningContentFilter(
|
||||
threshold=0.48, threshold_type="fixed", min_word_threshold=0
|
||||
)
|
||||
# content_filter=BM25ContentFilter(user_query=None, bm25_threshold=1.0)
|
||||
),
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
verbose=False,
|
||||
)
|
||||
end = time.time()
|
||||
print("Crawl4AI (Markdown Plus):")
|
||||
print(f"Time taken: {end - start:.2f} seconds")
|
||||
print(f"Content length: {len(result.markdown.raw_markdown)} characters")
|
||||
print(f"Fit Markdown: {len(result.markdown.fit_markdown)} characters")
|
||||
print(f"Images found: {result.markdown.raw_markdown.count('cldnry.s-nbcnews.com')}")
|
||||
print()
|
||||
|
||||
# Crawl4AI with JavaScript execution
|
||||
start = time.time()
|
||||
result = await crawler.arun(
|
||||
url="https://www.nbcnews.com/business",
|
||||
js_code=[
|
||||
"const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More')); loadMoreButton && loadMoreButton.click();"
|
||||
],
|
||||
word_count_threshold=0,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
markdown_generator=DefaultMarkdownGenerator(
|
||||
content_filter=PruningContentFilter(
|
||||
threshold=0.48, threshold_type="fixed", min_word_threshold=0
|
||||
)
|
||||
# content_filter=BM25ContentFilter(user_query=None, bm25_threshold=1.0)
|
||||
),
|
||||
verbose=False,
|
||||
)
|
||||
end = time.time()
|
||||
print("Crawl4AI (with JavaScript execution):")
|
||||
print(f"Time taken: {end - start:.2f} seconds")
|
||||
print(f"Content length: {len(result.markdown.raw_markdown)} characters")
|
||||
print(f"Fit Markdown: {len(result.markdown.fit_markdown)} characters")
|
||||
print(f"Images found: {result.markdown.raw_markdown.count('cldnry.s-nbcnews.com')}")
|
||||
|
||||
print("\nNote on Speed Comparison:")
|
||||
print("The speed test conducted here may not reflect optimal conditions.")
|
||||
print("When we call Firecrawl's API, we're seeing its best performance,")
|
||||
print("while Crawl4AI's performance is limited by the local network speed.")
|
||||
print("For a more accurate comparison, it's recommended to run these tests")
|
||||
print("on servers with a stable and fast internet connection.")
|
||||
print("Despite these limitations, Crawl4AI still demonstrates faster performance.")
|
||||
print("If you run these tests in an environment with better network conditions,")
|
||||
print("you may observe an even more significant speed advantage for Crawl4AI.")
|
||||
|
||||
|
||||
async def generate_knowledge_graph():
|
||||
class Entity(BaseModel):
|
||||
name: str
|
||||
description: str
|
||||
|
||||
class Relationship(BaseModel):
|
||||
entity1: Entity
|
||||
entity2: Entity
|
||||
description: str
|
||||
relation_type: str
|
||||
|
||||
class KnowledgeGraph(BaseModel):
|
||||
entities: List[Entity]
|
||||
relationships: List[Relationship]
|
||||
|
||||
extraction_strategy = LLMExtractionStrategy(
|
||||
llm_config=LLMConfig(provider="openai/gpt-4o-mini", api_token=os.getenv("OPENAI_API_KEY")), # In case of Ollama just pass "no-token"
|
||||
schema=KnowledgeGraph.model_json_schema(),
|
||||
extraction_type="schema",
|
||||
instruction="""Extract entities and relationships from the given text.""",
|
||||
)
|
||||
async with AsyncWebCrawler() as crawler:
|
||||
url = "https://paulgraham.com/love.html"
|
||||
result = await crawler.arun(
|
||||
url=url,
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
extraction_strategy=extraction_strategy,
|
||||
# magic=True
|
||||
)
|
||||
# print(result.extracted_content)
|
||||
with open(os.path.join(__location__, "kb.json"), "w") as f:
|
||||
f.write(result.extracted_content)
|
||||
|
||||
|
||||
async def fit_markdown_remove_overlay():
|
||||
async with AsyncWebCrawler(
|
||||
headless=True, # Set to False to see what is happening
|
||||
verbose=True,
|
||||
user_agent_mode="random",
|
||||
user_agent_generator_config={"device_type": "mobile", "os_type": "android"},
|
||||
) as crawler:
|
||||
result = await crawler.arun(
|
||||
url="https://www.kidocode.com/degrees/technology",
|
||||
cache_mode=CacheMode.BYPASS,
|
||||
markdown_generator=DefaultMarkdownGenerator(
|
||||
content_filter=PruningContentFilter(
|
||||
threshold=0.48, threshold_type="fixed", min_word_threshold=0
|
||||
),
|
||||
options={"ignore_links": True},
|
||||
),
|
||||
# markdown_generator=DefaultMarkdownGenerator(
|
||||
# content_filter=BM25ContentFilter(user_query="", bm25_threshold=1.0),
|
||||
# options={
|
||||
# "ignore_links": True
|
||||
# }
|
||||
# ),
|
||||
)
|
||||
|
||||
if result.success:
|
||||
print(len(result.markdown.raw_markdown))
|
||||
print(len(result.markdown.markdown_with_citations))
|
||||
print(len(result.markdown.fit_markdown))
|
||||
|
||||
# Save clean html
|
||||
with open(os.path.join(__location__, "output/cleaned_html.html"), "w") as f:
|
||||
f.write(result.cleaned_html)
|
||||
|
||||
with open(
|
||||
os.path.join(__location__, "output/output_raw_markdown.md"), "w"
|
||||
) as f:
|
||||
f.write(result.markdown.raw_markdown)
|
||||
|
||||
with open(
|
||||
os.path.join(__location__, "output/output_markdown_with_citations.md"),
|
||||
"w",
|
||||
) as f:
|
||||
f.write(result.markdown.markdown_with_citations)
|
||||
|
||||
with open(
|
||||
os.path.join(__location__, "output/output_fit_markdown.md"), "w"
|
||||
) as f:
|
||||
f.write(result.markdown.fit_markdown)
|
||||
|
||||
print("Done")
|
||||
|
||||
|
||||
async def main():
|
||||
# await extract_structured_data_using_llm("openai/gpt-4o", os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
# await simple_crawl()
|
||||
# await simple_example_with_running_js_code()
|
||||
# await simple_example_with_css_selector()
|
||||
# # await use_proxy()
|
||||
# await capture_and_save_screenshot("https://www.example.com", os.path.join(__location__, "tmp/example_screenshot.jpg"))
|
||||
# await extract_structured_data_using_css_extractor()
|
||||
|
||||
# LLM extraction examples
|
||||
# await extract_structured_data_using_llm()
|
||||
# await extract_structured_data_using_llm("huggingface/meta-llama/Meta-Llama-3.1-8B-Instruct", os.getenv("HUGGINGFACE_API_KEY"))
|
||||
# await extract_structured_data_using_llm("ollama/llama3.2")
|
||||
|
||||
# You always can pass custom headers to the extraction strategy
|
||||
# custom_headers = {
|
||||
# "Authorization": "Bearer your-custom-token",
|
||||
# "X-Custom-Header": "Some-Value"
|
||||
# }
|
||||
# await extract_structured_data_using_llm(extra_headers=custom_headers)
|
||||
|
||||
# await crawl_dynamic_content_pages_method_1()
|
||||
# await crawl_dynamic_content_pages_method_2()
|
||||
await crawl_dynamic_content_pages_method_3()
|
||||
|
||||
# await crawl_custom_browser_type()
|
||||
|
||||
# await speed_comparison()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -1,6 +1,6 @@
|
||||
import os, sys
|
||||
|
||||
from crawl4ai import LLMConfig
|
||||
from crawl4ai.types import LLMConfig
|
||||
|
||||
sys.path.append(
|
||||
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
@@ -1,405 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
from crawl4ai import LLMConfig
|
||||
from crawl4ai.web_crawler import WebCrawler
|
||||
from crawl4ai.chunking_strategy import *
|
||||
from crawl4ai.extraction_strategy import *
|
||||
from crawl4ai.crawler_strategy import *
|
||||
from rich import print
|
||||
from rich.console import Console
|
||||
from functools import lru_cache
|
||||
|
||||
console = Console()
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def create_crawler():
|
||||
crawler = WebCrawler(verbose=True)
|
||||
crawler.warmup()
|
||||
return crawler
|
||||
|
||||
|
||||
def print_result(result):
|
||||
# Print each key in one line and just the first 10 characters of each one's value and three dots
|
||||
console.print("\t[bold]Result:[/bold]")
|
||||
for key, value in result.model_dump().items():
|
||||
if isinstance(value, str) and value:
|
||||
console.print(f"\t{key}: [green]{value[:20]}...[/green]")
|
||||
if result.extracted_content:
|
||||
items = json.loads(result.extracted_content)
|
||||
print(f"\t[bold]{len(items)} blocks is extracted![/bold]")
|
||||
|
||||
|
||||
def cprint(message, press_any_key=False):
|
||||
console.print(message)
|
||||
if press_any_key:
|
||||
console.print("Press any key to continue...", style="")
|
||||
input()
|
||||
|
||||
|
||||
def basic_usage(crawler):
|
||||
cprint(
|
||||
"🛠️ [bold cyan]Basic Usage: Simply provide a URL and let Crawl4ai do the magic![/bold cyan]"
|
||||
)
|
||||
result = crawler.run(url="https://www.nbcnews.com/business", only_text=True)
|
||||
cprint("[LOG] 📦 [bold yellow]Basic crawl result:[/bold yellow]")
|
||||
print_result(result)
|
||||
|
||||
|
||||
def basic_usage_some_params(crawler):
|
||||
cprint(
|
||||
"🛠️ [bold cyan]Basic Usage: Simply provide a URL and let Crawl4ai do the magic![/bold cyan]"
|
||||
)
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business", word_count_threshold=1, only_text=True
|
||||
)
|
||||
cprint("[LOG] 📦 [bold yellow]Basic crawl result:[/bold yellow]")
|
||||
print_result(result)
|
||||
|
||||
|
||||
def screenshot_usage(crawler):
|
||||
cprint("\n📸 [bold cyan]Let's take a screenshot of the page![/bold cyan]")
|
||||
result = crawler.run(url="https://www.nbcnews.com/business", screenshot=True)
|
||||
cprint("[LOG] 📦 [bold yellow]Screenshot result:[/bold yellow]")
|
||||
# Save the screenshot to a file
|
||||
with open("screenshot.png", "wb") as f:
|
||||
f.write(base64.b64decode(result.screenshot))
|
||||
cprint("Screenshot saved to 'screenshot.png'!")
|
||||
print_result(result)
|
||||
|
||||
|
||||
def understanding_parameters(crawler):
|
||||
cprint(
|
||||
"\n🧠 [bold cyan]Understanding 'bypass_cache' and 'include_raw_html' parameters:[/bold cyan]"
|
||||
)
|
||||
cprint(
|
||||
"By default, Crawl4ai caches the results of your crawls. This means that subsequent crawls of the same URL will be much faster! Let's see this in action."
|
||||
)
|
||||
|
||||
# First crawl (reads from cache)
|
||||
cprint("1️⃣ First crawl (caches the result):", True)
|
||||
start_time = time.time()
|
||||
result = crawler.run(url="https://www.nbcnews.com/business")
|
||||
end_time = time.time()
|
||||
cprint(
|
||||
f"[LOG] 📦 [bold yellow]First crawl took {end_time - start_time} seconds and result (from cache):[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
# Force to crawl again
|
||||
cprint("2️⃣ Second crawl (Force to crawl again):", True)
|
||||
start_time = time.time()
|
||||
result = crawler.run(url="https://www.nbcnews.com/business", bypass_cache=True)
|
||||
end_time = time.time()
|
||||
cprint(
|
||||
f"[LOG] 📦 [bold yellow]Second crawl took {end_time - start_time} seconds and result (forced to crawl):[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
|
||||
def add_chunking_strategy(crawler):
|
||||
# Adding a chunking strategy: RegexChunking
|
||||
cprint(
|
||||
"\n🧩 [bold cyan]Let's add a chunking strategy: RegexChunking![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
cprint(
|
||||
"RegexChunking is a simple chunking strategy that splits the text based on a given regex pattern. Let's see it in action!"
|
||||
)
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business",
|
||||
chunking_strategy=RegexChunking(patterns=["\n\n"]),
|
||||
)
|
||||
cprint("[LOG] 📦 [bold yellow]RegexChunking result:[/bold yellow]")
|
||||
print_result(result)
|
||||
|
||||
# Adding another chunking strategy: NlpSentenceChunking
|
||||
cprint(
|
||||
"\n🔍 [bold cyan]Time to explore another chunking strategy: NlpSentenceChunking![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
cprint(
|
||||
"NlpSentenceChunking uses NLP techniques to split the text into sentences. Let's see how it performs!"
|
||||
)
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business", chunking_strategy=NlpSentenceChunking()
|
||||
)
|
||||
cprint("[LOG] 📦 [bold yellow]NlpSentenceChunking result:[/bold yellow]")
|
||||
print_result(result)
|
||||
|
||||
|
||||
def add_extraction_strategy(crawler):
|
||||
# Adding an extraction strategy: CosineStrategy
|
||||
cprint(
|
||||
"\n🧠 [bold cyan]Let's get smarter with an extraction strategy: CosineStrategy![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
cprint(
|
||||
"CosineStrategy uses cosine similarity to extract semantically similar blocks of text. Let's see it in action!"
|
||||
)
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business",
|
||||
extraction_strategy=CosineStrategy(
|
||||
word_count_threshold=10,
|
||||
max_dist=0.2,
|
||||
linkage_method="ward",
|
||||
top_k=3,
|
||||
sim_threshold=0.3,
|
||||
verbose=True,
|
||||
),
|
||||
)
|
||||
cprint("[LOG] 📦 [bold yellow]CosineStrategy result:[/bold yellow]")
|
||||
print_result(result)
|
||||
|
||||
# Using semantic_filter with CosineStrategy
|
||||
cprint(
|
||||
"You can pass other parameters like 'semantic_filter' to the CosineStrategy to extract semantically similar blocks of text. Let's see it in action!"
|
||||
)
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business",
|
||||
extraction_strategy=CosineStrategy(
|
||||
semantic_filter="inflation rent prices",
|
||||
),
|
||||
)
|
||||
cprint(
|
||||
"[LOG] 📦 [bold yellow]CosineStrategy result with semantic filter:[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
|
||||
def add_llm_extraction_strategy(crawler):
|
||||
# Adding an LLM extraction strategy without instructions
|
||||
cprint(
|
||||
"\n🤖 [bold cyan]Time to bring in the big guns: LLMExtractionStrategy without instructions![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
cprint(
|
||||
"LLMExtractionStrategy uses a large language model to extract relevant information from the web page. Let's see it in action!"
|
||||
)
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business",
|
||||
extraction_strategy=LLMExtractionStrategy(
|
||||
llm_config = LLMConfig(provider="openai/gpt-4o", api_token=os.getenv("OPENAI_API_KEY"))
|
||||
),
|
||||
)
|
||||
cprint(
|
||||
"[LOG] 📦 [bold yellow]LLMExtractionStrategy (no instructions) result:[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
# Adding an LLM extraction strategy with instructions
|
||||
cprint(
|
||||
"\n📜 [bold cyan]Let's make it even more interesting: LLMExtractionStrategy with instructions![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
cprint(
|
||||
"Let's say we are only interested in financial news. Let's see how LLMExtractionStrategy performs with instructions!"
|
||||
)
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business",
|
||||
extraction_strategy=LLMExtractionStrategy(
|
||||
llm_config=LLMConfig(provider="openai/gpt-4o",api_token=os.getenv("OPENAI_API_KEY")),
|
||||
instruction="I am interested in only financial news",
|
||||
),
|
||||
)
|
||||
cprint(
|
||||
"[LOG] 📦 [bold yellow]LLMExtractionStrategy (with instructions) result:[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
result = crawler.run(
|
||||
url="https://www.nbcnews.com/business",
|
||||
extraction_strategy=LLMExtractionStrategy(
|
||||
llm_config=LLMConfig(provider="openai/gpt-4o",api_token=os.getenv("OPENAI_API_KEY")),
|
||||
instruction="Extract only content related to technology",
|
||||
),
|
||||
)
|
||||
cprint(
|
||||
"[LOG] 📦 [bold yellow]LLMExtractionStrategy (with technology instruction) result:[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
|
||||
def targeted_extraction(crawler):
|
||||
# Using a CSS selector to extract only H2 tags
|
||||
cprint(
|
||||
"\n🎯 [bold cyan]Targeted extraction: Let's use a CSS selector to extract only H2 tags![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
result = crawler.run(url="https://www.nbcnews.com/business", css_selector="h2")
|
||||
cprint("[LOG] 📦 [bold yellow]CSS Selector (H2 tags) result:[/bold yellow]")
|
||||
print_result(result)
|
||||
|
||||
|
||||
def interactive_extraction(crawler):
|
||||
# Passing JavaScript code to interact with the page
|
||||
cprint(
|
||||
"\n🖱️ [bold cyan]Let's get interactive: Passing JavaScript code to click 'Load More' button![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
cprint(
|
||||
"In this example we try to click the 'Load More' button on the page using JavaScript code."
|
||||
)
|
||||
js_code = """
|
||||
const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More'));
|
||||
loadMoreButton && loadMoreButton.click();
|
||||
"""
|
||||
# crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
|
||||
# crawler = WebCrawler(crawler_strategy=crawler_strategy, always_by_pass_cache=True)
|
||||
result = crawler.run(url="https://www.nbcnews.com/business", js=js_code)
|
||||
cprint(
|
||||
"[LOG] 📦 [bold yellow]JavaScript Code (Load More button) result:[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
|
||||
def multiple_scrip(crawler):
|
||||
# Passing JavaScript code to interact with the page
|
||||
cprint(
|
||||
"\n🖱️ [bold cyan]Let's get interactive: Passing JavaScript code to click 'Load More' button![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
cprint(
|
||||
"In this example we try to click the 'Load More' button on the page using JavaScript code."
|
||||
)
|
||||
js_code = [
|
||||
"""
|
||||
const loadMoreButton = Array.from(document.querySelectorAll('button')).find(button => button.textContent.includes('Load More'));
|
||||
loadMoreButton && loadMoreButton.click();
|
||||
"""
|
||||
] * 2
|
||||
# crawler_strategy = LocalSeleniumCrawlerStrategy(js_code=js_code)
|
||||
# crawler = WebCrawler(crawler_strategy=crawler_strategy, always_by_pass_cache=True)
|
||||
result = crawler.run(url="https://www.nbcnews.com/business", js=js_code)
|
||||
cprint(
|
||||
"[LOG] 📦 [bold yellow]JavaScript Code (Load More button) result:[/bold yellow]"
|
||||
)
|
||||
print_result(result)
|
||||
|
||||
|
||||
def using_crawler_hooks(crawler):
|
||||
# Example usage of the hooks for authentication and setting a cookie
|
||||
def on_driver_created(driver):
|
||||
print("[HOOK] on_driver_created")
|
||||
# Example customization: maximize the window
|
||||
driver.maximize_window()
|
||||
|
||||
# Example customization: logging in to a hypothetical website
|
||||
driver.get("https://example.com/login")
|
||||
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.NAME, "username"))
|
||||
)
|
||||
driver.find_element(By.NAME, "username").send_keys("testuser")
|
||||
driver.find_element(By.NAME, "password").send_keys("password123")
|
||||
driver.find_element(By.NAME, "login").click()
|
||||
WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.ID, "welcome"))
|
||||
)
|
||||
# Add a custom cookie
|
||||
driver.add_cookie({"name": "test_cookie", "value": "cookie_value"})
|
||||
return driver
|
||||
|
||||
def before_get_url(driver):
|
||||
print("[HOOK] before_get_url")
|
||||
# Example customization: add a custom header
|
||||
# Enable Network domain for sending headers
|
||||
driver.execute_cdp_cmd("Network.enable", {})
|
||||
# Add a custom header
|
||||
driver.execute_cdp_cmd(
|
||||
"Network.setExtraHTTPHeaders", {"headers": {"X-Test-Header": "test"}}
|
||||
)
|
||||
return driver
|
||||
|
||||
def after_get_url(driver):
|
||||
print("[HOOK] after_get_url")
|
||||
# Example customization: log the URL
|
||||
print(driver.current_url)
|
||||
return driver
|
||||
|
||||
def before_return_html(driver, html):
|
||||
print("[HOOK] before_return_html")
|
||||
# Example customization: log the HTML
|
||||
print(len(html))
|
||||
return driver
|
||||
|
||||
cprint(
|
||||
"\n🔗 [bold cyan]Using Crawler Hooks: Let's see how we can customize the crawler using hooks![/bold cyan]",
|
||||
True,
|
||||
)
|
||||
|
||||
crawler_strategy = LocalSeleniumCrawlerStrategy(verbose=True)
|
||||
crawler_strategy.set_hook("on_driver_created", on_driver_created)
|
||||
crawler_strategy.set_hook("before_get_url", before_get_url)
|
||||
crawler_strategy.set_hook("after_get_url", after_get_url)
|
||||
crawler_strategy.set_hook("before_return_html", before_return_html)
|
||||
|
||||
crawler = WebCrawler(verbose=True, crawler_strategy=crawler_strategy)
|
||||
crawler.warmup()
|
||||
result = crawler.run(url="https://example.com")
|
||||
|
||||
cprint("[LOG] 📦 [bold yellow]Crawler Hooks result:[/bold yellow]")
|
||||
print_result(result=result)
|
||||
|
||||
|
||||
def using_crawler_hooks_dleay_example(crawler):
|
||||
def delay(driver):
|
||||
print("Delaying for 5 seconds...")
|
||||
time.sleep(5)
|
||||
print("Resuming...")
|
||||
|
||||
def create_crawler():
|
||||
crawler_strategy = LocalSeleniumCrawlerStrategy(verbose=True)
|
||||
crawler_strategy.set_hook("after_get_url", delay)
|
||||
crawler = WebCrawler(verbose=True, crawler_strategy=crawler_strategy)
|
||||
crawler.warmup()
|
||||
return crawler
|
||||
|
||||
cprint(
|
||||
"\n🔗 [bold cyan]Using Crawler Hooks: Let's add a delay after fetching the url to make sure entire page is fetched.[/bold cyan]"
|
||||
)
|
||||
crawler = create_crawler()
|
||||
result = crawler.run(url="https://google.com", bypass_cache=True)
|
||||
|
||||
cprint("[LOG] 📦 [bold yellow]Crawler Hooks result:[/bold yellow]")
|
||||
print_result(result)
|
||||
|
||||
|
||||
def main():
|
||||
cprint(
|
||||
"🌟 [bold green]Welcome to the Crawl4ai Quickstart Guide! Let's dive into some web crawling fun! 🌐[/bold green]"
|
||||
)
|
||||
cprint(
|
||||
"⛳️ [bold cyan]First Step: Create an instance of WebCrawler and call the `warmup()` function.[/bold cyan]"
|
||||
)
|
||||
cprint(
|
||||
"If this is the first time you're running Crawl4ai, this might take a few seconds to load required model files."
|
||||
)
|
||||
|
||||
crawler = create_crawler()
|
||||
|
||||
crawler.always_by_pass_cache = True
|
||||
basic_usage(crawler)
|
||||
# basic_usage_some_params(crawler)
|
||||
understanding_parameters(crawler)
|
||||
|
||||
crawler.always_by_pass_cache = True
|
||||
screenshot_usage(crawler)
|
||||
add_chunking_strategy(crawler)
|
||||
add_extraction_strategy(crawler)
|
||||
add_llm_extraction_strategy(crawler)
|
||||
targeted_extraction(crawler)
|
||||
interactive_extraction(crawler)
|
||||
multiple_scrip(crawler)
|
||||
|
||||
cprint(
|
||||
"\n🎉 [bold green]Congratulations! You've made it through the Crawl4ai Quickstart Guide! Now go forth and crawl the web like a pro! 🕸️[/bold green]"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user