import os import importlib import asyncio from functools import lru_cache from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Optional from crawl4ai.web_crawler import WebCrawler from crawl4ai.database import get_total_count, clear_db # Configuration __location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__))) MAX_CONCURRENT_REQUESTS = 10 # Adjust this to change the maximum concurrent requests current_requests = 0 lock = asyncio.Lock() app = FastAPI() # CORS configuration origins = ["*"] # Allow all origins app.add_middleware( CORSMiddleware, allow_origins=origins, # List of origins that are allowed to make requests allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) # Mount the pages directory as a static directory app.mount("/pages", StaticFiles(directory=__location__ + "/pages"), name="pages") # chromedriver_autoinstaller.install() # Ensure chromedriver is installed @lru_cache() def get_crawler(): # Initialize and return a WebCrawler instance return WebCrawler() class CrawlRequest(BaseModel): urls: List[HttpUrl] provider_model: str api_token: str include_raw_html: Optional[bool] = False bypass_cache: bool = False extract_blocks: bool = True word_count_threshold: Optional[int] = 5 extraction_strategy: Optional[str] = "CosineStrategy" chunking_strategy: Optional[str] = "RegexChunking" css_selector: Optional[str] = None verbose: Optional[bool] = True @app.get("/", response_class=HTMLResponse) async def read_index(): with open(f"{__location__}/pages/index.html", "r") as file: html_content = file.read() return HTMLResponse(content=html_content, status_code=200) @app.get("/total-count") async def get_total_url_count(): count = get_total_count() return JSONResponse(content={"count": count}) # Add endpoit to clear db @app.get("/clear-db") async def clear_database(): clear_db() return JSONResponse(content={"message": "Database cleared."}) def import_strategy(module_name: str, class_name: str): try: module = importlib.import_module(module_name) strategy_class = getattr(module, class_name) return strategy_class() except ImportError: raise HTTPException(status_code=400, detail=f"Module {module_name} not found.") except AttributeError: raise HTTPException(status_code=400, detail=f"Class {class_name} not found in {module_name}.") @app.post("/crawl") async def crawl_urls(crawl_request: CrawlRequest, request: Request): global current_requests # Raise error if api_token is not provided if not crawl_request.api_token: raise HTTPException(status_code=401, detail="API token is required.") async with lock: if current_requests >= MAX_CONCURRENT_REQUESTS: raise HTTPException(status_code=429, detail="Too many requests - please try again later.") current_requests += 1 try: extraction_strategy = import_strategy("crawl4ai.extraction_strategy", crawl_request.extraction_strategy) chunking_strategy = import_strategy("crawl4ai.chunking_strategy", crawl_request.chunking_strategy) # Use ThreadPoolExecutor to run the synchronous WebCrawler in async manner with ThreadPoolExecutor() as executor: loop = asyncio.get_event_loop() futures = [ loop.run_in_executor( executor, get_crawler().run, str(url), crawl_request.word_count_threshold, extraction_strategy, chunking_strategy, crawl_request.bypass_cache, crawl_request.css_selector, crawl_request.verbose ) for url in crawl_request.urls ] results = await asyncio.gather(*futures) # if include_raw_html is False, remove the raw HTML content from the results if not crawl_request.include_raw_html: for result in results: result.html = None return {"results": [result.dict() for result in results]} finally: async with lock: current_requests -= 1 @app.get("/strategies/extraction", response_class=JSONResponse) async def get_extraction_strategies(): # Load docs/extraction_strategies.json" and return as JSON response with open(f"{__location__}/docs/extraction_strategies.json", "r") as file: return JSONResponse(content=file.read()) @app.get("/strategies/chunking", response_class=JSONResponse) async def get_chunking_strategies(): with open(f"{__location__}/docs/chunking_strategies.json", "r") as file: return JSONResponse(content=file.read()) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)