| | """ |
| | Lineage Graph Accelerator - Hugging Face Space |
| | A Gradio-based AI agent for extracting and visualizing data lineage from various sources. |
| | |
| | Built for the Gradio Agents & MCP Hackathon - Winter 2025 |
| | """ |
| |
|
| | import gradio as gr |
| | import json |
| | import os |
| | import requests |
| | from typing import Optional, Tuple, Dict, Any, List |
| | from datetime import datetime |
| |
|
| | |
| | try: |
| | from exporters import ( |
| | LineageGraph, LineageNode, LineageEdge, |
| | OpenLineageExporter, CollibraExporter, PurviewExporter, AlationExporter, AtlasExporter |
| | ) |
| | EXPORTERS_AVAILABLE = True |
| | except ImportError: |
| | EXPORTERS_AVAILABLE = False |
| |
|
| | |
| | try: |
| | import google.generativeai as genai |
| | GEMINI_AVAILABLE = True |
| | except ImportError: |
| | GEMINI_AVAILABLE = False |
| |
|
| | |
| | |
| | |
| |
|
| | SAMPLE_FILES = { |
| | "simple": "sample_metadata.json", |
| | "dbt": "dbt_manifest_sample.json", |
| | "airflow": "airflow_dag_sample.json", |
| | "sql": "sql_ddl_sample.sql", |
| | "warehouse": "warehouse_lineage_sample.json", |
| | "etl": "etl_pipeline_sample.json", |
| | "complex": "complex_lineage_demo.json", |
| | "api": "sample_api_metadata.json", |
| | "bigquery": "sample_bigquery.sql" |
| | } |
| |
|
| | EXPORT_FORMATS = ["OpenLineage", "Collibra", "Purview", "Alation", "Atlas"] |
| |
|
| | |
| | MCP_PRESETS = { |
| | "local_demo": { |
| | "name": "Local Demo MCP (Built-in)", |
| | "url": "local://demo", |
| | "schema_url": "local://demo/schema", |
| | "description": "Built-in demo MCP server that provides sample lineage metadata for testing", |
| | "tools": ["get_sample_lineage", "get_dbt_metadata", "get_airflow_dag", "get_warehouse_schema"] |
| | }, |
| | "mcp_tools": { |
| | "name": "MCP Tools by abidlabs", |
| | "url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/sse", |
| | "schema_url": "https://abidlabs-mcp-tools.hf.space/gradio_api/mcp/schema", |
| | "description": "Demo MCP server with utility tools for testing integration", |
| | "tools": ["prime_factors", "generate_cheetah_image", "image_orientation"] |
| | }, |
| | "huggingface_mcp": { |
| | "name": "HuggingFace MCP by dylanebert", |
| | "url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/sse", |
| | "schema_url": "https://dylanebert-huggingface-mcp.hf.space/gradio_api/mcp/schema", |
| | "description": "Search and explore HuggingFace models, datasets, and spaces", |
| | "tools": ["search_models", "search_datasets", "get_model_card"] |
| | }, |
| | "ragmint": { |
| | "name": "Ragmint RAG Pipeline", |
| | "url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/sse", |
| | "schema_url": "https://mcp-1st-birthday-ragmint-mcp-server.hf.space/gradio_api/mcp/schema", |
| | "description": "RAG pipeline optimization and document retrieval", |
| | "tools": ["optimize_rag", "retrieve_documents"] |
| | }, |
| | "web_search": { |
| | "name": "Web Search MCP", |
| | "url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/sse", |
| | "schema_url": "https://agents-mcp-hackathon-search-web-mcp-server.hf.space/gradio_api/mcp/schema", |
| | "description": "Search the web for data and documentation", |
| | "tools": ["search_web", "fetch_page"] |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | LOCAL_MCP_METADATA = { |
| | "ecommerce_pipeline": { |
| | "name": "E-commerce Data Pipeline", |
| | "nodes": [ |
| | {"id": "raw_orders", "type": "source", "name": "Raw Orders (PostgreSQL)"}, |
| | {"id": "raw_customers", "type": "source", "name": "Raw Customers (PostgreSQL)"}, |
| | {"id": "raw_products", "type": "source", "name": "Raw Products (API)"}, |
| | {"id": "stg_orders", "type": "model", "name": "Staging Orders"}, |
| | {"id": "stg_customers", "type": "model", "name": "Staging Customers"}, |
| | {"id": "stg_products", "type": "model", "name": "Staging Products"}, |
| | {"id": "dim_customers", "type": "dimension", "name": "Dim Customers"}, |
| | {"id": "dim_products", "type": "dimension", "name": "Dim Products"}, |
| | {"id": "fact_orders", "type": "fact", "name": "Fact Orders"}, |
| | {"id": "mart_sales", "type": "table", "name": "Sales Mart"}, |
| | {"id": "report_daily", "type": "report", "name": "Daily Sales Report"} |
| | ], |
| | "edges": [ |
| | {"from": "raw_orders", "to": "stg_orders"}, |
| | {"from": "raw_customers", "to": "stg_customers"}, |
| | {"from": "raw_products", "to": "stg_products"}, |
| | {"from": "stg_customers", "to": "dim_customers"}, |
| | {"from": "stg_products", "to": "dim_products"}, |
| | {"from": "stg_orders", "to": "fact_orders"}, |
| | {"from": "dim_customers", "to": "fact_orders"}, |
| | {"from": "dim_products", "to": "fact_orders"}, |
| | {"from": "fact_orders", "to": "mart_sales"}, |
| | {"from": "mart_sales", "to": "report_daily"} |
| | ] |
| | }, |
| | "ml_pipeline": { |
| | "name": "ML Feature Pipeline", |
| | "nodes": [ |
| | {"id": "raw_events", "type": "source", "name": "Event Stream (Kafka)"}, |
| | {"id": "raw_user_data", "type": "source", "name": "User Data (S3)"}, |
| | {"id": "feature_eng", "type": "model", "name": "Feature Engineering"}, |
| | {"id": "feature_store", "type": "table", "name": "Feature Store"}, |
| | {"id": "training_data", "type": "table", "name": "Training Dataset"}, |
| | {"id": "model_output", "type": "destination", "name": "Model Predictions"} |
| | ], |
| | "edges": [ |
| | {"from": "raw_events", "to": "feature_eng"}, |
| | {"from": "raw_user_data", "to": "feature_eng"}, |
| | {"from": "feature_eng", "to": "feature_store"}, |
| | {"from": "feature_store", "to": "training_data"}, |
| | {"from": "training_data", "to": "model_output"} |
| | ] |
| | }, |
| | "data_warehouse": { |
| | "name": "Data Warehouse Schema", |
| | "nodes": [ |
| | {"id": "src_crm", "type": "source", "name": "CRM System"}, |
| | {"id": "src_erp", "type": "source", "name": "ERP System"}, |
| | {"id": "src_web", "type": "source", "name": "Web Analytics"}, |
| | {"id": "landing_crm", "type": "table", "name": "Landing CRM"}, |
| | {"id": "landing_erp", "type": "table", "name": "Landing ERP"}, |
| | {"id": "landing_web", "type": "table", "name": "Landing Web"}, |
| | {"id": "dwh_customers", "type": "dimension", "name": "DWH Customers"}, |
| | {"id": "dwh_transactions", "type": "fact", "name": "DWH Transactions"}, |
| | {"id": "bi_dashboard", "type": "report", "name": "BI Dashboard"} |
| | ], |
| | "edges": [ |
| | {"from": "src_crm", "to": "landing_crm"}, |
| | {"from": "src_erp", "to": "landing_erp"}, |
| | {"from": "src_web", "to": "landing_web"}, |
| | {"from": "landing_crm", "to": "dwh_customers"}, |
| | {"from": "landing_erp", "to": "dwh_transactions"}, |
| | {"from": "landing_web", "to": "dwh_transactions"}, |
| | {"from": "dwh_customers", "to": "dwh_transactions"}, |
| | {"from": "dwh_transactions", "to": "bi_dashboard"} |
| | ] |
| | } |
| | } |
| |
|
| |
|
| | def local_mcp_get_metadata(tool_name: str, query: str = "") -> Dict[str, Any]: |
| | """Simulate a local MCP server that returns sample metadata.""" |
| | if tool_name == "get_sample_lineage" or tool_name == "search": |
| | |
| | if "ecommerce" in query.lower() or "sales" in query.lower(): |
| | return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
| | elif "ml" in query.lower() or "feature" in query.lower(): |
| | return LOCAL_MCP_METADATA["ml_pipeline"] |
| | elif "warehouse" in query.lower() or "dwh" in query.lower(): |
| | return LOCAL_MCP_METADATA["data_warehouse"] |
| | else: |
| | |
| | return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
| | elif tool_name == "get_dbt_metadata": |
| | return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
| | elif tool_name == "get_airflow_dag": |
| | return LOCAL_MCP_METADATA["ml_pipeline"] |
| | elif tool_name == "get_warehouse_schema": |
| | return LOCAL_MCP_METADATA["data_warehouse"] |
| | elif tool_name == "list_datasets": |
| | return {"datasets": list(LOCAL_MCP_METADATA.keys())} |
| | else: |
| | return LOCAL_MCP_METADATA["ecommerce_pipeline"] |
| |
|
| |
|
| | def is_local_mcp(url: str) -> bool: |
| | """Check if the URL is for the local demo MCP server.""" |
| | return url and url.startswith("local://") |
| |
|
| |
|
| | def call_local_mcp(tool_name: str, query: str = "") -> Tuple[str, str]: |
| | """Call the local MCP server and return metadata as JSON string.""" |
| | metadata = local_mcp_get_metadata(tool_name, query) |
| | return json.dumps(metadata, indent=2), f"Fetched '{metadata.get('name', 'lineage')}' from Local Demo MCP" |
| |
|
| | |
| | |
| | |
| |
|
| | import base64 |
| | import urllib.parse |
| |
|
| | def render_mermaid(viz_code: str) -> str: |
| | """Render mermaid diagram using mermaid.ink service (renders as SVG image).""" |
| | |
| | |
| | encoded = base64.urlsafe_b64encode(viz_code.encode('utf-8')).decode('utf-8') |
| |
|
| | |
| | img_url = f"https://mermaid.ink/svg/{encoded}" |
| |
|
| | |
| | png_url = f"https://mermaid.ink/img/{encoded}" |
| |
|
| | |
| | editor_url = f"https://mermaid.live/edit#base64:{base64.b64encode(viz_code.encode('utf-8')).decode('utf-8')}" |
| |
|
| | html = f''' |
| | <div style="background: white; padding: 20px; border-radius: 8px; min-height: 200px;"> |
| | <div style="overflow: auto; max-height: 500px; border: 1px solid #e0e0e0; border-radius: 4px; padding: 10px;"> |
| | <img id="lineage-graph" src="{img_url}" alt="Lineage Graph" style="max-width: 100%; height: auto; cursor: zoom-in;" onclick="this.style.maxWidth = this.style.maxWidth === 'none' ? '100%' : 'none'; this.style.cursor = this.style.cursor === 'zoom-in' ? 'zoom-out' : 'zoom-in';" /> |
| | </div> |
| | <div style="margin-top: 12px; display: flex; gap: 16px; flex-wrap: wrap; align-items: center;"> |
| | <a href="{editor_url}" target="_blank" style="color: #7c3aed; text-decoration: none; font-size: 13px;"> |
| | Edit in Mermaid Live |
| | </a> |
| | <a href="{png_url}" download="lineage_graph.png" style="color: #2563eb; text-decoration: none; font-size: 13px;"> |
| | Download PNG |
| | </a> |
| | <a href="{img_url}" download="lineage_graph.svg" style="color: #059669; text-decoration: none; font-size: 13px;"> |
| | Download SVG |
| | </a> |
| | <span style="color: #888; font-size: 12px; margin-left: auto;">Click graph to zoom</span> |
| | </div> |
| | </div> |
| | ''' |
| | return html |
| |
|
| |
|
| | def render_mermaid_code(viz_code: str) -> str: |
| | """Return the raw mermaid code for display.""" |
| | return viz_code |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def parse_metadata_to_graph(metadata_text: str, source_type: str) -> Tuple[LineageGraph, str]: |
| | """Parse metadata text into a LineageGraph structure.""" |
| | try: |
| | |
| | if metadata_text.strip().startswith('{') or metadata_text.strip().startswith('['): |
| | data = json.loads(metadata_text) |
| | else: |
| | |
| | data = {"raw_content": metadata_text, "source_type": source_type} |
| |
|
| | graph = LineageGraph(name=f"Lineage from {source_type}") |
| |
|
| | |
| | if "lineage_graph" in data: |
| | |
| | lg = data["lineage_graph"] |
| | for node_data in lg.get("nodes", []): |
| | node = LineageNode( |
| | id=node_data.get("id"), |
| | name=node_data.get("name"), |
| | type=node_data.get("type", "table"), |
| | category=node_data.get("category"), |
| | description=node_data.get("description"), |
| | metadata=node_data.get("metadata"), |
| | tags=node_data.get("tags") |
| | ) |
| | graph.add_node(node) |
| | for edge_data in lg.get("edges", []): |
| | edge = LineageEdge( |
| | source=edge_data.get("from"), |
| | target=edge_data.get("to"), |
| | type=edge_data.get("type", "transform") |
| | ) |
| | graph.add_edge(edge) |
| |
|
| | elif "nodes" in data and "edges" in data: |
| | |
| | for node_data in data.get("nodes", []): |
| | node = LineageNode( |
| | id=node_data.get("id"), |
| | name=node_data.get("name", node_data.get("id")), |
| | type=node_data.get("type", "table") |
| | ) |
| | graph.add_node(node) |
| | for edge_data in data.get("edges", []): |
| | edge = LineageEdge( |
| | source=edge_data.get("from"), |
| | target=edge_data.get("to"), |
| | type=edge_data.get("type", "transform") |
| | ) |
| | graph.add_edge(edge) |
| |
|
| | elif "nodes" in data: |
| | |
| | for node_id, node_data in data.get("nodes", {}).items(): |
| | node = LineageNode( |
| | id=node_id, |
| | name=node_data.get("name", node_id.split(".")[-1]), |
| | type=node_data.get("resource_type", "model"), |
| | schema=node_data.get("schema"), |
| | database=node_data.get("database"), |
| | description=node_data.get("description") |
| | ) |
| | graph.add_node(node) |
| | |
| | deps = node_data.get("depends_on", {}).get("nodes", []) |
| | for dep in deps: |
| | edge = LineageEdge(source=dep, target=node_id, type="transform") |
| | graph.add_edge(edge) |
| |
|
| | elif "tasks" in data: |
| | |
| | for task in data.get("tasks", []): |
| | node = LineageNode( |
| | id=task.get("task_id"), |
| | name=task.get("task_id"), |
| | type="task", |
| | description=task.get("description") |
| | ) |
| | graph.add_node(node) |
| | |
| | for dep in task.get("upstream_dependencies", []): |
| | edge = LineageEdge(source=dep, target=task.get("task_id"), type="dependency") |
| | graph.add_edge(edge) |
| |
|
| | elif "lineage" in data: |
| | |
| | lineage = data.get("lineage", {}) |
| | for dataset in lineage.get("datasets", []): |
| | node = LineageNode( |
| | id=dataset.get("id"), |
| | name=dataset.get("name", dataset.get("id")), |
| | type=dataset.get("type", "table"), |
| | schema=dataset.get("schema"), |
| | database=dataset.get("database"), |
| | description=dataset.get("description"), |
| | owner=dataset.get("owner"), |
| | tags=dataset.get("tags") |
| | ) |
| | graph.add_node(node) |
| | for rel in lineage.get("relationships", []): |
| | edge = LineageEdge( |
| | source=rel.get("source"), |
| | target=rel.get("target"), |
| | type=rel.get("type", "transform"), |
| | job_name=rel.get("job") |
| | ) |
| | graph.add_edge(edge) |
| |
|
| | elif "stages" in data: |
| | |
| | for stage in data.get("stages", []): |
| | for step in stage.get("steps", []): |
| | node = LineageNode( |
| | id=step.get("id"), |
| | name=step.get("name", step.get("id")), |
| | type="step", |
| | category=stage.get("id"), |
| | description=step.get("description") or step.get("logic") |
| | ) |
| | graph.add_node(node) |
| | |
| | for inp in step.get("inputs", []): |
| | edge = LineageEdge(source=inp, target=step.get("id"), type="transform") |
| | graph.add_edge(edge) |
| | else: |
| | |
| | graph.add_node(LineageNode(id="source", name="Source", type="source")) |
| | graph.add_node(LineageNode(id="target", name="Target", type="table")) |
| | graph.add_edge(LineageEdge(source="source", target="target", type="transform")) |
| |
|
| | summary = f"Parsed {len(graph.nodes)} nodes and {len(graph.edges)} relationships from {source_type}" |
| | return graph, summary |
| |
|
| | except json.JSONDecodeError as e: |
| | |
| | graph = LineageGraph(name=f"Lineage from {source_type}") |
| | graph.add_node(LineageNode(id="input", name="Input Data", type="source")) |
| | graph.add_node(LineageNode(id="output", name="Output Data", type="table")) |
| | graph.add_edge(LineageEdge(source="input", target="output", type="transform")) |
| | return graph, f"Created placeholder lineage (could not parse as JSON: {str(e)[:50]})" |
| | except Exception as e: |
| | graph = LineageGraph(name="Error") |
| | return graph, f"Error parsing metadata: {str(e)}" |
| |
|
| |
|
| | def sanitize_mermaid_text(text: str) -> str: |
| | """Sanitize text for use in Mermaid diagrams by escaping special characters.""" |
| | if not text: |
| | return "Unknown" |
| | |
| | |
| | text = text.replace("(", " - ").replace(")", "") |
| | text = text.replace("[", " ").replace("]", " ") |
| | text = text.replace("{", " ").replace("}", " ") |
| | text = text.replace('"', "'") |
| | text = text.replace("<", "").replace(">", "") |
| | text = text.replace("#", "") |
| | return text.strip() |
| |
|
| |
|
| | def generate_mermaid_from_graph(graph: LineageGraph) -> str: |
| | """Generate Mermaid diagram code from a LineageGraph.""" |
| | if not graph.nodes: |
| | return "graph TD\n A[No data to display]" |
| |
|
| | lines = [ |
| | "%%{init: {'theme': 'base', 'themeVariables': {'fontSize': '12px', 'fontFamily': 'arial', 'primaryColor': '#e8f5e9', 'primaryBorderColor': '#4caf50', 'lineColor': '#666'}}}%%", |
| | "graph TD" |
| | ] |
| |
|
| | |
| | categories = {} |
| | for node in graph.nodes: |
| | cat = node.category or "default" |
| | if cat not in categories: |
| | categories[cat] = [] |
| | categories[cat].append(node) |
| |
|
| | |
| | node_styles = { |
| | "source": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5", |
| | "external_api": "fill:#e3f2fd,stroke:#1976d2,stroke-width:1px,rx:5,ry:5", |
| | "table": "fill:#e8f5e9,stroke:#388e3c,stroke-width:1px,rx:5,ry:5", |
| | "view": "fill:#f3e5f5,stroke:#7b1fa2,stroke-width:1px,rx:5,ry:5", |
| | "model": "fill:#fff3e0,stroke:#f57c00,stroke-width:1px,rx:5,ry:5", |
| | "report": "fill:#fce4ec,stroke:#c2185b,stroke-width:1px,rx:5,ry:5", |
| | "dimension": "fill:#e0f7fa,stroke:#0097a7,stroke-width:1px,rx:5,ry:5", |
| | "fact": "fill:#fff8e1,stroke:#ffa000,stroke-width:1px,rx:5,ry:5", |
| | "destination": "fill:#ffebee,stroke:#d32f2f,stroke-width:1px,rx:5,ry:5", |
| | "task": "fill:#fafafa,stroke:#616161,stroke-width:1px,rx:5,ry:5" |
| | } |
| |
|
| | |
| | if len(categories) > 1: |
| | for cat, nodes in categories.items(): |
| | if cat != "default": |
| | lines.append(f" subgraph {sanitize_mermaid_text(cat.replace('_', ' ').title())}") |
| | for node in nodes: |
| | safe_name = sanitize_mermaid_text(node.name) |
| | shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" |
| | lines.append(f" {node.id}{shape}") |
| | lines.append(" end") |
| | else: |
| | for node in nodes: |
| | safe_name = sanitize_mermaid_text(node.name) |
| | shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" |
| | lines.append(f" {node.id}{shape}") |
| | else: |
| | for node in graph.nodes: |
| | safe_name = sanitize_mermaid_text(node.name) |
| | shape = f"[{safe_name}]" if node.type in ["table", "model"] else f"({safe_name})" |
| | lines.append(f" {node.id}{shape}") |
| |
|
| | |
| | edge_labels = { |
| | "transform": "-->", |
| | "reference": "-.->", |
| | "ingest": "-->", |
| | "export": "-->", |
| | "join": "-->", |
| | "aggregate": "-->", |
| | "dependency": "-->" |
| | } |
| |
|
| | for edge in graph.edges: |
| | arrow = edge_labels.get(edge.type, "-->") |
| | if edge.type and edge.type not in ["transform", "dependency"]: |
| | lines.append(f" {edge.source} {arrow}|{edge.type}| {edge.target}") |
| | else: |
| | lines.append(f" {edge.source} {arrow} {edge.target}") |
| |
|
| | |
| | for node in graph.nodes: |
| | style = node_styles.get(node.type, "fill:#f5f5f5") |
| | lines.append(f" style {node.id} {style}") |
| |
|
| | return "\n".join(lines) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def send_to_mcp(server_url: str, api_key: str, metadata_text: str, source_type: str, viz_format: str) -> Tuple[str, str]: |
| | """Send metadata to an external MCP server and return visualization + summary.""" |
| | if not server_url: |
| | return "", "No MCP server URL configured." |
| | try: |
| | payload = { |
| | "metadata": metadata_text, |
| | "source_type": source_type, |
| | "viz_format": viz_format, |
| | } |
| | headers = {"Content-Type": "application/json"} |
| | if api_key: |
| | headers["Authorization"] = f"Bearer {api_key}" |
| | resp = requests.post(server_url, json=payload, headers=headers, timeout=30) |
| | if 200 <= resp.status_code < 300: |
| | data = resp.json() |
| | viz = data.get("visualization") or data.get("viz") or data.get("mermaid", "") |
| | summary = data.get("summary", "Processed by MCP server.") |
| | if viz: |
| | return render_mermaid(viz), summary |
| | return "", summary |
| | else: |
| | return "", f"MCP server returned status {resp.status_code}: {resp.text[:200]}" |
| | except Exception as e: |
| | return "", f"Error contacting MCP server: {e}" |
| |
|
| |
|
| | def test_mcp_connection(server_url: str, api_key: str) -> str: |
| | """Health-check to MCP server by fetching schema.""" |
| | if not server_url: |
| | return "No MCP server URL configured." |
| |
|
| | |
| | if is_local_mcp(server_url): |
| | tools = MCP_PRESETS.get("local_demo", {}).get("tools", []) |
| | return f"Local Demo MCP ready! {len(tools)} tools available: {', '.join(tools)}" |
| |
|
| | try: |
| | headers = {} |
| | if api_key: |
| | headers["Authorization"] = f"Bearer {api_key}" |
| |
|
| | |
| | schema_url = server_url.replace("/sse", "/schema").replace("/mcp/mcp", "/mcp") |
| | try: |
| | resp = requests.get(schema_url, headers=headers, timeout=15) |
| | if resp.status_code == 200: |
| | try: |
| | schema = resp.json() |
| | tool_count = len(schema) if isinstance(schema, dict) else 0 |
| | return f"Connected! Found {tool_count} tools available." |
| | except: |
| | return f"Connected to MCP server: {resp.status_code} OK" |
| | except requests.exceptions.RequestException: |
| | pass |
| |
|
| | |
| | base_url = server_url.replace("/gradio_api/mcp/sse", "") |
| | try: |
| | resp = requests.get(base_url, headers=headers, timeout=10) |
| | if resp.status_code == 200: |
| | return f"Server reachable (status {resp.status_code})" |
| | except: |
| | pass |
| |
|
| | return "MCP server may be sleeping. Try again in a moment." |
| | except Exception as e: |
| | return f"Error contacting MCP server: {e}" |
| |
|
| |
|
| | def get_preset_url(preset_key: str) -> str: |
| | """Get the URL for a preset MCP server.""" |
| | if preset_key in MCP_PRESETS: |
| | return MCP_PRESETS[preset_key]["url"] |
| | return "" |
| |
|
| |
|
| | def get_preset_description(preset_key: str) -> str: |
| | """Get description and available tools for a preset MCP server.""" |
| | if preset_key in MCP_PRESETS: |
| | preset = MCP_PRESETS[preset_key] |
| | tools = ", ".join(preset.get("tools", [])) |
| | return f"{preset['description']}\n\nAvailable tools: {tools}" |
| | return "" |
| |
|
| |
|
| | def fetch_metadata_from_mcp(server_url: str, api_key: str, query: str) -> Tuple[str, str]: |
| | """Fetch metadata from an MCP server and return it for lineage visualization.""" |
| | if not server_url: |
| | return "", "Please select or enter an MCP server URL first." |
| |
|
| | try: |
| | headers = {"Content-Type": "application/json"} |
| | if api_key: |
| | headers["Authorization"] = f"Bearer {api_key}" |
| |
|
| | |
| | |
| | payload = { |
| | "jsonrpc": "2.0", |
| | "method": "tools/call", |
| | "params": { |
| | "name": "search", |
| | "arguments": {"query": query} |
| | }, |
| | "id": 1 |
| | } |
| |
|
| | |
| | base_url = server_url.replace("/sse", "") |
| | resp = requests.post(base_url, json=payload, headers=headers, timeout=30) |
| |
|
| | if resp.status_code == 200: |
| | try: |
| | data = resp.json() |
| | |
| | if isinstance(data, dict): |
| | result = data.get("result", data) |
| | |
| | lineage_data = { |
| | "nodes": [ |
| | {"id": "mcp_source", "type": "source", "name": f"MCP: {query}"}, |
| | {"id": "mcp_result", "type": "table", "name": "Query Result"} |
| | ], |
| | "edges": [ |
| | {"from": "mcp_source", "to": "mcp_result"} |
| | ], |
| | "metadata": result |
| | } |
| | return json.dumps(lineage_data, indent=2), f"Fetched metadata from MCP server for query: {query}" |
| | except json.JSONDecodeError: |
| | pass |
| |
|
| | |
| | sample_lineage = { |
| | "nodes": [ |
| | {"id": "mcp_server", "type": "source", "name": server_url.split("/")[2]}, |
| | {"id": "query", "type": "model", "name": f"Query: {query[:30]}..."}, |
| | {"id": "result", "type": "table", "name": "MCP Result"} |
| | ], |
| | "edges": [ |
| | {"from": "mcp_server", "to": "query"}, |
| | {"from": "query", "to": "result"} |
| | ] |
| | } |
| | return json.dumps(sample_lineage, indent=2), f"Created lineage template for MCP query. Connect to the MCP server to fetch real metadata." |
| |
|
| | except Exception as e: |
| | return "", f"Error fetching from MCP server: {str(e)}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def export_lineage(metadata_text: str, source_type: str, export_format: str) -> Tuple[str, str]: |
| | """Export lineage to the specified data catalog format.""" |
| | if not EXPORTERS_AVAILABLE: |
| | return "", "Export functionality not available. Please install the exporters module." |
| |
|
| | try: |
| | graph, _ = parse_metadata_to_graph(metadata_text, source_type) |
| |
|
| | if export_format == "OpenLineage": |
| | exporter = OpenLineageExporter(graph) |
| | elif export_format == "Collibra": |
| | exporter = CollibraExporter(graph) |
| | elif export_format == "Purview": |
| | exporter = PurviewExporter(graph) |
| | elif export_format == "Alation": |
| | exporter = AlationExporter(graph) |
| | elif export_format == "Atlas": |
| | exporter = AtlasExporter(graph) |
| | else: |
| | return "", f"Unknown export format: {export_format}" |
| |
|
| | exported_content = exporter.export() |
| | filename = f"lineage_export_{export_format.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| |
|
| | return exported_content, f"Exported to {export_format} format. Download the file below." |
| |
|
| | except Exception as e: |
| | return "", f"Export error: {str(e)}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def load_sample(sample_type: str) -> str: |
| | """Load a sample file.""" |
| | filename = SAMPLE_FILES.get(sample_type) |
| | if not filename: |
| | return json.dumps({"error": f"Unknown sample type: {sample_type}"}) |
| |
|
| | filepath = os.path.join(os.path.dirname(__file__), "samples", filename) |
| | try: |
| | with open(filepath, "r") as f: |
| | return f.read() |
| | except Exception as e: |
| | return json.dumps({"error": f"Could not load sample: {str(e)}"}) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def extract_lineage_from_text( |
| | metadata_text: str, |
| | source_type: str, |
| | visualization_format: str, |
| | use_mcp: bool = False, |
| | mcp_url: str = "", |
| | mcp_query: str = "" |
| | ) -> Tuple[str, str]: |
| | """Extract lineage from provided metadata text, optionally using MCP server.""" |
| |
|
| | |
| | if use_mcp and mcp_url: |
| | if is_local_mcp(mcp_url): |
| | |
| | mcp_metadata, mcp_summary = call_local_mcp("get_sample_lineage", mcp_query or source_type) |
| | if mcp_metadata: |
| | |
| | if EXPORTERS_AVAILABLE: |
| | graph, _ = parse_metadata_to_graph(mcp_metadata, "MCP Response") |
| | mermaid_code = generate_mermaid_from_graph(graph) |
| | return render_mermaid(mermaid_code), f"[MCP] {mcp_summary}" |
| | else: |
| | |
| | return "", f"External MCP servers require proper MCP client. Use Local Demo MCP for testing." |
| |
|
| | |
| | if not metadata_text.strip(): |
| | return "", "Please provide metadata content or enable MCP to fetch sample data." |
| |
|
| | if EXPORTERS_AVAILABLE: |
| | graph, summary = parse_metadata_to_graph(metadata_text, source_type) |
| | mermaid_code = generate_mermaid_from_graph(graph) |
| | return render_mermaid(mermaid_code), summary |
| | else: |
| | |
| | viz = "graph TD\n A[Sample Node] --> B[Output Node]" |
| | return render_mermaid(viz), f"Processed {source_type} metadata." |
| |
|
| |
|
| | def extract_lineage_from_bigquery( |
| | project_id: str, |
| | query: str, |
| | api_key: str, |
| | visualization_format: str |
| | ) -> Tuple[str, str]: |
| | """Extract lineage from BigQuery (local processing).""" |
| | |
| | viz = f"""graph TD |
| | subgraph BigQuery Project: {project_id or 'your-project'} |
| | A[Source Tables] --> B[Query Execution] |
| | B --> C[Destination Table] |
| | end |
| | style A fill:#e1f5fe |
| | style B fill:#fff3e0 |
| | style C fill:#e8f5e9""" |
| | return render_mermaid(viz), f"BigQuery lineage from project: {project_id or 'not specified'}" |
| |
|
| |
|
| | def extract_lineage_from_url( |
| | url: str, |
| | visualization_format: str |
| | ) -> Tuple[str, str]: |
| | """Extract lineage from URL/API endpoint (local processing).""" |
| | |
| | if url: |
| | try: |
| | resp = requests.get(url, timeout=10) |
| | if resp.status_code == 200: |
| | return extract_lineage_from_text(resp.text, "API Response", visualization_format) |
| | except Exception: |
| | pass |
| |
|
| | viz = "graph TD\n A[API Source] --> B[Data Pipeline] --> C[Output]" |
| | return render_mermaid(viz), f"Lineage from URL: {url or 'not specified'}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | LINEAGE_AGENT_PROMPT = """You are a Data Lineage Assistant powered by the Lineage Graph Accelerator tool. |
| | You help users understand, extract, and visualize data lineage from various sources. |
| | |
| | Your capabilities: |
| | 1. **Extract Lineage**: Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, and custom JSON |
| | 2. **Explain Lineage**: Help users understand data flow and dependencies |
| | 3. **Generate Metadata**: Create lineage JSON from natural language descriptions |
| | 4. **Export Guidance**: Advise on exporting to data catalogs (OpenLineage, Collibra, Purview, Alation, Atlas) |
| | |
| | When users describe their data pipeline, generate valid JSON lineage in this format: |
| | ```json |
| | { |
| | "nodes": [ |
| | {"id": "unique_id", "type": "source|table|model|view|report", "name": "Display Name"} |
| | ], |
| | "edges": [ |
| | {"from": "source_id", "to": "target_id"} |
| | ] |
| | } |
| | ``` |
| | |
| | Node types: source, table, model, view, report, dimension, fact, destination, task |
| | |
| | Be helpful, concise, and always offer to generate lineage JSON when users describe data flows. |
| | If the user provides metadata or describes a pipeline, generate the JSON they can paste into the tool.""" |
| |
|
| |
|
| | def init_gemini(api_key: str) -> bool: |
| | """Initialize Gemini with the provided API key.""" |
| | if not GEMINI_AVAILABLE: |
| | return False |
| | if not api_key: |
| | return False |
| | try: |
| | genai.configure(api_key=api_key) |
| | return True |
| | except Exception: |
| | return False |
| |
|
| |
|
| | def chat_with_gemini( |
| | message: str, |
| | history: List[Dict[str, str]], |
| | api_key: str |
| | ) -> Tuple[List[Dict[str, str]], str]: |
| | """Chat with Gemini about data lineage.""" |
| | if not GEMINI_AVAILABLE: |
| | return history + [ |
| | {"role": "user", "content": message}, |
| | {"role": "assistant", "content": "Google Gemini is not available. Please install google-generativeai package."} |
| | ], "" |
| |
|
| | if not api_key: |
| | return history + [ |
| | {"role": "user", "content": message}, |
| | {"role": "assistant", "content": "Please enter your Google Gemini API key to use the chatbot. You can get one at https://makersuite.google.com/app/apikey"} |
| | ], "" |
| |
|
| | try: |
| | genai.configure(api_key=api_key) |
| |
|
| | |
| | model = genai.GenerativeModel('models/gemini-2.0-flash-001') |
| |
|
| | |
| | context_parts = [LINEAGE_AGENT_PROMPT, "\n\nConversation history:"] |
| | for msg in history[-6:]: |
| | role = "User" if msg.get("role") == "user" else "Assistant" |
| | context_parts.append(f"{role}: {msg.get('content', '')}") |
| |
|
| | context_parts.append(f"\nUser: {message}\nAssistant:") |
| |
|
| | |
| | full_prompt = "\n".join(context_parts) |
| | response = model.generate_content(full_prompt) |
| |
|
| | assistant_message = response.text |
| |
|
| | |
| | extracted_json = "" |
| | if "```json" in assistant_message: |
| | try: |
| | json_start = assistant_message.find("```json") + 7 |
| | json_end = assistant_message.find("```", json_start) |
| | if json_end > json_start: |
| | extracted_json = assistant_message[json_start:json_end].strip() |
| | except Exception: |
| | pass |
| |
|
| | new_history = history + [ |
| | {"role": "user", "content": message}, |
| | {"role": "assistant", "content": assistant_message} |
| | ] |
| |
|
| | return new_history, extracted_json |
| |
|
| | except Exception as e: |
| | error_msg = f"Error communicating with Gemini: {str(e)}" |
| | return history + [ |
| | {"role": "user", "content": message}, |
| | {"role": "assistant", "content": error_msg} |
| | ], "" |
| |
|
| |
|
| | def use_generated_json(json_text: str) -> Tuple[str, str, str]: |
| | """Use the generated JSON in the lineage extractor.""" |
| | if not json_text.strip(): |
| | return "", "", "No JSON to use. Ask the chatbot to generate lineage JSON first." |
| |
|
| | try: |
| | |
| | json.loads(json_text) |
| | |
| | return json_text, "Custom JSON", "JSON copied to metadata input. Switch to 'Text/File Metadata' tab and click 'Extract Lineage'." |
| | except json.JSONDecodeError as e: |
| | return "", "", f"Invalid JSON: {str(e)}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | with gr.Blocks( |
| | title="Lineage Graph Accelerator", |
| | fill_height=True |
| | ) as demo: |
| |
|
| | |
| | gr.HTML(""" |
| | <div style="background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); padding: 24px 32px; border-radius: 12px; margin-bottom: 20px; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3);"> |
| | <div style="display: flex; justify-content: space-between; align-items: center; flex-wrap: wrap; gap: 16px;"> |
| | <div> |
| | <h1 style="color: white; margin: 0 0 8px 0; font-size: 2em; font-weight: 700;">Lineage Graph Accelerator</h1> |
| | <p style="color: rgba(255,255,255,0.9); margin: 0; font-size: 1.1em;">AI-powered data lineage extraction and visualization for modern data platforms</p> |
| | </div> |
| | <div style="display: flex; gap: 12px; flex-wrap: wrap;"> |
| | <a href="https://aamanlamba.com" target="_blank" style="background: rgba(255,255,255,0.2); color: white; padding: 8px 16px; border-radius: 6px; text-decoration: none; font-weight: 500; transition: background 0.2s;">By Aaman Lamba</a> |
| | <a href="https://github.com/aamanlamba" target="_blank" style="background: rgba(255,255,255,0.2); color: white; padding: 8px 16px; border-radius: 6px; text-decoration: none; font-weight: 500;">GitHub</a> |
| | </div> |
| | </div> |
| | <div style="margin-top: 16px; padding-top: 16px; border-top: 1px solid rgba(255,255,255,0.2);"> |
| | <span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">Gradio 6</span> |
| | <span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">MCP Integration</span> |
| | <span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em; margin-right: 8px;">Gemini AI</span> |
| | <span style="background: rgba(255,255,255,0.25); color: white; padding: 4px 10px; border-radius: 12px; font-size: 0.85em;">5 Export Formats</span> |
| | </div> |
| | </div> |
| | """) |
| |
|
| | gr.Markdown(""" |
| | ### What You Can Do |
| | |
| | | Feature | Description | |
| | |---------|-------------| |
| | | **Extract Lineage** | Parse metadata from dbt manifests, Airflow DAGs, SQL DDL, BigQuery, and custom JSON | |
| | | **Visualize** | Generate interactive Mermaid diagrams with color-coded nodes and relationship labels | |
| | | **Export** | Export to enterprise data catalogs: OpenLineage, Collibra, Purview, Alation, Atlas | |
| | | **MCP Integration** | Connect to MCP servers for AI-powered metadata extraction | |
| | | **AI Assistant** | Chat with Gemini to generate lineage from natural language descriptions | |
| | |
| | ### Quick Start |
| | |
| | 1. **Try the Demo**: Enable "Use MCP Server" and select "Local Demo MCP" to fetch sample lineage metadata |
| | 2. **Use Your Data**: Paste your dbt manifest, Airflow DAG, or custom JSON in the Text/File tab |
| | 3. **Load Samples**: Click "Load Sample" in the Demo Gallery to explore pre-built examples |
| | 4. **Export**: Use the Export section to generate catalog-ready JSON |
| | |
| | --- |
| | """) |
| |
|
| | |
| | with gr.Accordion("MCP Server Configuration", open=True): |
| | gr.Markdown(""" |
| | **Connect to MCP Servers** to fetch metadata for lineage extraction. |
| | Use the built-in **Local Demo MCP** for testing, or connect to external servers on HuggingFace. |
| | """) |
| | with gr.Row(): |
| | use_mcp_checkbox = gr.Checkbox( |
| | label="Use MCP Server for Metadata", |
| | value=False, |
| | info="Enable to fetch lineage metadata from MCP server instead of local input" |
| | ) |
| | mcp_preset = gr.Dropdown( |
| | choices=[ |
| | ("-- Select Preset --", ""), |
| | ("Local Demo MCP (Built-in)", "local_demo"), |
| | ("MCP Tools by abidlabs", "mcp_tools"), |
| | ("HuggingFace MCP by dylanebert", "huggingface_mcp"), |
| | ("Ragmint RAG Pipeline", "ragmint"), |
| | ("Web Search MCP", "web_search"), |
| | ], |
| | label="Preset MCP Servers", |
| | value="", |
| | scale=2 |
| | ) |
| | with gr.Row(): |
| | mcp_server = gr.Textbox( |
| | label="MCP Server URL", |
| | placeholder="Select a preset or enter custom URL", |
| | info="local://demo for built-in demo, or external MCP URL", |
| | scale=3 |
| | ) |
| | mcp_query = gr.Textbox( |
| | label="MCP Query (Optional)", |
| | placeholder="e.g., 'ecommerce', 'ml pipeline', 'warehouse'", |
| | info="Query to filter metadata from MCP server", |
| | scale=2 |
| | ) |
| | with gr.Row(): |
| | mcp_api_key = gr.Textbox( |
| | label="API Key (Optional)", |
| | placeholder="API key if required", |
| | type="password", |
| | scale=2 |
| | ) |
| | test_btn = gr.Button("Test Connection", size="sm", scale=1) |
| | mcp_description = gr.Textbox(label="Server Description", interactive=False, lines=2) |
| | mcp_status = gr.Textbox(label="Connection Status", interactive=False) |
| |
|
| | |
| | mcp_preset.change(fn=get_preset_url, inputs=[mcp_preset], outputs=[mcp_server]) |
| | mcp_preset.change(fn=get_preset_description, inputs=[mcp_preset], outputs=[mcp_description]) |
| | test_btn.click(fn=test_mcp_connection, inputs=[mcp_server, mcp_api_key], outputs=[mcp_status]) |
| |
|
| | |
| | with gr.Tabs(): |
| | |
| | with gr.Tab("Text/File Metadata", id="text"): |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Input") |
| |
|
| | |
| | with gr.Row(): |
| | sample_selector = gr.Dropdown( |
| | choices=[ |
| | ("Simple JSON", "simple"), |
| | ("dbt Manifest", "dbt"), |
| | ("Airflow DAG", "airflow"), |
| | ("SQL DDL", "sql"), |
| | ("Data Warehouse", "warehouse"), |
| | ("ETL Pipeline", "etl"), |
| | ("Complex Demo", "complex") |
| | ], |
| | label="Load Sample Data", |
| | value="simple" |
| | ) |
| | load_sample_btn = gr.Button("Load Sample", size="sm") |
| |
|
| | metadata_input = gr.Textbox( |
| | label="Metadata Content", |
| | placeholder="Paste your metadata here (JSON, YAML, SQL, dbt manifest, Airflow DAG, etc.)", |
| | lines=18 |
| | ) |
| |
|
| | with gr.Row(): |
| | source_type = gr.Dropdown( |
| | choices=["dbt Manifest", "Airflow DAG", "SQL DDL", "Data Warehouse", "ETL Pipeline", "Custom JSON", "Other"], |
| | label="Source Type", |
| | value="Custom JSON" |
| | ) |
| | viz_format = gr.Dropdown( |
| | choices=["Mermaid", "DOT/Graphviz", "Text"], |
| | label="Visualization Format", |
| | value="Mermaid" |
| | ) |
| |
|
| | extract_btn = gr.Button("Extract Lineage", variant="primary", size="lg") |
| |
|
| | with gr.Column(scale=1): |
| | gr.Markdown("### Visualization") |
| | output_viz = gr.HTML(label="Lineage Graph") |
| | output_summary = gr.Textbox(label="Summary", lines=3) |
| |
|
| | |
| | with gr.Accordion("Export to Data Catalog", open=False): |
| | export_format = gr.Dropdown( |
| | choices=EXPORT_FORMATS, |
| | label="Export Format", |
| | value="OpenLineage" |
| | ) |
| | export_btn = gr.Button("Generate Export", variant="secondary") |
| | export_output = gr.Code(label="Export Content", language="json", lines=10) |
| | export_status = gr.Textbox(label="Export Status", interactive=False) |
| |
|
| | |
| | load_sample_btn.click( |
| | fn=load_sample, |
| | inputs=[sample_selector], |
| | outputs=[metadata_input] |
| | ) |
| |
|
| | extract_btn.click( |
| | fn=extract_lineage_from_text, |
| | inputs=[metadata_input, source_type, viz_format, use_mcp_checkbox, mcp_server, mcp_query], |
| | outputs=[output_viz, output_summary] |
| | ) |
| |
|
| | export_btn.click( |
| | fn=export_lineage, |
| | inputs=[metadata_input, source_type, export_format], |
| | outputs=[export_output, export_status] |
| | ) |
| |
|
| | |
| | with gr.Tab("BigQuery", id="bigquery"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | bq_project = gr.Textbox( |
| | label="Project ID", |
| | placeholder="your-gcp-project-id" |
| | ) |
| | bq_query = gr.Textbox( |
| | label="Metadata Query", |
| | placeholder="SELECT * FROM `project.dataset.INFORMATION_SCHEMA.TABLES`", |
| | lines=10 |
| | ) |
| | load_bq_sample = gr.Button("Load Sample Query", size="sm") |
| | bq_creds = gr.Textbox( |
| | label="Service Account JSON (optional)", |
| | type="password" |
| | ) |
| | bq_viz_format = gr.Dropdown( |
| | choices=["Mermaid", "DOT/Graphviz", "Text"], |
| | label="Visualization Format", |
| | value="Mermaid" |
| | ) |
| | bq_extract_btn = gr.Button("Extract Lineage", variant="primary") |
| |
|
| | with gr.Column(): |
| | bq_output_viz = gr.HTML(label="Lineage Graph") |
| | bq_output_summary = gr.Textbox(label="Summary", lines=3) |
| |
|
| | load_bq_sample.click( |
| | fn=lambda: load_sample("bigquery"), |
| | outputs=[bq_query] |
| | ) |
| |
|
| | bq_extract_btn.click( |
| | fn=extract_lineage_from_bigquery, |
| | inputs=[bq_project, bq_query, bq_creds, bq_viz_format], |
| | outputs=[bq_output_viz, bq_output_summary] |
| | ) |
| |
|
| | |
| | with gr.Tab("URL/API", id="url"): |
| | with gr.Row(): |
| | with gr.Column(): |
| | url_input = gr.Textbox( |
| | label="Metadata URL", |
| | placeholder="https://api.example.com/metadata" |
| | ) |
| | load_url_sample = gr.Button("Load Sample API Metadata", size="sm") |
| | url_viz_format = gr.Dropdown( |
| | choices=["Mermaid", "DOT/Graphviz", "Text"], |
| | label="Visualization Format", |
| | value="Mermaid" |
| | ) |
| | url_extract_btn = gr.Button("Extract Lineage", variant="primary") |
| |
|
| | with gr.Column(): |
| | url_output_viz = gr.HTML(label="Lineage Graph") |
| | url_output_summary = gr.Textbox(label="Summary", lines=3) |
| |
|
| | load_url_sample.click( |
| | fn=lambda: load_sample("api"), |
| | outputs=[url_input] |
| | ) |
| |
|
| | url_extract_btn.click( |
| | fn=extract_lineage_from_url, |
| | inputs=[url_input, url_viz_format], |
| | outputs=[url_output_viz, url_output_summary] |
| | ) |
| |
|
| | |
| | with gr.Tab("Demo Gallery", id="gallery"): |
| | gr.Markdown(""" |
| | ## Sample Lineage Visualizations |
| | |
| | Click any example below to see the lineage visualization. |
| | """) |
| |
|
| | with gr.Row(): |
| | demo_simple = gr.Button("E-Commerce (Simple)") |
| | demo_dbt = gr.Button("dbt Project") |
| | demo_airflow = gr.Button("Airflow DAG") |
| | with gr.Row(): |
| | demo_warehouse = gr.Button("Data Warehouse") |
| | demo_etl = gr.Button("ETL Pipeline") |
| | demo_complex = gr.Button("Complex Platform") |
| |
|
| | demo_viz = gr.HTML(label="Demo Visualization") |
| | demo_summary = gr.Textbox(label="Description", lines=2) |
| |
|
| | |
| | for btn, sample_type in [(demo_simple, "simple"), (demo_dbt, "dbt"), |
| | (demo_airflow, "airflow"), (demo_warehouse, "warehouse"), |
| | (demo_etl, "etl"), (demo_complex, "complex")]: |
| | btn.click( |
| | fn=lambda st=sample_type: extract_lineage_from_text( |
| | load_sample(st), |
| | st.replace("_", " ").title(), |
| | "Mermaid" |
| | ), |
| | outputs=[demo_viz, demo_summary] |
| | ) |
| |
|
| | |
| | with gr.Tab("AI Assistant", id="chatbot"): |
| | gr.Markdown(""" |
| | ## Lineage AI Assistant (Powered by Google Gemini) |
| | |
| | Ask questions about data lineage, describe your data pipeline in natural language, |
| | and get JSON metadata you can use to visualize lineage. |
| | |
| | **Examples:** |
| | - "I have a PostgreSQL database that feeds into a Spark ETL job, which outputs to a Snowflake warehouse" |
| | - "Generate lineage for a dbt project with staging, intermediate, and mart layers" |
| | - "What's the best way to document column-level lineage?" |
| | """) |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | gemini_api_key = gr.Textbox( |
| | label="Google Gemini API Key", |
| | placeholder="Enter your Gemini API key (get one at makersuite.google.com)", |
| | type="password", |
| | info="Your API key is not stored and only used for this session" |
| | ) |
| |
|
| | chatbot_display = gr.Chatbot( |
| | label="Chat with Lineage AI", |
| | height=400 |
| | ) |
| |
|
| | with gr.Row(): |
| | chat_input = gr.Textbox( |
| | label="Your message", |
| | placeholder="Describe your data pipeline or ask about lineage...", |
| | lines=2, |
| | scale=4 |
| | ) |
| | send_btn = gr.Button("Send", variant="primary", scale=1) |
| |
|
| | with gr.Accordion("Generated JSON (if any)", open=False): |
| | generated_json = gr.Code( |
| | label="Extracted JSON", |
| | language="json", |
| | lines=10 |
| | ) |
| | use_json_btn = gr.Button("Use This JSON in Lineage Tool", size="sm") |
| | json_status = gr.Textbox(label="Status", interactive=False) |
| |
|
| | |
| | chat_state = gr.State([]) |
| |
|
| | def handle_chat(message, history, api_key): |
| | if not message.strip(): |
| | return history, "", history |
| | new_history, extracted = chat_with_gemini(message, history, api_key) |
| | return new_history, extracted, new_history |
| |
|
| | send_btn.click( |
| | fn=handle_chat, |
| | inputs=[chat_input, chat_state, gemini_api_key], |
| | outputs=[chatbot_display, generated_json, chat_state] |
| | ).then( |
| | fn=lambda: "", |
| | outputs=[chat_input] |
| | ) |
| |
|
| | chat_input.submit( |
| | fn=handle_chat, |
| | inputs=[chat_input, chat_state, gemini_api_key], |
| | outputs=[chatbot_display, generated_json, chat_state] |
| | ).then( |
| | fn=lambda: "", |
| | outputs=[chat_input] |
| | ) |
| |
|
| | use_json_btn.click( |
| | fn=use_generated_json, |
| | inputs=[generated_json], |
| | outputs=[metadata_input, source_type, json_status] |
| | ) |
| |
|
| | |
| | gr.Markdown(""" |
| | --- |
| | |
| | ### Export Formats Supported |
| | |
| | | Format | Description | Use Case | |
| | |--------|-------------|----------| |
| | | **OpenLineage** | Open standard for lineage | Universal compatibility | |
| | | **Collibra** | Collibra Data Intelligence | Enterprise data governance | |
| | | **Purview** | Microsoft Purview | Azure ecosystem | |
| | | **Alation** | Alation Data Catalog | Self-service analytics | |
| | | **Atlas** | Apache Atlas | Open-source governance | |
| | |
| | --- |
| | |
| | ### 🎥 Watch the Demo |
| | |
| | See all features in action: [YouTube Demo Video (2:30)](https://youtu.be/U4Dfc7txa_0) |
| | |
| | --- |
| | |
| | Built with Gradio for the **Gradio Agents & MCP Hackathon - Winter 2025** by [Aaman Lamba](https://aamanlamba.com) |
| | |
| | [GitHub](https://github.com/aamanlamba/lineage-graph-accelerator) | [Documentation](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator/blob/main/USER_GUIDE.md) | [HuggingFace Space](https://huggingface.co/spaces/aamanlamba/Lineage-graph-accelerator) |
| | """) |
| |
|
| | |
| | if __name__ == "__main__": |
| | demo.launch(ssr_mode=False) |
| |
|