| from llama_index.core.workflow import ( |
| StartEvent, |
| StopEvent, |
| Workflow, |
| step, |
| Event, |
| Context, |
| ) |
| import asyncio |
| import nest_asyncio |
| from llama_index.llms.groq import Groq |
| from llama_index.utils.workflow import draw_all_possible_flows |
| from IPython.display import display, HTML |
| from dotenv import load_dotenv |
| from helper import extract_html_content |
| from pathlib import Path |
| import os |
|
|
| nest_asyncio.apply() |
|
|
| load_dotenv() |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| global_llm = Groq(api_key=GROQ_API_KEY, model="llama3-70b-8192") |
|
|
|
|
| class FirstEvent(Event): |
| first_output: str |
|
|
|
|
| class SecondEvent(Event): |
| second_output: str |
| response: str |
|
|
|
|
| class ProgressEvent(Event): |
| msg: str |
|
|
|
|
| class MyWorkflow(Workflow): |
| @step |
| async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent: |
| ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening")) |
| return FirstEvent(first_output="First step complete.") |
|
|
| @step |
| async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent: |
| llm = global_llm |
| generator = await llm.astream_complete( |
| "Please give me the first 3 paragraphs of Moby Dick, a book in the public domain." |
| ) |
| async for response in generator: |
| |
| ctx.write_event_to_stream(ProgressEvent(msg=response.delta)) |
| return SecondEvent( |
| second_output="Second step complete, full response attached", |
| response=str(response), |
| ) |
|
|
| @step |
| async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent: |
| ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening")) |
| return StopEvent(result="Workflow complete.") |
|
|
|
|
| async def main(): |
| w = MyWorkflow(timeout=30, verbose=True) |
| handler = w.run(first_input="Start the workflow.") |
|
|
| async for ev in handler.stream_events(): |
| if isinstance(ev, ProgressEvent): |
| print(ev.msg) |
|
|
| final_result = await handler |
| print("Final result", final_result) |
|
|
| workflow_file = Path(__file__).parent / "workflows" / "streaming_workflow.html" |
| draw_all_possible_flows(w, filename=str(workflow_file)) |
| html_content = extract_html_content(str(workflow_file)) |
| display(HTML(html_content), metadata=dict(isolated=True)) |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |