import gradio as gr import hashlib import json import time import io import zipfile from dataclasses import dataclass, asdict from typing import List, Dict, Any, Tuple # ============================================================ # CodexByte ΩΞ Runtime — Single-File, HF-Safe, Proof-First # ============================================================ # ---------------------------- # ISA (stable core) # ---------------------------- OPCODES = { "HALT": 0x00, "LOAD_IMM": 0x01, # r, v "LOAD_MEM": 0x02, # r, a "STORE": 0x03, # r, a "ADD": 0x04, # r1, r2 "SUB": 0x05, # r1, r2 "MUL": 0x06, # r1, r2 "DIV": 0x07, # r1, r2 "CMP": 0x08, # r1, r2 -> sets Z "JMP": 0x09, # a "JZ": 0x0A, # a "JNZ": 0x0B, # a "HASH": 0x0C, # r -> r becomes 64-bit int derived from sha256 "TIME": 0x10, # r -> monotonic ms (deterministic-ish per run; used for trace only) "COMMIT": 0x0F, # -> append chained hash to Ω-ledger "EMIT": 0x11, # r -> LAST_EMIT in mem } OPNAMES = {v: k for k, v in OPCODES.items()} # ---------------------------- # Utilities # ---------------------------- def sha256_hex(b: bytes) -> str: return hashlib.sha256(b).hexdigest() def stable_json(obj: Any) -> bytes: return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") def clamp_int64(x: int) -> int: # Keep values bounded (prevents unbounded growth) return int(max(min(x, (1 << 63) - 1), -(1 << 63))) def parse_int(token: str) -> int: # Accept decimal or 0x.. forms return int(token, 0) # ---------------------------- # Compiler: Text -> Bytecode # ---------------------------- def compile_codexbyte(source: str) -> List[int]: """ Assembles a simple assembly-like form into a flat integer bytecode list. Each instruction is encoded as: [opcode, operand1, operand2, ...] with fixed arity per op. """ lines = source.splitlines() bytecode: List[int] = [] for raw in lines: line = raw.strip() if not line or line.startswith(";") or line.startswith("#"): continue # strip inline comments if ";" in line: line = line.split(";", 1)[0].strip() if "#" in line: line = line.split("#", 1)[0].strip() if not line: continue parts = line.split() op = parts[0].upper() if op not in OPCODES: raise ValueError(f"Unknown opcode: {op}") bytecode.append(OPCODES[op]) # encode operands for tok in parts[1:]: bytecode.append(parse_int(tok)) return bytecode # ---------------------------- # Trace record # ---------------------------- @dataclass class TraceStep: step: int pc_before: int op: str operands: List[int] reg_before: List[int] reg_after: List[int] flags_before: Dict[str, bool] flags_after: Dict[str, bool] mem_writes: List[Tuple[str, int]] # (addr, value) ledger_added: str | None # ---------------------------- # VM: Bytecode -> State/Proof # ---------------------------- class CodexByteVM: def __init__(self): self.reset() def reset(self): self.reg = [0] * 8 self.mem: Dict[Any, int] = {} self.flags = {"Z": False} self.pc = 0 self.omega_ledger: List[str] = [] self._ledger_head = "0" * 64 # chained hash head self.last_trace: List[TraceStep] = [] def _state_digest(self) -> str: # Deterministic digest of current state (regs + mem + flags + pc) obj = { "pc": self.pc, "reg": self.reg, "mem": self.mem, "flags": self.flags, "ledger_head": self._ledger_head, } return sha256_hex(stable_json(obj)) def _commit(self) -> str: # Chain: head <- sha256(head || state_digest) sd = self._state_digest() new_head = sha256_hex((self._ledger_head + sd).encode("utf-8")) self._ledger_head = new_head self.omega_ledger.append(new_head) return new_head def run(self, prog: List[int], step_limit: int = 20000, trace: bool = True) -> Dict[str, Any]: self.last_trace = [] steps = 0 while True: if steps >= step_limit: raise RuntimeError(f"Step limit exceeded ({step_limit}). Possible infinite loop.") if self.pc < 0 or self.pc >= len(prog): raise RuntimeError(f"PC out of bounds: pc={self.pc}, program_len={len(prog)}") pc_before = self.pc op = prog[self.pc] self.pc += 1 opname = OPNAMES.get(op, f"OP_{op:02X}") reg_before = self.reg.copy() flags_before = dict(self.flags) mem_writes: List[Tuple[str, int]] = [] ledger_added = None def need(n: int): if self.pc + n > len(prog): raise RuntimeError(f"Truncated operands for {opname} at pc={pc_before}") def read1() -> int: nonlocal prog v = prog[self.pc] self.pc += 1 return v def read2() -> Tuple[int, int]: need(2) a = read1() b = read1() return a, b # ---- Execute ---- if op == 0x00: # HALT pass elif op == 0x01: # LOAD_IMM r v r, v = read2() self._check_reg(r) self.reg[r] = clamp_int64(v) elif op == 0x02: # LOAD_MEM r a r, a = read2() self._check_reg(r) self.reg[r] = clamp_int64(self.mem.get(self._addr(a), 0)) elif op == 0x03: # STORE r a r, a = read2() self._check_reg(r) addr = self._addr(a) self.mem[addr] = clamp_int64(self.reg[r]) mem_writes.append((str(addr), self.mem[addr])) elif op == 0x04: # ADD r1 r2 r1, r2 = read2() self._check_reg(r1); self._check_reg(r2) self.reg[r1] = clamp_int64(self.reg[r1] + self.reg[r2]) elif op == 0x05: # SUB r1 r2 r1, r2 = read2() self._check_reg(r1); self._check_reg(r2) self.reg[r1] = clamp_int64(self.reg[r1] - self.reg[r2]) elif op == 0x06: # MUL r1 r2 r1, r2 = read2() self._check_reg(r1); self._check_reg(r2) self.reg[r1] = clamp_int64(self.reg[r1] * self.reg[r2]) elif op == 0x07: # DIV r1 r2 r1, r2 = read2() self._check_reg(r1); self._check_reg(r2) if self.reg[r2] == 0: raise ZeroDivisionError("DIV by zero") self.reg[r1] = clamp_int64(int(self.reg[r1] / self.reg[r2])) elif op == 0x08: # CMP r1 r2 r1, r2 = read2() self._check_reg(r1); self._check_reg(r2) self.flags["Z"] = (self.reg[r1] == self.reg[r2]) elif op == 0x09: # JMP a a = read1() self.pc = self._pc(a, len(prog)) elif op == 0x0A: # JZ a a = read1() if self.flags["Z"]: self.pc = self._pc(a, len(prog)) elif op == 0x0B: # JNZ a a = read1() if not self.flags["Z"]: self.pc = self._pc(a, len(prog)) elif op == 0x0C: # HASH r (store as 64-bit int) r = read1() self._check_reg(r) h = sha256_hex(str(self.reg[r]).encode("utf-8")) self.reg[r] = int(h[:16], 16) # 64-bit-ish elif op == 0x10: # TIME r r = read1() self._check_reg(r) # Note: time is not strictly deterministic across runs; use only for observability. self.reg[r] = int(time.time() * 1000) elif op == 0x0F: # COMMIT ledger_added = self._commit() elif op == 0x11: # EMIT r r = read1() self._check_reg(r) self.mem["LAST_EMIT"] = clamp_int64(self.reg[r]) mem_writes.append(("LAST_EMIT", self.mem["LAST_EMIT"])) else: raise RuntimeError(f"Unsupported opcode: 0x{op:02X} at pc={pc_before}") reg_after = self.reg.copy() flags_after = dict(self.flags) # Trace if trace: # operands captured approximately: from prog slice # best-effort decode: since pc moved, reconstruct from pc_before+1 to current pc op_slice = prog[pc_before+1:self.pc] self.last_trace.append(TraceStep( step=steps, pc_before=pc_before, op=opname, operands=op_slice, reg_before=reg_before, reg_after=reg_after, flags_before=flags_before, flags_after=flags_after, mem_writes=mem_writes, ledger_added=ledger_added )) steps += 1 if op == 0x00: # HALT break return self.snapshot() def snapshot(self) -> Dict[str, Any]: return { "registers": self.reg, "memory": self.mem, "flags": self.flags, "omega_ledger": self.omega_ledger, "ledger_head": self._ledger_head, "state_digest": self._state_digest(), } def _check_reg(self, r: int): if not (0 <= r < 8): raise ValueError(f"Invalid register index r={r}, expected 0..7") def _addr(self, a: int) -> str: # normalize address into stable string return hex(a) if isinstance(a, int) else str(a) def _pc(self, a: int, n: int) -> int: if not (0 <= a < n): raise ValueError(f"Invalid jump target {a}, program_len={n}") return a # ---------------------------- # Proof bundle (ZIP export) # ---------------------------- def build_proof_bundle( source: str, bytecode: List[int], result: Dict[str, Any], trace_steps: List[TraceStep], ) -> bytes: trace_json = [asdict(s) for s in trace_steps] bundle = { "meta": { "system": "CodexByte_Runtime", "proof_format": "OmegaTraceBundle.v1", }, "source": source, "bytecode": bytecode, "result": result, "trace": trace_json, } verifier_py = r''' import json, hashlib def sha256_hex(b: bytes) -> str: return hashlib.sha256(b).hexdigest() def stable_json(obj): return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") def replay(bundle_path: str): with open(bundle_path, "rb") as f: z = f.read() # This verifier expects you to open the ZIP and inspect bundle.json. print("Open the ZIP, extract bundle.json, and use your runtime to replay bytecode.") print("Bundle bytes sha256:", sha256_hex(z)) if __name__ == "__main__": replay("CodexByte_ProofBundle.zip") '''.lstrip() mem = io.BytesIO() with zipfile.ZipFile(mem, "w", compression=zipfile.ZIP_DEFLATED) as z: z.writestr("bundle.json", json.dumps(bundle, indent=2, ensure_ascii=False)) z.writestr("verifier.py", verifier_py) z.writestr("README.txt", "CodexByte Proof Bundle\n" "- bundle.json contains source, bytecode, trace, result\n" "- verifier.py provides a minimal integrity hook\n" "Replay verification: run the same bytecode in the runtime; compare ledger_head/state_digest.\n") return mem.getvalue() # ---------------------------- # Samples # ---------------------------- SAMPLES: Dict[str, str] = { "Contract: obligation satisfied (commit+emit)": """\ ; If mem[0x20] == 1000 -> satisfied (emit 0), else breach (emit 1) LOAD_IMM 0 1000 LOAD_MEM 1 0x20 CMP 1 0 JZ 18 ; breach LOAD_IMM 2 1 STORE 2 0x30 COMMIT EMIT 2 HALT ; satisfied LOAD_IMM 2 0 STORE 2 0x30 COMMIT EMIT 2 HALT """, "Ledger integrity demo (multi-commit)": """\ LOAD_IMM 0 7 COMMIT ADD 0 0 COMMIT HASH 0 COMMIT EMIT 0 HALT """, "Loop demo (safe with step limit)": """\ LOAD_IMM 0 0 LOAD_IMM 1 1 ADD 0 1 JMP 4 HALT """, } # ============================================================ # Gradio App # ============================================================ vm = CodexByteVM() def load_sample(name: str) -> str: return SAMPLES.get(name, "") def run_program(source: str, step_limit: int, enable_trace: bool, preload_mem_0x20: int): vm.reset() # preload for common contract patterns vm.mem[hex(0x20)] = int(preload_mem_0x20) bytecode = compile_codexbyte(source) result = vm.run(bytecode, step_limit=step_limit, trace=enable_trace) trace = [asdict(s) for s in vm.last_trace] if enable_trace else [] proof_zip = build_proof_bundle(source, bytecode, result, vm.last_trace if enable_trace else []) # gr.File expects a path OR a tuple (name, bytes) in newer versions. # Use (filename, bytes) which HF Gradio accepts. return ( { "status": "OK", "result": result, "bytecode_len": len(bytecode), "preloaded_memory": {"0x20": preload_mem_0x20}, }, trace, ("CodexByte_ProofBundle.zip", proof_zip), ) def replay_verify(source: str, step_limit: int, preload_mem_0x20: int): """ Replay check: run twice, compare final ledger_head & state_digest. """ # run 1 vm1 = CodexByteVM() vm1.mem[hex(0x20)] = int(preload_mem_0x20) bc = compile_codexbyte(source) r1 = vm1.run(bc, step_limit=step_limit, trace=False) # run 2 vm2 = CodexByteVM() vm2.mem[hex(0x20)] = int(preload_mem_0x20) r2 = vm2.run(bc, step_limit=step_limit, trace=False) ok = (r1["ledger_head"] == r2["ledger_head"]) and (r1["state_digest"] == r2["state_digest"]) return { "replay_ok": ok, "run1": {"ledger_head": r1["ledger_head"], "state_digest": r1["state_digest"]}, "run2": {"ledger_head": r2["ledger_head"], "state_digest": r2["state_digest"]}, "note": "TIME opcode makes replay non-deterministic. Avoid TIME in proofs." } with gr.Blocks(title="CodexByte ΩΞ Runtime") as demo: gr.Markdown( "# CodexByte ΩΞ Runtime\n" "**Programs = contracts • Execution = enforcement • Trace = Ω-proof**\n\n" "This runtime executes CodexByte deterministically and emits an intrinsic proof bundle." ) with gr.Row(): sample = gr.Dropdown(list(SAMPLES.keys()), value="Contract: obligation satisfied (commit+emit)", label="Sample Programs") load_btn = gr.Button("Load Sample") program = gr.Textbox(lines=18, label="CodexByte Program") load_btn.click(load_sample, inputs=sample, outputs=program) with gr.Row(): preload = gr.Number(value=1000, label="Preload mem[0x20] value (common contract input)") step_limit = gr.Slider(100, 50000, value=20000, step=100, label="Step limit (safety)") trace_on = gr.Checkbox(value=True, label="Enable trace (Ω-step log)") run_btn = gr.Button("Execute") with gr.Tabs(): with gr.Tab("Ω-State Result"): result_out = gr.JSON() proof_file = gr.File(label="Download Proof Bundle (ZIP)") with gr.Tab("Execution Trace"): trace_out = gr.JSON() with gr.Tab("Replay Verification"): verify_btn = gr.Button("Replay verify (run twice)") verify_out = gr.JSON() run_btn.click( fn=run_program, inputs=[program, step_limit, trace_on, preload], outputs=[result_out, trace_out, proof_file], ) verify_btn.click( fn=replay_verify, inputs=[program, step_limit, preload], outputs=verify_out ) if __name__ == "__main__": demo.launch()