| import os |
| from ..core.module import BaseModule |
| from typing import Optional, Literal, Dict, Any, List |
| from pydantic import Field, BaseModel |
| import json |
| from dotenv import load_dotenv |
| import time |
|
|
| from ..models import OpenAILLM, OpenAILLMConfig, BaseLLM |
| from ..models.model_configs import LLMConfig |
| from ..prompts.workflow.workflow_editor import WORKFLOW_EDITOR_PROMPT |
| from ..core.logging import logger |
|
|
| load_dotenv() |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
|
|
| class MockLLMConfig(LLMConfig): |
| """Mock LLM configuration for testing purposes""" |
| llm_type: str = "MockLLM" |
| model: str = "mock-model" |
|
|
|
|
| class MockLLM(BaseLLM): |
| """Mock LLM implementation for testing purposes that passes pydantic type validation""" |
| |
| def __init__(self, config: MockLLMConfig = None, **kwargs): |
| if config is None: |
| config = MockLLMConfig( |
| llm_type="MockLLM", |
| model="mock-model", |
| output_response=True |
| ) |
| super().__init__(config, **kwargs) |
| |
| def init_model(self): |
| """Initialize the mock model (no-op)""" |
| pass |
| |
| def formulate_messages(self, prompts: List[str], system_messages: Optional[List[str]] = None) -> List[List[dict]]: |
| """Mock implementation of formulate_messages""" |
| result = [] |
| for prompt in prompts: |
| messages = [] |
| if system_messages: |
| for sys_msg in system_messages: |
| messages.append({"role": "system", "content": sys_msg}) |
| messages.append({"role": "user", "content": prompt}) |
| result.append(messages) |
| return result |
| |
| def single_generate(self, messages: List[dict], **kwargs) -> str: |
| """Mock implementation that returns a simple JSON response""" |
| return '{"nodes": [], "edges": []}' |
| |
| def batch_generate(self, batch_messages: List[List[dict]], **kwargs) -> List[str]: |
| """Mock implementation for batch generation""" |
| return [self.single_generate(messages, **kwargs) for messages in batch_messages] |
| |
| async def single_generate_async(self, messages: List[dict], **kwargs) -> str: |
| """Mock async implementation""" |
| return self.single_generate(messages, **kwargs) |
|
|
|
|
| def default_llm_config(): |
| """ |
| Create default LLM configuration. Uses MockLLM in testing environments |
| or when OPENAI_API_KEY is not available. |
| """ |
| |
| is_testing = ( |
| os.getenv("PYTEST_CURRENT_TEST") is not None or |
| os.getenv("CI") is not None or |
| OPENAI_API_KEY is None or |
| OPENAI_API_KEY.strip() == "" |
| ) |
| |
| if is_testing: |
| |
| mock_config = MockLLMConfig( |
| llm_type="MockLLM", |
| model="mock-model", |
| output_response=True |
| ) |
| return MockLLM(mock_config) |
| else: |
| |
| llm_config = OpenAILLMConfig( |
| model="gpt-4o", |
| openai_key=OPENAI_API_KEY, |
| stream=True, |
| output_response=True |
| ) |
| return OpenAILLM(llm_config) |
|
|
| class WorkFlowEditorReturn(BaseModel): |
| """ |
| The return of the workflow editor. |
| """ |
| |
| status: Literal["success", "failed", "exceeded_max_retries"] = Field( |
| description="The status of the workflow editing operation" |
| ) |
| |
| workflow_json: Dict[str, Any] | None = Field( |
| description="The workflow JSON structure after editing" |
| ) |
| |
| workflow_json_path: str | None = Field( |
| description="The file path where the workflow JSON is saved" |
| ) |
| |
| error_message: Optional[str] | None = Field( |
| default=None, |
| description="Error message if the operation failed" |
| ) |
|
|
| class WorkFlowEditor(BaseModule): |
| """ |
| This is a API oriented version of HITLOutsideConversationAgent, it can be used to edit the workflow json structure but in a interaction-free way. |
| Attributes: |
| save_dir (str): The directory to save the workflow json file. |
| llm (BaseLLM): The LLM model to use for editing the workflow json file. |
| max_retries (int): The maximum number of retries to edit the workflow json file. |
| """ |
| save_dir: str |
| llm: Optional[BaseLLM] = Field(default=default_llm_config()) |
| max_retries: Optional[int] = Field(default=3) |
|
|
| def init_module(self): |
| pass |
|
|
| async def edit_workflow(self, file_path: str, instruction: str, new_file_path: Optional[str] = None): |
| """ |
| optimize or modify the workflow json file according to the instruction, using LLM's ability. |
| Args: |
| file_path (str): The path to the workflow json file or the file name in the save_dir. |
| instruction (str): The instruction to edit the workflow json file. |
| new_file_path (Optional[str]): The path to the new workflow json file. |
| Returns: |
| new_json_path (str): The path to the new workflow json file. |
| """ |
| if new_file_path is None: |
| new_file_path = "new_json_for__" + os.path.split(file_path)[-1] + "__" + time.strftime("%Y%m%d_%H%M%S") + ".json" |
| new_file_path = os.path.join(self.save_dir, new_file_path) |
| else: |
| |
| path_split = os.path.split(new_file_path) |
| if not path_split[0]: |
| new_file_path = os.path.join(self.save_dir, new_file_path) |
| else: |
| if os.path.exists(path_split[0]) and path_split[1][:-5] == ".json": |
| new_file_path = new_file_path |
| else: |
| raise FileNotFoundError(f"The directory {path_split[0]} does not exist or the file name is not a json file name.") |
|
|
| |
| with open(file_path, "r") as f: |
| workflow_json = json.load(f) |
|
|
| optimization_prompt = WORKFLOW_EDITOR_PROMPT.format( |
| current_workflow_json=json.dumps(workflow_json, indent=2, ensure_ascii=False), |
| user_advice=instruction |
| ) |
| messages = [ |
| {"role": "system", "content": "You are a helpful assistant that can optimize the workflow json structure."}, |
| {"role": "user", "content": optimization_prompt} |
| ] |
| try: |
| response = await self.llm.single_generate_async(messages=messages, response_format={"type": "json_object"}) |
| |
| optimized_json = json.loads(response) |
| except Exception as e: |
| logger.error(f"LLM optimization failed: {e}") |
| optimized_json = None |
|
|
| if not optimized_json: |
| return WorkFlowEditorReturn( |
| status="failed", |
| workflow_json=None, |
| workflow_json_path=None, |
| error_message="LLM optimization failed" |
| ) |
| |
| |
| try: |
| from ..workflow.workflow import WorkFlow |
| from ..workflow.workflow_graph import WorkFlowGraph |
|
|
| |
| graph = WorkFlowGraph.from_dict(optimized_json) |
|
|
| |
| workflow = WorkFlow(graph=graph, llm=self.llm) |
| except Exception as e: |
| logger.error(f"Workflow json structure check failed: {e}") |
| return WorkFlowEditorReturn( |
| status="failed", |
| workflow_json=None, |
| workflow_json_path=None, |
| error_message="Workflow json structure check failed" |
| ) |
| del workflow |
|
|
| |
| with open(new_file_path, "w") as f: |
| json.dump(optimized_json, f, indent=2, ensure_ascii=False) |
| |
| return WorkFlowEditorReturn( |
| status="success", |
| workflow_json=optimized_json, |
| workflow_json_path=new_file_path, |
| error_message=None |
| ) |