| import requests |
| from typing import Optional, Union |
| import io |
| import json |
| from voice_client import VoiceClient |
|
|
| class StackWithVoice: |
| def __init__(self, stack_api_url: str, voice_api_url: str = "http://localhost:8000"): |
| self.stack_api_url = stack_api_url |
| self.voice_client = VoiceClient(voice_api_url) |
| self.session = requests.Session() |
| |
| |
| self._voice_cache = {} |
| |
| def _get_stack_response(self, prompt: str) -> str: |
| """Get response from Stack 2.9 API""" |
| try: |
| response = self.session.post( |
| f"{self.stack_api_url}/api/chat", |
| json={"prompt": prompt, "model": "stack-2.9"}, |
| headers={"Content-Type": "application/json"} |
| ) |
| response.raise_for_status() |
| |
| data = response.json() |
| return data.get("response", "") |
| |
| except requests.RequestException as e: |
| raise Exception(f"Stack API request failed: {str(e)}") |
| |
| def _get_voice_model(self, voice_name: str) -> Optional[dict]: |
| """Get voice model info from cache or API""" |
| if voice_name in self._voice_cache: |
| return self._voice_cache[voice_name] |
| |
| try: |
| voices = self.voice_client.list_voices() |
| for voice in voices: |
| if voice == voice_name: |
| self._voice_cache[voice_name] = {"name": voice_name} |
| return {"name": voice_name} |
| return None |
| except Exception as e: |
| print(f"Warning: Failed to get voice models: {e}") |
| return None |
| |
| def voice_chat(self, prompt_audio_path: str, voice_name: str = "default") -> Optional[bytes]: |
| """Complete voice chat workflow: audio → text → response → audio""" |
| |
| print(f"Converting audio to text: {prompt_audio_path}") |
| prompt_text = self._audio_to_text(prompt_audio_path) |
| if not prompt_text: |
| return None |
| |
| print(f"User prompt: {prompt_text}") |
| |
| |
| print("Getting response from Stack 2.9...") |
| response_text = self._get_stack_response(prompt_text) |
| |
| if not response_text: |
| return None |
| |
| print(f"Stack response: {response_text}") |
| |
| |
| print(f"Generating voice response with voice: {voice_name}") |
| audio_data = self.voice_client.synthesize(response_text, voice_name) |
| |
| return audio_data |
| |
| def _audio_to_text(self, audio_path: str) -> str: |
| """Convert audio to text (placeholder implementation)""" |
| |
| |
| text_path = audio_path.replace(".wav", ".txt").replace(".mp3", ".txt") |
| |
| if os.path.exists(text_path): |
| with open(text_path, 'r') as f: |
| return f.read().strip() |
| |
| |
| return "This is a test voice prompt." |
| |
| def voice_command(self, command: str, voice_name: str = "default") -> Optional[bytes]: |
| """Execute voice command and get spoken response""" |
| print(f"Executing voice command: {command}") |
| |
| |
| |
| response_text = self._get_stack_response(command) |
| |
| if not response_text: |
| return None |
| |
| print(f"Command response: {response_text}") |
| |
| |
| audio_data = self.voice_client.synthesize(response_text, voice_name) |
| |
| return audio_data |
| |
| def streaming_voice_chat(self, prompt_audio_path: str, voice_name: str = "default") -> None: |
| """Stream voice chat (placeholder implementation)""" |
| print("Starting streaming voice chat...") |
| |
| |
| prompt_text = self._audio_to_text(prompt_audio_path) |
| response_text = self._get_stack_response(prompt_text) |
| |
| if not response_text: |
| print("No response received") |
| return |
| |
| print("Streaming response:") |
| print(response_text) |
| |
| |
| |
| |
| |
| |
| |
| |
| audio_data = self.voice_client.synthesize(response_text, voice_name, stream=True) |
| |
| |
| output_path = "./streaming_response.wav" |
| self.voice_client.download_audio(audio_data, output_path) |
| print(f"Streaming response saved to: {output_path}") |
|
|
| |
| if __name__ == "__main__": |
| stack_voice = StackWithVoice( |
| stack_api_url="http://localhost:5000", |
| voice_api_url="http://localhost:8000" |
| ) |
| |
| print("Testing Stack with Voice integration...") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |