from typing import Literal from langchain.chat_models import init_chat_model from email_assistant.tools import get_tools, get_tools_by_name from email_assistant.tools.default.prompt_templates import AGENT_TOOLS_PROMPT from email_assistant.prompts import triage_system_prompt, triage_user_prompt, agent_system_prompt, default_background, default_triage_instructions, default_response_preferences, default_cal_preferences from email_assistant.schemas import State, RouterSchema, StateInput from email_assistant.utils import parse_email, format_email_markdown from langgraph.graph import StateGraph, START, END from langgraph.types import Command from dotenv import load_dotenv load_dotenv(".env") # Get tools tools = get_tools() tools_by_name = get_tools_by_name(tools) # Initialize the LLM for use with router / structured output llm = init_chat_model("openai:gpt-4.1", temperature=0.0) llm_router = llm.with_structured_output(RouterSchema) # Initialize the LLM, enforcing tool use (of any available tools) for agent llm = init_chat_model("openai:gpt-4.1", temperature=0.0) llm_with_tools = llm.bind_tools(tools, tool_choice="any") # Nodes def llm_call(state: State): """LLM decides whether to call a tool or not""" return { "messages": [ llm_with_tools.invoke( [ {"role": "system", "content": agent_system_prompt.format( tools_prompt=AGENT_TOOLS_PROMPT, background=default_background, response_preferences=default_response_preferences, cal_preferences=default_cal_preferences) }, ] + state["messages"] ) ] } def tool_node(state: State): """Performs the tool call""" result = [] for tool_call in state["messages"][-1].tool_calls: tool = tools_by_name[tool_call["name"]] observation = tool.invoke(tool_call["args"]) result.append({"role": "tool", "content" : observation, "tool_call_id": tool_call["id"]}) return {"messages": result} # Conditional edge function def should_continue(state: State) -> Literal["Action", "__end__"]: """Route to Action, or end if Done tool called""" messages = state["messages"] last_message = messages[-1] if last_message.tool_calls: for tool_call in last_message.tool_calls: if tool_call["name"] == "Done": return END else: return "Action" # Build workflow agent_builder = StateGraph(State) # Add nodes agent_builder.add_node("llm_call", llm_call) agent_builder.add_node("environment", tool_node) # Add edges to connect nodes agent_builder.add_edge(START, "llm_call") agent_builder.add_conditional_edges( "llm_call", should_continue, { # Name returned by should_continue : Name of next node to visit "Action": "environment", END: END, }, ) agent_builder.add_edge("environment", "llm_call") # Compile the agent agent = agent_builder.compile() def triage_router(state: State) -> Command[Literal["response_agent", "__end__"]]: """Analyze email content to decide if we should respond, notify, or ignore. The triage step prevents the assistant from wasting time on: - Marketing emails and spam - Company-wide announcements - Messages meant for other teams """ author, to, subject, email_thread = parse_email(state["email_input"]) system_prompt = triage_system_prompt.format( background=default_background, triage_instructions=default_triage_instructions ) user_prompt = triage_user_prompt.format( author=author, to=to, subject=subject, email_thread=email_thread ) # Create email markdown for Agent Inbox in case of notification email_markdown = format_email_markdown(subject, author, to, email_thread) # Run the router LLM result = llm_router.invoke( [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] ) # Decision classification = result.classification if classification == "respond": print("📧 Classification: RESPOND - This email requires a response") goto = "response_agent" # Add the email to the messages update = { "classification_decision": result.classification, "messages": [{"role": "user", "content": f"Respond to the email: {email_markdown}" }], } elif result.classification == "ignore": print("🚫 Classification: IGNORE - This email can be safely ignored") update = { "classification_decision": result.classification, } goto = END elif result.classification == "notify": # If real life, this would do something else print("🔔 Classification: NOTIFY - This email contains important information") update = { "classification_decision": result.classification, } goto = END else: raise ValueError(f"Invalid classification: {result.classification}") return Command(goto=goto, update=update) # Build workflow overall_workflow = ( StateGraph(State, input=StateInput) .add_node(triage_router) .add_node("response_agent", agent) .add_edge(START, "triage_router") ) email_assistant = overall_workflow.compile()