| import os, sys |
| import utils |
| import uuid |
| import json |
| import subprocess, threading |
|
|
| FILE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| REPO_DIR = os.path.dirname(FILE_DIR) |
| STATE_DIR = os.path.join(FILE_DIR, "state") |
| sys.path.append(REPO_DIR) |
| if not os.path.exists(STATE_DIR): |
| os.mkdir(STATE_DIR) |
| import time |
|
|
|
|
| def get_openai_api_key(): |
| return os.getenv("OPENAI_API_KEY") |
|
|
|
|
| running_apis = [] |
|
|
|
|
| def get_state(state_file): |
| with open(state_file, "r") as f: |
| state = json.load(f) |
| return state |
|
|
|
|
| def set_state(state_file, state): |
| with open(state_file, "w") as f: |
| json.dump(state, f) |
|
|
|
|
| class AutoAPI: |
| def __init__(self, openai_key, ai_name, ai_role, top_5_goals): |
| self.openai_key = openai_key |
| hex = uuid.uuid4().hex |
| print(hex) |
| self.state_file = os.path.join(STATE_DIR, f"state_{hex}.json") |
| self.log_file = os.path.join(STATE_DIR, f"log_{hex}.json") |
|
|
| newline = "\n" |
| with open(os.path.join(REPO_DIR, "ai_settings.yaml"), "w") as f: |
| f.write( |
| f"""ai_goals: |
| {newline.join([f'- {goal[0]}' for goal in top_5_goals if goal[0]])} |
| ai_name: {ai_name} |
| ai_role: {ai_role} |
| """ |
| ) |
| state = { |
| "pending_input": None, |
| "awaiting_input": False, |
| "messages": [], |
| "last_message_read_index": -1, |
| } |
| set_state(self.state_file, state) |
|
|
| with open(self.log_file, "w") as f: |
| subprocess.Popen( |
| [ |
| "python", |
| os.path.join(REPO_DIR, "ui", "api.py"), |
| openai_key, |
| self.state_file, |
| ], |
| cwd=REPO_DIR, |
| stdout=f, |
| stderr=f, |
| ) |
|
|
| def send_message(self, message="Y"): |
| state = get_state(self.state_file) |
| state["pending_input"] = message |
| state["awaiting_input"] = False |
| set_state(self.state_file, state) |
|
|
| def get_chatbot_response(self): |
| while True: |
| state = get_state(self.state_file) |
| if ( |
| state["awaiting_input"] |
| and state["last_message_read_index"] >= len(state["messages"]) - 1 |
| ): |
| break |
| if state["last_message_read_index"] >= len(state["messages"]) - 1: |
| time.sleep(1) |
| else: |
| state["last_message_read_index"] += 1 |
| title, content = state["messages"][state["last_message_read_index"]] |
| yield (f"**{title.strip()}** " if title else "") + utils.remove_color( |
| content |
| ).replace("\n", "<br />") |
| set_state(self.state_file, state) |
|
|
|
|
| if __name__ == "__main__": |
| print(sys.argv) |
| _, openai_key, state_file = sys.argv |
| os.environ["OPENAI_API_KEY"] = openai_key |
| import autogpt.config.config |
| from autogpt.logs import logger |
| from autogpt.cli import main |
| import autogpt.utils |
| from autogpt.spinner import Spinner |
|
|
| def add_message(title, content): |
| state = get_state(state_file) |
| state["messages"].append((title, content)) |
| set_state(state_file, state) |
|
|
| def typewriter_log(title="", title_color="", content="", *args, **kwargs): |
| add_message(title, content) |
|
|
| def warn(message, title="", *args, **kwargs): |
| add_message(title, message) |
|
|
| def error(title, message="", *args, **kwargs): |
| add_message(title, message) |
|
|
| def clean_input(prompt=""): |
| add_message(None, prompt) |
| state = get_state(state_file) |
| state["awaiting_input"] = True |
| set_state(state_file, state) |
| while state["pending_input"] is None: |
| state = get_state(state_file) |
| print("Waiting for input...") |
| time.sleep(1) |
| print("Got input") |
| pending_input = state["pending_input"] |
| state["pending_input"] = None |
| set_state(state_file, state) |
| return pending_input |
|
|
| def spinner_start(): |
| add_message(None, "Thinking...") |
|
|
| logger.typewriter_log = typewriter_log |
| logger.warn = warn |
| logger.error = error |
| autogpt.utils.clean_input = clean_input |
| Spinner.spin = spinner_start |
|
|
| sys.argv = sys.argv[:1] |
| main() |
|
|