{ "cells": [ { "cell_type": "markdown", "id": "66976afe", "metadata": {}, "source": [ "# Chapter 7: Tool Manipulation and Orchestration Agents\n", "\n", "**Book:** *30 Agents Every AI Engineer Must Build* \n", "**Author:** Imran Ahmad \n", "**Publisher:** Packt Publishing, 2026 \n", "**Chapter Pages:** pp. 173–201\n", "\n", "---\n", "\n", "> *\"The best way to predict the future is to invent it.\"*\n", "> — Alan Kay, computer scientist\n", "\n", "## Introduction\n", "\n", "While language models provide the reasoning substrate for understanding user goals, it is the **use of tools** that enables agents to execute tasks and affect the real world. This chapter establishes the architectural foundation for understanding how agents translate reasoning into action through three progressive patterns:\n", "\n", "1. **Tool-Using Agents** (§7.1–7.3, pp. 174–186) — A single agent extends its capabilities by invoking external functions through a Think/Plan/Act cycle. Covers tool registries with schema contracts, the three-stage selection funnel (intent classification → semantic search → constraint filtering), and layered error handling with safe wrappers and fallback chains.\n", "\n", "2. **Chain-of-Agents Orchestrators** (§7.4–7.6, pp. 186–194) — Multiple specialized agents collaborate under a cooperation protocol with four architectural pillars: defined roles, common communication, shared memory (episodic + semantic), and execution orchestration. Includes conflict resolution via automated arbitration with confidence-based consensus.\n", "\n", "3. **Agentic Workflow Systems** (§7.7, pp. 195–201) — Stateful business processes modeled as state machines with human-in-the-loop checkpoints, guard conditions, and audit trails. Two case studies: e-commerce order processing and multi-agent insurance claims.\n", "\n", "### Key Architectural Insight\n", "\n", "Tool invocation represents the fundamental mechanism that bridges cognitive operations and concrete actions, transforming an agent from a simple conversationalist into a functional system component. The architectural challenge lies in creating systems in which tool invocation becomes a **seamless extension of agent cognition** rather than a brittle integration point.\n", "\n", "A robust Tool-Using agent architecture separates four concerns: a **reasoning core** (Think/Plan), a **tool registry** with explicit schema contracts, an **execution engine** managing state and retries, and a **guarded tool chest** of modular functions with single responsibilities.\n", "\n", "### Fail-Gracefully Architecture\n", "\n", "This notebook is built around a Fail-Gracefully architecture: if no API key is present, the system automatically enters **Simulation Mode**, returning chapter-derived mock data through a context-aware `MockLLM`. Every agent action produces color-coded log output. Every tool invocation is wrapped in defensive logic. The notebook always runs to completion.\n", "\n", "---" ] }, { "cell_type": "markdown", "id": "f4772161", "metadata": {}, "source": [ "---\n", "## Section 0: Setup and Configuration\n", "\n", "This section loads dependencies, detects the operating mode (Simulation vs. Live),\n", "and initializes the shared infrastructure used by all subsequent sections.\n", "\n", "**Key components initialized here:**\n", "- `helpers` package (color logger, resilience decorators, MockLLM)\n", "- `LIVE_MODE` and `INTERACTIVE_MODE` flags\n", "- `AgentError` exception class used by workflow sections" ] }, { "cell_type": "code", "execution_count": null, "id": "9997d1d0", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 0: Setup and Configuration\n", "# Chapter 7: Tool Manipulation and Orchestration Agents\n", "# Book: \"Agents\" by Imran Ahmad (Packt, 2026 — B34135)\n", "# =============================================================================\n", "\n", "import os\n", "import sys\n", "import json\n", "import time\n", "import getpass\n", "from typing import Dict, Any, List, Tuple, Optional\n", "\n", "import pandas as pd\n", "import matplotlib\n", "import matplotlib.pyplot as plt\n", "\n", "# Ensure helpers/ is importable from the notebook's working directory\n", "sys.path.insert(0, os.path.abspath(\".\"))\n", "\n", "from helpers import (\n", " log_info, log_success, log_error, log_warning, log_mock, log_step,\n", " graceful_fallback, safe_invoke,\n", " MockLLM,\n", ")\n", "\n", "# Matplotlib backend for inline rendering\n", "%matplotlib inline\n", "\n", "# ---------------------------------------------------------------------------\n", "# Custom Exception — used by workflow sections (Sections 7.7, 7.7b)\n", "# ---------------------------------------------------------------------------\n", "class AgentError(Exception):\n", " \"\"\"Raised when a workflow step fails critically and the pipeline must stop.\"\"\"\n", " pass\n", "\n", "log_info(\"Core imports complete. helpers package loaded.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "24fd137f", "metadata": {}, "outputs": [], "source": [ "# Multi-provider LLM support (OpenAI / Anthropic / Google Gemini)\n", "# Set LLM_PROVIDER in .env to choose: openai | anthropic | google | auto\n", "# Auto-detection uses the first available key.\n", "# See supporting/llm_provider.py for details.\n", "\n", "import sys, os\n", "sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath('.')), ''))\n", "sys.path.insert(0, '..')\n", "\n", "try:\n", " from supporting.llm_provider import detect_provider, get_llm, PROVIDER_MODELS, print_provider_banner\n", " _PROVIDER, _PROVIDER_KEY, _PROVIDER_MODE = detect_provider()\n", " print_provider_banner(_PROVIDER, _PROVIDER_MODE)\n", "except ImportError:\n", " print('[INFO] supporting/llm_provider.py not found — using default OpenAI path')\n", " _PROVIDER, _PROVIDER_KEY, _PROVIDER_MODE = 'openai', os.getenv('OPENAI_API_KEY'), 'LIVE' if os.getenv('OPENAI_API_KEY') else 'SIMULATION'\n" ] }, { "cell_type": "code", "execution_count": null, "id": "738de9af", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 0: API Key Detection and Mode Selection\n", "# Ref: Strategy §3.4 — Simulation Mode Activation Flow\n", "# =============================================================================\n", "\n", "# --- Step 1: Try .env file ---\n", "try:\n", " from dotenv import load_dotenv\n", " load_dotenv()\n", " log_info(\"python-dotenv loaded. Checking .env for OPENAI_API_KEY...\")\n", "except ImportError:\n", " log_warning(\"python-dotenv not installed. Skipping .env loading.\")\n", "\n", "OPENAI_API_KEY = os.getenv(\"OPENAI_API_KEY\", \"\").strip()\n", "\n", "# --- Step 2: Interactive fallback via getpass ---\n", "# Set False for Colab, CI/CD, nbconvert --execute, or non-interactive envs\n", "INTERACTIVE_MODE = sys.stdin.isatty() if hasattr(sys.stdin, \"isatty\") else False\n", "\n", "if not OPENAI_API_KEY and INTERACTIVE_MODE:\n", " try:\n", " OPENAI_API_KEY = getpass.getpass(\n", " \"Enter OpenAI API key (or press Enter to skip): \"\n", " ).strip()\n", " except (EOFError, OSError, Exception):\n", " # Catches StdinNotImplementedError, EOFError, and any other input failure\n", " OPENAI_API_KEY = \"\"\n", " INTERACTIVE_MODE = False\n", " log_warning(\"Non-interactive environment detected. INTERACTIVE_MODE set to False.\")\n", "\n", "# --- Step 3: Determine operating mode ---\n", "LIVE_MODE = bool(OPENAI_API_KEY) and \"your-key\" not in OPENAI_API_KEY and \"your_key\" not in OPENAI_API_KEY\n", "\n", "if LIVE_MODE:\n", " log_success(\"LIVE MODE active. LLM calls will use the OpenAI API.\")\n", " os.environ[\"OPENAI_API_KEY\"] = OPENAI_API_KEY\n", " try:\n", " from openai import OpenAI\n", " _openai_client = OpenAI(api_key=OPENAI_API_KEY)\n", " class LiveLLM:\n", " \"\"\"Thin wrapper around OpenAI client matching MockLLM.generate() interface.\"\"\"\n", " def generate(self, prompt, **kwargs):\n", " response = _openai_client.chat.completions.create(\n", " model=\"gpt-4o-mini\",\n", " messages=[{\"role\": \"user\", \"content\": prompt}],\n", " temperature=0.7,\n", " )\n", " return response.choices[0].message.content\n", " llm = LiveLLM()\n", " except Exception as e:\n", " log_error(f\"OpenAI client init failed: {e}. Falling back to Simulation.\")\n", " LIVE_MODE = False\n", " llm = MockLLM(verbose=True)\n", "else:\n", " llm = MockLLM(verbose=True)\n", " log_mock(\"╔══════════════════════════════════════════════════════════╗\")\n", " log_mock(\"║ SIMULATION MODE ACTIVE ║\")\n", " log_mock(\"║ All LLM calls return chapter-derived mock responses. ║\")\n", " log_mock(\"║ To use a live API, create .env from .env.template ║\")\n", " log_mock(\"╚══════════════════════════════════════════════════════════╝\")\n", "\n", "# --- Summary ---\n", "log_info(f\"Operating Mode: {'LIVE' if LIVE_MODE else 'SIMULATION'}\")\n", "log_info(f\"Interactive Mode: {INTERACTIVE_MODE}\")\n", "log_info(f\"Python: {sys.version.split()[0]}\")\n", "log_info(f\"pandas: {pd.__version__}, matplotlib: {matplotlib.__version__}\")" ] }, { "cell_type": "markdown", "id": "4d64957a", "metadata": {}, "source": [ "---\n", "## Section 1: The Tool-Using Agent\n", "\n", "**Chapter Reference:** Section 7.1 — Agents in Action: Tool Invocation Patterns\n", "\n", "A Tool-Using agent extends its core reasoning by invoking a predefined set of external\n", "functions. The architecture consists of four primary components (Figure 7.1):\n", "\n", "1. **Reasoning Core** — Parses intent and formulates a plan (Think + Plan)\n", "2. **Tool Registry** — Catalog of available tools with metadata and schemas\n", "3. **Execution Engine** — Manages state, retries, and error handling (Act)\n", "4. **Tool Chest** — The collection of guarded functions the agent can execute\n", "\n", "This section implements the data visualization assistant's tool chest: four composable\n", "functions that divide the visualization pipeline into clear, testable steps." ] }, { "cell_type": "markdown", "id": "ba832598", "metadata": {}, "source": [ "#### Figure 7.1 — Internal Architecture of Tool-Using Agents *(Book p. 176)*\n", "\n", "```\n", " ┌───────────────────────┐\n", " User Input ──────────────────────▶│ Reasoning Core │\n", " \"Show chart\" │ ┌─────────────────┐ │ 3. Pass Plan\n", " │ │ Intent Parsing │ │───────────────┐\n", " 1. Discover │ │ Planning │ │ │\n", " & Select Tools │ └─────────────────┘ │ ▼\n", " │ └───────────▲───────────┘ ┌─────────────────┐\n", " ▼ │ │ Execution Engine │\n", " ┌─────────────────┐ 2. Return │ │ State Management │\n", " │ Tool Registry │──── Candidates ──┘ │ Error Handling │\n", " │ Metadata │ └────────┬────────┘\n", " │ Schemas │ 4. Invoke│Tool\n", " │ Status │ ▼\n", " └─────────────────┘ ┌──────────────────────────────┐\n", " │ Tool Chest │\n", " │ ┌──────────┐ ┌──────────┐ │\n", " │ │ load_csv │→│plot_chart │ │\n", " │ └──────────┘ └──────────┘ │\n", " │ ┌──────────┐ ┌──────────┐ │\n", " │ │aggregate │ │ validate │ │\n", " │ └──────────┘ └──────────┘ │\n", " └──────────────┬───────────────┘\n", " ▼\n", " Generated Chart\n", "```\n", "\n", "This architecture cleanly separates decision logic from execution. Explicit schemas make interfaces predictable, while safety wrappers provide resilience. The same structure supports both reactive queries (single tool) and deliberative plans (chained tools).\n" ] }, { "cell_type": "code", "execution_count": null, "id": "54f83baf", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 1: Tool Chest — Four Composable Visualization Functions\n", "# Ref: Section 7.1, pp. 178–179 — Implementation Example\n", "#\n", "# Each function has a single responsibility (single responsibility principle):\n", "# load_csv — data ingestion\n", "# group_by_and_aggregate — data summarization\n", "# plot_bar_chart — bar chart rendering\n", "# plot_line_chart — line chart rendering\n", "#\n", "# All functions are wrapped with @graceful_fallback for resilience (Section 7.3).\n", "# =============================================================================\n", "\n", "# --- Tool 1: Data Ingestion ---\n", "# Ref: Section 7.1, p. 6 — \"Handles data ingestion\"\n", "\n", "@graceful_fallback(fallback_return=pd.DataFrame(), section=\"7.1\")\n", "def load_csv(file_path: str) -> pd.DataFrame:\n", " \"\"\"Loads data from a specified CSV file into a pandas DataFrame.\"\"\"\n", " df = pd.read_csv(file_path)\n", " log_step(\"7.1\", 1, f\"Loaded {len(df)} rows from '{file_path}'.\")\n", " return df\n", "\n", "\n", "# --- Tool 2: Data Transformation and Summarization ---\n", "# Ref: Section 7.1, p. 7 — \"Performs data transformation and summarization\"\n", "\n", "@graceful_fallback(fallback_return=None, section=\"7.1\")\n", "def group_by_and_aggregate(\n", " df: pd.DataFrame,\n", " group_by_col: str,\n", " agg_col: str,\n", " agg_func: str = \"sum\",\n", ") -> pd.DataFrame:\n", " \"\"\"Groups the DataFrame and applies an aggregation function.\"\"\"\n", " if group_by_col not in df.columns or agg_col not in df.columns:\n", " raise ValueError(\n", " f\"Column not found in DataFrame. \"\n", " f\"Available: {list(df.columns)}\"\n", " )\n", " func = {\"sum\": \"sum\", \"mean\": \"mean\", \"count\": \"count\"}.get(agg_func, \"sum\")\n", " result = df.groupby(group_by_col)[agg_col].agg(func).reset_index()\n", " log_step(\"7.1\", 2, f\"Grouped '{agg_col}' by '{group_by_col}' using '{func}'.\")\n", " return result\n", "\n", "\n", "# --- Tool 3: Bar Chart Rendering ---\n", "# Ref: Section 7.1, p. 7 — \"Handles final rendering for a bar chart\"\n", "\n", "@graceful_fallback(fallback_return=None, section=\"7.1\")\n", "def plot_bar_chart(\n", " df: pd.DataFrame,\n", " x_col: str,\n", " y_col: str,\n", " title: str,\n", " output_path: str,\n", ") -> str:\n", " \"\"\"Generates and saves a bar chart from the data.\"\"\"\n", " fig, ax = plt.subplots(figsize=(8, 5))\n", " ax.bar(df[x_col].astype(str), df[y_col], color=\"#4A90D9\")\n", " ax.set_xlabel(x_col)\n", " ax.set_ylabel(y_col)\n", " ax.set_title(title)\n", " fig.tight_layout()\n", " fig.savefig(output_path, dpi=100)\n", " plt.show()\n", " plt.close(fig)\n", " log_step(\"7.1\", 3, f\"Saved bar chart to '{output_path}'.\")\n", " return output_path\n", "\n", "\n", "# --- Tool 4: Line Chart Rendering ---\n", "# Ref: Section 7.1, p. 7-8 — \"Handles final rendering for a line chart\"\n", "\n", "@graceful_fallback(fallback_return=None, section=\"7.1\")\n", "def plot_line_chart(\n", " df: pd.DataFrame,\n", " x_col: str,\n", " y_col: str,\n", " title: str,\n", " output_path: str,\n", ") -> str:\n", " \"\"\"Generates and saves a line chart from the data.\"\"\"\n", " fig, ax = plt.subplots(figsize=(8, 5))\n", " ax.plot(df[x_col].astype(str), df[y_col], marker=\"o\", color=\"#4A90D9\")\n", " ax.set_xlabel(x_col)\n", " ax.set_ylabel(y_col)\n", " ax.set_title(title)\n", " fig.tight_layout()\n", " fig.savefig(output_path, dpi=100)\n", " plt.show()\n", " plt.close(fig)\n", " log_step(\"7.1\", 4, f\"Saved line chart to '{output_path}'.\")\n", " return output_path\n", "\n", "\n", "log_success(\"Tool chest defined: load_csv, group_by_and_aggregate, plot_bar_chart, plot_line_chart.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "356abd2e", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 1: Tool Registry\n", "# Ref: Section 7.1, pp. 175–177 — \"The Tool Registry\"\n", "#\n", "# The registry maintains critical metadata for each tool: name, description,\n", "# input/output schemas, and operational status. These schemas serve as explicit,\n", "# reliable contracts between the reasoning core and the execution layer.\n", "# =============================================================================\n", "\n", "TOOL_REGISTRY = {\n", " \"load_csv\": {\n", " \"function\": load_csv,\n", " \"description\": \"Loads data from a specified CSV file into a pandas DataFrame.\",\n", " \"input_schema\": {\"file_path\": \"str\"},\n", " \"output_schema\": \"pd.DataFrame\",\n", " \"status\": \"active\",\n", " },\n", " \"group_by_and_aggregate\": {\n", " \"function\": group_by_and_aggregate,\n", " \"description\": \"Groups a DataFrame by a column and applies an aggregation function.\",\n", " \"input_schema\": {\n", " \"df\": \"pd.DataFrame\",\n", " \"group_by_col\": \"str\",\n", " \"agg_col\": \"str\",\n", " \"agg_func\": \"str (sum|mean|count)\",\n", " },\n", " \"output_schema\": \"pd.DataFrame\",\n", " \"status\": \"active\",\n", " },\n", " \"plot_bar_chart\": {\n", " \"function\": plot_bar_chart,\n", " \"description\": \"Generates and saves a bar chart from aggregated data.\",\n", " \"input_schema\": {\n", " \"df\": \"pd.DataFrame\",\n", " \"x_col\": \"str\",\n", " \"y_col\": \"str\",\n", " \"title\": \"str\",\n", " \"output_path\": \"str\",\n", " },\n", " \"output_schema\": \"str (file path)\",\n", " \"status\": \"active\",\n", " },\n", " \"plot_line_chart\": {\n", " \"function\": plot_line_chart,\n", " \"description\": \"Generates and saves a line chart with markers from aggregated data.\",\n", " \"input_schema\": {\n", " \"df\": \"pd.DataFrame\",\n", " \"x_col\": \"str\",\n", " \"y_col\": \"str\",\n", " \"title\": \"str\",\n", " \"output_path\": \"str\",\n", " },\n", " \"output_schema\": \"str (file path)\",\n", " \"status\": \"active\",\n", " },\n", "}\n", "\n", "# Display the registry for inspection\n", "log_info(\"Tool Registry initialized:\")\n", "for name, meta in TOOL_REGISTRY.items():\n", " log_info(f\" {name}: {meta['description']} [status={meta['status']}]\")" ] }, { "cell_type": "code", "execution_count": null, "id": "36232aed", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 1: Demo Pipeline — End-to-End Data Visualization\n", "# Ref: Section 7.1, pp. 177–179 — Tool Chest composability\n", "#\n", "# Demonstrates the four tools working together in sequence:\n", "# 1. load_csv → ingest the ad campaign dataset\n", "# 2. group_by_and_aggregate → summarize spend by campaign\n", "# 3. plot_bar_chart → render the bar chart\n", "# 4. group_by_and_aggregate → summarize clicks over time\n", "# 5. plot_line_chart → render the line chart\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_info(\"Section 1 Demo: Data Visualization Pipeline\")\n", "log_info(\"=\" * 60)\n", "\n", "# Ensure outputs directory exists\n", "os.makedirs(\"outputs\", exist_ok=True)\n", "\n", "# Step 1: Load the dataset\n", "df = load_csv(\"data/sample_ad_campaign.csv\")\n", "if not df.empty:\n", " log_info(f\"Dataset preview ({len(df)} rows):\")\n", " print(df.head())\n", " print()\n", "\n", " # Step 2a: Aggregate spend by campaign (for bar chart)\n", " spend_by_campaign = group_by_and_aggregate(df, \"campaign_name\", \"spend\", \"sum\")\n", " if spend_by_campaign is not None:\n", " print(spend_by_campaign)\n", " print()\n", " # Step 3: Render bar chart\n", " plot_bar_chart(\n", " spend_by_campaign,\n", " \"campaign_name\", \"spend\",\n", " \"Total Spend by Campaign\",\n", " \"outputs/spend_by_campaign.png\",\n", " )\n", "\n", " # Step 2b: Aggregate clicks over time (for line chart)\n", " clicks_over_time = group_by_and_aggregate(df, \"date\", \"clicks\", \"sum\")\n", " if clicks_over_time is not None:\n", " print(clicks_over_time)\n", " print()\n", " # Step 4: Render line chart\n", " plot_line_chart(\n", " clicks_over_time,\n", " \"date\", \"clicks\",\n", " \"Total Clicks Over Time\",\n", " \"outputs/clicks_over_time.png\",\n", " )\n", "\n", " log_success(\"Section 1 Demo complete. Charts saved to outputs/.\")\n", "else:\n", " log_error(\"Dataset is empty. Cannot proceed with demo.\")" ] }, { "cell_type": "markdown", "id": "6b33a0ab", "metadata": {}, "source": [ "---\n", "## Section 2: Tool Discovery and Selection\n", "\n", "**Chapter Reference:** Section 7.2 — Tool Discovery and Selection Algorithms\n", "\n", "As an agent's tool chest grows, it must reliably select the correct tool for each task.\n", "This section implements the first stage of the **Selection Funnel** (Figure 7.2):\n", "\n", "1. **Intent Classification (broad filtering)** — Map the user's query to a relevant category\n", "2. **Semantic Search (candidate ranking)** — Rank remaining tools by similarity (architectural)\n", "3. **Constraint Filtering (final verification)** — Enforce input types, permissions, and status\n", "\n", "The `parse_query` function below implements **template-based intent classification**,\n", "the most direct approach where keywords map to structured parameters." ] }, { "cell_type": "markdown", "id": "bb62b27b", "metadata": {}, "source": [ "#### Figure 7.2 — Tool Selection Funnel *(Book p. 180)*\n", "\n", "The multi-stage funnel transforms a large tool registry into a single verified, executable choice:\n", "\n", "```\n", " All Available Tools (~100 tools)\n", " │\n", " ▼\n", " ┌──────────────────────────────┐\n", " │ 1. Intent Classification │ ──▶ ~20 tools\n", " │ Map query to categories │\n", " └──────────────┬───────────────┘\n", " ▼\n", " ┌──────────────────────────────┐\n", " │ 2. Semantic Similarity │ ──▶ ~5 tools\n", " │ Embedding-based ranking │\n", " └──────────────┬───────────────┘\n", " ▼\n", " Similarity > 0.7?\n", " Yes ▼ No → Discard\n", " ┌──────────────────────────────┐\n", " │ 3. Constraint Filtering │ ──▶ ~2-3 tools\n", " │ Input types, permissions │\n", " └──────────────┬───────────────┘\n", " ▼\n", " ✓ Optimal Tool Selected\n", " (1 tool, ready to execute)\n", "```\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5410fa81", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 2: Intent Classification through Template Matching\n", "# Ref: Section 7.2, pp. 182–183 — \"Implementation Example: Intent Classification\"\n", "#\n", "# parse_query() acts as a lightweight intent classifier. It scans for specific\n", "# keywords and patterns to extract: a metric, a dimension, and a chart type.\n", "# This implements the Template Matching selection strategy.\n", "# =============================================================================\n", "\n", "@graceful_fallback(fallback_return=None, section=\"7.2\")\n", "def parse_query(query: str) -> dict:\n", " \"\"\"A simple natural language parser to extract intent from the user's query.\n", " Uses keyword matching to identify the metric, dimension, and chart type.\"\"\"\n", " query = query.lower() # Normalize for case-insensitive matching\n", "\n", " # --- Step 1: Identify the metric ---\n", " # A simple lookup to map known metric keywords to their column names\n", " metric_map = {\"spend\": \"spend\", \"conversions\": \"conversions\", \"clicks\": \"clicks\"}\n", " metric = next((metric_map[key] for key in metric_map if key in query), None)\n", "\n", " # --- Step 2: Identify the dimension and chart type ---\n", " # Use keywords to infer how the user wants to group and visualize data\n", " dimension, chart_type = None, None\n", " if \"by campaign\" in query or \"which campaign\" in query:\n", " dimension, chart_type = \"campaign_name\", \"bar\"\n", " elif \"over time\" in query or \"trend\" in query:\n", " dimension, chart_type = \"date\", \"line\"\n", "\n", " # --- Step 3: Validate and return the structured intent ---\n", " # If any essential component is missing, the intent is invalid\n", " if not all([metric, dimension, chart_type]):\n", " return None\n", "\n", " return {\"metric\": metric, \"dimension\": dimension, \"chart_type\": chart_type}\n", "\n", "\n", "log_success(\"parse_query defined — template-matching intent classifier.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "9e178e77", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 2: Demo — Intent Classification on Sample Queries\n", "# Ref: Section 7.2, p. 183 — \"For a query like 'What was the trend of clicks\n", "# over time?', it identifies 'clicks' as the metric...\"\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_info(\"Section 2 Demo: Intent Classification\")\n", "log_info(\"=\" * 60)\n", "\n", "demo_queries = [\n", " \"What was the trend of clicks over time?\",\n", " \"Show me spend by campaign\",\n", " \"Which campaign had the most conversions?\",\n", " \"Show me conversions over time\",\n", " \"Tell me something interesting\", # Should fail — no metric/dimension\n", " \"What about clicks?\", # Should fail — no dimension\n", "]\n", "\n", "for q in demo_queries:\n", " intent = parse_query(q)\n", " if intent:\n", " log_info(f\"Query: '{q}'\")\n", " log_success(\n", " f\" → metric={intent['metric']}, \"\n", " f\"dimension={intent['dimension']}, \"\n", " f\"chart_type={intent['chart_type']}\"\n", " )\n", " else:\n", " log_info(f\"Query: '{q}'\")\n", " log_warning(\" → Could not parse intent (missing metric or dimension).\")\n", " print()" ] }, { "cell_type": "markdown", "id": "3438d603", "metadata": {}, "source": [ "> **📘 Implementation Insight: Failure Memory as Circuit Breaker** *(Book p. 184)*\n", ">\n", "> To prevent an agent from repeatedly attempting to use a tool that is clearly failing, the system can implement a short-term **failure memory**. This acts like a circuit breaker: if a tool fails multiple times in a short window, the agent temporarily marks it as unavailable and does not attempt to select it again for a predefined period, preventing wasted cycles.\n", ">\n", "> Production-ready agents implement a layered defense: **safe invocation wrappers** (try/except with retry logic), **fallback tool chains** (alternative tools for the same goal), **confidence-based switching** (discard low-confidence results and retry with a different tool), and **escalation paths** to human operators as the final fallback.\n" ] }, { "cell_type": "markdown", "id": "2a54e616", "metadata": {}, "source": [ "---\n", "## Section 3: Error Handling and Resilience\n", "\n", "**Chapter Reference:** Section 7.3 — Error Handling in Tool Integration\n", "\n", "Production-ready agents implement a layered defense against four categories of failure:\n", "\n", "1. **Input validation errors** — Data doesn't match the expected schema\n", "2. **Runtime failures** — Network timeouts, API errors, file I/O exceptions\n", "3. **Semantic mismatches** — Tool succeeds but produces the wrong result\n", "4. **Tool unavailability** — External service is offline or deprecated\n", "\n", "This section builds the `data_viz_agent` orchestrator, which ties together intent parsing,\n", "tool selection, and error handling into a single **Think → Plan → Act** cycle.\n", "It then demonstrates deliberate failure scenarios to show the resilience layer in action." ] }, { "cell_type": "code", "execution_count": null, "id": "787dd1f4", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 3: data_viz_agent — The Central Orchestrator\n", "# Ref: Section 7.3, pp. 185–186 — \"Implementation Example\"\n", "#\n", "# This function implements the full Think/Plan/Act cycle:\n", "# THINK: parse_query() interprets the user's goal\n", "# PLAN: Dynamically construct a list of tool calls based on intent\n", "# ACT: Execute tools sequentially, passing data_state between steps\n", "# =============================================================================\n", "\n", "def data_viz_agent(query: str, file_path: str) -> Optional[str]:\n", " \"\"\"Central orchestrator for the data visualization assistant.\n", " Implements the Think/Plan/Act cycle from Section 7.1.\"\"\"\n", "\n", " log_info(\"=\" * 60)\n", " log_step(\"7.3\", 0, f\"data_viz_agent received query: '{query}'\")\n", " log_info(\"=\" * 60)\n", "\n", " # ----- THINK: Interpret the user's goal -----\n", " log_step(\"7.3\", 1, \"THINK — Parsing user intent...\")\n", " intent = parse_query(query)\n", " if intent is None:\n", " log_error(\"Could not understand the request. \"\n", " \"Query must contain a metric (spend/clicks/conversions) \"\n", " \"AND a dimension (by campaign / over time / trend).\")\n", " return None\n", "\n", " log_info(f\" Intent Analysis: metric={intent['metric']}, \"\n", " f\"dimension={intent['dimension']}, chart_type={intent['chart_type']}\")\n", "\n", " # ----- PLAN: Create a sequence of tool calls -----\n", " log_step(\"7.3\", 2, \"PLAN — Constructing execution plan...\")\n", " plan = [\n", " (\"load_csv\", {\"file_path\": file_path}),\n", " (\"group_by_and_aggregate\", {\n", " \"group_by_col\": intent[\"dimension\"],\n", " \"agg_col\": intent[\"metric\"],\n", " \"agg_func\": \"sum\",\n", " }),\n", " ]\n", "\n", " if intent[\"chart_type\"] == \"bar\":\n", " plot_tool = \"plot_bar_chart\"\n", " base_title = f\"{intent['metric']} by {intent['dimension']}\"\n", " else:\n", " plot_tool = \"plot_line_chart\"\n", " base_title = f\"{intent['metric']} over {intent['dimension']}\"\n", "\n", " output_path = f\"outputs/output_{intent['metric']}_{intent['dimension']}.png\"\n", " plan.append((plot_tool, {\n", " \"x_col\": intent[\"dimension\"],\n", " \"y_col\": intent[\"metric\"],\n", " \"title\": base_title,\n", " \"output_path\": output_path,\n", " }))\n", "\n", " log_info(\" Action Plan:\")\n", " for step_name, _ in plan:\n", " log_info(f\" - {step_name}\")\n", "\n", " # ----- ACT: Execute the plan step-by-step -----\n", " log_step(\"7.3\", 3, \"ACT — Executing plan...\")\n", " data_state = None\n", "\n", " tool_functions = {\n", " \"load_csv\": load_csv,\n", " \"group_by_and_aggregate\": group_by_and_aggregate,\n", " \"plot_bar_chart\": plot_bar_chart,\n", " \"plot_line_chart\": plot_line_chart,\n", " }\n", "\n", " for tool_name, tool_args in plan:\n", " tool_func = tool_functions[tool_name]\n", "\n", " # Inject data_state as the first argument for downstream tools\n", " if tool_name != \"load_csv\":\n", " tool_args[\"df\"] = data_state\n", "\n", " result = tool_func(**tool_args)\n", "\n", " if isinstance(result, pd.DataFrame):\n", " if result.empty:\n", " log_error(f\"Tool '{tool_name}' returned empty DataFrame. Aborting plan.\")\n", " return None\n", " data_state = result\n", " elif result is None and tool_name != \"load_csv\":\n", " log_error(f\"Execution failed at '{tool_name}'. Aborting plan.\")\n", " return None\n", "\n", " log_success(f\"data_viz_agent completed. Output: {output_path}\")\n", " return output_path" ] }, { "cell_type": "code", "execution_count": null, "id": "3ea23050", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 3: Demo — Successful Orchestration\n", "# Ref: Section 7.3 — data_viz_agent with valid queries\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_info(\"Section 3 Demo: Successful Orchestration Runs\")\n", "log_info(\"=\" * 60)\n", "\n", "# Query 1: Bar chart of spend by campaign\n", "data_viz_agent(\"Show me spend by campaign\", \"data/sample_ad_campaign.csv\")\n", "print()\n", "\n", "# Query 2: Line chart of clicks over time\n", "data_viz_agent(\"What was the trend of clicks over time?\", \"data/sample_ad_campaign.csv\")" ] }, { "cell_type": "code", "execution_count": null, "id": "cc3b404b", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 3: Deliberate Failure Demo 1 — FileNotFoundError\n", "# Ref: Section 7.3, Strategy §6.6 — \"Force FileNotFoundError\"\n", "#\n", "# Expected behavior:\n", "# - load_csv catches FileNotFoundError\n", "# - Red [ERROR] log appears\n", "# - @graceful_fallback returns empty DataFrame\n", "# - Orchestrator detects df.empty and aborts cleanly\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_warning(\"Section 3: DELIBERATE FAILURE DEMO — FileNotFoundError\")\n", "log_info(\"=\" * 60)\n", "\n", "result = data_viz_agent(\n", " \"Show me spend by campaign\",\n", " \"nonexistent_file.csv\", # This file does not exist\n", ")\n", "\n", "if result is None:\n", " log_info(\"Orchestrator handled the failure gracefully. No crash.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "a1b781ff", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 3: Deliberate Failure Demo 2 — Column Mismatch\n", "# Ref: Section 7.3, Strategy §6.6 — \"Force Column Mismatch\"\n", "#\n", "# Expected behavior:\n", "# - group_by_and_aggregate detects invalid column \"bad_column\"\n", "# - Raises ValueError caught by @graceful_fallback\n", "# - Red [ERROR] log, returns None\n", "# - Orchestrator detects None and aborts cleanly\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_warning(\"Section 3: DELIBERATE FAILURE DEMO — Column Mismatch\")\n", "log_info(\"=\" * 60)\n", "\n", "# First load a valid dataframe, then force a bad column name\n", "df_valid = load_csv(\"data/sample_ad_campaign.csv\")\n", "if not df_valid.empty:\n", " bad_result = group_by_and_aggregate(df_valid, \"bad_column\", \"spend\", \"sum\")\n", " if bad_result is None:\n", " log_info(\"group_by_and_aggregate returned None for invalid column. \"\n", " \"Orchestrator would abort cleanly at this point.\")\n", "\n", "log_success(\"Section 3 complete. Both failure demos handled gracefully.\")" ] }, { "cell_type": "markdown", "id": "b2eaee40", "metadata": {}, "source": [ "---\n", "## Section 4: The Chain-of-Agents Orchestrator\n", "\n", "**Chapter Reference:** Sections 7.4–7.5\n", "\n", "Complex tasks often exceed the capacity of any single agent. A chain-of-agents orchestrator\n", "coordinates a team of specialists, each with its own domain expertise and tool chest.\n", "\n", "A robust **Cooperation Protocol** is built upon four key architectural pillars (Table 7.1):\n", "\n", "| Protocol Element | Definition |\n", "|:---|:---|\n", "| **Message Format** | A shared envelope structure (e.g., JSON with sender, recipient, task_id, payload) |\n", "| **Role Declaration** | Each agent registers its name, capabilities, and accepted task types |\n", "| **Task Delegation Scheme** | A typed request specifying target agent, action, inputs, and expected output schema |\n", "| **Status Signaling** | Standardized status field (pending, running, done, error) for sequencing |\n", "\n", "This section implements a **Market Intelligence System** with three specialist agents that\n", "contribute findings to shared **episodic memory**, coordinated by a manager agent." ] }, { "cell_type": "markdown", "id": "edeaccb6", "metadata": {}, "source": [ "> **📘 Standards Connection: MCP and A2A Protocols** *(Book p. 188)*\n", ">\n", "> The cooperation protocol elements (message format, role declaration, task delegation scheme, status signaling) align with emerging industry standards. The **Model Context Protocol (MCP)** and **Agent-to-Agent (A2A) protocol**, introduced in Chapter 6, formalize exactly this message schema and role-declaration layer into open, interoperable specifications.\n", ">\n", "> Where Chapter 6 covered their structure and negotiation mechanics, this chapter applies those foundations: the cooperation protocol described here is the practical expression of what MCP and A2A standardize at the wire level.\n" ] }, { "cell_type": "markdown", "id": "1bbc02df", "metadata": {}, "source": [ "#### Figure 7.3 — Memory-Augmented Agent Architecture *(Book p. 189)*\n", "\n", "```\n", " External Inputs ──▶ Perception ──▶ Agent Core (Cognition & Reasoning)\n", " │ │\n", " Active Context Memory Query\n", " Manager │ │\n", " ▼ ▼ ▼\n", " ┌─────────────┐ ┌──────────────────────┐\n", " │ Working │ │ Vector Databases │◀── External\n", " │ Memory │ │ & Retrieval │ DBs/APIs\n", " │ (Current │ └──────────┬───────────┘\n", " │ Prompt) │ │\n", " └──────┬───────┘ ┌────────┼────────┐\n", " │ ▼ ▼\n", " Store Session ┌──────────────┐ ┌──────────────┐\n", " Context/Events │ Episodic │ │ Semantic │\n", " │ │ Memory │ │ Memory │\n", " └───▶│ (Historical │ │ (Facts & │\n", " │ Interactions)│ │ Knowledge) │◀── Ingestion\n", " └──────────────┘ └──────────────┘\n", "```\n", "\n", "**Working memory** holds the immediate context (scratchpad). **Episodic memory** stores historical interaction logs. **Semantic memory** contains durable factual knowledge. The Agent Core queries long-term memory and loads relevant context into working memory for each task.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "7fb1f81a", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 4: Specialist Agents — NewsAgent, FinancialAgent, SentimentAgent\n", "# Ref: Section 7.5, pp. 190–191 — \"Implementation Example: Market Intelligence\"\n", "#\n", "# Each specialist accepts a company name and returns a structured payload.\n", "# All three wrap results in the same dict layout:\n", "# {\"source\": ..., \"status\": ..., \"data\": ...}\n", "# making outputs predictable for the orchestrator.\n", "#\n", "# Mock data is drawn directly from the chapter's own code listings.\n", "# In production, uncomment the API calls shown in comments.\n", "# =============================================================================\n", "\n", "@graceful_fallback(\n", " fallback_return={\"source\": \"NewsAgent\", \"status\": \"error\", \"data\": []},\n", " section=\"7.5\",\n", ")\n", "def NewsAgent(company_name: str) -> Dict[str, Any]:\n", " \"\"\"Fetches top news headlines from a data source.\"\"\"\n", " log_step(\"7.5\", 1, f\"[NewsAgent] Fetching headlines for '{company_name}'...\")\n", " headlines = [\n", " f\"{company_name}: Q3 earnings beat analyst estimates by 8%\",\n", " f\"{company_name} announces expansion into cloud infrastructure\",\n", " f\"Regulatory review concluded; {company_name} cleared to proceed\",\n", " ]\n", " # In production, uncomment: response = news_api.get(company_name)\n", " response = {\"data\": headlines}\n", " return {\"source\": \"NewsAgent\", \"status\": \"success\", \"data\": response[\"data\"]}\n", "\n", "\n", "@graceful_fallback(\n", " fallback_return={\"source\": \"FinancialAgent\", \"status\": \"error\", \"data\": {}},\n", " section=\"7.5\",\n", ")\n", "def FinancialAgent(company_name: str) -> Dict[str, Any]:\n", " \"\"\"Fetches financial data from a data source.\"\"\"\n", " log_step(\"7.5\", 2, f\"[FinancialAgent] Fetching financial data for '{company_name}'...\")\n", " response = {\n", " \"data\": {\n", " \"pe_ratio\": 24.5,\n", " \"revenue_growth\": 0.12,\n", " \"debt_to_equity\": 0.38,\n", " \"last_close\": 142.73,\n", " }\n", " }\n", " # In production, uncomment: response = financial_api.get(company_name)\n", " return {\"source\": \"FinancialAgent\", \"status\": \"success\", \"data\": response[\"data\"]}\n", "\n", "\n", "@graceful_fallback(\n", " fallback_return={\"source\": \"SentimentAgent\", \"status\": \"error\", \"data\": {}},\n", " section=\"7.5\",\n", ")\n", "def SentimentAgent(company_name: str) -> Dict[str, Any]:\n", " \"\"\"Fetches a sentiment score from a data source.\"\"\"\n", " log_step(\"7.5\", 3, f\"[SentimentAgent] Fetching sentiment for '{company_name}'...\")\n", " response = {\n", " \"data\": {\n", " \"score\": 0.72,\n", " \"label\": \"positive\",\n", " \"evidence\": [\n", " f\"Investor confidence in {company_name} remains high.\",\n", " \"Social media tone broadly favorable this week.\",\n", " ],\n", " }\n", " }\n", " # In production, uncomment: response = sentiment_api.get(company_name)\n", " return {\"source\": \"SentimentAgent\", \"status\": \"success\", \"data\": response[\"data\"]}\n", "\n", "\n", "log_success(\"Specialist agents defined: NewsAgent, FinancialAgent, SentimentAgent.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "400975ad", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 4: Manager Agent with Episodic Memory\n", "# Ref: Section 7.4, pp. 187–189 — Cooperation Protocol\n", "# Ref: Section 7.5, pp. 189–193 — Memory-Augmented Multi-Agent Systems\n", "#\n", "# The manager agent coordinates the specialists, stores results in episodic\n", "# memory (timestamped interaction log), and prepares context for synthesis.\n", "# =============================================================================\n", "\n", "from datetime import datetime\n", "\n", "\n", "class ManagerAgent:\n", " \"\"\"Orchestrates specialist agents and maintains shared episodic memory.\n", "\n", " Implements the Cooperation Protocol from Section 7.4:\n", " - Role Declaration: each agent has a defined role and output schema\n", " - Task Delegation: manager dispatches to each specialist in turn\n", " - Status Signaling: checks 'status' field in agent responses\n", " - Shared Memory: episodic_memory stores timestamped interaction records\n", " \"\"\"\n", "\n", " def __init__(self, company_name: str):\n", " self.company_name = company_name\n", " self.episodic_memory: List[Dict[str, Any]] = [] # Section 7.5 — episodic memory\n", " self.results: Dict[str, Any] = {}\n", "\n", " def delegate_and_collect(self) -> Dict[str, Any]:\n", " \"\"\"Dispatch tasks to each specialist and store results in episodic memory.\"\"\"\n", " log_info(\"=\" * 60)\n", " log_step(\"7.4\", 1, f\"ManagerAgent dispatching tasks for '{self.company_name}'...\")\n", " log_info(\"=\" * 60)\n", "\n", " specialists = {\n", " \"NewsAgent\": NewsAgent,\n", " \"FinancialAgent\": FinancialAgent,\n", " \"SentimentAgent\": SentimentAgent,\n", " }\n", "\n", " for agent_name, agent_func in specialists.items():\n", " result = agent_func(self.company_name)\n", " self.results[agent_name] = result\n", "\n", " # Write to episodic memory — Section 7.5\n", " memory_entry = {\n", " \"timestamp\": datetime.now().isoformat(timespec=\"seconds\"),\n", " \"agent\": agent_name,\n", " \"status\": result.get(\"status\", \"unknown\"),\n", " \"data_summary\": str(result.get(\"data\", \"\"))[:120],\n", " }\n", " self.episodic_memory.append(memory_entry)\n", " log_info(f\" Episodic memory updated: {agent_name} -> {result['status']}\")\n", "\n", " log_success(f\"All specialists reported. {len(self.episodic_memory)} memory entries.\")\n", " return self.results\n", "\n", " def show_episodic_memory(self):\n", " \"\"\"Display the episodic memory log for inspection.\"\"\"\n", " log_info(\"Episodic Memory Log:\")\n", " for entry in self.episodic_memory:\n", " log_info(\n", " f\" [{entry['timestamp']}] {entry['agent']}: \"\n", " f\"status={entry['status']}, \"\n", " f\"data={entry['data_summary'][:80]}...\"\n", " )\n", "\n", "\n", "log_success(\"ManagerAgent class defined with episodic memory.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "97cdc2a2", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 4: Demo — Market Intelligence System\n", "# Ref: Sections 7.4-7.5 — Manager delegates to 3 specialists\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_info(\"Section 4 Demo: Market Intelligence System for 'TechCorp'\")\n", "log_info(\"=\" * 60)\n", "\n", "manager = ManagerAgent(\"TechCorp\")\n", "results = manager.delegate_and_collect()\n", "\n", "print()\n", "manager.show_episodic_memory()\n", "\n", "print()\n", "log_info(\"Raw specialist results:\")\n", "for agent_name, result in results.items():\n", " print(f\" {agent_name}: {json.dumps(result, indent=2, default=str)}\")" ] }, { "cell_type": "markdown", "id": "9cb9543e", "metadata": {}, "source": [ "---\n", "## Section 5: Conflict Resolution Mechanisms\n", "\n", "**Chapter Reference:** Section 7.6\n", "\n", "When multiple specialists analyze complex information, disagreements are inevitable.\n", "The architectural strength of a multi-agent system is measured by its capacity to\n", "**resolve conflict productively** (Figure 7.4).\n", "\n", "The arbitration workflow consists of four stages:\n", "1. **Conflict Detection** — Semantic similarity or numerical divergence check\n", "2. **Automated Arbitration** — Arbiter agent consults a knowledge base\n", "3. **Confidence-Based Consensus** — Accept if confidence exceeds threshold (e.g., 95%)\n", "4. **Human Escalation** — Route to human reviewer if confidence is low\n", "\n", "This section extends the `ManagerAgent` with a `synthesize_report` method that computes\n", "a `conflict_score` to detect divergence between sentiment and financial signals." ] }, { "cell_type": "code", "execution_count": null, "id": "66bc58b1", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 5: Conflict Resolution — synthesize_report with conflict_score\n", "# Ref: Section 7.6, pp. 193–194 — \"Implementation Example\"\n", "#\n", "# conflict_score = abs(sentiment_score - (stock_change / 10))\n", "#\n", "# sentiment_score is bounded [-1, 1] by the SentimentAgent.\n", "# stock_change is expressed as a percentage (e.g., 5.0 for 5%).\n", "# Dividing by 10 maps a typical daily swing of +/-10% onto the same [-1, 1] scale.\n", "# The 0.5 threshold represents a half-unit divergence on this normalized scale.\n", "# =============================================================================\n", "\n", "def synthesize_report(\n", " results: Dict[str, Any],\n", " stock_change_pct: float = 5.0,\n", ") -> str:\n", " \"\"\"Aggregate specialist findings into a single report with conflict detection.\n", "\n", " Parameters\n", " ----------\n", " results : dict\n", " Output from ManagerAgent.delegate_and_collect().\n", " stock_change_pct : float\n", " Simulated stock price change percentage for conflict detection.\n", " Default 5.0 represents a 5% positive move — aligned with chapter example.\n", " \"\"\"\n", " log_step(\"7.6\", 1, \"Synthesizing market intelligence report...\")\n", "\n", " financial_data = results.get(\"FinancialAgent\", {}).get(\"data\", {})\n", " sentiment_data = results.get(\"SentimentAgent\", {}).get(\"data\", {})\n", " news_items = results.get(\"NewsAgent\", {}).get(\"data\", [])\n", "\n", " # Extract key values\n", " sentiment_score = sentiment_data.get(\"score\", 0.0)\n", "\n", " # Conflict score formula from Section 7.6, p. 27\n", " # Normalizes both inputs to a compatible scale before comparing\n", " conflict_score = abs(sentiment_score - (stock_change_pct / 10))\n", "\n", " # Build the report\n", " headlines = \"; \".join(news_items[:2]) if news_items else \"no headlines\"\n", "\n", " report = \"Market Intelligence Report\\n\"\n", " report += \"=\" * 40 + \"\\n\"\n", " report += f\"Recent news: {headlines}\\n\"\n", " report += (\n", " f\"P/E ratio: {financial_data.get('pe_ratio', 'N/A')}, \"\n", " f\"Revenue growth: {financial_data.get('revenue_growth', 0):.0%}\\n\"\n", " )\n", "\n", " # Conflict detection — threshold 0.5 (Section 7.6, p. 27)\n", " if conflict_score > 0.5:\n", " report += (\n", " f\"\\n**Conflict Detected (Score: {conflict_score:.2f}):** \"\n", " f\"A significant discrepancy exists between public sentiment \"\n", " f\"(score={sentiment_score}) and market performance \"\n", " f\"(change={stock_change_pct}%). Further investigation recommended.\"\n", " )\n", " log_warning(\n", " f\"Conflict detected! Score: {conflict_score:.2f} > 0.5 threshold. \"\n", " f\"Sentiment={sentiment_score}, StockChange={stock_change_pct}%\"\n", " )\n", " else:\n", " report += (\n", " f\"\\n**Alignment Confirmed (Score: {conflict_score:.2f}):** \"\n", " f\"Public perception and market performance are well-aligned. \"\n", " f\"No material discrepancy detected.\"\n", " )\n", " log_success(\n", " f\"Alignment confirmed. Conflict score: {conflict_score:.2f} \"\n", " f\"(within 0.5 threshold).\"\n", " )\n", "\n", " return report\n", "\n", "\n", "log_success(\"synthesize_report function defined with conflict_score detection.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "b1e86d97", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 5: Demo — Conflict Resolution in Action\n", "# Ref: Section 7.6 — Two scenarios: aligned signals vs. conflicting signals\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_info(\"Section 5 Demo: Conflict Resolution\")\n", "log_info(\"=\" * 60)\n", "\n", "# --- Scenario A: Aligned signals ---\n", "# sentiment_score=0.72, stock_change=5.0% -> conflict_score = |0.72 - 0.5| = 0.22\n", "log_info(\"Scenario A: Aligned signals (stock +5%, sentiment 0.72)\")\n", "report_aligned = synthesize_report(results, stock_change_pct=5.0)\n", "print()\n", "print(report_aligned)\n", "print()\n", "\n", "# --- Scenario B: Conflicting signals ---\n", "# sentiment_score=0.72, stock_change=-8.0% -> conflict_score = |0.72 - (-0.8)| = 1.52\n", "log_info(\"Scenario B: Conflicting signals (stock -8%, sentiment 0.72)\")\n", "report_conflict = synthesize_report(results, stock_change_pct=-8.0)\n", "print()\n", "print(report_conflict)\n", "\n", "log_success(\"Section 5 complete. Both conflict scenarios demonstrated.\")" ] }, { "cell_type": "markdown", "id": "1033887f", "metadata": {}, "source": [ "---\n", "## Section 6: Agentic Workflow — E-Commerce Order Processing\n", "\n", "**Chapter Reference:** Section 7.7\n", "\n", "An agentic workflow system extends orchestration into **long-running, stateful business\n", "processes**. Unlike short-lived tool invocations, workflows require persistence, branching,\n", "error recovery, and checkpoints for human oversight.\n", "\n", "This section builds an e-commerce order processing workflow with:\n", "- **Deterministic tools:** `validate_order`, `check_inventory`, `process_payment`\n", "- **Intelligent agent:** `llm_analyst_agent` for risk assessment (with rule-based fallback)\n", "- **Human-in-the-Loop (HITL) gate:** Pauses for human judgment on medium/high risk orders\n", "\n", "Three test orders demonstrate the full range of behaviors:\n", "\n", "| Order ID | Total | Risk Trigger | Expected Outcome |\n", "|:---|:---|:---|:---|\n", "| ORD-1001 | $69.98 | None (standard) | Happy path — auto-completes |\n", "| ORD-1002 | $1,599.70 | High value + new customer | HITL escalation (medium risk) |\n", "| ORD-1003 | $2,399.40 | Address mismatch + high value | HITL escalation (high risk) |" ] }, { "cell_type": "code", "execution_count": null, "id": "887c67e8", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 6: E-Commerce Workflow Tools\n", "# Ref: Section 7.7, pp. 195–197 — Case Study: E-Commerce Order Processing\n", "#\n", "# Inventory database: a small DataFrame simulating product stock.\n", "# Each tool is wrapped with @graceful_fallback for resilience.\n", "# =============================================================================\n", "\n", "# --- Inventory Database (simulated) ---\n", "inventory_db = pd.DataFrame({\n", " \"product_id\": [\"PROD-A\", \"PROD-B\", \"PROD-C\", \"PROD-D\"],\n", " \"product_name\": [\"Wireless Mouse\", \"USB-C Hub\", \"Mechanical Keyboard\", \"Monitor Stand\"],\n", " \"price\": [34.99, 49.99, 89.99, 59.99],\n", " \"stock\": [150, 75, 40, 20],\n", "})\n", "\n", "log_info(\"Inventory database initialized:\")\n", "print(inventory_db.to_string(index=False))\n", "print()\n", "\n", "# --- Test Orders (Strategy §6.3) ---\n", "test_orders = [\n", " {\n", " \"order_id\": \"ORD-1001\",\n", " \"customer_id\": \"CUST-200\",\n", " \"items\": [{\"product_id\": \"PROD-A\", \"quantity\": 2}],\n", " \"shipping_address\": \"123 Main St, Springfield\",\n", " \"billing_address\": \"123 Main St, Springfield\",\n", " \"customer_type\": \"returning\",\n", " },\n", " {\n", " \"order_id\": \"ORD-1002\",\n", " \"customer_id\": \"CUST-501\",\n", " \"items\": [\n", " {\"product_id\": \"PROD-C\", \"quantity\": 10},\n", " {\"product_id\": \"PROD-D\", \"quantity\": 10},\n", " ],\n", " \"shipping_address\": \"456 Oak Ave, Shelbyville\",\n", " \"billing_address\": \"456 Oak Ave, Shelbyville\",\n", " \"customer_type\": \"new\",\n", " },\n", " {\n", " \"order_id\": \"ORD-1003\",\n", " \"customer_id\": \"CUST-777\",\n", " \"items\": [\n", " {\"product_id\": \"PROD-C\", \"quantity\": 20},\n", " {\"product_id\": \"PROD-D\", \"quantity\": 8},\n", " ],\n", " \"shipping_address\": \"789 Elm Rd, Capital City\",\n", " \"billing_address\": \"999 Different Blvd, Ogdenville\",\n", " \"customer_type\": \"new\",\n", " },\n", "]\n", "\n", "log_info(f\"{len(test_orders)} test orders prepared.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "9ac1f946", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 6: Workflow Step Functions\n", "# Ref: Section 7.7, pp. 195–197\n", "# =============================================================================\n", "\n", "@graceful_fallback(fallback_return=(False, 0.0), section=\"7.7\")\n", "def validate_order(order: Dict[str, Any], inv_db: pd.DataFrame) -> Tuple[bool, float]:\n", " \"\"\"Step 1: Validate order data and compute total.\"\"\"\n", " order_id = order.get(\"order_id\", \"N/A\")\n", " log_step(\"7.7\", 1, f\"Validating Order #{order_id}...\")\n", "\n", " total = 0.0\n", " for item in order.get(\"items\", []):\n", " product = inv_db[inv_db[\"product_id\"] == item[\"product_id\"]]\n", " if product.empty:\n", " raise ValueError(f\"Product {item['product_id']} not found in inventory.\")\n", " total += product.iloc[0][\"price\"] * item[\"quantity\"]\n", "\n", " log_info(f\" Order #{order_id} validated. Total: ${total:,.2f}\")\n", " return (True, total)\n", "\n", "\n", "@graceful_fallback(fallback_return=False, section=\"7.7\")\n", "def check_inventory(order: Dict[str, Any], inv_db: pd.DataFrame) -> bool:\n", " \"\"\"Step 3: Check and reserve inventory for all items.\"\"\"\n", " order_id = order.get(\"order_id\", \"N/A\")\n", " log_step(\"7.7\", 3, f\"Checking inventory for Order #{order_id}...\")\n", "\n", " for item in order.get(\"items\", []):\n", " product = inv_db[inv_db[\"product_id\"] == item[\"product_id\"]]\n", " if product.empty or product.iloc[0][\"stock\"] < item[\"quantity\"]:\n", " log_error(f\" Insufficient stock for {item['product_id']}.\")\n", " return False\n", " log_info(f\" Inventory confirmed for Order #{order_id}.\")\n", " return True\n", "\n", "\n", "@graceful_fallback(fallback_return=False, section=\"7.7\")\n", "def process_payment(order: Dict[str, Any], order_total: float) -> bool:\n", " \"\"\"Step 4: Process payment (simulated).\"\"\"\n", " order_id = order.get(\"order_id\", \"N/A\")\n", " log_step(\"7.7\", 4, f\"Processing payment of ${order_total:,.2f} for Order #{order_id}...\")\n", " # Simulate payment processing delay\n", " time.sleep(0.5)\n", " log_success(f\" Payment of ${order_total:,.2f} processed for Order #{order_id}.\")\n", " return True\n", "\n", "\n", "log_success(\"Workflow step functions defined: validate_order, check_inventory, process_payment.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "1cd54400", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 6: LLM Analyst Agent — Risk Assessment\n", "# Ref: Section 7.7, p. 196 — \"Embedding Intelligence in Workflow Nodes\"\n", "#\n", "# The llm_analyst_agent is designed to be adaptable:\n", "# - Live Mode: uses the OpenAI API for nuanced risk analysis\n", "# - Simulation Mode: uses rule-based logic as a mock analysis\n", "#\n", "# Rule-based risk triggers (from chapter text):\n", "# - High value (> $500) + new customer → medium risk\n", "# - Address mismatch + high value → high risk\n", "# =============================================================================\n", "\n", "@graceful_fallback(\n", " fallback_return={\"risk_level\": \"high\", \"reason\": \"Unavailable — default high.\"},\n", " section=\"7.7\",\n", ")\n", "def llm_analyst_agent(order: Dict[str, Any], order_total: float) -> Dict[str, str]:\n", " \"\"\"Step 2: Assess risk for an order using LLM or rule-based fallback.\"\"\"\n", " order_id = order.get(\"order_id\", \"N/A\")\n", " log_step(\"7.7\", 2, f\"Assessing risk for Order #{order_id}...\")\n", "\n", " if not LIVE_MODE:\n", " # --- Rule-based mock analysis (Simulation Mode) ---\n", " log_mock(\" Using mock analysis (Simulation Mode).\")\n", "\n", " address_mismatch = (\n", " order.get(\"shipping_address\", \"\") != order.get(\"billing_address\", \"\")\n", " )\n", " is_new = order.get(\"customer_type\", \"\") == \"new\"\n", " high_value = order_total > 500.0\n", "\n", " if address_mismatch and high_value:\n", " analysis = {\n", " \"risk_level\": \"high\",\n", " \"reason\": \"Shipping/billing address mismatch combined with high order value.\",\n", " }\n", " elif high_value and is_new:\n", " analysis = {\n", " \"risk_level\": \"medium\",\n", " \"reason\": \"Order total exceeds typical range for a new customer.\",\n", " }\n", " else:\n", " analysis = {\n", " \"risk_level\": \"low\",\n", " \"reason\": \"Standard order parameters. No risk indicators detected.\",\n", " }\n", "\n", " log_info(f\" Risk Level: {analysis['risk_level']}\")\n", " log_info(f\" Reason: {analysis['reason']}\")\n", " return analysis\n", "\n", " # --- Live Mode: use OpenAI API ---\n", " log_info(\" Mode: Using OpenAI GPT for analysis.\")\n", " try:\n", " prompt = (\n", " f\"Assess the fraud risk for Order #{order_id}. \"\n", " f\"Total: ${order_total:,.2f}. Customer type: {order.get('customer_type')}. \"\n", " f\"Shipping: {order.get('shipping_address')}. \"\n", " f\"Billing: {order.get('billing_address')}. \"\n", " f\"Respond with JSON: {{\\\"risk_level\\\": \\\"low|medium|high\\\", \\\"reason\\\": \\\"...\\\"}}.\"\n", " )\n", " response_text = llm.generate(prompt)\n", " analysis = json.loads(response_text)\n", " log_info(f\" LLM Analysis complete. Risk Level: {analysis.get('risk_level')}\")\n", " return analysis\n", " except Exception as e:\n", " log_error(f\" LLM analysis failed: {e}. Defaulting to high risk.\")\n", " return {\"risk_level\": \"high\", \"reason\": \"LLM analysis failed.\"}\n", "\n", "\n", "log_success(\"llm_analyst_agent defined with Live/Simulation dual mode.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "77c62d03", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 6: Workflow Manager Agent with HITL Gate\n", "# Ref: Section 7.7, pp. 195–197 — workflow_manager_agent\n", "# Ref: Strategy §6.5 — HITL behavior in Simulation Mode\n", "#\n", "# Step 2.5: HITL Escalation Gate\n", "# If risk_level is medium or high, pause for human judgment.\n", "# In Simulation Mode: auto-approve after 2s delay.\n", "# In Live Mode + INTERACTIVE_MODE: use real input() prompt.\n", "# =============================================================================\n", "\n", "def workflow_manager_agent(order: Dict[str, Any], inv_db: pd.DataFrame) -> bool:\n", " \"\"\"Orchestrate the full e-commerce order processing workflow.\"\"\"\n", " order_id = order.get(\"order_id\", \"N/A\")\n", " log_info(\"=\" * 60)\n", " log_info(f\"Starting Workflow for Order #{order_id}\")\n", " log_info(\"=\" * 60)\n", "\n", " try:\n", " # Step 1: Validate the order data\n", " is_valid, order_total = validate_order(order, inv_db)\n", " if not is_valid:\n", " raise AgentError(\"Order validation failed.\")\n", "\n", " # Step 2: Assess fraud risk with an intelligent agent\n", " risk_analysis = llm_analyst_agent(order, order_total)\n", " risk_level = risk_analysis.get(\"risk_level\", \"high\").lower()\n", " risk_reason = risk_analysis.get(\"reason\", \"No reason provided.\")\n", "\n", " # Step 2.5: Human-in-the-Loop Escalation Gate\n", " if risk_level in [\"high\", \"medium\"]:\n", " log_warning(f\"HUMAN INTERVENTION REQUIRED for Order #{order_id}!\")\n", " log_info(f\" Reason: Analyst flagged order as '{risk_level}' — {risk_reason}\")\n", "\n", " if LIVE_MODE and INTERACTIVE_MODE:\n", " # Real interactive prompt\n", " decision = \"\"\n", " while decision not in [\"approve\", \"reject\"]:\n", " decision = input(\" Type 'approve' or 'reject': \").lower().strip()\n", " else:\n", " # Simulation Mode: auto-approve after delay (Strategy §6.5)\n", " log_mock(\" Auto-approving after 2s delay (Simulation Mode)...\")\n", " time.sleep(2)\n", " decision = \"approve\"\n", "\n", " if decision == \"reject\":\n", " log_error(f\" DECISION: Human operator REJECTED Order #{order_id}.\")\n", " raise AgentError(\"Workflow terminated by human operator.\")\n", " else:\n", " log_success(f\" DECISION: Human operator APPROVED Order #{order_id}. Resuming...\")\n", " else:\n", " log_success(f\" Risk level 'low' — no HITL required for Order #{order_id}.\")\n", "\n", " # Step 3: Check and reserve inventory\n", " if not check_inventory(order, inv_db):\n", " raise AgentError(\"Inventory check failed.\")\n", "\n", " # Step 4: Process the payment\n", " if not process_payment(order, order_total):\n", " raise AgentError(\"Payment processing failed.\")\n", "\n", " log_success(f\"Workflow for Order #{order_id} COMPLETED SUCCESSFULLY.\")\n", " return True\n", "\n", " except AgentError as e:\n", " log_error(f\"Workflow for Order #{order_id} TERMINATED. Reason: {e}\")\n", " return False\n", "\n", "\n", "log_success(\"workflow_manager_agent defined with HITL escalation gate.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "bc50683f", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 6: Demo — Run All 3 Test Orders\n", "# Ref: Strategy §6.3 — Expected outcomes:\n", "# ORD-1001: Happy path (low risk, auto-completes)\n", "# ORD-1002: Medium risk HITL escalation (auto-approved in Simulation Mode)\n", "# ORD-1003: High risk HITL escalation (auto-approved in Simulation Mode)\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_info(\"Section 6 Demo: E-Commerce Workflow — 3 Test Orders\")\n", "log_info(\"=\" * 60)\n", "\n", "workflow_results = {}\n", "for order in test_orders:\n", " success = workflow_manager_agent(order, inventory_db)\n", " workflow_results[order[\"order_id\"]] = success\n", " print()\n", "\n", "# Summary table\n", "log_info(\"=\" * 60)\n", "log_info(\"Workflow Results Summary:\")\n", "for oid, result in workflow_results.items():\n", " status = \"COMPLETED\" if result else \"TERMINATED\"\n", " badge_func = log_success if result else log_error\n", " badge_func(f\" {oid}: {status}\")\n", "\n", "log_success(\"Section 6 complete. All 3 test orders processed.\")" ] }, { "cell_type": "markdown", "id": "06c36b03", "metadata": {}, "source": [ "---\n", "## Section 7: Agentic Workflow — Insurance Claims Processing\n", "\n", "**Chapter Reference:** Section 7.7b\n", "\n", "This section demonstrates a more complex, multi-agent workflow modeled as a **state machine**\n", "(Figure 7.5). Five specialized agents handle different stages of insurance claims processing:\n", "\n", "| Agent | Role |\n", "|:---|:---|\n", "| **Intake Agent** | OCR/NLP to digitize claim forms |\n", "| **Validator Agent** | Checks policy status, fraud signals, required documentation |\n", "| **Classifier Agent** | Assesses claim type, urgency, and risk level |\n", "| **Payout Agent** | Calculates settlement and processes payment |\n", "| **Escalation Agent** | Routes high-risk or low-confidence claims to human review |\n", "\n", "**Guard Conditions** (Table 7.2):\n", "\n", "| Guard | Action |\n", "|:---|:---|\n", "| `claim_amount > threshold` | Route to human for approval |\n", "| Validation fails | Transition to Closed: Rejected |\n", "| `confidence_score < 0.85` | Escalate to Pending Human Review |\n", "\n", "Three test claims:\n", "\n", "| Claim ID | Amount | Type | Expected Path |\n", "|:---|:---|:---|:---|\n", "| CLM-4821 | $8,400 | water_damage | Auto-approved (confidence 0.91) |\n", "| CLM-5099 | $47,000 | fire_damage | Escalated (confidence 0.79) |\n", "| CLM-5100 | $3,200 | theft | Rejected at validation (expired policy) |" ] }, { "cell_type": "markdown", "id": "25a449e0", "metadata": {}, "source": [ "#### Figure 7.5 — Insurance Claim State Machine *(Book p. 199)*\n", "\n", "```\n", " ● ──▶ [Intake] ──Form Submitted──▶ [Validating] ──Valid──▶ [Assessing Risk]\n", " │ │ │\n", " Invalid Low Risk High Risk\n", " │ │ │\n", " ▼ ▼ ▼\n", " [Closed: [Processing [Pending Human\n", " Rejected] Payout] Review]\n", " ▲ │ │ │ │\n", " │ Success│ Payment Approved Rejected\n", " │ │ Failed │ │\n", " │ ▼ │ ▼ │\n", " Rejected │ [Closed: └──▶[Processing │\n", " (Human)───┘ Approved] Payout] │\n", " ▲ │\n", " └───────────────────────────────────────────┘\n", "```\n", "\n", "**Guard conditions** (Table 7.2): `claim_amount > threshold` → human approval; `validation fails` → rejected; `confidence_score < 0.85` → escalate to human review. At each transition, the system records the agent's reasoning, tools used, and human decisions for a complete audit trail.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "8e72217d", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 7: Insurance Claims — Test Data and Policy Database\n", "# Ref: Section 7.7b, pp. 198–200 — Multi-Agent Insurance Claims Workflow\n", "# Ref: Strategy §6.4 — Test claims\n", "# =============================================================================\n", "\n", "# --- Policy Database (simulated) ---\n", "policy_db = {\n", " \"POL-992317\": {\"status\": \"active\", \"holder\": \"Alice Johnson\", \"fraud_flag\": False},\n", " \"POL-110482\": {\"status\": \"active\", \"holder\": \"Bob Martinez\", \"fraud_flag\": False},\n", " \"POL-EXPIRED\": {\"status\": \"expired\", \"holder\": \"Charlie Lee\", \"fraud_flag\": False},\n", "}\n", "\n", "# --- Test Claims (Strategy §6.4) ---\n", "test_claims = [\n", " {\n", " \"claim_id\": \"CLM-4821\",\n", " \"policy_id\": \"POL-992317\",\n", " \"claim_type\": \"water_damage\",\n", " \"amount\": 8400.00,\n", " \"description\": \"Burst pipe caused flooding in basement. Damage to walls and flooring.\",\n", " },\n", " {\n", " \"claim_id\": \"CLM-5099\",\n", " \"policy_id\": \"POL-110482\",\n", " \"claim_type\": \"fire_damage\",\n", " \"amount\": 47000.00,\n", " \"description\": \"Kitchen fire spread to living area. Significant structural damage.\",\n", " },\n", " {\n", " \"claim_id\": \"CLM-5100\",\n", " \"policy_id\": \"POL-EXPIRED\",\n", " \"claim_type\": \"theft\",\n", " \"amount\": 3200.00,\n", " \"description\": \"Laptop and electronics stolen during home break-in.\",\n", " },\n", "]\n", "\n", "log_info(f\"Policy database: {len(policy_db)} policies loaded.\")\n", "log_info(f\"Test claims: {len(test_claims)} claims prepared.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "a7bf4d67", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 7: Insurance Claims — Five Specialized Agents\n", "# Ref: Section 7.7b, pp. 198–199 — Agent roles\n", "# =============================================================================\n", "\n", "@graceful_fallback(fallback_return=None, section=\"7.7b\")\n", "def intake_agent(claim: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Intake Agent: Digitize and extract fields from claim submission.\n", " In production, this would use OCR and NLP on submitted forms.\"\"\"\n", " claim_id = claim.get(\"claim_id\", \"N/A\")\n", " log_step(\"7.7b\", 1, f\"[Intake] Processing claim {claim_id}...\")\n", "\n", " # Simulate OCR/NLP field extraction\n", " extracted = {\n", " \"claim_id\": claim[\"claim_id\"],\n", " \"policy_id\": claim[\"policy_id\"],\n", " \"claim_type\": claim[\"claim_type\"],\n", " \"amount\": claim[\"amount\"],\n", " \"description\": claim[\"description\"],\n", " \"status\": \"intake_complete\",\n", " }\n", " log_info(f\" Extracted: type={extracted['claim_type']}, amount=${extracted['amount']:,.2f}\")\n", " return extracted\n", "\n", "\n", "@graceful_fallback(fallback_return=False, section=\"7.7b\")\n", "def validator_agent(claim_record: Dict[str, Any], policies: Dict) -> bool:\n", " \"\"\"Validator Agent: Check policy status and fraud signals.\n", " Guard: validation fails -> Closed: Rejected.\"\"\"\n", " claim_id = claim_record.get(\"claim_id\", \"N/A\")\n", " policy_id = claim_record.get(\"policy_id\", \"N/A\")\n", " log_step(\"7.7b\", 2, f\"[Validator] Validating claim {claim_id}, policy {policy_id}...\")\n", "\n", " policy = policies.get(policy_id)\n", " if policy is None:\n", " log_error(f\" Policy {policy_id} not found in database.\")\n", " return False\n", "\n", " if policy[\"status\"] != \"active\":\n", " log_error(f\" Policy {policy_id} status: {policy['status']}. Claim rejected.\")\n", " return False\n", "\n", " if policy.get(\"fraud_flag\", False):\n", " log_warning(f\" Fraud flag detected on policy {policy_id}.\")\n", " return False\n", "\n", " log_info(f\" Policy {policy_id} validated: active, no fraud flag.\")\n", " return True\n", "\n", "\n", "@graceful_fallback(\n", " fallback_return={\"confidence_score\": 0.5, \"risk\": \"high\"},\n", " section=\"7.7b\",\n", ")\n", "def classifier_agent(claim_record: Dict[str, Any]) -> Dict[str, Any]:\n", " \"\"\"Classifier Agent: Assess claim type, urgency, and risk level.\n", " Guard: confidence_score < 0.85 -> escalate to Pending Human Review.\"\"\"\n", " claim_id = claim_record.get(\"claim_id\", \"N/A\")\n", " claim_type = claim_record.get(\"claim_type\", \"unknown\")\n", " amount = claim_record.get(\"amount\", 0.0)\n", " log_step(\"7.7b\", 3, f\"[Classifier] Assessing risk for claim {claim_id}...\")\n", "\n", " # Rule-based classification (Simulation Mode)\n", " # High-value claims or fire/explosion types get lower confidence\n", " if amount > 25000 or claim_type in [\"fire_damage\", \"explosion\"]:\n", " classification = {\n", " \"confidence_score\": 0.79,\n", " \"risk\": \"high\",\n", " \"claim_type\": claim_type,\n", " }\n", " else:\n", " classification = {\n", " \"confidence_score\": 0.91,\n", " \"risk\": \"low\",\n", " \"claim_type\": claim_type,\n", " }\n", "\n", " log_info(\n", " f\" Classification: confidence={classification['confidence_score']}, \"\n", " f\"risk={classification['risk']}\"\n", " )\n", " return classification\n", "\n", "\n", "@graceful_fallback(fallback_return=False, section=\"7.7b\")\n", "def payout_agent(claim_record: Dict[str, Any]) -> bool:\n", " \"\"\"Payout Agent: Calculate settlement and process payment.\"\"\"\n", " claim_id = claim_record.get(\"claim_id\", \"N/A\")\n", " amount = claim_record.get(\"amount\", 0.0)\n", " log_step(\"7.7b\", 5, f\"[Payout] Processing settlement of ${amount:,.2f} for {claim_id}...\")\n", " time.sleep(0.5) # Simulate payment processing\n", " log_success(f\" Settlement of ${amount:,.2f} paid for claim {claim_id}.\")\n", " return True\n", "\n", "\n", "log_success(\"Insurance claims agents defined: intake, validator, classifier, payout.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "b9df3bca", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 7: Claims Workflow Manager — State Machine\n", "# Ref: Section 7.7b, pp. 199–200 — Figure 7.5 and Table 7.2\n", "#\n", "# State transitions:\n", "# Intake -> Validating -> Assessing Risk -> Processing Payout -> Closed: Approved\n", "# | | |\n", "# v v v\n", "# Closed: Rejected Pending Human Review Closed: Rejected\n", "# (approve -> Payout) (payment failed)\n", "# (reject -> Rejected)\n", "#\n", "# Guard: confidence_score < 0.85 -> escalate to Pending Human Review\n", "# =============================================================================\n", "\n", "def claims_workflow_manager(\n", " claim: Dict[str, Any],\n", " policies: Dict,\n", ") -> Dict[str, Any]:\n", " \"\"\"Process a single insurance claim through the state machine.\n", "\n", " Returns an audit record with all state transitions and decisions.\n", " \"\"\"\n", " claim_id = claim.get(\"claim_id\", \"N/A\")\n", " audit_trail: List[Dict[str, str]] = []\n", "\n", " log_info(\"=\" * 60)\n", " log_info(f\"Claims Workflow: Processing {claim_id}\")\n", " log_info(\"=\" * 60)\n", "\n", " def log_transition(from_state: str, to_state: str, reason: str):\n", " entry = {\"from\": from_state, \"to\": to_state, \"reason\": reason}\n", " audit_trail.append(entry)\n", " log_info(f\" State: {from_state} -> {to_state} ({reason})\")\n", "\n", " # --- State: Intake ---\n", " claim_record = intake_agent(claim)\n", " if claim_record is None:\n", " log_transition(\"Intake\", \"Closed: Rejected\", \"Intake agent failed\")\n", " return {\"claim_id\": claim_id, \"final_state\": \"Closed: Rejected\", \"audit\": audit_trail}\n", " log_transition(\"Start\", \"Intake\", \"Form submitted\")\n", "\n", " # --- State: Validating ---\n", " is_valid = validator_agent(claim_record, policies)\n", " if not is_valid:\n", " log_transition(\"Intake\", \"Closed: Rejected\", \"Validation failed\")\n", " log_error(f\"Claim {claim_id} REJECTED at validation.\")\n", " return {\"claim_id\": claim_id, \"final_state\": \"Closed: Rejected\", \"audit\": audit_trail}\n", " log_transition(\"Intake\", \"Validating\", \"All fields populated\")\n", " log_transition(\"Validating\", \"Assessing Risk\", \"Validation passed\")\n", "\n", " # --- State: Assessing Risk ---\n", " classification = classifier_agent(claim_record)\n", " confidence = classification.get(\"confidence_score\", 0.0)\n", " risk = classification.get(\"risk\", \"high\")\n", "\n", " # Guard: confidence_score < 0.85 -> escalate (Table 7.2)\n", " if confidence < 0.85:\n", " log_transition(\"Assessing Risk\", \"Pending Human Review\",\n", " f\"confidence_score {confidence} < 0.85 threshold\")\n", " log_warning(f\"Claim {claim_id} escalated for human review (confidence={confidence}).\")\n", "\n", " # HITL: Escalation Agent\n", " log_step(\"7.7b\", 4, f\"[Escalation] Claim {claim_id} awaiting human decision...\")\n", " if LIVE_MODE and INTERACTIVE_MODE:\n", " decision = \"\"\n", " while decision not in [\"approve\", \"reject\"]:\n", " decision = input(\" Type 'approve' or 'reject': \").lower().strip()\n", " else:\n", " log_mock(\" Auto-approving after 2s delay (Simulation Mode)...\")\n", " time.sleep(2)\n", " decision = \"approve\"\n", "\n", " if decision == \"reject\":\n", " log_transition(\"Pending Human Review\", \"Closed: Rejected\", \"Human rejected\")\n", " log_error(f\"Claim {claim_id} REJECTED by human reviewer.\")\n", " return {\"claim_id\": claim_id, \"final_state\": \"Closed: Rejected\", \"audit\": audit_trail}\n", " else:\n", " log_transition(\"Pending Human Review\", \"Processing Payout\", \"Human approved\")\n", " log_success(f\"Claim {claim_id} APPROVED by human reviewer.\")\n", " else:\n", " log_transition(\"Assessing Risk\", \"Processing Payout\",\n", " f\"Low risk, confidence {confidence} >= 0.85\")\n", "\n", " # --- State: Processing Payout ---\n", " payment_ok = payout_agent(claim_record)\n", " if not payment_ok:\n", " log_transition(\"Processing Payout\", \"Closed: Rejected\", \"Payment failed\")\n", " log_error(f\"Claim {claim_id} REJECTED — payment failure.\")\n", " return {\"claim_id\": claim_id, \"final_state\": \"Closed: Rejected\", \"audit\": audit_trail}\n", "\n", " log_transition(\"Processing Payout\", \"Closed: Approved\", \"Settlement paid\")\n", " log_success(f\"Claim {claim_id} APPROVED and settled.\")\n", "\n", " return {\"claim_id\": claim_id, \"final_state\": \"Closed: Approved\", \"audit\": audit_trail}\n", "\n", "\n", "log_success(\"claims_workflow_manager defined — full state machine with guard conditions.\")" ] }, { "cell_type": "code", "execution_count": null, "id": "504af1b5", "metadata": {}, "outputs": [], "source": [ "# =============================================================================\n", "# Section 7: Demo — Run All 3 Test Claims\n", "# Ref: Strategy §6.4 — Expected outcomes:\n", "# CLM-4821: Auto-approved (confidence 0.91, water_damage, $8,400)\n", "# CLM-5099: Escalated then auto-approved (confidence 0.79, fire_damage, $47,000)\n", "# CLM-5100: Rejected at validation (expired policy)\n", "# =============================================================================\n", "\n", "log_info(\"=\" * 60)\n", "log_info(\"Section 7 Demo: Insurance Claims Workflow — 3 Test Claims\")\n", "log_info(\"=\" * 60)\n", "\n", "claim_results = {}\n", "for claim in test_claims:\n", " result = claims_workflow_manager(claim, policy_db)\n", " claim_results[result[\"claim_id\"]] = result\n", " print()\n", "\n", "# --- Summary and Audit Trail ---\n", "log_info(\"=\" * 60)\n", "log_info(\"Claims Processing Summary:\")\n", "for cid, result in claim_results.items():\n", " final = result[\"final_state\"]\n", " badge_func = log_success if \"Approved\" in final else log_error\n", " badge_func(f\" {cid}: {final}\")\n", "\n", "print()\n", "log_info(\"Audit Trails:\")\n", "for cid, result in claim_results.items():\n", " log_info(f\" {cid}:\")\n", " for entry in result[\"audit\"]:\n", " log_info(f\" {entry['from']} -> {entry['to']} ({entry['reason']})\")\n", "\n", "log_success(\"Section 7 complete. All 3 test claims processed with full audit trails.\")" ] }, { "cell_type": "markdown", "id": "deabc617", "metadata": {}, "source": [ "---\n", "## Section 8: Summary and Key Takeaways\n", "\n", "**Chapter 7** demonstrated the progression from foundational tool invocation to sophisticated\n", "multi-agent orchestration and persistent workflow systems. We explored three architectural\n", "patterns, each building on the last:\n", "\n", "### Pattern 1: Tool-Using Agents (Sections 7.1–7.3)\n", "A single agent extends its reasoning by invoking external functions through a **Think/Plan/Act**\n", "cycle. The tool chest, tool registry, and execution engine work together to transform natural\n", "language requests into concrete outputs. The **selection funnel** ensures reliable tool discovery,\n", "and **safe invocation wrappers** provide resilience against the four categories of failure.\n", "\n", "### Pattern 2: Chain-of-Agents Orchestrators (Sections 7.4–7.6)\n", "Multiple specialized agents collaborate under a **cooperation protocol** with four pillars:\n", "defined roles, a common communication infrastructure, shared memory, and execution\n", "orchestration. **Episodic memory** preserves state across handoffs, and **conflict resolution**\n", "mechanisms detect divergence between agent outputs using calibrated confidence scores.\n", "\n", "### Pattern 3: Agentic Workflow Systems (Section 7.7)\n", "Long-running business processes modeled as **state machines** with guard conditions,\n", "**human-in-the-loop checkpoints**, and complete audit trails. The e-commerce and insurance\n", "workflows demonstrated how to embed intelligent agents within deterministic process flows,\n", "balancing automation speed with human governance.\n", "\n", "### Cross-Cutting Principles\n", "- **Fail-Gracefully Architecture:** `@graceful_fallback` on every function ensures no single\n", " failure crashes the system.\n", "- **Visual Observability:** Color-coded logs provide a clear execution trail.\n", "- **Simulation Mode:** `MockLLM` enables the entire notebook to run without any API key.\n", "\n", "---\n", "\n", "*These patterns provide the foundation for production-ready agent systems. In Chapter 8,\n", "we apply these orchestration patterns to data analysis and reasoning agents that explore\n", "datasets, recommend visualizations, and perform statistical reasoning.*\n", "\n", "---\n", "**Book:** *Agents* by Imran Ahmad (Packt, 2026 — B34135)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.11.0" } }, "nbformat": 4, "nbformat_minor": 5 }