File size: 4,002 Bytes
5dc5419 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | """AskUserQuestionTool - Ask user questions interactively for Stack 2.9"""
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from .base import BaseTool, ToolResult
from .registry import tool_registry
QUESTIONS_FILE = Path.home() / ".stack-2.9" / "questions.json"
def _load_questions() -> Dict[str, Any]:
"""Load pending questions."""
QUESTIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
if QUESTIONS_FILE.exists():
return json.loads(QUESTIONS_FILE.read_text())
return {"questions": []}
def _save_questions(data: Dict[str, Any]) -> None:
"""Save pending questions."""
QUESTIONS_FILE.write_text(json.dumps(data, indent=2))
class AskQuestionTool(BaseTool):
"""Ask the user a question and wait for response."""
name = "ask_question"
description = "Ask user a question with optional choices"
input_schema = {
"type": "object",
"properties": {
"question": {"type": "string", "description": "Question to ask"},
"options": {"type": "array", "items": {"type": "string"}, "description": "Optional choices"},
"timeout": {"type": "number", "default": 300, "description": "Timeout in seconds"}
},
"required": ["question"]
}
async def execute(self, question: str, options: Optional[List[str]] = None, timeout: int = 300) -> ToolResult:
"""Ask question."""
data = _load_questions()
q_id = str(uuid.uuid4())[:8]
q_entry = {
"id": q_id,
"question": question,
"options": options,
"status": "pending",
"created_at": datetime.now().isoformat(),
"timeout": timeout
}
data["questions"].append(q_entry)
_save_questions(data)
return ToolResult(success=True, data={
"question_id": q_id,
"question": question,
"options": options,
"status": "pending",
"note": f"Question {q_id} is pending. Await user response."
})
class GetPendingQuestionsTool(BaseTool):
"""Get all pending questions."""
name = "get_pending_questions"
description = "List all pending questions"
input_schema = {
"type": "object",
"properties": {},
"required": []
}
async def execute(self) -> ToolResult:
"""Get pending questions."""
data = _load_questions()
pending = [q for q in data.get("questions", []) if q.get("status") == "pending"]
return ToolResult(success=True, data={
"pending_questions": pending,
"count": len(pending)
})
class AnswerQuestionTool(BaseTool):
"""Answer a pending question."""
name = "answer_question"
description = "Submit an answer to a pending question"
input_schema = {
"type": "object",
"properties": {
"question_id": {"type": "string", "description": "Question ID"},
"answer": {"type": "string", "description": "Answer"}
},
"required": ["question_id", "answer"]
}
async def execute(self, question_id: str, answer: str) -> ToolResult:
"""Answer question."""
data = _load_questions()
for q in data.get("questions", []):
if q.get("id") == question_id:
q["status"] = "answered"
q["answer"] = answer
q["answered_at"] = datetime.now().isoformat()
_save_questions(data)
return ToolResult(success=True, data={
"question_id": question_id,
"answer": answer,
"status": "answered"
})
return ToolResult(success=False, error=f"Question {question_id} not found")
# Register tools
tool_registry.register(AskQuestionTool())
tool_registry.register(GetPendingQuestionsTool())
tool_registry.register(AnswerQuestionTool())
|