from __future__ import annotations import json import os import re import time import unicodedata from dataclasses import dataclass from functools import lru_cache from importlib.metadata import PackageNotFoundError, version from typing import Any, Callable APP_TITLE = "ContextForge" APP_SUBTITLE = "From fuzzy brief to build-ready agent blueprint." DEFAULT_MODEL_ID = "Qwen/Qwen2.5-0.5B-Instruct" DEFAULT_MID_MODEL_ID = "RthItalia/nano_compact_3b_qkvfp16" DEFAULT_HIGH_MODEL_ID = "Qwen/Qwen3-32B" DEFAULT_OPENBMB_MODEL_ID = "openbmb/MiniCPM5-1B" DEFAULT_OPENBMB_REASONING_MODEL_ID = "openbmb/MiniCPM4.1-8B" REQUIRED_PROMPT_TAGS = [ "ROLE", "COGNITIVE_LAYERS", "KAHNEMAN_SYSTEM2", "PARETO_80_20", "VITAL_SPOT", "REASONING_PROTOCOL", "AGENTIC_LOOP", "ACTION", "FORMAT_AND_TARGET", "QA_CHECKS", ] TOPOLOGIES = ["Auto", "Single Prompt", "Cascade", "Context Pack", "Agent Workflow"] REASONING_LAYERS = [ "CRAFT", "Kahneman System 2", "Pareto 80/20", "Agentic Loop", "Tree of Thought controlled", "Private CoT", "Self-Correction", "Sentinel Recovery", ] STAGE_NAMES = [ "intake_analysis", "topology_decision", "vital_structure", "reasoning_architecture", "prompt_pack_generation", "qa_repair", "final_assembly", ] STAGE_TOKEN_BUDGETS = { "intake_analysis": 180, "topology_decision": 140, "vital_structure": 180, "reasoning_architecture": 240, "prompt_pack_generation": 520, "qa_repair": 260, "final_assembly": 260, } def parse_bool_env(name: str, default: bool = False) -> bool: raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} def parse_int_env(name: str, default: int, minimum: int, maximum: int) -> int: try: value = int(os.getenv(name, str(default))) except ValueError: value = default return max(minimum, min(maximum, value)) MODEL_ENABLED = parse_bool_env("CONTEXTFORGE_ENABLE_MODEL", False) MODEL_ID = os.getenv("CONTEXTFORGE_MODEL_ID", DEFAULT_MODEL_ID) MID_MODEL_ID = os.getenv("CONTEXTFORGE_MID_MODEL_ID", DEFAULT_MID_MODEL_ID) HIGH_MODEL_ID = os.getenv("CONTEXTFORGE_HIGH_MODEL_ID", DEFAULT_HIGH_MODEL_ID) OPENBMB_ENABLED = parse_bool_env("CONTEXTFORGE_OPENBMB_ENABLE", False) OPENBMB_MODEL_ID = os.getenv("CONTEXTFORGE_OPENBMB_MODEL_ID", DEFAULT_OPENBMB_MODEL_ID) OPENBMB_REASONING_MODEL_ID = os.getenv( "CONTEXTFORGE_OPENBMB_REASONING_MODEL_ID", DEFAULT_OPENBMB_REASONING_MODEL_ID, ) MAX_NEW_TOKENS = parse_int_env("CONTEXTFORGE_MAX_NEW_TOKENS", 1800, 256, 4096) MAX_INPUT_CHARS = parse_int_env("CONTEXTFORGE_MAX_INPUT_CHARS", 12000, 2000, 40000) @dataclass(frozen=True) class RuntimeCandidate: role: str model_id: str source: str requires_cuda: bool = False prefer_cpu: bool = False min_transformers: str = "" min_cuda_gb: int = 0 disable_thinking: bool = False @dataclass class StageResult: data: dict[str, Any] source: str model_id: str elapsed_ms: int note: str = "" def runtime_row(self, stage: str) -> dict[str, Any]: return { "stage": stage, "source": self.source, "model_attempted": self.model_id, "fallback_reason": self.note, "duration_ms": self.elapsed_ms, } _RUNTIME_TRACE: list[dict[str, Any]] = [] def clean_text(value: Any, limit: int = 4000) -> str: text = "" if value is None else str(value) text = text.replace("\x00", " ") text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text).strip() return text[:limit] def clean_list(value: Any, limit: int = 8) -> list[str]: if isinstance(value, str): candidates = re.split(r"[,;\n]+", value) elif isinstance(value, list): candidates = value else: candidates = [] result = [] for item in candidates: cleaned = clean_text(item, 240) if cleaned and cleaned not in result: result.append(cleaned) return result[:limit] def json_text(value: Any) -> str: return json.dumps(value, ensure_ascii=False, indent=2, sort_keys=True) def parse_json_object(raw: str) -> dict[str, Any] | None: decoder = json.JSONDecoder() for match in re.finditer(r"\{", raw or ""): try: parsed, _ = decoder.raw_decode(raw[match.start() :]) except json.JSONDecodeError: continue if isinstance(parsed, dict): return parsed return None def merge_known(fallback: dict[str, Any], candidate: dict[str, Any] | None) -> dict[str, Any]: if not candidate: return fallback merged = dict(fallback) for key, fallback_value in fallback.items(): candidate_value = candidate.get(key) if candidate_value is None: continue if isinstance(fallback_value, list): items = clean_list(candidate_value, max(3, len(fallback_value) + 3)) if items: merged[key] = items elif isinstance(fallback_value, dict) and isinstance(candidate_value, dict): merged[key] = {**fallback_value, **candidate_value} elif isinstance(fallback_value, int): try: merged[key] = int(candidate_value) except (TypeError, ValueError): pass else: cleaned = clean_text(candidate_value, 16000) if cleaned: merged[key] = cleaned return merged def model_candidates() -> list[RuntimeCandidate]: candidates = [ RuntimeCandidate("high", HIGH_MODEL_ID, "small_model", requires_cuda=True), RuntimeCandidate("mid", MID_MODEL_ID, "small_model", requires_cuda=True), RuntimeCandidate("public_cpu", MODEL_ID, "small_model"), ] seen: set[str] = set() return [ item for item in candidates if item.model_id.strip() and not (item.model_id in seen or seen.add(item.model_id)) ] def openbmb_candidates() -> list[RuntimeCandidate]: if not OPENBMB_ENABLED: return [] candidates = [ RuntimeCandidate( "openbmb_lightweight", OPENBMB_MODEL_ID, "openbmb_minicpm5", prefer_cpu=True, min_transformers="5.6", disable_thinking=True, ), RuntimeCandidate( "openbmb_reasoning", OPENBMB_REASONING_MODEL_ID, "openbmb_minicpm4_reasoning", requires_cuda=True, min_transformers="4.56", min_cuda_gb=20, ), ] seen: set[str] = set() return [ item for item in candidates if item.model_id.strip() and not (item.model_id in seen or seen.add(item.model_id)) ] def runtime_candidates() -> list[RuntimeCandidate]: candidates = openbmb_candidates() if MODEL_ENABLED: candidates.extend(model_candidates()) seen: set[str] = set() return [ item for item in candidates if item.model_id not in seen and not seen.add(item.model_id) ] def package_version_at_least(package: str, minimum: str) -> tuple[bool, str]: try: installed = version(package) except PackageNotFoundError: return False, f"{package} is not installed" try: from packaging.version import Version compatible = Version(installed) >= Version(minimum) except Exception: compatible = installed >= minimum return compatible, installed @lru_cache(maxsize=8) def load_candidate_model(candidate: RuntimeCandidate) -> tuple[Any | None, Any | None, str]: try: import torch from transformers import AutoModelForCausalLM, AutoTokenizer except Exception as exc: return None, None, f"dependencies unavailable: {type(exc).__name__}: {exc}" if candidate.min_transformers: compatible, installed = package_version_at_least("transformers", candidate.min_transformers) if not compatible: return ( None, None, f"requires transformers>={candidate.min_transformers}; installed={installed}", ) if candidate.requires_cuda and not torch.cuda.is_available(): return None, None, "CUDA unavailable" if candidate.min_cuda_gb and torch.cuda.is_available(): try: total_gb = torch.cuda.get_device_properties(0).total_memory / (1024**3) except Exception as exc: return None, None, f"could not inspect CUDA memory: {type(exc).__name__}: {exc}" if total_gb < candidate.min_cuda_gb: return None, None, f"requires at least {candidate.min_cuda_gb} GB CUDA memory; available={total_gb:.1f} GB" try: try: tokenizer = AutoTokenizer.from_pretrained( candidate.model_id, trust_remote_code=True, use_fast=True, ) except Exception: tokenizer = AutoTokenizer.from_pretrained( candidate.model_id, trust_remote_code=True, use_fast=False, ) if tokenizer.pad_token_id is None and tokenizer.eos_token_id is not None: tokenizer.pad_token = tokenizer.eos_token kwargs: dict[str, Any] = {"trust_remote_code": True, "low_cpu_mem_usage": True} use_cuda = torch.cuda.is_available() and not candidate.prefer_cpu if use_cuda: kwargs["device_map"] = "cuda" kwargs["torch_dtype"] = torch.float16 model = AutoModelForCausalLM.from_pretrained(candidate.model_id, **kwargs) model.eval() device = "cuda" if use_cuda else "cpu" return tokenizer, model, f"loaded {candidate.role} on {device}" except Exception as exc: return None, None, f"load failed: {type(exc).__name__}: {exc}" @lru_cache(maxsize=1) def load_model() -> tuple[Any | None, Any | None, str, str]: candidates = runtime_candidates() if not candidates: return None, None, "disabled", "model runtimes disabled" failures: list[str] = [] for candidate in candidates: tokenizer, model, note = load_candidate_model(candidate) if tokenizer is not None and model is not None: return tokenizer, model, candidate.model_id, note failures.append(f"{candidate.role} {candidate.model_id}: {note}") return None, None, "unavailable", " | ".join(failures) def extract_output_language(payload: Any) -> str: if isinstance(payload, dict): for key, value in payload.items(): if key == "output_language" and clean_text(value, 40): return clean_text(value, 40) for value in payload.values(): language = extract_output_language(value) if language: return language elif isinstance(payload, list): for value in payload: language = extract_output_language(value) if language: return language return "" def detect_generation_issue( raw_text: str, stage: str, output_language: str = "English", generated_token_ids: list[int] | None = None, special_token_ids: set[int] | None = None, raw_with_special_tokens: str = "", ) -> str | None: text = raw_text or "" stripped = text.strip() if not text: return "empty decoded continuation" if not stripped: return "whitespace-only continuation or immediate EOS after whitespace" token_ids = generated_token_ids or [] specials = special_token_ids or set() if token_ids and specials and len(token_ids) <= 2 and all(token_id in specials for token_id in token_ids): return "immediate EOS or special-token-only continuation" special_markers = re.findall( r"<\|[^|]{1,80}\|>|?(?:s|pad|eos|bos|unk)>", raw_with_special_tokens or text, flags=re.IGNORECASE, ) if len(special_markers) >= 4: most_common = max(special_markers.count(marker) for marker in set(special_markers)) if most_common >= 3: return "repeated special tokens" minimum_chars = { "intake_analysis": 40, "topology_decision": 35, "vital_structure": 45, "reasoning_architecture": 50, "prompt_pack_generation": 100, "qa_repair": 60, "final_assembly": 80, }.get(stage, 30) if len(stripped) < minimum_chars: return f"output too short for {stage} contract" if "\ufffd" in stripped or re.search(r"(.)\1{9,}", stripped, flags=re.DOTALL): return "suspicious replacement characters or repeated-character garbage" printable_ratio = sum(character.isprintable() for character in stripped) / max(1, len(stripped)) if printable_ratio < 0.92: return "suspicious non-printable output" words = re.findall(r"[^\W\d_]{2,}", stripped, flags=re.UNICODE) repeated_words = re.findall( r"\b([^\W\d_]{2,})(?:\s+\1){5,}\b", stripped, flags=re.IGNORECASE | re.UNICODE, ) if repeated_words: return "repeated-token gibberish" language = (output_language or "English").lower() if "english" in language or "italian" in language or "italiano" in language: alphabetic = [character for character in stripped if character.isalpha()] latin = [character for character in alphabetic if "LATIN" in unicodedata.name(character, "")] if alphabetic and len(latin) / len(alphabetic) < 0.65: return f"suspicious non-target-language garbage for {output_language or 'English'}" if len(words) < 3: return f"insufficient {output_language or 'English'} language content" return None def format_chat_prompt( tokenizer: Any, stage: str, instruction: str, payload: dict[str, Any], disable_thinking: bool = False, ) -> str: system = ( "You are one isolated module inside ContextForge, an agent prompt compiler. " "Return only a valid JSON object. Private reasoning internal only. " "Never reveal chain of thought, hidden branches, or internal deliberation. " "Public fields may contain only decision summaries, assumptions, risks, verification steps, and outputs." ) user = f"MODULE: {stage}\nTASK:\n{instruction}\nINPUT:\n{json_text(payload)}" try: if getattr(tokenizer, "chat_template", None): kwargs: dict[str, Any] = { "tokenize": False, "add_generation_prompt": True, } if disable_thinking: kwargs["enable_thinking"] = False return tokenizer.apply_chat_template( [{"role": "system", "content": system}, {"role": "user", "content": user}], **kwargs, ) except Exception: pass return f"{system}\n\n{user}\n\nJSON:" def generate_with_candidate( candidate: RuntimeCandidate, stage: str, instruction: str, payload: dict[str, Any], ) -> tuple[dict[str, Any] | None, str]: tokenizer, model, load_note = load_candidate_model(candidate) if tokenizer is None or model is None: return None, load_note try: import torch prompt = format_chat_prompt( tokenizer, stage, instruction, payload, disable_thinking=candidate.disable_thinking, ) inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=6144) device = getattr(model, "device", None) if device is not None and str(device) != "meta": inputs = {key: value.to(device) for key, value in inputs.items()} with torch.no_grad(): output_ids = model.generate( **inputs, max_new_tokens=min(MAX_NEW_TOKENS, STAGE_TOKEN_BUDGETS.get(stage, MAX_NEW_TOKENS)), do_sample=False, repetition_penalty=1.05, pad_token_id=tokenizer.eos_token_id, ) continuation_ids = output_ids[0][inputs["input_ids"].shape[-1] :] generated_token_ids = continuation_ids.detach().cpu().tolist() raw = tokenizer.decode(continuation_ids, skip_special_tokens=True) raw_with_special = tokenizer.decode(continuation_ids, skip_special_tokens=False) special_token_ids = set(getattr(tokenizer, "all_special_ids", []) or []) issue = detect_generation_issue( raw, stage, extract_output_language(payload) or "English", generated_token_ids, special_token_ids, raw_with_special, ) if issue: return None, f"{load_note}; rejected output: {issue}" parsed = parse_json_object(raw) if parsed is None: return None, f"{load_note}; invalid JSON output" return parsed, load_note except Exception as exc: return None, f"{load_note}; generation failed: {type(exc).__name__}: {exc}" def generate_json( stage: str, instruction: str, payload: dict[str, Any], ) -> tuple[dict[str, Any] | None, str, str, str]: candidates = runtime_candidates() if not candidates: reasons = [] if not OPENBMB_ENABLED: reasons.append("OpenBMB mode disabled by CONTEXTFORGE_OPENBMB_ENABLE") if not MODEL_ENABLED: reasons.append("existing model path disabled by CONTEXTFORGE_ENABLE_MODEL") return None, "none", "; ".join(reasons) or "no runtime candidates configured", "deterministic_fallback" attempted: list[str] = [] failures: list[str] = [] for candidate in candidates: attempted.append(candidate.model_id) try: parsed, note = generate_with_candidate(candidate, stage, instruction, payload) except Exception as exc: parsed = None note = f"candidate execution failed: {type(exc).__name__}: {exc}" if parsed is not None: return parsed, " -> ".join(attempted), "; ".join(failures), candidate.source failures.append(f"{candidate.role} {candidate.model_id}: {note}") return None, " -> ".join(attempted), "; ".join(failures), "deterministic_fallback" def run_stage( stage: str, instruction: str, payload: dict[str, Any], fallback_factory: Callable[[], dict[str, Any]], validator: Callable[[dict[str, Any]], dict[str, Any]] | None = None, ) -> dict[str, Any]: started = time.perf_counter() fallback = fallback_factory() candidate, selected_id, note, source = generate_json(stage, instruction, payload) if candidate is None: data = fallback source = "deterministic_fallback" else: data = merge_known(fallback, candidate) if validator: try: data = validator(data) except Exception as exc: data = fallback source = "deterministic_fallback" note = f"{note}; validation failed: {type(exc).__name__}: {exc}" elapsed_ms = round((time.perf_counter() - started) * 1000) result = StageResult(data=data, source=source, model_id=selected_id, elapsed_ms=elapsed_ms, note=note) _RUNTIME_TRACE.append(result.runtime_row(stage)) return result.data def infer_domain(payload: dict[str, Any]) -> str: haystack = " ".join(clean_text(v, 1000).lower() for v in payload.values() if isinstance(v, str)) domains = [ ("software engineering", ["api", "code", "software", "app", "backend", "frontend"]), ("agent systems", ["agent", "workflow", "tool", "autonomous", "mcp"]), ("data and analytics", ["data", "dataset", "analytics", "dashboard", "sql"]), ("creative production", ["story", "creative", "brand", "content", "design"]), ] for domain, signals in domains: if any(signal in haystack for signal in signals): return domain return "general knowledge work" def analyze_intake(input_payload: dict[str, Any]) -> dict[str, Any]: payload = {key: clean_text(value, MAX_INPUT_CHARS) if isinstance(value, str) else value for key, value in input_payload.items()} def fallback() -> dict[str, Any]: missing = [ label for key, label in [ ("project_idea", "project idea"), ("target_user", "target user"), ("build_target", "build target"), ("output_contract", "output contract"), ("verification_criteria", "verification criteria"), ] if not clean_text(payload.get(key), 200) ] complexity_signals = sum( bool(clean_text(payload.get(key), 300)) for key in ["user_context", "project_context", "technical_context", "constraints", "inputs_files", "failure_modes"] ) return { "domain": infer_domain(payload), "task_type": "design and implementation planning", "risk_level": clean_text(payload.get("risk_level"), 40) or "Medium", "input_type": "structured brief with free-text context", "output_type": clean_text(payload.get("build_target"), 200) or "executable prompt architecture", "missing_information": missing, "complexity": "high" if complexity_signals >= 5 else "medium" if complexity_signals >= 2 else "low", "decision_summary": "Normalize the brief into an explicit compiler input before selecting topology.", "assumptions": ["Unspecified details may be resolved conservatively during execution."], "risks": clean_list(payload.get("failure_modes"), 5) or ["Ambiguous output contract", "Insufficient verification criteria"], } instruction = ( "Classify domain, task type, risk level, input type, output type, missing information, complexity, " "decision summary, assumptions, and risks. Do not solve the task." ) return run_stage("intake_analysis", instruction, payload, fallback) def decide_topology(analysis: dict[str, Any], user_topology_choice: str) -> dict[str, Any]: choice = user_topology_choice if user_topology_choice in TOPOLOGIES else "Auto" def fallback() -> dict[str, Any]: risk = clean_text(analysis.get("risk_level"), 40).lower() complexity = clean_text(analysis.get("complexity"), 40).lower() domain = clean_text(analysis.get("domain"), 100).lower() if choice != "Auto": topology = choice reason = "Explicit user topology choice." elif "agent" in domain or risk == "critical": topology = "Agent Workflow" reason = "Agentic or critical-risk work benefits from explicit execution and recovery states." elif complexity == "high": topology = "Cascade" reason = "Multiple context areas and dependent outputs require sequential specialist prompts." elif analysis.get("missing_information"): topology = "Context Pack" reason = "A reusable context contract should stabilize unresolved inputs." else: topology = "Single Prompt" reason = "The task is bounded enough for one complete execution contract." roles_by_topology = { "Single Prompt": ["Lead Executor"], "Cascade": ["Brief Analyst", "Solution Architect", "Builder", "Verifier"], "Context Pack": ["Context Curator", "Execution Prompt Author"], "Agent Workflow": ["Planner", "Executor", "Verifier", "Recovery Sentinel"], } roles = roles_by_topology[topology] return { "topology": topology, "reason": reason, "number_of_prompts": len(roles), "roles": roles, "handoff_contract": "Each stage receives structured upstream output and returns a verifiable downstream artifact.", } instruction = ( "Choose Single Prompt, Cascade, Context Pack, or Agent Workflow. Use Cascade when multiple expertise areas " "are required, task A feeds task B, or more than six unrelated ACTION sections are required. Respect an " "explicit non-Auto user choice. Return topology, reason, number_of_prompts, roles, and handoff_contract." ) return run_stage("topology_decision", instruction, {"analysis": analysis, "user_choice": choice}, fallback) def extract_vital_structure(analysis: dict[str, Any], topology: dict[str, Any]) -> dict[str, Any]: def fallback() -> dict[str, Any]: vital_few = [ "A precise output contract", "A topology matched to dependency structure", "Verifiable acceptance criteria", "Explicit failure and recovery behavior", ] if analysis.get("missing_information"): vital_few.insert(0, "Resolution of critical missing context") return { "vital_few": vital_few[:5], "vital_spot": "The output contract: if it is ambiguous, every downstream prompt can appear complete while producing the wrong artifact.", "vital_spot_guard": "Restate the output contract before execution and fail QA when required fields or verification evidence are absent.", "decision_summary": f"Optimize the {topology.get('topology', 'selected')} architecture around a small set of quality drivers.", } instruction = ( "Extract three to five Vital Few elements that determine most output quality and one Vital Spot whose failure " "breaks the workflow. Include a concrete guard for the Vital Spot." ) return run_stage("vital_structure", instruction, {"analysis": analysis, "topology": topology}, fallback) def select_reasoning_architecture( analysis: dict[str, Any], topology: dict[str, Any], selected_layers: list[str], ) -> dict[str, Any]: selected = [layer for layer in selected_layers if layer in REASONING_LAYERS] def fallback() -> dict[str, Any]: layers = selected or ["CRAFT", "Pareto 80/20", "Private CoT", "Self-Correction", "Sentinel Recovery"] if topology.get("topology") in {"Cascade", "Agent Workflow"} and "Agentic Loop" not in layers: layers.append("Agentic Loop") if clean_text(analysis.get("risk_level"), 30).lower() in {"high", "critical"} and "Kahneman System 2" not in layers: layers.append("Kahneman System 2") configurations = { layer: { "purpose": { "CRAFT": "Bind context, role, action, format, and target.", "Kahneman System 2": "Slow down at consequential decisions and verify assumptions.", "Pareto 80/20": "Prioritize the few actions that drive most value.", "Agentic Loop": "Plan, act, observe, verify, and recover.", "Tree of Thought controlled": "Compare strategies without exposing hidden branches.", "Private CoT": "Keep reasoning internal and publish only summaries and evidence.", "Self-Correction": "Repair failed checks before final output.", "Sentinel Recovery": "Detect blocked or degraded states and continue safely.", }[layer], "public_output": "decision summary, assumptions, risks, verification steps, final answer", } for layer in layers } return { "selected_layers": layers, "configurations": configurations, "private_reasoning_policy": "Private reasoning internal only.", "tree_of_thought_policy": "Expose only: strategy | upside | risk | cost | selected.", } instruction = ( "Select and configure only useful reasoning layers. Private CoT must remain internal. Controlled Tree of " "Thought may expose only strategy, upside, risk, cost, selected. Return selected_layers, configurations, " "private_reasoning_policy, and tree_of_thought_policy." ) return run_stage( "reasoning_architecture", instruction, {"analysis": analysis, "topology": topology, "selected_layers": selected}, fallback, ) def prompt_block( title: str, role: str, action: str, analysis: dict[str, Any], topology: dict[str, Any], vital: dict[str, Any], reasoning_architecture: dict[str, Any], output_contract: str, verification_criteria: str, ) -> str: layers = ", ".join(reasoning_architecture.get("selected_layers", [])) vital_few = "\n".join(f"- {item}" for item in vital.get("vital_few", [])) return f"""# {title} [ROLE] You are {role}. Own the assigned artifact and its verification. Do not impersonate other stages. [COGNITIVE_LAYERS] Use: {layers}. Private reasoning internal only. Public output may include only decision summary, assumptions, risks, verification steps, and final answer. [KAHNEMAN_SYSTEM2] Pause before consequential decisions. Check assumptions, dependency order, risk, and evidence before committing. [PARETO_80_20] Prioritize these Vital Few: {vital_few} [VITAL_SPOT] {vital.get("vital_spot", "The output contract is the single failure point.")} Guard: {vital.get("vital_spot_guard", "Fail QA when the contract is incomplete.")} [REASONING_PROTOCOL] 1. Normalize the available context. 2. Identify assumptions and risks. 3. Compare options only when useful. If using controlled Tree of Thought, expose only: strategy | upside | risk | cost | selected. 4. Execute the selected strategy. 5. Verify against the output contract. Never reveal chain of thought or hidden branches. [AGENTIC_LOOP] PLAN -> ACT -> OBSERVE -> VERIFY -> REPAIR or COMPLETE. On blocked execution, invoke Sentinel Recovery: state the blocker, preserve valid work, choose the safest viable fallback, and continue. [ACTION] {action} [FORMAT_AND_TARGET] Target topology: {topology.get("topology", "Single Prompt")} Required output contract: {output_contract or "Return a complete, directly usable artifact with explicit assumptions and verification evidence."} [QA_CHECKS] - Required sections and fields are present. - Claims and assumptions are distinguishable. - Verification criteria are satisfied: {verification_criteria or "The output is complete, internally consistent, and directly executable."} - No full chain of thought or hidden Tree of Thought branches are exposed. - If a check fails, repair the artifact and rerun QA before returning it.""" def deterministic_prompt_pack( analysis: dict[str, Any], topology: dict[str, Any], vital: dict[str, Any], reasoning_architecture: dict[str, Any], context: dict[str, Any], ) -> dict[str, Any]: topology_name = topology.get("topology", "Single Prompt") roles = topology.get("roles", ["Lead Executor"]) project_idea = clean_text(context.get("project_idea"), 1800) or "Execute the supplied project brief." output_contract = clean_text(context.get("output_contract"), 1600) verification = clean_text(context.get("verification_criteria"), 1200) prompts = [] for index, role in enumerate(roles, start=1): if topology_name == "Single Prompt": action = f"Turn this brief into the required artifact:\n{project_idea}" elif topology_name == "Context Pack": action = ( "Create a reusable, source-aware context pack that separates facts, assumptions, constraints, open " "questions, and execution instructions." if index == 1 else "Use the approved context pack to produce the final execution prompt and verification contract." ) elif topology_name == "Agent Workflow": agent_actions = { "Planner": "Convert the brief into ordered tasks, dependencies, stop conditions, and acceptance tests.", "Executor": "Execute the approved plan and return artifacts plus evidence.", "Verifier": "Test artifacts against acceptance criteria and identify repair actions.", "Recovery Sentinel": "Handle blockers, failed checks, and degraded model/tool states without losing valid work.", } action = agent_actions.get(role, f"Execute the {role} stage and return a structured handoff.") else: action = f"Execute stage {index} as {role}; consume the previous structured handoff and produce the next verifiable artifact." prompts.append( prompt_block( f"Prompt {index}: {role}", role, action, analysis, topology, vital, reasoning_architecture, output_contract, verification, ) ) execution_plan = [ f"Run {role}; validate its output contract; pass only verified artifacts downstream." for role in roles ] return { "topology": topology_name, "prompts": prompts, "execution_plan": execution_plan, "output_contract": output_contract or "Complete artifact, assumptions, risks, verification steps, final answer.", } def validate_prompt_pack(data: dict[str, Any]) -> dict[str, Any]: prompts = data.get("prompts") if not isinstance(prompts, list) or not prompts: raise ValueError("prompt pack is empty") cleaned_prompts = [clean_text(prompt, 30000) for prompt in prompts if clean_text(prompt, 30000)] if not cleaned_prompts: raise ValueError("prompt pack contains no usable prompts") for prompt in cleaned_prompts: missing = [tag for tag in REQUIRED_PROMPT_TAGS if f"[{tag}]" not in prompt] if missing: raise ValueError(f"prompt missing required tags: {', '.join(missing)}") data["prompts"] = cleaned_prompts return data def generate_prompt_pack( analysis: dict[str, Any], topology: dict[str, Any], vital: dict[str, Any], reasoning_architecture: dict[str, Any], context: dict[str, Any] | None = None, ) -> dict[str, Any]: context = context or {} def fallback() -> dict[str, Any]: return deterministic_prompt_pack(analysis, topology, vital, reasoning_architecture, context) instruction = ( "Generate the complete prompt pack for the selected topology. Every prompt must contain all required tags: " + ", ".join(REQUIRED_PROMPT_TAGS) + ". Never request or reveal full chain of thought. Use exactly 'Private reasoning internal only.' " "Controlled Tree of Thought exposes only strategy | upside | risk | cost | selected. Return topology, prompts, " "execution_plan, and output_contract." ) return run_stage( "prompt_pack_generation", instruction, { "analysis": analysis, "topology": topology, "vital": vital, "reasoning_architecture": reasoning_architecture, "context": context, }, fallback, validate_prompt_pack, ) def repair_prompt_text(prompt: str) -> tuple[str, list[str]]: repaired = clean_text(prompt, 30000) repairs: list[str] = [] forbidden = [ r"reveal (?:your|the) (?:full )?chain of thought", r"show (?:your|the) (?:full )?chain of thought", r"expose hidden branches", ] for pattern in forbidden: if re.search(pattern, repaired, flags=re.IGNORECASE): repaired = re.sub(pattern, "provide a concise decision summary", repaired, flags=re.IGNORECASE) repairs.append("Removed chain-of-thought leakage request.") for tag in REQUIRED_PROMPT_TAGS: if f"[{tag}]" not in repaired: repaired += f"\n\n[{tag}]\nComplete this section before execution." repairs.append(f"Added missing [{tag}] tag.") if "Private reasoning internal only." not in repaired: repaired = repaired.replace("[REASONING_PROTOCOL]", "[REASONING_PROTOCOL]\nPrivate reasoning internal only.", 1) repairs.append("Added private reasoning policy.") if "strategy | upside | risk | cost | selected" not in repaired: repaired += "\n\nControlled Tree of Thought public schema: strategy | upside | risk | cost | selected." repairs.append("Added controlled Tree of Thought public schema.") return repaired, repairs def deterministic_qa(prompt_pack: dict[str, Any]) -> dict[str, Any]: repaired_prompts = [] issues: list[str] = [] for index, prompt in enumerate(prompt_pack.get("prompts", []), start=1): repaired, repairs = repair_prompt_text(str(prompt)) repaired_prompts.append(repaired) issues.extend(f"Prompt {index}: {repair}" for repair in repairs) repaired_pack = dict(prompt_pack) repaired_pack["prompts"] = repaired_prompts missing_tags = [ tag for tag in REQUIRED_PROMPT_TAGS if any(f"[{tag}]" not in prompt for prompt in repaired_prompts) ] leakage = any( re.search(r"(reveal|show|expose).{0,24}chain of thought", line, flags=re.IGNORECASE) and not re.search(r"\b(never|do not|don't|must not|without)\b", line, flags=re.IGNORECASE) for prompt in repaired_prompts for line in prompt.splitlines() ) checks = { "all_required_tags": not missing_tags, "strong_roles": all("[ROLE]" in prompt and len(prompt.split("[ROLE]", 1)[-1].strip()) > 20 for prompt in repaired_prompts), "output_contracts": all("[FORMAT_AND_TARGET]" in prompt for prompt in repaired_prompts), "no_chain_of_thought_leakage": not leakage, "qa_present": all("[QA_CHECKS]" in prompt for prompt in repaired_prompts), "repair_logic_present": all("REPAIR" in prompt for prompt in repaired_prompts), "tree_of_thought_controlled": all("strategy | upside | risk | cost | selected" in prompt for prompt in repaired_prompts), } return { "pass": all(checks.values()), "issues": issues, "checks": checks, "repaired_prompt_pack": repaired_pack, } def validate_qa(data: dict[str, Any]) -> dict[str, Any]: deterministic = deterministic_qa(data.get("repaired_prompt_pack", {})) if not deterministic["pass"]: return deterministic data["pass"] = True data["checks"] = deterministic["checks"] data["repaired_prompt_pack"] = deterministic["repaired_prompt_pack"] return data def qa_repair_pass(prompt_pack: dict[str, Any]) -> dict[str, Any]: def fallback() -> dict[str, Any]: return deterministic_qa(prompt_pack) instruction = ( "Check missing required tags, weak roles, missing output contracts, chain-of-thought leakage, missing QA, " "missing repair logic, and uncontrolled Tree of Thought. Repair every issue. Return pass, issues, checks, " "and repaired_prompt_pack. Never add hidden reasoning." ) return run_stage("qa_repair", instruction, {"prompt_pack": prompt_pack}, fallback, validate_qa) def score_metrics( analysis: dict[str, Any], topology: dict[str, Any], qa: dict[str, Any], ) -> dict[str, int]: checks = qa.get("checks", {}) check_score = round(100 * sum(bool(value) for value in checks.values()) / max(1, len(checks))) missing_count = len(analysis.get("missing_information", [])) coverage = max(45, 100 - missing_count * 10) topology_score = 94 if topology.get("topology") in {"Cascade", "Agent Workflow"} else 86 risk_score = 96 if checks.get("no_chain_of_thought_leakage") and checks.get("repair_logic_present") else 68 return { "Prompt Integrity": check_score, "Context Coverage": coverage, "Agent Readiness": topology_score, "Risk Control": risk_score, } def deterministic_final( analysis: dict[str, Any], topology: dict[str, Any], vital: dict[str, Any], reasoning_architecture: dict[str, Any], qa: dict[str, Any], ) -> dict[str, Any]: repaired_pack = qa.get("repaired_prompt_pack", {}) prompts = repaired_pack.get("prompts", []) compiled_prompt_pack = "\n\n---\n\n".join(prompts) architecture_analysis = { "intake": analysis, "topology": topology, "vital_structure": vital, "reasoning_architecture": reasoning_architecture, } execution_plan = repaired_pack.get("execution_plan", []) repair_protocol = [ "Detect the failed check and preserve valid upstream artifacts.", "Identify the smallest repair that restores the output contract.", "Apply the repair, rerun QA, and continue only after verification passes.", "If a model stage fails, use that stage's deterministic fallback and record it in Runtime Details.", ] return { "architecture_analysis": architecture_analysis, "prompt_pack": compiled_prompt_pack, "execution_plan": execution_plan, "qa_checklist": qa.get("checks", {}), "repair_protocol": repair_protocol, "metrics": score_metrics(analysis, topology, qa), } def assemble_final_output( analysis: dict[str, Any], topology: dict[str, Any], vital: dict[str, Any], reasoning_architecture: dict[str, Any], qa: dict[str, Any], ) -> dict[str, Any]: def fallback() -> dict[str, Any]: return deterministic_final(analysis, topology, vital, reasoning_architecture, qa) instruction = ( "Assemble the final user-facing compiler result without adding hidden reasoning. Return architecture_analysis, " "prompt_pack, execution_plan, qa_checklist, repair_protocol, and metrics. The prompt_pack must preserve all " "required prompt tags exactly." ) def validate_final(data: dict[str, Any]) -> dict[str, Any]: prompt_pack = clean_text(data.get("prompt_pack"), 120000) if not prompt_pack: raise ValueError("final prompt pack is empty") missing = [tag for tag in REQUIRED_PROMPT_TAGS if f"[{tag}]" not in prompt_pack] if missing: raise ValueError(f"final assembly lost required tags: {', '.join(missing)}") data["prompt_pack"] = prompt_pack return data return run_stage( "final_assembly", instruction, { "analysis": analysis, "topology": topology, "vital": vital, "reasoning_architecture": reasoning_architecture, "qa": qa, }, fallback, validate_final, ) def compile_context( project_idea: str, target_user: str, build_target: str, topology_choice: str, risk_level: str, output_language: str, selected_layers: list[str], user_context: str, project_context: str, technical_context: str, constraints: str, inputs_files: str, output_contract: str, failure_modes: str, verification_criteria: str, ) -> tuple[str, str, str, str, str, str]: _RUNTIME_TRACE.clear() payload = { "project_idea": clean_text(project_idea, MAX_INPUT_CHARS), "target_user": clean_text(target_user, 2000), "build_target": clean_text(build_target, 2000), "risk_level": clean_text(risk_level, 100), "output_language": clean_text(output_language, 100), "user_context": clean_text(user_context, MAX_INPUT_CHARS), "project_context": clean_text(project_context, MAX_INPUT_CHARS), "technical_context": clean_text(technical_context, MAX_INPUT_CHARS), "constraints": clean_text(constraints, MAX_INPUT_CHARS), "inputs_files": clean_text(inputs_files, MAX_INPUT_CHARS), "output_contract": clean_text(output_contract, MAX_INPUT_CHARS), "failure_modes": clean_text(failure_modes, MAX_INPUT_CHARS), "verification_criteria": clean_text(verification_criteria, MAX_INPUT_CHARS), } analysis = analyze_intake(payload) topology = decide_topology(analysis, topology_choice) vital = extract_vital_structure(analysis, topology) reasoning = select_reasoning_architecture(analysis, topology, selected_layers or []) pack = generate_prompt_pack(analysis, topology, vital, reasoning, payload) qa = qa_repair_pass(pack) final = assemble_final_output(analysis, topology, vital, reasoning, qa) metrics_html = render_metrics(final.get("metrics", {})) architecture_md = "```json\n" + json_text(final.get("architecture_analysis", {})) + "\n```" prompt_pack_text = clean_text(final.get("prompt_pack"), 120000) execution_md = render_list(final.get("execution_plan", [])) qa_md = render_qa(final.get("qa_checklist", {}), final.get("repair_protocol", [])) runtime_md = render_runtime(_RUNTIME_TRACE) return metrics_html, architecture_md, prompt_pack_text, execution_md, qa_md, runtime_md def render_metrics(metrics: dict[str, Any]) -> str: cards = [] for label in ["Prompt Integrity", "Context Coverage", "Agent Readiness", "Risk Control"]: if label not in metrics: cards.append( f'
{APP_SUBTITLE}
ContextForge turns messy software, app, and agent ideas into executable prompt architectures.
Paste a rough app, agent or workflow idea. ContextForge compiles it into a staged prompt pack for Codex or another coding agent.
' ) project_idea = gr.Textbox( label="Project idea", lines=4, placeholder="Example: I want to build a Gradio app that helps students prepare oral exams from a syllabus.", ) with gr.Row(): target_user = gr.Textbox(label="Target user") build_target = gr.Textbox(label="Build target") with gr.Row(): topology_choice = gr.Dropdown(TOPOLOGIES, value="Auto", label="Topology") risk_level = gr.Dropdown(["Low", "Medium", "High", "Critical"], value="Medium", label="Risk level") output_language = gr.Textbox(value="English", label="Output language") selected_layers = gr.CheckboxGroup(REASONING_LAYERS, value=["CRAFT", "Pareto 80/20", "Private CoT", "Self-Correction", "Sentinel Recovery"], label="Cognitive modules") with gr.Accordion("Context inputs", open=False, visible=False) as context_inputs_accordion: user_context = gr.Textbox(label="User context", lines=3) project_context = gr.Textbox(label="Project context", lines=3) technical_context = gr.Textbox(label="Technical context", lines=3) constraints = gr.Textbox(label="Constraints", lines=3) inputs_files = gr.Textbox(label="Inputs / files", lines=3) with gr.Accordion("Contracts and controls", open=False, visible=False) as contracts_accordion: output_contract = gr.Textbox(label="Output contract", lines=3) failure_modes = gr.Textbox(label="Failure modes", lines=3) verification_criteria = gr.Textbox(label="Verification criteria", lines=3) with gr.Row(): compile_button = gr.Button("Compile Prompt Architecture", variant="primary") example_button = gr.Button("Load Example", variant="secondary") with gr.Column(scale=1, elem_classes=["output-panel"]): metrics = gr.HTML(value=render_metrics({})) gr.HTML('