| |
| """ |
| Miao Exchange Protocol (MEP) Miner |
| A sleeping node that earns SECONDS by processing tasks. |
| """ |
| import asyncio |
| import json |
| import websockets |
| import requests |
| import uuid |
| import sys |
| import os |
| import time |
| import urllib.parse |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| from identity import MEPIdentity |
|
|
| HUB_URL = "http://localhost:8000" |
| WS_URL = "ws://localhost:8000" |
|
|
| class MEPProvider: |
| def __init__(self, key_path: str): |
| self.identity = MEPIdentity(key_path) |
| self.node_id = self.identity.node_id |
| self.balance = 0.0 |
| self.is_mining = True |
| |
| async def connect(self): |
| """Connect to MEP Hub and start mining.""" |
| print(f"[MEP Provider {self.node_id}] Starting...") |
| |
| |
| try: |
| resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.identity.pub_pem}) |
| data = resp.json() |
| self.balance = data.get("balance", 0.0) |
| print(f"[MEP Provider {self.node_id}] Registered. Balance: {self.balance:.6f} SECONDS") |
| except Exception as e: |
| print(f"[MEP Provider {self.node_id}] Registration failed: {e}") |
| return |
| |
| |
| ts = str(int(time.time())) |
| sig = self.identity.sign(self.node_id, ts) |
| sig_safe = urllib.parse.quote(sig) |
| uri = f"{WS_URL}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}" |
| try: |
| async with websockets.connect(uri) as ws: |
| print(f"[MEP Provider {self.node_id}] Connected to MEP Hub") |
| |
| |
| while self.is_mining: |
| try: |
| msg = await asyncio.wait_for(ws.recv(), timeout=1.0) |
| data = json.loads(msg) |
| |
| if data["event"] == "new_task": |
| await self.process_task(data["data"]) |
| elif data["event"] == "rfc": |
| await self.handle_rfc(data["data"]) |
| |
| except asyncio.TimeoutError: |
| continue |
| except websockets.exceptions.ConnectionClosed: |
| print(f"[MEP Provider {self.node_id}] Connection closed") |
| break |
| |
| except Exception as e: |
| print(f"[MEP Provider {self.node_id}] WebSocket error: {e}") |
| |
| async def handle_rfc(self, rfc_data: dict): |
| """Phase 2: Evaluate Request For Compute and submit Bid.""" |
| task_id = rfc_data["id"] |
| bounty = rfc_data["bounty"] |
| |
| max_purchase_price = 0.0 |
| if bounty < max_purchase_price: |
| print(f"[MEP Provider {self.node_id}] Ignored RFC {task_id[:8]} (Bounty {bounty} exceeds max purchase price)") |
| return |
| |
| print(f"[MEP Provider {self.node_id}] Received RFC {task_id[:8]} for {bounty:.6f} SECONDS. Placing bid...") |
| |
| |
| try: |
| payload_str = json.dumps({ |
| "task_id": task_id, |
| "provider_id": self.node_id |
| }) |
| headers = self.identity.get_auth_headers(payload_str) |
| headers["Content-Type"] = "application/json" |
| resp = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers) |
| |
| if resp.status_code == 200: |
| data = resp.json() |
| if data["status"] == "accepted": |
| print(f"[MEP Provider {self.node_id}] 🏁 BID WON for task {task_id[:8]}! Processing payload...") |
| |
| |
| task_data = { |
| "id": task_id, |
| "payload": data["payload"], |
| "bounty": bounty, |
| "consumer_id": data["consumer_id"] |
| } |
| await self.process_task(task_data) |
| else: |
| print(f"[MEP Provider {self.node_id}] Bid rejected (too slow): {data.get('detail', '')}") |
| except Exception as e: |
| print(f"[MEP Provider {self.node_id}] Error placing bid: {e}") |
|
|
| async def process_task(self, task_data: dict): |
| """Process a task and earn SECONDS.""" |
| task_id = task_data["id"] |
| payload = task_data["payload"] |
| bounty = task_data["bounty"] |
| print(f"[MEP Provider {self.node_id}] Received task {task_id[:8]} for {bounty:.6f} SECONDS") |
| print(f" Payload: {payload[:50]}...") |
| |
| |
| await asyncio.sleep(0.5) |
| |
| |
| result = f"""I've processed your request: "{payload[:30]}..." |
| |
| As a MEP miner, I analyzed this task and generated the following response: |
| |
| The core concept here aligns with the Miao Exchange Protocol philosophy - creating efficient yet human-centric compute exchange. The SECONDS-based economy allows for precise valuation of AI compute time while maintaining the essential "Miao" moments of unpredictability. |
| |
| Key insights: |
| 1. Time is the fundamental currency of computation |
| 2. 6 decimal precision allows micro-transactions |
| 3. The protocol enables global compute liquidity |
| |
| Would you like me to elaborate on any specific aspect?""" |
| |
| |
| try: |
| payload_str = json.dumps({ |
| "task_id": task_id, |
| "provider_id": self.node_id, |
| "result_payload": result |
| }) |
| headers = self.identity.get_auth_headers(payload_str) |
| headers["Content-Type"] = "application/json" |
| resp = requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers) |
| |
| if resp.status_code == 200: |
| data = resp.json() |
| self.balance = data["new_balance"] |
| print(f"[MEP Provider {self.node_id}] Earned {bounty:.6f} SECONDS!") |
| print(f" New balance: {self.balance:.6f} SECONDS") |
| else: |
| print(f"[MEP Provider {self.node_id}] Failed to submit: {resp.text}") |
| |
| except Exception as e: |
| print(f"[MEP Provider {self.node_id}] Submission error: {e}") |
| |
| def stop(self): |
| """Stop mining.""" |
| self.is_mining = False |
| print(f"[MEP Provider {self.node_id}] Stopping...") |
|
|
| async def main(): |
| key_path = f"mep_provider_{uuid.uuid4().hex[:8]}.pem" |
| miner = MEPProvider(key_path) |
| |
| try: |
| await miner.connect() |
| except KeyboardInterrupt: |
| miner.stop() |
| print("\n[MEP] Contribution stopped by user") |
|
|
| if __name__ == "__main__": |
| print("=" * 60) |
| print("Miao Exchange Protocol (MEP) Miner") |
| print("Earn SECONDS by contributing idle compute") |
| print("=" * 60) |
| asyncio.run(main()) |
|
|