Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import os | |
| import json | |
| from datetime import datetime | |
| from threading import Lock | |
| from datasets import Dataset | |
| from huggingface_hub import HfApi | |
| import pandas as pd | |
| # Configuration | |
| DATASET_REPO = "assafvayner/webhook-messages" | |
| BATCH_SIZE = 100 | |
| ALLOWED_SCOPES = {"repo", "repo.content"} | |
| # In-memory storage | |
| webhook_messages = [] | |
| message_lock = Lock() | |
| batch_counter = 0 | |
| latest_batch_file = None | |
| # HuggingFace API client | |
| hf_api = HfApi(token=os.environ.get("HF_TOKEN")) | |
| # Ensure dataset repo exists on startup | |
| try: | |
| hf_api.create_repo( | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| exist_ok=True | |
| ) | |
| print(f"β Dataset repository ready: {DATASET_REPO}") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not create/verify dataset repo: {str(e)}") | |
| def save_batch_to_dataset(messages, batch_num): | |
| """Save a batch of webhook messages to the HuggingFace dataset as a parquet file.""" | |
| global latest_batch_file | |
| try: | |
| # Create DataFrame from messages | |
| df = pd.DataFrame(messages) | |
| # Create filename with timestamp and batch number | |
| timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") | |
| filename = f"batch_{batch_num:06d}_{timestamp}.parquet" | |
| # Convert to HuggingFace Dataset | |
| dataset = Dataset.from_pandas(df) | |
| # Upload to the dataset repo | |
| dataset.to_parquet(f"/tmp/{filename}") | |
| hf_api.upload_file( | |
| path_or_fileobj=f"/tmp/{filename}", | |
| path_in_repo=f"data/{filename}", | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| ) | |
| print(f"β Saved batch {batch_num} with {len(messages)} messages to {DATASET_REPO}") | |
| # Update latest batch file info | |
| latest_batch_file = f"data/{filename}" | |
| # Clean up temp file | |
| os.remove(f"/tmp/{filename}") | |
| return True | |
| except Exception as e: | |
| print(f"β Error saving batch {batch_num}: {str(e)}") | |
| return False | |
| def process_webhook(payload: dict, event_type: str): | |
| """Process and store webhook payload if it matches allowed scopes.""" | |
| global batch_counter | |
| # Extract scope from payload | |
| scope = payload.get("event", {}).get("scope") | |
| # Filter by scope | |
| if scope not in ALLOWED_SCOPES: | |
| return False | |
| # Create message entry | |
| message = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "event_type": event_type, | |
| "scope": scope, | |
| "payload": json.dumps(payload) # Store full payload as JSON string | |
| } | |
| with message_lock: | |
| webhook_messages.append(message) | |
| current_count = len(webhook_messages) | |
| # Check if we need to save a batch | |
| if current_count >= BATCH_SIZE: | |
| batch_counter += 1 | |
| messages_to_save = webhook_messages.copy() | |
| webhook_messages.clear() | |
| # Save in background (non-blocking) | |
| save_batch_to_dataset(messages_to_save, batch_counter) | |
| return True | |
| # Create FastAPI app first | |
| app = FastAPI() | |
| # Add webhook endpoints BEFORE mounting Gradio | |
| async def webhook_endpoint(request: Request): | |
| """ | |
| Webhook endpoint for HuggingFace Hub events. | |
| Supports all webhook events documented at: | |
| https://huggingface.co/docs/hub/webhooks | |
| """ | |
| try: | |
| # Get the event type from headers | |
| event_type = request.headers.get("X-Event-Type", "unknown") | |
| # Parse JSON payload | |
| payload = await request.json() | |
| # Process the webhook | |
| processed = process_webhook(payload, event_type) | |
| if processed: | |
| return JSONResponse( | |
| content={ | |
| "status": "success", | |
| "message": "Webhook received and queued", | |
| "scope": payload.get("event", {}).get("scope") | |
| }, | |
| status_code=200 | |
| ) | |
| else: | |
| return JSONResponse( | |
| content={ | |
| "status": "ignored", | |
| "message": "Webhook scope not in allowed list", | |
| "scope": payload.get("event", {}).get("scope") | |
| }, | |
| status_code=200 | |
| ) | |
| except Exception as e: | |
| print(f"Error processing webhook: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| with message_lock: | |
| return { | |
| "status": "healthy", | |
| "messages_in_memory": len(webhook_messages), | |
| "batches_saved": batch_counter, | |
| "allowed_scopes": list(ALLOWED_SCOPES) | |
| } | |
| # Create Gradio interface | |
| with gr.Blocks(title="HuggingFace Webhook Processor") as demo: | |
| gr.Markdown(""" | |
| # π HuggingFace Webhook Processor | |
| This app receives HuggingFace Hub webhooks and stores them for analysis. | |
| ## Webhook Endpoint | |
| Send POST requests to: `/webhooks/hub` | |
| ## Configuration | |
| - **Filtered Scopes**: `repo`, `repo.content` | |
| - **Batch Size**: 100 messages | |
| - **Dataset**: `assafvayner/webhook-messages` | |
| ## Status | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| status_text = gr.Textbox( | |
| label="Current Status", | |
| value="Waiting for webhooks...", | |
| interactive=False | |
| ) | |
| message_count = gr.Number( | |
| label="Messages in Memory", | |
| value=0, | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| batch_count = gr.Number( | |
| label="Batches Saved", | |
| value=0, | |
| interactive=False | |
| ) | |
| latest_batch = gr.Textbox( | |
| label="Latest Batch File", | |
| value="No batches saved yet", | |
| interactive=False | |
| ) | |
| def get_status(): | |
| with message_lock: | |
| batch_file = latest_batch_file if latest_batch_file else "No batches saved yet" | |
| return ( | |
| f"Active - Ready to receive webhooks", | |
| len(webhook_messages), | |
| batch_counter, | |
| batch_file | |
| ) | |
| def get_recent_messages(): | |
| with message_lock: | |
| if not webhook_messages: | |
| return "No messages in memory yet" | |
| # Get first 10 messages (or fewer if less than 10) | |
| messages_to_show = webhook_messages[:10] | |
| # Format messages nicely | |
| output = [] | |
| for i, msg in enumerate(messages_to_show, 1): | |
| output.append(f"### Message {i}") | |
| output.append(f"**Timestamp:** {msg['timestamp']}") | |
| output.append(f"**Event Type:** {msg['event_type']}") | |
| output.append(f"**Scope:** {msg['scope']}") | |
| output.append(f"**Payload:**") | |
| # Parse and pretty-print JSON | |
| try: | |
| payload = json.loads(msg['payload']) | |
| output.append(f"```json\n{json.dumps(payload, indent=2)}\n```") | |
| except: | |
| output.append(f"```\n{msg['payload']}\n```") | |
| output.append("\n---\n") | |
| return "\n".join(output) | |
| refresh_btn = gr.Button("π Refresh Status") | |
| refresh_btn.click( | |
| fn=get_status, | |
| outputs=[status_text, message_count, batch_count, latest_batch] | |
| ) | |
| with gr.Accordion("π Recent Messages (First 10)", open=False): | |
| recent_messages = gr.Markdown( | |
| value="Click 'Refresh Messages' to load recent messages" | |
| ) | |
| refresh_messages_btn = gr.Button("π Refresh Messages") | |
| refresh_messages_btn.click( | |
| fn=get_recent_messages, | |
| outputs=[recent_messages] | |
| ) | |
| # Load initial status on page load | |
| demo.load( | |
| fn=get_status, | |
| outputs=[status_text, message_count, batch_count, latest_batch] | |
| ) | |
| # Mount Gradio on our FastAPI app | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| # Launch the app | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |