|
|
""" |
|
|
Quick test script for SPARKNET FastAPI backend |
|
|
Tests all major endpoints with a sample patent. |
|
|
""" |
|
|
|
|
|
import requests |
|
|
import json |
|
|
import time |
|
|
from pathlib import Path |
|
|
from rich.console import Console |
|
|
from rich.table import Table |
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn |
|
|
|
|
|
console = Console() |
|
|
|
|
|
API_BASE = "http://localhost:8000" |
|
|
|
|
|
def test_health(): |
|
|
"""Test health endpoint""" |
|
|
console.print("\n[bold blue]1. Testing Health Endpoint[/bold blue]") |
|
|
|
|
|
response = requests.get(f"{API_BASE}/api/health") |
|
|
data = response.json() |
|
|
|
|
|
console.print(f"Status: [green]{data['status']}[/green]") |
|
|
console.print(f"Active Workflows: {data['statistics']['active_workflows']}") |
|
|
console.print(f"Processed Patents: {data['statistics']['processed_patents']}") |
|
|
|
|
|
return response.status_code == 200 |
|
|
|
|
|
def test_upload(patent_path): |
|
|
"""Test patent upload""" |
|
|
console.print("\n[bold blue]2. Testing Patent Upload[/bold blue]") |
|
|
|
|
|
if not Path(patent_path).exists(): |
|
|
console.print(f"[red]Patent file not found: {patent_path}[/red]") |
|
|
console.print("[yellow]Using mock upload test (no actual file)[/yellow]") |
|
|
return None |
|
|
|
|
|
with open(patent_path, 'rb') as f: |
|
|
files = {'file': (Path(patent_path).name, f, 'application/pdf')} |
|
|
response = requests.post(f"{API_BASE}/api/patents/upload", files=files) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
console.print(f"[green]✓[/green] Patent uploaded successfully") |
|
|
console.print(f"Patent ID: {data['patent_id']}") |
|
|
console.print(f"Filename: {data['filename']}") |
|
|
console.print(f"Size: {data['size']} bytes") |
|
|
return data['patent_id'] |
|
|
else: |
|
|
console.print(f"[red]✗[/red] Upload failed: {response.text}") |
|
|
return None |
|
|
|
|
|
def test_workflow(patent_id): |
|
|
"""Test workflow execution""" |
|
|
console.print("\n[bold blue]3. Testing Workflow Execution[/bold blue]") |
|
|
|
|
|
payload = {"patent_id": patent_id, "scenario": "patent_wakeup"} |
|
|
response = requests.post( |
|
|
f"{API_BASE}/api/workflows/execute", |
|
|
json=payload |
|
|
) |
|
|
|
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
console.print(f"[green]✓[/green] Workflow started") |
|
|
console.print(f"Workflow ID: {data['workflow_id']}") |
|
|
return data['workflow_id'] |
|
|
else: |
|
|
console.print(f"[red]✗[/red] Workflow start failed: {response.text}") |
|
|
return None |
|
|
|
|
|
def monitor_workflow(workflow_id): |
|
|
"""Monitor workflow progress""" |
|
|
console.print("\n[bold blue]4. Monitoring Workflow Progress[/bold blue]") |
|
|
|
|
|
with Progress( |
|
|
SpinnerColumn(), |
|
|
TextColumn("[progress.description]{task.description}"), |
|
|
console=console |
|
|
) as progress: |
|
|
|
|
|
task = progress.add_task("Processing workflow...", total=100) |
|
|
|
|
|
while True: |
|
|
response = requests.get(f"{API_BASE}/api/workflows/{workflow_id}") |
|
|
|
|
|
if response.status_code != 200: |
|
|
console.print("[red]Failed to get workflow status[/red]") |
|
|
break |
|
|
|
|
|
data = response.json() |
|
|
status = data['status'] |
|
|
prog = data.get('progress', 0) |
|
|
current_step = data.get('current_step', 'initializing') |
|
|
|
|
|
progress.update(task, completed=prog, description=f"Step: {current_step}") |
|
|
|
|
|
if status in ['completed', 'failed']: |
|
|
break |
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
|
|
|
response = requests.get(f"{API_BASE}/api/workflows/{workflow_id}") |
|
|
data = response.json() |
|
|
|
|
|
if data['status'] == 'completed': |
|
|
console.print(f"\n[green]✓ Workflow completed successfully![/green]") |
|
|
display_results(data['result']) |
|
|
else: |
|
|
console.print(f"\n[red]✗ Workflow failed: {data.get('error', 'Unknown error')}[/red]") |
|
|
|
|
|
def display_results(result): |
|
|
"""Display workflow results""" |
|
|
console.print("\n[bold]Analysis Results:[/bold]\n") |
|
|
|
|
|
|
|
|
console.print(f"Quality Score: [blue]{result.get('quality_score', 0):.2f}[/blue]") |
|
|
|
|
|
|
|
|
doc_analysis = result.get('document_analysis', {}) |
|
|
if doc_analysis: |
|
|
console.print(f"\n[bold]Patent Analysis:[/bold]") |
|
|
console.print(f" TRL Level: {doc_analysis.get('trl_level', 'N/A')}/9") |
|
|
console.print(f" Key Innovations: {len(doc_analysis.get('key_innovations', []))}") |
|
|
|
|
|
|
|
|
market_analysis = result.get('market_analysis', {}) |
|
|
if market_analysis: |
|
|
opportunities = market_analysis.get('opportunities', []) |
|
|
console.print(f"\n[bold]Market Opportunities:[/bold]") |
|
|
console.print(f" Found: {len(opportunities)} opportunities") |
|
|
|
|
|
if opportunities: |
|
|
table = Table(show_header=True) |
|
|
table.add_column("Sector", style="cyan") |
|
|
table.add_column("Market Size", justify="right") |
|
|
table.add_column("Growth", justify="right") |
|
|
table.add_column("Fit", style="green") |
|
|
|
|
|
for opp in opportunities[:5]: |
|
|
table.add_row( |
|
|
opp.get('sector', '')[:30], |
|
|
f"${opp.get('market_size_usd', 0)/1e9:.1f}B", |
|
|
f"{opp.get('growth_rate_percent', 0)}%", |
|
|
opp.get('technology_fit', '') |
|
|
) |
|
|
|
|
|
console.print(table) |
|
|
|
|
|
|
|
|
matches = result.get('matches', []) |
|
|
if matches: |
|
|
console.print(f"\n[bold]Stakeholder Matches:[/bold]") |
|
|
console.print(f" Found: {len(matches)} potential partners") |
|
|
|
|
|
table = Table(show_header=True) |
|
|
table.add_column("Partner", style="cyan") |
|
|
table.add_column("Type") |
|
|
table.add_column("Location") |
|
|
table.add_column("Fit Score", justify="right", style="green") |
|
|
|
|
|
for match in matches[:5]: |
|
|
table.add_row( |
|
|
match.get('stakeholder_name', '')[:30], |
|
|
match.get('stakeholder_type', ''), |
|
|
match.get('location', ''), |
|
|
f"{match.get('overall_fit_score', 0)*100:.0f}%" |
|
|
) |
|
|
|
|
|
console.print(table) |
|
|
|
|
|
|
|
|
brief = result.get('brief', {}) |
|
|
if brief: |
|
|
console.print(f"\n[bold]Valorization Brief:[/bold]") |
|
|
console.print(f" PDF: {brief.get('pdf_path', 'Not generated')}") |
|
|
|
|
|
def test_list_endpoints(): |
|
|
"""Test list endpoints""" |
|
|
console.print("\n[bold blue]5. Testing List Endpoints[/bold blue]") |
|
|
|
|
|
|
|
|
response = requests.get(f"{API_BASE}/api/patents/") |
|
|
if response.status_code == 200: |
|
|
patents = response.json() |
|
|
console.print(f"[green]✓[/green] Found {len(patents)} patents") |
|
|
|
|
|
|
|
|
response = requests.get(f"{API_BASE}/api/workflows/") |
|
|
if response.status_code == 200: |
|
|
workflows = response.json() |
|
|
console.print(f"[green]✓[/green] Found {len(workflows)} workflows") |
|
|
|
|
|
def main(): |
|
|
"""Main test function""" |
|
|
console.print("\n[bold cyan]SPARKNET API Test Suite[/bold cyan]\n") |
|
|
|
|
|
try: |
|
|
|
|
|
if not test_health(): |
|
|
console.print("[red]Health check failed - is the API running?[/red]") |
|
|
console.print("Start with: [yellow]python -m api.main[/yellow]") |
|
|
return |
|
|
|
|
|
|
|
|
dataset_dir = Path("Dataset") |
|
|
test_patents = list(dataset_dir.glob("*.pdf")) if dataset_dir.exists() else [] |
|
|
|
|
|
if not test_patents: |
|
|
console.print("\n[yellow]No patents found in Dataset/ directory[/yellow]") |
|
|
console.print("Skipping upload and workflow tests") |
|
|
return |
|
|
|
|
|
|
|
|
test_patent = test_patents[0] |
|
|
console.print(f"\nUsing test patent: [cyan]{test_patent.name}[/cyan]") |
|
|
|
|
|
|
|
|
patent_id = test_upload(str(test_patent)) |
|
|
if not patent_id: |
|
|
return |
|
|
|
|
|
|
|
|
workflow_id = test_workflow(patent_id) |
|
|
if not workflow_id: |
|
|
return |
|
|
|
|
|
|
|
|
monitor_workflow(workflow_id) |
|
|
|
|
|
|
|
|
test_list_endpoints() |
|
|
|
|
|
console.print("\n[bold green]✓ All tests completed successfully![/bold green]\n") |
|
|
|
|
|
except requests.exceptions.ConnectionError: |
|
|
console.print("\n[red]✗ Cannot connect to API[/red]") |
|
|
console.print("Make sure the API is running: [yellow]python -m api.main[/yellow]") |
|
|
except Exception as e: |
|
|
console.print(f"\n[red]✗ Test failed: {e}[/red]") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|