google-labs-jules[bot] archc0der commited on
Commit
bf6dbfa
·
0 Parent(s):

feat: implement AutoStream conversational AI sales agent with LangGraph

Browse files

- Implements a stateful agent workflow graph using LangGraph
- Sets up an LLM-based intent classifier with structured outputs
- Implements a local FAISS-based RAG pipeline
- Includes a step-by-step lead qualification workflow and a mock backend tool execution
- Provides a CLI interface in main.py
- Creates a comprehensive testing suite mocking LLMs and Embeddings via pytest
- Adds thorough documentation on system architecture and integration capabilities

Co-authored-by: archc0der <119496494+archc0der@users.noreply.github.com>

.gitignore ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ __pycache__/
2
+ *.pyc
3
+ .env
README.md ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # AutoStream Conversational AI Agent
2
+
3
+ ## Project Overview
4
+ This project is a production-quality Conversational AI Agent built for **AutoStream**, a fictional SaaS company. It handles customer inquiries, answers product questions using a Knowledge Base (RAG), and detects high-intent users to seamlessly collect lead information and execute backend lead capture functions.
5
+
6
+ ## System Architecture
7
+ The system is designed as an agentic workflow using **LangGraph**, replacing traditional linear chatbots with a stateful, branching graph architecture.
8
+ 1. **User Input & State Management**: User messages and conversational context are persisted in a shared `AgentState` that tracks details like intent, history, and collected lead fields.
9
+ 2. **Intent Classification**: Using `gpt-4o-mini` with structured output, the agent categorizes messages (e.g., GREETING, PRICING_QUERY, HIGH_INTENT_LEAD).
10
+ 3. **Routing**: A conditional edge acts as a router, directing the conversation to specialized nodes based on intent.
11
+ 4. **Knowledge Retrieval**: Product and pricing questions are routed to a RAG pipeline that retrieves context from a FAISS vector store.
12
+ 5. **Lead Qualification**: High-intent users are routed to a multi-turn lead collection workflow. The agent selectively asks for missing fields (Name, Email, Creator Platform).
13
+ 6. **Tool Execution**: Once all fields are collected, the agent safely executes a simulated backend lead-capture tool.
14
+
15
+ ## Running Locally
16
+
17
+ ### Prerequisites
18
+ - Python 3.9+
19
+ - An OpenAI API Key
20
+
21
+ ### Setup
22
+ 1. Clone this repository.
23
+ 2. Install dependencies:
24
+ ```bash
25
+ pip install -r requirements.txt
26
+ ```
27
+ 3. Set your OpenAI API key in the environment or create a `.env` file at the root of the project:
28
+ ```env
29
+ OPENAI_API_KEY=your_openai_api_key_here
30
+ ```
31
+
32
+ ### Running the CLI Agent
33
+ To interact with the conversational agent:
34
+ ```bash
35
+ python main.py
36
+ ```
37
+
38
+ ### Running the Tests
39
+ The project features a full automated testing suite that runs completely without API keys, as all LLM and embedding calls are securely mocked.
40
+ ```bash
41
+ pytest
42
+ ```
43
+
44
+ ## RAG Pipeline (Retrieval-Augmented Generation)
45
+ When the user asks a product or pricing question, the agent utilizes a RAG pipeline:
46
+ 1. The `data/knowledge_base.md` is loaded and chunked using a `RecursiveCharacterTextSplitter`.
47
+ 2. Chunks are embedded using `OpenAIEmbeddings` and indexed into a local `FAISS` vector database.
48
+ 3. The retriever fetches the top `k` relevant chunks for the user's query and injects them into the RAG generation prompt.
49
+ 4. The LLM generates a well-grounded response strictly based on the retrieved context.
50
+
51
+ ## Lead Capture Workflow
52
+ For users expressing a desire to purchase or sign up, the intent classifier triggers `HIGH_INTENT_LEAD`.
53
+ The workflow then shifts to `process_lead`. The system relies on structured extraction to glean fields (Name, Email, Creator Platform) from incoming text. It incrementally prompts the user over several turns until all required fields are collected, effectively pausing the LangGraph execution between inputs.
54
+
55
+ ## State Management
56
+ A `TypedDict` named `AgentState` tracks the overarching conversation context. This prevents duplicate questions and provides memory. State variables include `conversation_history` (up to 6 turns), the currently `detected_intent`, `retrieved_documents`, and incremental lead variables (`user_name`, `user_email`, `creator_platform`). The state flows deterministically through each node, creating predictable transitions.
57
+
58
+ ## Tool Execution Safety
59
+ The mock backend tool (`mock_lead_capture`) is heavily guarded. It executes solely in the `execute_tool` node, which only runs if the router confirms `lead_ready` is `True`. Furthermore, the node performs a strict validation to ensure `user_name`, `user_email`, and `creator_platform` are all non-null before triggering the function, ensuring no premature or incomplete lead data is dispatched.
60
+
61
+ ## WhatsApp Integration
62
+ This agent can easily be deployed on WhatsApp using webhooks and Twilio:
63
+ 1. **Twilio API**: Set up a Twilio WhatsApp Business API sandbox or account.
64
+ 2. **Webhook Endpoint**: Create an HTTP endpoint (e.g., via FastAPI or Flask) to receive incoming webhook payloads containing the user's WhatsApp message.
65
+ 3. **Agent Backend**: The webhook extracts the message text and user identifier (phone number) and invokes the LangGraph agent.
66
+ 4. **Session Management**: A database (like Redis) can key the `AgentState` to the user's phone number, maintaining continuity and conversational memory across incoming webhooks.
67
+ 5. **Response Dispatch**: After the graph runs, the final `response` string is dispatched back to the user via a POST request to Twilio's Message API.
68
+
69
+ ## Testing Architecture
70
+ A rigorous suite of tests sits in the `tests/` directory:
71
+ 1. **Mocking**: All AI inference (LLMs and Embeddings) is aggressively mocked using `pytest-mock` and standard injection.
72
+ 2. **Deterministic Reliability**: By returning controlled mock objects, tests validate the graph structure, logic, state changes, routing, and tool safety independently of live API behavior and latencies.
73
+ 3. **End-to-End Simulation**: `test_agent_e2e.py` walks through a multi-turn conversation step-by-step, mimicking user turns and validating correct downstream transitions from Greeting -> RAG -> Lead Capture -> Tool Execution.
agent/__init__.py ADDED
File without changes
agent/graph.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from langgraph.graph import StateGraph, START, END
2
+ from agent.state import AgentState
3
+ from agent.nodes import (
4
+ detect_intent,
5
+ handle_greeting,
6
+ handle_unknown,
7
+ retrieve_knowledge,
8
+ generate_rag_response,
9
+ process_lead,
10
+ execute_tool
11
+ )
12
+ from agent.router import route_intent, route_after_lead
13
+
14
+ def build_graph():
15
+ # Initialize the graph with the typed state
16
+ workflow = StateGraph(AgentState)
17
+
18
+ # Add nodes
19
+ workflow.add_node("detect_intent", detect_intent)
20
+ workflow.add_node("handle_greeting", handle_greeting)
21
+ workflow.add_node("handle_unknown", handle_unknown)
22
+ workflow.add_node("retrieve_knowledge", retrieve_knowledge)
23
+ workflow.add_node("generate_rag_response", generate_rag_response)
24
+ workflow.add_node("process_lead", process_lead)
25
+ workflow.add_node("execute_tool", execute_tool)
26
+
27
+ # Define edges
28
+ # Start -> detect_intent
29
+ workflow.add_edge(START, "detect_intent")
30
+
31
+ # detect_intent -> conditional routing based on intent
32
+ workflow.add_conditional_edges(
33
+ "detect_intent",
34
+ route_intent,
35
+ {
36
+ "handle_greeting": "handle_greeting",
37
+ "retrieve_knowledge": "retrieve_knowledge",
38
+ "process_lead": "process_lead",
39
+ "handle_unknown": "handle_unknown"
40
+ }
41
+ )
42
+
43
+ # retrieve_knowledge -> generate_rag_response
44
+ workflow.add_edge("retrieve_knowledge", "generate_rag_response")
45
+
46
+ # process_lead -> conditional routing (execute_tool or end)
47
+ workflow.add_conditional_edges(
48
+ "process_lead",
49
+ route_after_lead,
50
+ {
51
+ "execute_tool": "execute_tool",
52
+ "__end__": END
53
+ }
54
+ )
55
+
56
+ # Define terminal edges
57
+ workflow.add_edge("handle_greeting", END)
58
+ workflow.add_edge("handle_unknown", END)
59
+ workflow.add_edge("generate_rag_response", END)
60
+ workflow.add_edge("execute_tool", END)
61
+
62
+ # Compile the graph
63
+ app = workflow.compile()
64
+
65
+ return app
66
+
67
+ # Expose a compiled instance
68
+ app = build_graph()
agent/nodes.py ADDED
@@ -0,0 +1,112 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Optional
2
+ from pydantic import BaseModel, Field
3
+ from langchain_openai import ChatOpenAI
4
+ from langchain_core.prompts import ChatPromptTemplate
5
+ from agent.state import AgentState
6
+ from rag.retriever import retrieve_documents
7
+ from tools.lead_capture import mock_lead_capture
8
+
9
+ def get_llm():
10
+ return ChatOpenAI(model="gpt-4o-mini", temperature=0)
11
+
12
+ class IntentResponse(BaseModel):
13
+ intent: str = Field(description="The intent of the user. Must be one of: GREETING, PRODUCT_QUERY, PRICING_QUERY, HIGH_INTENT_LEAD, UNKNOWN")
14
+ confidence: float = Field(description="Confidence score between 0 and 1")
15
+
16
+ class LeadExtractionResponse(BaseModel):
17
+ user_name: Optional[str] = Field(default=None, description="The name of the user if provided")
18
+ user_email: Optional[str] = Field(default=None, description="The email address of the user if provided")
19
+ creator_platform: Optional[str] = Field(default=None, description="The creator platform (e.g., YouTube, Instagram) if provided")
20
+
21
+ def detect_intent(state: AgentState) -> AgentState:
22
+ llm = get_llm()
23
+ prompt = ChatPromptTemplate.from_messages([
24
+ ("system", "You are an intent classification assistant for AutoStream. Analyze the user's message and determine the intent. Categories: GREETING, PRODUCT_QUERY, PRICING_QUERY, HIGH_INTENT_LEAD, UNKNOWN. A 'HIGH_INTENT_LEAD' is when a user explicitly expresses interest in signing up, buying, or trying out a plan."),
25
+ ("user", "{message}")
26
+ ])
27
+
28
+ chain = prompt | llm.with_structured_output(IntentResponse)
29
+
30
+ history_str = "\n".join([f"{msg['role']}: {msg['content']}" for msg in state.get("conversation_history", [])[-3:]])
31
+ context_message = f"Recent history:\n{history_str}\n\nCurrent message:\n{state['current_message']}"
32
+
33
+ response = chain.invoke({"message": context_message})
34
+
35
+ return {"detected_intent": response.intent}
36
+
37
+ def handle_greeting(state: AgentState) -> AgentState:
38
+ return {"response": "Hello! I'm the AutoStream assistant. I can answer questions about our features and pricing. How can I help you today?"}
39
+
40
+ def handle_unknown(state: AgentState) -> AgentState:
41
+ return {"response": "I'm not quite sure how to help with that. Could you clarify your question about AutoStream?"}
42
+
43
+ def retrieve_knowledge(state: AgentState) -> AgentState:
44
+ docs = retrieve_documents(state["current_message"])
45
+ return {"retrieved_documents": docs}
46
+
47
+ def generate_rag_response(state: AgentState) -> AgentState:
48
+ llm = get_llm()
49
+ prompt = ChatPromptTemplate.from_messages([
50
+ ("system", "You are a helpful sales assistant for AutoStream. Answer the user's question based strictly on the following retrieved knowledge:\n\n{context}\n\nIf the answer is not in the context, say you don't know."),
51
+ ("user", "{message}")
52
+ ])
53
+
54
+ context = "\n\n".join(state.get("retrieved_documents", []))
55
+ chain = prompt | llm
56
+
57
+ response = chain.invoke({
58
+ "context": context,
59
+ "message": state["current_message"]
60
+ })
61
+
62
+ return {"response": response.content}
63
+
64
+ def process_lead(state: AgentState) -> AgentState:
65
+ llm = get_llm()
66
+
67
+ extract_prompt = ChatPromptTemplate.from_messages([
68
+ ("system", "Extract the user's name, email, and creator platform (e.g. YouTube, TikTok, Instagram) from the message if present. Return null for fields not found."),
69
+ ("user", "{message}")
70
+ ])
71
+ extract_chain = extract_prompt | llm.with_structured_output(LeadExtractionResponse)
72
+
73
+ history_str = "\n".join([f"{msg['role']}: {msg['content']}" for msg in state.get("conversation_history", [])[-3:]])
74
+ context_message = f"Recent history:\n{history_str}\n\nCurrent message:\n{state['current_message']}"
75
+
76
+ extracted = extract_chain.invoke({"message": context_message})
77
+
78
+ updates = {}
79
+ if extracted.user_name and not state.get("user_name"):
80
+ updates["user_name"] = extracted.user_name
81
+ if extracted.user_email and not state.get("user_email"):
82
+ updates["user_email"] = extracted.user_email
83
+ if extracted.creator_platform and not state.get("creator_platform"):
84
+ updates["creator_platform"] = extracted.creator_platform
85
+
86
+ current_name = updates.get("user_name", state.get("user_name"))
87
+ current_email = updates.get("user_email", state.get("user_email"))
88
+ current_platform = updates.get("creator_platform", state.get("creator_platform"))
89
+
90
+ if not current_name:
91
+ updates["response"] = "Great! I can help with that. Could I have your name?"
92
+ return updates
93
+ elif not current_email:
94
+ updates["response"] = f"Thanks {current_name}! What is your email address?"
95
+ return updates
96
+ elif not current_platform:
97
+ updates["response"] = "Got it. And what creator platform do you primarily use (e.g., YouTube, TikTok)?"
98
+ return updates
99
+ else:
100
+ updates["lead_ready"] = True
101
+ return updates
102
+
103
+ def execute_tool(state: AgentState) -> AgentState:
104
+ if state.get("lead_ready") and state.get("user_name") and state.get("user_email") and state.get("creator_platform"):
105
+ mock_lead_capture(
106
+ state["user_name"],
107
+ state["user_email"],
108
+ state["creator_platform"]
109
+ )
110
+ return {"response": f"Thanks {state['user_name']}! I've successfully collected your information for your {state['creator_platform']} channel. Our team will reach out to {state['user_email']} shortly."}
111
+ else:
112
+ return {"response": "Error: Tried to execute lead capture tool without all required fields."}
agent/router.py ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from agent.state import AgentState
2
+
3
+ def route_intent(state: AgentState) -> str:
4
+ """
5
+ Router node that directs the workflow based on the detected intent.
6
+ It returns the name of the next node to execute.
7
+ """
8
+ # If we are already in the middle of lead collection, we should stay in that flow
9
+ # This is slightly simplified; we'll route to process_lead if we detected HIGH_INTENT_LEAD
10
+ # or if we are already missing lead fields but have HIGH_INTENT_LEAD in previous turns.
11
+ # To keep it simple, if intent is HIGH_INTENT_LEAD, we go to lead workflow.
12
+ # If we are expecting lead info, the intent classifier might classify as UNKNOWN or something else
13
+ # We can handle this by checking if there's an ongoing lead collection in state.
14
+
15
+ intent = state.get("detected_intent")
16
+
17
+ # Check if we were already in lead collection
18
+ has_partial_lead = (
19
+ state.get("user_name") is not None or
20
+ state.get("user_email") is not None or
21
+ state.get("creator_platform") is not None
22
+ ) and not state.get("lead_ready")
23
+
24
+ if intent == "HIGH_INTENT_LEAD" or has_partial_lead:
25
+ return "process_lead"
26
+ elif intent in ["PRODUCT_QUERY", "PRICING_QUERY"]:
27
+ return "retrieve_knowledge"
28
+ elif intent == "GREETING":
29
+ return "handle_greeting"
30
+ else:
31
+ return "handle_unknown"
32
+
33
+ def route_after_lead(state: AgentState) -> str:
34
+ """
35
+ Router node after process_lead to decide whether to execute the tool or stop.
36
+ """
37
+ if state.get("lead_ready"):
38
+ return "execute_tool"
39
+ else:
40
+ # We need more info, so we just end the graph execution here to wait for user input
41
+ return "__end__"
agent/state.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import TypedDict, List, Optional, Dict, Any
2
+
3
+ class AgentState(TypedDict):
4
+ """
5
+ Shared state object used by the agent graph.
6
+ """
7
+ conversation_history: List[Dict[str, str]] # list of {"role": "user"/"assistant", "content": "..."}
8
+ current_message: str
9
+ detected_intent: Optional[str]
10
+ retrieved_documents: List[str]
11
+
12
+ # Lead collection fields
13
+ user_name: Optional[str]
14
+ user_email: Optional[str]
15
+ creator_platform: Optional[str]
16
+
17
+ lead_ready: bool
18
+
19
+ # Final response to the user
20
+ response: str
data/knowledge_base.md ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # AutoStream Pricing & Features
2
+
3
+ ## Basic Plan
4
+ * $29/month
5
+ * 10 videos per month
6
+ * 720p resolution
7
+
8
+ ## Pro Plan
9
+ * $79/month
10
+ * Unlimited videos
11
+ * 4K resolution
12
+ * AI captions included
13
+
14
+ # Company Policies
15
+ * No refunds after 7 days
16
+ * 24/7 support available only on Pro plan
main.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+ from agent.graph import app
4
+ from agent.state import AgentState
5
+
6
+ def print_header(title):
7
+ print(f"\n{'='*50}\n{title}\n{'='*50}")
8
+
9
+ def main():
10
+ # Load environment variables
11
+ load_dotenv()
12
+
13
+ if not os.environ.get("OPENAI_API_KEY"):
14
+ print("Warning: OPENAI_API_KEY is not set. The agent will not be able to call the LLM.")
15
+ print("Please set it in your environment or create a .env file.")
16
+
17
+ print_header("AutoStream AI Sales Assistant")
18
+ print("Type 'quit' or 'exit' to end the conversation.\n")
19
+
20
+ # Initialize state
21
+ state = AgentState(
22
+ conversation_history=[],
23
+ current_message="",
24
+ detected_intent=None,
25
+ retrieved_documents=[],
26
+ user_name=None,
27
+ user_email=None,
28
+ creator_platform=None,
29
+ lead_ready=False,
30
+ response=""
31
+ )
32
+
33
+ while True:
34
+ try:
35
+ user_input = input("\nYou: ")
36
+ if user_input.lower() in ['quit', 'exit']:
37
+ break
38
+
39
+ # Update state with new message
40
+ state["current_message"] = user_input
41
+
42
+ # Run the agent graph
43
+ print("\n[Agent is thinking...]")
44
+
45
+ # Run the graph
46
+ result_state = app.invoke(state)
47
+
48
+ # Update our persistent state with the new state from the graph
49
+ state = result_state
50
+
51
+ # Add to conversation history
52
+ state["conversation_history"].append({"role": "user", "content": user_input})
53
+ state["conversation_history"].append({"role": "assistant", "content": state["response"]})
54
+
55
+ # Keep history to max 6 turns
56
+ if len(state["conversation_history"]) > 12: # 6 turns (user+assistant)
57
+ state["conversation_history"] = state["conversation_history"][-12:]
58
+
59
+ # Display results
60
+ print(f"[Detected Intent]: {state.get('detected_intent', 'UNKNOWN')}")
61
+
62
+ if state.get("retrieved_documents") and state.get("detected_intent") in ["PRODUCT_QUERY", "PRICING_QUERY"]:
63
+ print(f"[RAG Retrieval]: Found {len(state['retrieved_documents'])} relevant knowledge chunks.")
64
+
65
+ print(f"\nAgent: {state['response']}")
66
+
67
+ except KeyboardInterrupt:
68
+ break
69
+ except Exception as e:
70
+ print(f"\nAn error occurred: {e}")
71
+
72
+ if __name__ == "__main__":
73
+ main()
rag/__init__.py ADDED
File without changes
rag/embeddings.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ from langchain_openai import OpenAIEmbeddings
2
+
3
+ def get_embeddings():
4
+ """
5
+ Returns the embedding model used for the RAG pipeline.
6
+ """
7
+ return OpenAIEmbeddings()
rag/retriever.py ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from rag.vectorstore import get_vectorstore
2
+
3
+ def retrieve_documents(query: str, k: int = 3):
4
+ """
5
+ Retrieves the top k relevant documents for the given query.
6
+ """
7
+ vectorstore = get_vectorstore()
8
+ retriever = vectorstore.as_retriever(search_kwargs={"k": k})
9
+ docs = retriever.invoke(query)
10
+
11
+ return [doc.page_content for doc in docs]
rag/vectorstore.py ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from langchain_community.document_loaders import TextLoader
3
+ from langchain_text_splitters import RecursiveCharacterTextSplitter
4
+ from langchain_community.vectorstores import FAISS
5
+ from rag.embeddings import get_embeddings
6
+
7
+ def build_vectorstore(filepath: str = "data/knowledge_base.md"):
8
+ """
9
+ Loads the knowledge base, splits it, and builds a FAISS vector store.
10
+ """
11
+ if not os.path.exists(filepath):
12
+ raise FileNotFoundError(f"Knowledge base not found at {filepath}")
13
+
14
+ loader = TextLoader(filepath)
15
+ docs = loader.load()
16
+
17
+ text_splitter = RecursiveCharacterTextSplitter(
18
+ chunk_size=100,
19
+ chunk_overlap=20,
20
+ separators=["\n\n", "\n", " ", ""]
21
+ )
22
+ splits = text_splitter.split_documents(docs)
23
+
24
+ embeddings = get_embeddings()
25
+ vectorstore = FAISS.from_documents(splits, embeddings)
26
+
27
+ return vectorstore
28
+
29
+ # Cache the vector store globally so we don't rebuild it on every request
30
+ _vectorstore = None
31
+
32
+ def get_vectorstore(filepath: str = "data/knowledge_base.md"):
33
+ global _vectorstore
34
+ if _vectorstore is None:
35
+ _vectorstore = build_vectorstore(filepath)
36
+ return _vectorstore
requirements.txt ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ langchain
2
+ langgraph
3
+ langchain-openai
4
+ langchain-community
5
+ langchain-text-splitters
6
+ faiss-cpu
7
+ python-dotenv
8
+ pydantic
9
+ pytest
10
+ pytest-mock
tests/__init__.py ADDED
File without changes
tests/test_agent_e2e.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ from agent.graph import app
3
+ from agent.state import AgentState
4
+ from agent.nodes import IntentResponse, LeadExtractionResponse
5
+ from langchain_core.runnables import RunnableLambda
6
+
7
+ def simulate_conversation(messages, mock_llm_setup_func):
8
+ """
9
+ Helper utility that simulates a multi-turn conversation.
10
+ Feeds messages sequentially through the agent graph and returns the final state.
11
+ """
12
+ state = AgentState(
13
+ conversation_history=[],
14
+ current_message="",
15
+ detected_intent=None,
16
+ retrieved_documents=[],
17
+ user_name=None,
18
+ user_email=None,
19
+ creator_platform=None,
20
+ lead_ready=False,
21
+ response=""
22
+ )
23
+
24
+ for idx, msg in enumerate(messages):
25
+ state["current_message"] = msg
26
+ mock_llm_setup_func(idx) # setup mocks for this turn
27
+ state = app.invoke(state)
28
+
29
+ # update history manually
30
+ state["conversation_history"].append({"role": "user", "content": state["current_message"]})
31
+ state["conversation_history"].append({"role": "assistant", "content": state["response"]})
32
+
33
+ return state
34
+
35
+ def test_agent_e2e(mocker):
36
+ # E2E Test USING graph.invoke
37
+ # We patch the `get_llm` inside `agent.nodes` to return a mock LLM.
38
+ mock_llm = mocker.MagicMock()
39
+ mocker.patch('agent.nodes.get_llm', return_value=mock_llm)
40
+
41
+ # Mock RAG retrieval
42
+ mocker.patch('agent.nodes.retrieve_documents', return_value=["We have Basic and Pro plans for $29 and $79."])
43
+
44
+ mock_tool = mocker.patch('agent.nodes.mock_lead_capture')
45
+
46
+ messages = [
47
+ "Hi",
48
+ "Tell me about pricing",
49
+ "I want the Pro plan for my YouTube channel",
50
+ "My name is Alex",
51
+ "alex@email.com"
52
+ ]
53
+
54
+ def setup_mocks_for_turn(idx):
55
+ if idx == 0:
56
+ # Turn 1: Greeting
57
+ mock_chain = RunnableLambda(lambda x: IntentResponse(intent="GREETING", confidence=0.99))
58
+ mock_llm.with_structured_output.return_value = mock_chain
59
+ elif idx == 1:
60
+ # Turn 2: Pricing
61
+ mock_chain = RunnableLambda(lambda x: IntentResponse(intent="PRICING_QUERY", confidence=0.99))
62
+ mock_llm.with_structured_output.return_value = mock_chain
63
+
64
+ # The regular invoke for generate_rag_response returns AIMessage-like object
65
+ class FakeResponse:
66
+ content = "We have Basic and Pro plans."
67
+ mock_llm.invoke.return_value = FakeResponse()
68
+
69
+ elif idx == 2:
70
+ # Turn 3: High intent lead
71
+ # The router uses intent. The process_lead uses with_structured_output.
72
+ # Since both use with_structured_output in the same turn, we need a side_effect.
73
+ def mock_structured_output(schema):
74
+ if schema.__name__ == "IntentResponse":
75
+ return RunnableLambda(lambda x: IntentResponse(intent="HIGH_INTENT_LEAD", confidence=0.99))
76
+ else:
77
+ return RunnableLambda(lambda x: LeadExtractionResponse(user_name=None, user_email=None, creator_platform="YouTube"))
78
+ mock_llm.with_structured_output.side_effect = mock_structured_output
79
+
80
+ elif idx == 3:
81
+ # Turn 4: Provide name
82
+ def mock_structured_output(schema):
83
+ if schema.__name__ == "IntentResponse":
84
+ return RunnableLambda(lambda x: IntentResponse(intent="HIGH_INTENT_LEAD", confidence=0.99))
85
+ else:
86
+ return RunnableLambda(lambda x: LeadExtractionResponse(user_name="Alex", user_email=None, creator_platform=None))
87
+ mock_llm.with_structured_output.side_effect = mock_structured_output
88
+
89
+ elif idx == 4:
90
+ # Turn 5: Provide email
91
+ def mock_structured_output(schema):
92
+ if schema.__name__ == "IntentResponse":
93
+ return RunnableLambda(lambda x: IntentResponse(intent="HIGH_INTENT_LEAD", confidence=0.99))
94
+ else:
95
+ return RunnableLambda(lambda x: LeadExtractionResponse(user_name=None, user_email="alex@email.com", creator_platform=None))
96
+ mock_llm.with_structured_output.side_effect = mock_structured_output
97
+
98
+ final_state = simulate_conversation(messages, setup_mocks_for_turn)
99
+
100
+ assert final_state.get("user_name") == "Alex"
101
+ assert final_state.get("user_email") == "alex@email.com"
102
+ assert final_state.get("creator_platform") == "YouTube"
103
+ assert final_state.get("lead_ready") is True
104
+
105
+ mock_tool.assert_called_once_with("Alex", "alex@email.com", "YouTube")
tests/test_intent_classifier.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ from agent.nodes import detect_intent
3
+ from agent.state import AgentState
4
+ from agent.nodes import IntentResponse
5
+ from langchain_core.runnables import RunnableLambda
6
+
7
+ def test_intent_classifier_greeting(mocker):
8
+ state = AgentState(
9
+ conversation_history=[],
10
+ current_message="Hi there",
11
+ detected_intent=None,
12
+ retrieved_documents=[],
13
+ user_name=None,
14
+ user_email=None,
15
+ creator_platform=None,
16
+ lead_ready=False,
17
+ response=""
18
+ )
19
+
20
+ mock_llm = mocker.MagicMock()
21
+ mock_chain = RunnableLambda(lambda x: IntentResponse(intent="GREETING", confidence=0.99))
22
+ mock_llm.with_structured_output.return_value = mock_chain
23
+ mocker.patch('agent.nodes.get_llm', return_value=mock_llm)
24
+
25
+ result = detect_intent(state)
26
+ assert result["detected_intent"] == "GREETING"
27
+
28
+ def test_intent_classifier_pricing(mocker):
29
+ state = AgentState(
30
+ conversation_history=[],
31
+ current_message="What are your pricing plans?",
32
+ detected_intent=None,
33
+ retrieved_documents=[],
34
+ user_name=None,
35
+ user_email=None,
36
+ creator_platform=None,
37
+ lead_ready=False,
38
+ response=""
39
+ )
40
+
41
+ mock_llm = mocker.MagicMock()
42
+ mock_chain = RunnableLambda(lambda x: IntentResponse(intent="PRICING_QUERY", confidence=0.95))
43
+ mock_llm.with_structured_output.return_value = mock_chain
44
+ mocker.patch('agent.nodes.get_llm', return_value=mock_llm)
45
+
46
+ result = detect_intent(state)
47
+ assert result["detected_intent"] == "PRICING_QUERY"
48
+
49
+ def test_intent_classifier_high_intent(mocker):
50
+ state = AgentState(
51
+ conversation_history=[],
52
+ current_message="I want to sign up for Pro plan",
53
+ detected_intent=None,
54
+ retrieved_documents=[],
55
+ user_name=None,
56
+ user_email=None,
57
+ creator_platform=None,
58
+ lead_ready=False,
59
+ response=""
60
+ )
61
+
62
+ mock_llm = mocker.MagicMock()
63
+ mock_chain = RunnableLambda(lambda x: IntentResponse(intent="HIGH_INTENT_LEAD", confidence=0.91))
64
+ mock_llm.with_structured_output.return_value = mock_chain
65
+ mocker.patch('agent.nodes.get_llm', return_value=mock_llm)
66
+
67
+ result = detect_intent(state)
68
+ assert result["detected_intent"] == "HIGH_INTENT_LEAD"
tests/test_lead_workflow.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ from agent.nodes import process_lead, LeadExtractionResponse
3
+ from agent.state import AgentState
4
+ from langchain_core.runnables import RunnableLambda
5
+
6
+ def test_lead_workflow_step_by_step(mocker):
7
+ # Step 1: User says they want the Pro plan for YouTube
8
+ state = AgentState(
9
+ conversation_history=[],
10
+ current_message="I want the Pro plan for my YouTube channel",
11
+ detected_intent="HIGH_INTENT_LEAD",
12
+ retrieved_documents=[],
13
+ user_name=None,
14
+ user_email=None,
15
+ creator_platform=None,
16
+ lead_ready=False,
17
+ response=""
18
+ )
19
+
20
+ mock_llm = mocker.MagicMock()
21
+ mock_chain_1 = RunnableLambda(lambda x: LeadExtractionResponse(user_name=None, user_email=None, creator_platform="YouTube"))
22
+ mock_llm.with_structured_output.return_value = mock_chain_1
23
+ mocker.patch('agent.nodes.get_llm', return_value=mock_llm)
24
+
25
+ result = process_lead(state)
26
+ assert result.get("user_name") is None
27
+ assert result.get("creator_platform") == "YouTube"
28
+ assert "name" in result["response"].lower()
29
+
30
+ # Simulate state update
31
+ state.update(result)
32
+ state["conversation_history"].append({"role": "user", "content": state["current_message"]})
33
+ state["conversation_history"].append({"role": "assistant", "content": state["response"]})
34
+
35
+ # Step 2: User provides name
36
+ state["current_message"] = "My name is Alex"
37
+ mock_chain_2 = RunnableLambda(lambda x: LeadExtractionResponse(user_name="Alex", user_email=None, creator_platform=None))
38
+ mock_llm.with_structured_output.return_value = mock_chain_2
39
+
40
+ result = process_lead(state)
41
+ assert result.get("user_name") == "Alex"
42
+ assert "email" in result["response"].lower()
43
+
44
+ # Simulate state update
45
+ state.update(result)
46
+ state["conversation_history"].append({"role": "user", "content": state["current_message"]})
47
+ state["conversation_history"].append({"role": "assistant", "content": state["response"]})
48
+
49
+ # Step 3: User provides email
50
+ state["current_message"] = "alex@email.com"
51
+ mock_chain_3 = RunnableLambda(lambda x: LeadExtractionResponse(user_name=None, user_email="alex@email.com", creator_platform=None))
52
+ mock_llm.with_structured_output.return_value = mock_chain_3
53
+
54
+ result = process_lead(state)
55
+ assert result.get("user_email") == "alex@email.com"
56
+ assert result.get("lead_ready") is True
tests/test_rag_pipeline.py ADDED
@@ -0,0 +1,51 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ import os
3
+ from rag.vectorstore import build_vectorstore
4
+ import rag.vectorstore
5
+ from langchain_core.embeddings import Embeddings
6
+ from typing import List
7
+
8
+ os.environ["OPENAI_API_KEY"] = "dummy_key"
9
+
10
+ class MockEmbedding(Embeddings):
11
+ def embed_documents(self, texts: List[str]) -> List[List[float]]:
12
+ # Just return a zero vector of size 1536 for each input text
13
+ return [[0.0] * 1536 for _ in texts]
14
+
15
+ def embed_query(self, text: str) -> List[float]:
16
+ return [0.0] * 1536
17
+
18
+ def test_rag_pipeline_loads_and_retrieves(mocker, tmp_path):
19
+ # Test end-to-end vectorstore build and retrieval (testing doc loading and splitting)
20
+ kb_file = tmp_path / "knowledge_base.md"
21
+ kb_file.write_text("""
22
+ # AutoStream Pricing & Features
23
+
24
+ ## Pro Plan
25
+ * $79/month
26
+ * Unlimited videos
27
+ * 4K resolution
28
+ * AI captions included
29
+ """)
30
+
31
+ # We must patch get_embeddings in vectorstore so it uses our mock that doesn't call OpenAI
32
+ mocker.patch('rag.vectorstore.get_embeddings', return_value=MockEmbedding())
33
+ # FAISS has an internal check for Embeddings class, so MockEmbedding must inherit from Embeddings
34
+
35
+ # Mock the actual FAISS from_documents internally to just create an empty FAISS store,
36
+ # OR we can let FAISS run with our mock embeddings. Let's let it run with mock embeddings.
37
+ vs = build_vectorstore(str(kb_file))
38
+ assert vs is not None
39
+
40
+ # Now patch the global get_vectorstore so our retriever uses this one
41
+ mocker.patch('rag.retriever.get_vectorstore', return_value=vs)
42
+ from rag.retriever import retrieve_documents
43
+
44
+ docs = retrieve_documents("What does the Pro plan cost?", k=1)
45
+
46
+ # Since all embeddings are 0, it will return the first document(s) it split.
47
+ # With chunk size 100, the first few lines should be retrieved.
48
+ assert len(docs) > 0
49
+ # The actual retrieval will return a chunk. The first chunk should have "Pro Plan" or "AutoStream Pricing".
50
+ # Just asserting it retrieved something from our mock file.
51
+ assert "AutoStream" in docs[0] or "Pro Plan" in docs[0] or "$79/month" in docs[0]
tests/test_tool_execution.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pytest
2
+ from agent.nodes import execute_tool
3
+ from agent.state import AgentState
4
+
5
+ def test_tool_execution_missing_fields(mocker):
6
+ mock_tool = mocker.patch('agent.nodes.mock_lead_capture')
7
+
8
+ state = AgentState(
9
+ conversation_history=[],
10
+ current_message="",
11
+ detected_intent="HIGH_INTENT_LEAD",
12
+ retrieved_documents=[],
13
+ user_name="Alex",
14
+ user_email="alex@email.com",
15
+ creator_platform=None, # Missing platform
16
+ lead_ready=True,
17
+ response=""
18
+ )
19
+
20
+ result = execute_tool(state)
21
+
22
+ # Tool should NOT be executed
23
+ mock_tool.assert_not_called()
24
+ assert "Error" in result["response"]
25
+
26
+ def test_tool_execution_all_fields(mocker):
27
+ mock_tool = mocker.patch('agent.nodes.mock_lead_capture')
28
+
29
+ state = AgentState(
30
+ conversation_history=[],
31
+ current_message="",
32
+ detected_intent="HIGH_INTENT_LEAD",
33
+ retrieved_documents=[],
34
+ user_name="Alex",
35
+ user_email="alex@email.com",
36
+ creator_platform="YouTube",
37
+ lead_ready=True,
38
+ response=""
39
+ )
40
+
41
+ result = execute_tool(state)
42
+
43
+ # Tool should be executed exactly once
44
+ mock_tool.assert_called_once_with("Alex", "alex@email.com", "YouTube")
45
+ assert "Thanks Alex" in result["response"]
tools/__init__.py ADDED
File without changes
tools/lead_capture.py ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ def mock_lead_capture(name: str, email: str, platform: str):
2
+ """
3
+ Mock backend function to capture lead information.
4
+ """
5
+ print(f"Lead captured successfully: {name}, {email}, {platform}")