diff --git a/docs/examples/c4a_script/tutorial/server.py b/docs/examples/c4a_script/tutorial/server.py index 6242789d..f9cb81e9 100644 --- a/docs/examples/c4a_script/tutorial/server.py +++ b/docs/examples/c4a_script/tutorial/server.py @@ -283,7 +283,7 @@ WAIT `.success-message` 5''' return jsonify(examples) if __name__ == '__main__': - port = int(os.environ.get('PORT', 8080)) + port = int(os.environ.get('PORT', 8000)) print(f""" ╔══════════════════════════════════════════════════════════╗ ║ C4A-Script Interactive Tutorial Server ║ diff --git a/tests/async_assistant/test_extract_pipeline.py b/tests/async_assistant/test_extract_pipeline.py new file mode 100644 index 00000000..719d6ea1 --- /dev/null +++ b/tests/async_assistant/test_extract_pipeline.py @@ -0,0 +1,381 @@ +""" +Test implementation of AI Assistant extract pipeline using only Crawl4AI capabilities. +This follows the exact flow discussed: query enhancement, classification, HTML skimming, +parent extraction, schema generation, and extraction. +""" + +import asyncio +import json +import os +from typing import List, Dict, Any, Optional, Union +from lxml import html as lxml_html +import re + +from crawl4ai import AsyncWebCrawler, CrawlerRunConfig +from crawl4ai.async_configs import LLMConfig +from crawl4ai import JsonCssExtractionStrategy, LLMExtractionStrategy +from crawl4ai.utils import perform_completion_with_backoff + + +async def extract_pipeline( + base_url: str, + urls: Union[str, List[str], None], + query: str, + target_json_example: Optional[str] = None, + force_llm: bool = False, + verbose: bool = True +) -> Union[Dict, List[Dict]]: + """ + Full implementation of the AI-powered extraction pipeline using only Crawl4AI. + + Pipeline: + 1. Quick crawl & HTML skimming + 2. Classification (structural vs semantic) using LLM + 3. Parent element extraction using LLM (for structural) + 4. Schema generation using Crawl4AI's generate_schema + 5. Extraction execution using Crawl4AI strategies + """ + + # Normalize URLs + if urls is None: + urls = base_url + target_urls = [urls] if isinstance(urls, str) else urls + single_result = isinstance(urls, str) or urls is None + + # LLM configs for different tasks + llm_small = LLMConfig( + provider="openai/gpt-4o-mini", + api_token=os.getenv("OPENAI_API_KEY") + ) + llm_small.temperature = 0.3 + + llm_strong = LLMConfig( + provider="openai/gpt-4o", + api_token=os.getenv("OPENAI_API_KEY") + ) + llm_strong.temperature = 0.5 + + def vprint(msg: str): + if verbose: + print(f"🔍 {msg}") + + # Step 1: Starting + vprint(f"Query: '{query}'") + + # Step 2: Quick crawl for analysis + async with AsyncWebCrawler(verbose=False) as crawler: + vprint(f"Quick crawl: {base_url}") + quick_result = await crawler.arun( + url=base_url, + config=CrawlerRunConfig( + cache_mode="bypass", + delay_before_return_html=2.0 + ) + ) + + if not quick_result.success: + raise Exception(f"Failed to crawl {base_url}") + + # Step 3: HTML Skimming using lxml + def skim_html(html: str) -> str: + """Remove non-structural elements using lxml.""" + parser = lxml_html.HTMLParser(remove_comments=True) + tree = lxml_html.fromstring(html, parser=parser) + + # Remove head section entirely + for head in tree.xpath('//head'): + head.getparent().remove(head) + + # Remove non-structural elements including SVGs + for element in tree.xpath('//script | //style | //noscript | //meta | //link | //svg'): + parent = element.getparent() + if parent is not None: + parent.remove(element) + + # Remove base64 images + for img in tree.xpath('//img[@src]'): + src = img.get('src', '') + if 'base64' in src: + img.set('src', 'BASE64_IMAGE') + + # Remove long class/id attributes + for element in tree.xpath('//*[@class or @id]'): + if element.get('class') and len(element.get('class')) > 100: + element.set('class', 'LONG_CLASS') + if element.get('id') and len(element.get('id')) > 50: + element.set('id', 'LONG_ID') + + # Truncate text nodes + for text_node in tree.xpath('//text()'): + if text_node.strip() and len(text_node) > 100: + parent = text_node.getparent() + if parent is not None: + new_text = text_node[:50] + "..." + text_node[-20:] + if text_node.is_text: + parent.text = new_text + elif text_node.is_tail: + parent.tail = new_text + + return lxml_html.tostring(tree, encoding='unicode') + + skimmed_html = skim_html(quick_result.html) + vprint(f"Skimmed HTML from {len(quick_result.html)} to {len(skimmed_html)} chars") + + # Step 4: Classification using LLM + classification = 'semantic' # Default + + if not force_llm: + classification_prompt = f""" + Analyze this HTML to determine extraction strategy. + + Query: "{query}" + + HTML sample: + <<<>> + {skimmed_html} + <<<>> + + Determine if this can be extracted using CSS/XPath patterns (structural) + or requires semantic understanding (semantic). + + Look for: + - Repeating patterns (lists, cards, tables) → structural + - Consistent HTML structure → structural + - Need for inference or understanding → semantic + + Return JSON: + {{ + "strategy": "structural" or "semantic", + "confidence": 0.0-1.0, + "reasoning": "..." + }} + """ + + response = perform_completion_with_backoff( + provider=llm_small.provider, + prompt_with_variables=classification_prompt, + api_token=llm_small.api_token, + json_response=True, + temperature=llm_small.temperature + ) + + classification_result = json.loads(response.choices[0].message.content) + classification = classification_result['strategy'] + vprint(f"Classification: {classification} (confidence: {classification_result['confidence']})") + vprint(f"Reasoning: {classification_result['reasoning']}") + + if force_llm: + classification = 'semantic' + vprint("Forced LLM extraction") + + # Step 5 & 6: Execute appropriate extraction strategy + if classification == 'structural': + # Extract parent element using LLM with proper explanation + parent_prompt = f""" + Identify the CSS selector for the BASE ELEMENT TEMPLATE containing the data to extract. + + IMPORTANT: The base element template is a repeating pattern in the HTML where each instance + contains one item of data (like a product card, article card, issue card, etc.). + + The selector should: + - Not be too specific (avoid selecting just one item) + - Not be too general (avoid selecting unrelated elements) + - Select ALL instances of the repeating pattern + - Point to the container that holds ONE complete data item + + For example: + - On Amazon: div.s-result-item (each product card) + - On GitHub issues: div[id^="issue_"] (each issue card) + - On a blog: article.post-card (each article) + + User query: "{query}" + """ + + if target_json_example: + parent_prompt += f""" + + The user expects to extract data in this format: + {target_json_example} + + Find the base element that contains all these fields. + """ + else: + parent_prompt += """ + + Also provide a JSON example of what data can be extracted from one instance of this base element. + """ + + parent_prompt += f""" + + HTML (first 8000 chars): + <<<>> + {skimmed_html} + <<<>> + + Return JSON: + {{ + "parent_selector": "css_selector_here", + "explanation": "why this selector is appropriate",""" + + if not target_json_example: + parent_prompt += """ + "suggested_json_example": { + "field1": "example value", + "field2": "example value" + }""" + + parent_prompt += """ + }} + """ + + response = perform_completion_with_backoff( + provider=llm_small.provider, + prompt_with_variables=parent_prompt, + api_token=llm_small.api_token, + json_response=True, + temperature=llm_small.temperature + ) + + parent_data = json.loads(response.choices[0].message.content) + parent_selector = parent_data['parent_selector'] + vprint(f"Parent selector: {parent_selector}") + vprint(f"Explanation: {parent_data['explanation']}") + + # Use suggested JSON example if no target provided + if not target_json_example and 'suggested_json_example' in parent_data: + target_json_example = json.dumps(parent_data['suggested_json_example']) + vprint(f"Using LLM suggested example: {target_json_example}") + + # Get the actual parent HTML for schema generation + tree = lxml_html.fromstring(quick_result.html) + parent_elements = tree.cssselect(parent_selector) + + if not parent_elements: + vprint("Parent selector not found, falling back to semantic") + classification = 'semantic' + else: + # Use the first instance as sample + sample_html = lxml_html.tostring(parent_elements[0], encoding='unicode') + vprint(f"Generating schema from sample HTML ({len(sample_html)} chars)") + + # Generate schema using Crawl4AI + schema_params = { + "html": sample_html, + "query": query, + "llm_config": llm_strong + } + + if target_json_example: + schema_params["target_json_example"] = target_json_example + + schema = JsonCssExtractionStrategy.generate_schema(**schema_params) + + vprint(f"Generated schema with {len(schema.get('fields', []))} fields") + + # Extract from all URLs + extraction_strategy = JsonCssExtractionStrategy(schema) + results = [] + + for url in target_urls: + vprint(f"Extracting from: {url}") + result = await crawler.arun( + url=url, + config=CrawlerRunConfig( + extraction_strategy=extraction_strategy, + cache_mode="bypass" + ) + ) + + if result.success and result.extracted_content: + data = json.loads(result.extracted_content) + results.append({ + 'url': url, + 'data': data, + 'count': len(data) if isinstance(data, list) else 1, + 'method': 'JsonCssExtraction', + 'schema': schema + }) + + return results[0] if single_result else results + + # Semantic extraction (LLM) + if classification == 'semantic': + vprint("Using LLM extraction") + + # Build instruction from query + instruction = f""" + {query} + + Return structured JSON data. + """ + + extraction_strategy = LLMExtractionStrategy( + llm_config=llm_strong, + instruction=instruction + ) + + results = [] + for url in target_urls: + vprint(f"LLM extracting from: {url}") + result = await crawler.arun( + url=url, + config=CrawlerRunConfig( + extraction_strategy=extraction_strategy, + cache_mode="bypass" + ) + ) + + if result.success and result.extracted_content: + data = json.loads(result.extracted_content) + results.append({ + 'url': url, + 'data': data, + 'count': len(data) if isinstance(data, list) else 1, + 'method': 'LLMExtraction' + }) + + return results[0] if single_result else results + + +async def main(): + """Test the extraction pipeline.""" + + print("\n🚀 CRAWL4AI EXTRACTION PIPELINE TEST") + print("="*50) + + # Test structural extraction + try: + result = await extract_pipeline( + base_url="https://github.com/unclecode/crawl4ai/issues", + urls=None, + query="I want to extract all issue titles, numbers, and who opened them", + verbose=True + ) + + print(f"\n✅ Success! Extracted {result.get('count', 0)} items") + print(f"Method used: {result.get('method')}") + + if result.get('data'): + print("\nFirst few items:") + data = result['data'] + items_to_show = data[:3] if isinstance(data, list) else data + print(json.dumps(items_to_show, indent=2)) + + if result.get('schema'): + print(f"\nGenerated schema fields: {[f['name'] for f in result['schema'].get('fields', [])]}") + + except Exception as e: + print(f"\n❌ Error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + # Check for API key + if not os.getenv("OPENAI_API_KEY"): + print("⚠️ Error: OPENAI_API_KEY environment variable not set") + exit(1) + + asyncio.run(main()) + + diff --git a/tests/async_assistant/test_extract_pipeline_v2.py b/tests/async_assistant/test_extract_pipeline_v2.py new file mode 100644 index 00000000..bb65df8d --- /dev/null +++ b/tests/async_assistant/test_extract_pipeline_v2.py @@ -0,0 +1,386 @@ +""" +Test implementation v2: Combined classification and preparation in one LLM call. +More efficient approach that reduces token usage and LLM calls. +""" + +import asyncio +import json +import os +from typing import List, Dict, Any, Optional, Union +from lxml import html as lxml_html +import re + +from crawl4ai import AsyncWebCrawler, CrawlerRunConfig +from crawl4ai.async_configs import LLMConfig +from crawl4ai import JsonCssExtractionStrategy, LLMExtractionStrategy +from crawl4ai.utils import perform_completion_with_backoff + + +async def extract_pipeline_v2( + base_url: str, + urls: Union[str, List[str], None], + query: str, + target_json_example: Optional[str] = None, + force_llm: bool = False, + verbose: bool = True +) -> Union[Dict, List[Dict]]: + """ + Improved extraction pipeline with combined classification and preparation. + + Pipeline: + 1. Quick crawl & HTML skimming + 2. Combined LLM call for classification + preparation + 3. Execute appropriate extraction strategy + """ + + # Normalize URLs + if urls is None: + urls = base_url + target_urls = [urls] if isinstance(urls, str) else urls + single_result = isinstance(urls, str) or urls is None + + # LLM configs + llm_small = LLMConfig( + provider="openai/gpt-4o-mini", + api_token=os.getenv("OPENAI_API_KEY") + ) + llm_small.temperature = 0.3 + + llm_strong = LLMConfig( + provider="openai/gpt-4o", + api_token=os.getenv("OPENAI_API_KEY") + ) + llm_strong.temperature = 0.5 + + def vprint(msg: str): + if verbose: + print(f"🔍 {msg}") + + vprint(f"Query: '{query}'") + if target_json_example: + vprint(f"Target format provided: {target_json_example[:100]}...") + + # Step 1: Quick crawl for analysis + async with AsyncWebCrawler(verbose=False) as crawler: + vprint(f"Quick crawl: {base_url}") + quick_result = await crawler.arun( + url=base_url, + config=CrawlerRunConfig( + cache_mode="bypass", + delay_before_return_html=2.0 + ) + ) + + if not quick_result.success: + raise Exception(f"Failed to crawl {base_url}") + + # HTML Skimming + def skim_html(html: str) -> str: + """Remove non-structural elements using lxml.""" + parser = lxml_html.HTMLParser(remove_comments=True) + tree = lxml_html.fromstring(html, parser=parser) + + # Remove head section entirely + for head in tree.xpath('//head'): + head.getparent().remove(head) + + # Remove non-structural elements including SVGs + for element in tree.xpath('//script | //style | //noscript | //meta | //link | //svg'): + parent = element.getparent() + if parent is not None: + parent.remove(element) + + # Remove base64 images + for img in tree.xpath('//img[@src]'): + src = img.get('src', '') + if 'base64' in src: + img.set('src', 'BASE64_IMAGE') + + # Remove long class/id attributes + for element in tree.xpath('//*[@class or @id]'): + if element.get('class') and len(element.get('class')) > 100: + element.set('class', 'LONG_CLASS') + if element.get('id') and len(element.get('id')) > 50: + element.set('id', 'LONG_ID') + + # Truncate text nodes + for text_node in tree.xpath('//text()'): + if text_node.strip() and len(text_node) > 100: + parent = text_node.getparent() + if parent is not None: + new_text = text_node[:50] + "..." + text_node[-20:] + if text_node.is_text: + parent.text = new_text + elif text_node.is_tail: + parent.tail = new_text + + return lxml_html.tostring(tree, encoding='unicode') + + skimmed_html = skim_html(quick_result.html) + vprint(f"Skimmed HTML from {len(quick_result.html)} to {len(skimmed_html)} chars") + + # Step 2: Combined classification and preparation + if force_llm: + classification_data = {"classification": "semantic"} + vprint("Forced LLM extraction") + else: + combined_prompt = f""" + Analyze this HTML and prepare for data extraction. + + User query: "{query}" + """ + + if target_json_example: + combined_prompt += f""" + Target format: {target_json_example} + """ + + combined_prompt += f""" + + HTML: + <<<>>> + {skimmed_html} + <<<>>> + + STEP 1: Determine extraction strategy + - If data follows repeating HTML patterns (lists, tables, cards) → "structural" + - If data requires understanding/inference → "semantic" + + STEP 2A: If STRUCTURAL extraction is appropriate: + - Find the CSS selector for the BASE ELEMENT (repeating pattern) + - Base element = container holding ONE data item (e.g., product card, table row) + - Selector should select ALL instances, not too specific, not too general + - Count approximate number of these elements + """ + + if not target_json_example: + combined_prompt += """ + - Suggest what JSON structure can be extracted from one element + """ + + combined_prompt += """ + + STEP 2B: If SEMANTIC extraction is needed: + - Write a detailed instruction for what to extract + - Be specific about the data needed + """ + + if not target_json_example: + combined_prompt += """ + - Suggest expected JSON output structure + """ + + combined_prompt += """ + + Return JSON with ONLY the relevant fields based on classification: + { + "classification": "structural" or "semantic", + "confidence": 0.0-1.0, + "reasoning": "brief explanation", + + // Include ONLY if classification is "structural": + "base_selector": "css selector", + "element_count": approximate number, + + // Include ONLY if classification is "semantic": + "extraction_instruction": "detailed instruction", + + // Include if no target_json_example was provided: + "suggested_json_example": { ... } + } + """ + + response = perform_completion_with_backoff( + provider=llm_small.provider, + prompt_with_variables=combined_prompt, + api_token=llm_small.api_token, + json_response=True, + temperature=llm_small.temperature + ) + + classification_data = json.loads(response.choices[0].message.content) + vprint(f"Classification: {classification_data['classification']} (confidence: {classification_data['confidence']})") + vprint(f"Reasoning: {classification_data['reasoning']}") + + # Use suggested JSON example if needed + if not target_json_example and 'suggested_json_example' in classification_data: + target_json_example = json.dumps(classification_data['suggested_json_example']) + vprint(f"Using suggested example: {target_json_example}") + + # Step 3: Execute extraction based on classification + if classification_data['classification'] == 'structural': + vprint(f"Base selector: {classification_data['base_selector']}") + vprint(f"Found ~{classification_data['element_count']} elements") + + # Get sample HTML for schema generation + tree = lxml_html.fromstring(quick_result.html) + parent_elements = tree.cssselect(classification_data['base_selector']) + + if not parent_elements: + vprint("Base selector not found, falling back to semantic") + classification_data['classification'] = 'semantic' + else: + # Use first element as sample + sample_html = lxml_html.tostring(parent_elements[0], encoding='unicode') + vprint(f"Generating schema from sample ({len(sample_html)} chars)") + + # Generate schema + schema_params = { + "html": sample_html, + "query": query, + "llm_config": llm_strong + } + + if target_json_example: + schema_params["target_json_example"] = target_json_example + + schema = JsonCssExtractionStrategy.generate_schema(**schema_params) + vprint(f"Generated schema with {len(schema.get('fields', []))} fields") + + # Extract from all URLs + extraction_strategy = JsonCssExtractionStrategy(schema) + results = [] + + for idx, url in enumerate(target_urls): + vprint(f"Extracting from: {url}") + + # Use already crawled HTML for base_url, crawl others + if idx == 0 and url == base_url: + # We already have this HTML, use raw:// to avoid re-crawling + raw_url = f"raw://{quick_result.html}" + vprint("Using cached HTML with raw:// scheme") + else: + # Need to crawl this URL + raw_url = url + + result = await crawler.arun( + url=raw_url, + config=CrawlerRunConfig( + extraction_strategy=extraction_strategy, + cache_mode="bypass" + ) + ) + + if result.success and result.extracted_content: + data = json.loads(result.extracted_content) + results.append({ + 'url': url, # Keep original URL for reference + 'data': data, + 'count': len(data) if isinstance(data, list) else 1, + 'method': 'JsonCssExtraction', + 'schema': schema + }) + + return results[0] if single_result else results + + # Semantic extraction + if classification_data['classification'] == 'semantic': + vprint("Using LLM extraction") + + # Use generated instruction or create simple one + if 'extraction_instruction' in classification_data: + instruction = classification_data['extraction_instruction'] + vprint(f"Generated instruction: {instruction[:100]}...") + else: + instruction = f"{query}\n\nReturn structured JSON data." + + extraction_strategy = LLMExtractionStrategy( + llm_config=llm_strong, + instruction=instruction + ) + + results = [] + for idx, url in enumerate(target_urls): + vprint(f"LLM extracting from: {url}") + + # Use already crawled HTML for base_url, crawl others + if idx == 0 and url == base_url: + # We already have this HTML, use raw:// to avoid re-crawling + raw_url = f"raw://{quick_result.html}" + vprint("Using cached HTML with raw:// scheme") + else: + # Need to crawl this URL + raw_url = url + + result = await crawler.arun( + url=raw_url, + config=CrawlerRunConfig( + extraction_strategy=extraction_strategy, + cache_mode="bypass" + ) + ) + + if result.success and result.extracted_content: + data = json.loads(result.extracted_content) + results.append({ + 'url': url, # Keep original URL for reference + 'data': data, + 'count': len(data) if isinstance(data, list) else 1, + 'method': 'LLMExtraction' + }) + + return results[0] if single_result else results + + +async def main(): + """Test the improved extraction pipeline.""" + + print("\n🚀 CRAWL4AI EXTRACTION PIPELINE V2 TEST") + print("="*50) + + try: + # Test 1: Structural extraction (GitHub issues) + print("\nTest 1: GitHub Issues (should use structural)") + result = await extract_pipeline_v2( + base_url="https://github.com/unclecode/crawl4ai/issues", + urls=None, + query="Extract all issue titles, numbers, and authors", + verbose=True + ) + + print(f"\n✅ Extracted {result.get('count', 0)} items using {result.get('method')}") + if result.get('data'): + print("Sample:", json.dumps(result['data'][:2] if isinstance(result['data'], list) else result['data'], indent=2)) + + # Test 2: With target JSON example + print("\n\nTest 2: With target JSON example") + target_example = json.dumps({ + "title": "Issue title here", + "number": "#123", + "author": "username" + }) + + result2 = await extract_pipeline_v2( + base_url="https://github.com/unclecode/crawl4ai/issues", + urls=None, + query="Extract GitHub issues", + target_json_example=target_example, + verbose=True + ) + + print(f"\n✅ Extracted {result2.get('count', 0)} items") + + # Test 3: Semantic extraction (force LLM) + print("\n\nTest 3: Force semantic extraction") + result3 = await extract_pipeline_v2( + base_url="https://en.wikipedia.org/wiki/Artificial_intelligence", + urls=None, + query="Extract key concepts and their relationships in AI field", + force_llm=True, + verbose=True + ) + + print(f"\n✅ Extracted using {result3.get('method')}") + + except Exception as e: + print(f"\n❌ Error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + if not os.getenv("OPENAI_API_KEY"): + print("⚠️ Error: OPENAI_API_KEY environment variable not set") + exit(1) + + asyncio.run(main()) \ No newline at end of file