{ "cells": [ { "cell_type": "markdown", "id": "b8f98beb", "metadata": {}, "source": [ "# Chapter 16: Embodied and Physical World Agents\n", "\n", "> From **30 Agents Every AI Engineer Must Build** by Imran Ahmad (Packt Publishing, 2026) \n", "> **Book pages 457–491** | [GitHub Repository](https://github.com/PacktPublishing/30-Agents-Every-AI-Engineer-Must-Build/chapter16)\n", "\n", "---\n", "\n", "## Introduction\n", "\n", "> *\"The world is its own best model.\"* \n", "> — Rodney Brooks, co-founder of iRobot and former director of MIT CSAIL\n", "\n", "In 1966, researchers at Stanford set a wheeled robot named **Shakey** loose in a corridor and watched it reason about pushing blocks. The lesson was immediate: intelligence that acts in the physical world earns every decision against gravity, friction, and consequence. NASA's Sojourner rover crossed Martian terrain three decades later under the same constraint. Today, autonomous drones navigate storms and construction sites with no human at the controls.\n", "\n", "This qualitative difference creates what the chapter calls the **physicality constraint** — decomposed into three concrete properties:\n", "\n", "| Property | Description |\n", "|---|---|\n", "| **Irreversibility** | Physical actions cannot be rolled back once executed; planning must account for worst-case outcomes before committing |\n", "| **Latency** | Mechanical actuators, communication links, and sensor pipelines introduce delays that accumulate in real-time control loops |\n", "| **Energy limits** | Onboard power is finite and non-replenishable mid-mission; every action carries an energy cost that bounds the feasible plan space |\n", "\n", "The chapter addresses this constraint through a fundamental **depth–breadth divide**:\n", "\n", "- **Depth Problem (Embodied Intelligence Agent):** Controlling a single physical system with millisecond precision and hard safety guarantees — a layered control hierarchy from LLM-based strategic reasoning down to 1–10 kHz servo-level motor control.\n", "- **Breadth Problem (Domain-Transforming Integration Agent):** Coordinating heterogeneous infrastructure systems with complex cross-domain dependencies — modeled as a typed knowledge graph with weighted influence propagation.\n", "\n", "These two architectures are brought together in a **drone mission case study** set in Ottawa's winter conditions, where real-time flight control operates within a **Unified Constraint Envelope** derived from weather, energy, regulatory, and mission constraints. The central principle: **conservative constraint fusion** — autonomous systems must satisfy *all* constraints simultaneously, with safety enforced as a precondition for action rather than an afterthought.\n", "\n", "### Notebook Structure\n", "\n", "| Section | Listings | Book Pages |\n", "|---|---|---|\n", "| Setup & Environment Detection | — | p. 458 |\n", "| Architectural Foundations | — | pp. 458–462 |\n", "| Control Hierarchy & World Model | Pseudocode | pp. 462–468 |\n", "| Embodied Agent Implementation | 16.1, 16.2, 16.3 | pp. 468–472 |\n", "| Domain-Transforming Integration Agent | 16.4, 16.5 | pp. 476–479 |\n", "| Ottawa Drone Case Study | 16.6, 16.7 | pp. 480–489 |\n", "| Failure Scenario Demonstrations | — | pp. 489–490 |\n" ] }, { "cell_type": "markdown", "id": "9d2164a5", "metadata": {}, "source": [ "### Figure 16.1 — The Depth–Breadth Divide (p. 459)\n", "\n", "```\n", "┌─────────────────────────────────────────────────────────────────────────┐\n", "│ AUTONOMOUS PHYSICAL SYSTEM │\n", "│ │\n", "│ ┌───────────────────────┐ ┌───────────────────────────────┐ │\n", "│ │ DEPTH PROBLEM │ │ BREADTH PROBLEM │ │\n", "│ │ (Embodied Agent) │ │ (Integration Agent) │ │\n", "│ │ │ │ │ │\n", "│ │ ┌─────────────────┐ │ │ ┌─────────────────────────┐ │ │\n", "│ │ │ Task Planning │ │ │ │ Weather ←→ Energy │ │ │\n", "│ │ │ (0.1-1 Hz) │ │ │ │ ↕ ↕ │ │ │\n", "│ │ ├─────────────────┤ │ │ │ Airspace ←→ Transport │ │ │\n", "│ │ │ Motion Planning │ │ │ │ ↕ ↕ │ │ │\n", "│ │ │ (1-10 Hz) │ │ │ │ Parks ←──→ Mission │ │ │\n", "│ │ ├─────────────────┤ │ │ └─────────────────────────┘ │ │\n", "│ │ │ Trajectory Ctrl │ │ │ Cross-domain knowledge graph │ │\n", "│ │ │ (50-200 Hz) │ │ │ with influence propagation │ │\n", "│ │ ├─────────────────┤ │ │ │ │\n", "│ │ │ Servo Control │ │ │ │ │\n", "│ │ │ (1-10 kHz) │ │ │ │ │\n", "│ │ └─────────────────┘ │ └───────────────────────────────┘ │\n", "│ └───────────────────────┘ │\n", "│ │\n", "│ ┌─────────────────────────────────┐ │\n", "│ │ UNIFIED CONSTRAINT ENVELOPE │ │\n", "│ │ Conservative Constraint Fusion │ │\n", "│ │ ALL domains must be GREEN │ │\n", "│ └─────────────────────────────────┘ │\n", "└─────────────────────────────────────────────────────────────────────────┘\n", "```\n" ] }, { "cell_type": "code", "execution_count": null, "id": "0471a871", "metadata": {}, "outputs": [], "source": [ "# Cell 2: Environment Setup — Imports, .env Loading, SIMULATION_MODE Detection\n", "# Ref: §Technical requirements (p. 458)\n", "# Author: Imran Ahmad\n", "#\n", "# Cascading fallback: .env → getpass → Simulation Mode\n", "# The getpass prompt is skipped in non-interactive environments (e.g. nbconvert)\n", "# to prevent blocking during automated validation.\n", "\n", "import os\n", "import sys\n", "import json\n", "from pathlib import Path\n", "\n", "# --- Install check (idempotent) ---\n", "# Uncomment the following line if running in a fresh environment:\n", "# !pip install -q langchain==0.2.16 langchain-openai==0.1.23 langgraph==0.1.4 openai==1.40.0 pydantic==2.8.2 python-dotenv==1.0.1\n", "\n", "# --- Load .env if present ---\n", "try:\n", " from dotenv import load_dotenv\n", " load_dotenv()\n", "except ImportError:\n", " pass # python-dotenv not installed; rely on environment variables\n", "\n", "# --- API key resolution with getpass fallback ---\n", "api_key = os.environ.get(\"OPENAI_API_KEY\", \"\")\n", "\n", "if not api_key or api_key == \"your-openai-api-key-here\":\n", " # Only prompt if running interactively (not under nbconvert --execute)\n", " if sys.stdin.isatty():\n", " try:\n", " import getpass\n", " api_key = getpass.getpass(\n", " \"Enter your OpenAI API key (or press Enter for Simulation Mode): \"\n", " )\n", " if api_key:\n", " os.environ[\"OPENAI_API_KEY\"] = api_key\n", " except (EOFError, OSError):\n", " api_key = \"\"\n", "\n", "# --- Determine mode ---\n", "SIMULATION_MODE = not (api_key and api_key.startswith(\"sk-\") and \"your-key\" not in api_key)\n", "\n", "print(f\"SIMULATION_MODE = {SIMULATION_MODE}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "47fbb894", "metadata": {}, "outputs": [], "source": [ "# Multi-provider LLM support (OpenAI / Anthropic / Google Gemini)\n", "# Set LLM_PROVIDER in .env to choose: openai | anthropic | google | auto\n", "# Auto-detection uses the first available key.\n", "# See supporting/llm_provider.py for details.\n", "\n", "import sys, os\n", "sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath('.')), ''))\n", "sys.path.insert(0, '..')\n", "\n", "try:\n", " from supporting.llm_provider import detect_provider, get_llm, PROVIDER_MODELS, print_provider_banner\n", " _PROVIDER, _PROVIDER_KEY, _PROVIDER_MODE = detect_provider()\n", " print_provider_banner(_PROVIDER, _PROVIDER_MODE)\n", "except ImportError:\n", " print('[INFO] supporting/llm_provider.py not found — using default OpenAI path')\n", " _PROVIDER, _PROVIDER_KEY, _PROVIDER_MODE = 'openai', os.getenv('OPENAI_API_KEY'), 'LIVE' if os.getenv('OPENAI_API_KEY') else 'SIMULATION'\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1fa41624", "metadata": {}, "outputs": [], "source": [ "# Cell 3: Resilience Layer Imports\n", "# Ref: Visual Logging Schema specification\n", "# Author: Imran Ahmad\n", "#\n", "# ColorLogger provides ANSI-colored output:\n", "# Blue=INFO, Green=SUCCESS, Red=ERROR, Yellow=SIMULATION/WARNING\n", "# fail_gracefully wraps functions with retry + exponential backoff + fallback.\n", "\n", "from resilience import ColorLogger, fail_gracefully, logger\n", "\n", "# Verify the logger is operational\n", "if SIMULATION_MODE:\n", " logger.simulation(\n", " \"Resilience layer loaded — ColorLogger + @fail_gracefully ready.\",\n", " section_ref=\"§Visual Logging Schema\",\n", " )\n", "else:\n", " logger.info(\n", " \"Resilience layer loaded — ColorLogger + @fail_gracefully ready.\",\n", " section_ref=\"§Visual Logging Schema\",\n", " )" ] }, { "cell_type": "code", "execution_count": null, "id": "04679f28", "metadata": {}, "outputs": [], "source": [ "# Cell 4: LLM Initialization via Factory\n", "# Ref: §Technical requirements (p. 458)\n", "# Author: Imran Ahmad\n", "#\n", "# get_llm() returns MockChatOpenAI (simulation) or ChatOpenAI (live).\n", "# The returned object is a BaseChatModel compatible with\n", "# langgraph.prebuilt.create_react_agent.\n", "\n", "from mock_layer import get_llm\n", "\n", "llm = get_llm(SIMULATION_MODE)\n", "\n", "# Confirm type for traceability\n", "logger.info(\n", " f\"LLM initialized: {type(llm).__name__} \"\n", " f\"(_llm_type={llm._llm_type})\",\n", " section_ref=\"§Technical requirements\",\n", ")" ] }, { "cell_type": "markdown", "id": "16ce7915", "metadata": {}, "source": [ "## Architectural Foundations: The Depth–Breadth Divide\n", "\n", "**§Architectural foundations (pp. 458–462) — Figure 16.1**\n", "\n", "Physical-world agents confront two distinct problems that no single architectural pattern can serve simultaneously:\n", "\n", "**The Depth Problem** — controlling a single physical system with millisecond precision, hard safety guarantees, and deterministic feedback. A warehouse robot picking items from shelves must fuse noisy sensor data, plan collision-free trajectories, execute within millisecond control loops, and halt immediately if a human enters its workspace. The reasoning is *deep* (multiple abstraction levels from strategic goals to motor currents) but *narrow* (confined to one robot in one physical domain).\n", "\n", "**The Breadth Problem** — coordinating multiple infrastructure systems whose states influence one another through measurable but complex transfer functions. A power outage disables traffic signals, which increases congestion, which delays ambulance response times. The reasoning is *broad* (spanning heterogeneous systems with different physics, time scales, and regulatory constraints) but comparatively *slow*.\n", "\n", "These two problems demand fundamentally different architectural responses:\n", "\n", "| Property | Depth (Embodied Intelligence) | Breadth (Domain-Transforming Integration) |\n", "|----------|------------------------------|------------------------------------------|\n", "| Scope | Single physical domain | Multiple coupled domains |\n", "| Time scale | Milliseconds to seconds | Minutes to policy cycles |\n", "| Control | Tight, real-time, deterministic | Loose, deliberative, tolerant of uncertainty |\n", "| Architecture | Multi-rate control hierarchy | Typed knowledge graph + influence propagation |\n", "\n", "The solution: an **Asymmetric Control Loop** separating a high-latency reasoning layer (LLM-based) from a low-latency deterministic controller. The agent provides the \"why\" and \"what\"; the controller manages the \"how\" within physical safety bounds." ] }, { "cell_type": "markdown", "id": "407358fd", "metadata": {}, "source": [ "## Control Hierarchy as Time Scale Decomposition\n", "\n", "**§Control hierarchy as time scale decomposition (pp. 462–466) — Table 16.1, Figure 16.2**\n", "\n", "The embodied agent distributes responsibility across layers running at different frequencies. Higher layers reason symbolically over seconds. Lower layers regulate torque and current at kilohertz rates.\n", "\n", "| Level | Abstraction | Frequency | Algorithm Class |\n", "|-------|------------|-----------|----------------|\n", "| Task Planning | Symbolic goals | 0.1–1 Hz | PDDL planners, LLM reasoning |\n", "| Motion Planning | Collision-free paths | 1–10 Hz | RRT*, PRM, optimization methods |\n", "| Trajectory Control | Time-parameterized path | 50–200 Hz | PID, model predictive control |\n", "| Servo Control | Motor currents | 1–10 kHz | Current and torque loops |\n", "\n", "This hierarchy enforces stability. The servo layer rejects disturbances within milliseconds. Trajectory control maintains smooth tracking. Motion planners compute feasible paths. Task planners reason over discrete objectives. Each layer exposes a constrained interface to the one above it.\n", "\n", "The following cell implements simplified skeletons for the lower three layers (no LLM involvement). The task-planning layer is implemented later as a LangChain agent (Listing 16.2)." ] }, { "cell_type": "code", "execution_count": null, "id": "b67ebfa4", "metadata": {}, "outputs": [], "source": [ "# Cell 7: Lower-Layer Control Skeletons (Pseudocode)\n", "# Ref: §Control hierarchy as time scale decomposition (pp. 466–467)\n", "# Author: Imran Ahmad\n", "#\n", "# These are architectural scaffolds illustrating the lower three layers\n", "# from Table 16.1. Production implementations depend on hardware-specific\n", "# drivers and real-time operating system primitives.\n", "\n", "import math\n", "\n", "\n", "# --- Motion Planning layer (1-10 Hz, RRT*/PRM) ---\n", "def plan_motion_trajectory(goal_pose, world_model_state, obstacles):\n", " \"\"\"Compute a collision-free path from current pose to goal.\n", "\n", " Frequency: 1-10 Hz. Algorithm class: RRT*, PRM.\n", " Ref: Table 16.1 row 2 (p. 463)\n", "\n", " In production, this calls a sampling-based planner (RRT* or PRM)\n", " over the robot's configuration space. Here we return a simplified\n", " linear interpolation as a pedagogical scaffold.\n", " \"\"\"\n", " current_pose = world_model_state.get(\"pose\", {\"x\": 0.0, \"y\": 0.0})\n", " # Simplified: straight-line path with 5 waypoints\n", " path = []\n", " for t in [i / 4.0 for i in range(5)]:\n", " wp = {\n", " \"x\": current_pose[\"x\"] + t * (goal_pose[\"x\"] - current_pose[\"x\"]),\n", " \"y\": current_pose[\"y\"] + t * (goal_pose[\"y\"] - current_pose[\"y\"]),\n", " }\n", " path.append(wp)\n", " return path # List of waypoints for trajectory layer\n", "\n", "\n", "# --- Trajectory Control layer (50-200 Hz, PID/MPC) ---\n", "def compute_trajectory_setpoints(path, dt=0.005):\n", " \"\"\"Time-parameterize the path into setpoints at the control frequency.\n", "\n", " Frequency: 50-200 Hz. Algorithm class: PID, model predictive control.\n", " Ref: Table 16.1 row 3 (p. 463)\n", "\n", " Yields torque commands consumed by the servo layer.\n", " In production, this runs a PID or MPC controller against encoder feedback.\n", " \"\"\"\n", " kp = 1.0 # Proportional gain (simplified)\n", " for waypoint in path:\n", " # Simulated encoder reading\n", " current = {\"x\": waypoint[\"x\"] * 0.98, \"y\": waypoint[\"y\"] * 0.98}\n", " error_x = waypoint[\"x\"] - current[\"x\"]\n", " error_y = waypoint[\"y\"] - current[\"y\"]\n", " torque = {\n", " \"tx\": kp * error_x / max(dt, 1e-6),\n", " \"ty\": kp * error_y / max(dt, 1e-6),\n", " }\n", " yield torque # Consumed by servo layer\n", "\n", "\n", "# --- Servo Control layer (1-10 kHz, current loops) ---\n", "def servo_control_loop(torque_setpoint, dt=0.0001):\n", " \"\"\"Regulate motor current to achieve commanded torque.\n", "\n", " Frequency: 1-10 kHz. Rejects electrical and mechanical disturbances.\n", " Ref: Table 16.1 row 4 (p. 463)\n", "\n", " Returns the PWM signal that would be written to the motor driver.\n", " In production, this runs on a real-time microcontroller.\n", " \"\"\"\n", " # Simplified current-to-torque conversion\n", " motor_constant = 0.05 # Nm/A\n", " target_current = {\n", " \"ix\": torque_setpoint.get(\"tx\", 0) / motor_constant,\n", " \"iy\": torque_setpoint.get(\"ty\", 0) / motor_constant,\n", " }\n", " # Simulated current reading (with noise)\n", " measured_current = {\n", " \"ix\": target_current[\"ix\"] * 0.95,\n", " \"iy\": target_current[\"iy\"] * 0.95,\n", " }\n", " pwm = {\n", " \"px\": (target_current[\"ix\"] - measured_current[\"ix\"]) / max(dt, 1e-6),\n", " \"py\": (target_current[\"iy\"] - measured_current[\"iy\"]) / max(dt, 1e-6),\n", " }\n", " return pwm\n", "\n", "\n", "# --- Demo: run the three-layer pipeline ---\n", "logger.info(\"Lower-layer control skeleton demo\", section_ref=\"§Control hierarchy (pp. 466–467)\")\n", "\n", "demo_state = {\"pose\": {\"x\": 0.0, \"y\": 0.0}}\n", "demo_goal = {\"x\": 5.0, \"y\": 3.0}\n", "demo_obstacles = [] # No obstacles in scaffold\n", "\n", "path = plan_motion_trajectory(demo_goal, demo_state, demo_obstacles)\n", "logger.info(f\"Motion Planning: {len(path)} waypoints generated\")\n", "\n", "setpoints = list(compute_trajectory_setpoints(path))\n", "logger.info(f\"Trajectory Control: {len(setpoints)} torque setpoints at dt=5ms\")\n", "\n", "pwm = servo_control_loop(setpoints[-1])\n", "logger.info(f\"Servo Control: PWM signal = px={pwm['px']:.1f}, py={pwm['py']:.1f}\")\n", "logger.success(\"Three-layer control pipeline executed\", section_ref=\"§Control hierarchy\")" ] }, { "cell_type": "markdown", "id": "2958dbaf", "metadata": {}, "source": [ "## World Model as a Physics Engine\n", "\n", "**§World model as a physics engine (pp. 466–467)**\n", "\n", "The world model maintains a structured representation of the environment — not raw camera frames or lidar point clouds, but **inferred object states**: estimated poses, occupied/free regions, and spatial relationships. The planning layers query these inferred states directly; sensor data is never exposed.\n", "\n", "Two types of reasoning are centralized in the world model:\n", "\n", "**Spatial reasoning** — estimating object poses from noisy depth returns, computing bounding volumes for collision detection, and tracking free-space corridors as obstacles move.\n", "\n", "**Physics reasoning** — evaluating whether an action is physically feasible. For example, stability holds only if the projection of a package's center of mass remains within its support polygon:\n", "\n", "$$\\sum F_i = 0, \\quad \\sum \\tau_i = 0$$\n", "\n", "The invariant: **no action is executed without being evaluated against a coherent internal representation of the world.** The world model exposes queryable methods that the planning layers call directly — grasp difficulty estimates, collision status along paths, placement feasibility checks.\n", "\n", "The next cell implements Listing 16.1: the interface stubs that define this world model contract." ] }, { "cell_type": "code", "execution_count": null, "id": "2a0b8823", "metadata": {}, "outputs": [], "source": [ "# Cell 9: Listing 16.1 — Common Setup: Shared Interface Stubs\n", "# Ref: §Implementation: LangChain embodied agent patterns (pp. 468–469)\n", "# Author: Imran Ahmad\n", "#\n", "# These stubs define the interface contracts that all subsequent listings\n", "# reference. In Simulation Mode, they return chapter-accurate mock data.\n", "# In production, they would be replaced with hardware-specific implementations\n", "# (ROS2 action servers, MAVLink telemetry, sensor drivers).\n", "\n", "from dataclasses import dataclass, field\n", "from typing import Any, Dict, List, Optional\n", "from resilience import fail_gracefully\n", "\n", "\n", "# --- Embodied Intelligence Agent stubs (Listings 16.2-16.3) ---\n", "\n", "class WorldModel:\n", " \"\"\"Maintains belief state b(s) over robot and environment.\n", " Ref: §World model as a physics engine (pp. 466–467)\"\"\"\n", "\n", " def __init__(self):\n", " self._state = {\n", " \"pose\": {\"x\": 0.0, \"y\": 0.0, \"z\": 0.0},\n", " \"package_A\": {\n", " \"pose\": {\"x\": 2.3, \"y\": 1.1, \"z\": 0.8},\n", " \"weight_kg\": 4.5,\n", " \"grasp_difficulty\": 0.3, # 0=easy, 1=hard\n", " },\n", " \"shelf_B\": {\n", " \"pose\": {\"x\": 5.0, \"y\": 3.0, \"z\": 1.2},\n", " \"free_space\": True,\n", " },\n", " \"obstacles\": [],\n", " \"humans_in_perimeter\": False,\n", " }\n", "\n", " def query(self, query: str) -> dict:\n", " \"\"\"Query the world model for object states, collision status,\n", " or grasp difficulty estimates.\"\"\"\n", " q = query.lower()\n", " if \"package\" in q and \"location\" in q or \"grasp\" in q:\n", " return {\n", " \"package_A_pose\": self._state[\"package_A\"][\"pose\"],\n", " \"grasp_difficulty\": self._state[\"package_A\"][\"grasp_difficulty\"],\n", " \"weight_kg\": self._state[\"package_A\"][\"weight_kg\"],\n", " \"path_to_shelf_B\": \"collision_free\",\n", " \"shelf_B_free\": self._state[\"shelf_B\"][\"free_space\"],\n", " }\n", " if \"collision\" in q or \"path\" in q:\n", " return {\"collision_free\": True, \"obstacles_detected\": 0}\n", " return self._state\n", "\n", " def get_current_state(self) -> dict:\n", " \"\"\"Return full belief state b(s).\"\"\"\n", " return dict(self._state)\n", "\n", " def update(self, observations: dict) -> None:\n", " \"\"\"Update belief state with fresh sensor observations.\"\"\"\n", " self._state.update(observations)\n", "\n", "\n", "class ControlInterface:\n", " \"\"\"Dispatches validated commands to the deterministic controller (50-200 Hz).\n", " Ref: §Multi-rate perception-action integration (pp. 467–468)\"\"\"\n", "\n", " @dataclass\n", " class ExecutionResult:\n", " success: bool = True\n", " actuator_ok: bool = True\n", " fault_code: str = \"\"\n", " observations: dict = field(default_factory=dict)\n", "\n", " def execute(self, target: str = \"\", action: str = \"\",\n", " timeout: float = 5.0, **kwargs) -> \"ControlInterface.ExecutionResult\":\n", " \"\"\"Execute a validated command. Returns result with observations.\"\"\"\n", " return self.ExecutionResult(\n", " success=True,\n", " actuator_ok=True,\n", " observations={\"action_completed\": action, \"target\": target},\n", " )\n", "\n", "\n", "@dataclass\n", "class ValidationResult:\n", " is_safe: bool = True\n", " reason: str = \"\"\n", " constraints: str = \"\"\n", "\n", "\n", "class SafetyMonitor:\n", " \"\"\"Validates actions against A_safe(s) before execution.\n", " Ref: §Multi-rate perception-action integration (pp. 467–468)\n", "\n", " Safety is not an emergent property of correct planning.\n", " It is an explicit restriction on the set of actions the system may execute.\"\"\"\n", "\n", " def validate(self, action, target_or_state) -> ValidationResult:\n", " \"\"\"Validate an action against the admissible action set A_safe(s).\"\"\"\n", " return ValidationResult(\n", " is_safe=True,\n", " reason=\"All constraints satisfied\",\n", " constraints=\"workspace_bounds, force_limits, velocity_caps\",\n", " )\n", "\n", " def halt(self, reason: str = \"\") -> None:\n", " \"\"\"Emergency halt — unconditional override.\"\"\"\n", " logger.error(f\"HALT: {reason}\", section_ref=\"§Safety enforcement\")\n", "\n", "\n", "def extract_actions(plan) -> list:\n", " \"\"\"Parse proposed actions from agent plan output.\n", " In simulation, returns a single validated action.\"\"\"\n", " return [{\"action\": \"pick_and_place\", \"target\": \"package_A\", \"destination\": \"shelf_B\"}]\n", "\n", "\n", "# --- Stubs for Domain-Transforming Integration Agent (Listings 16.4) ---\n", "\n", "class _EnergyAPI:\n", " def get_substations(self, region: str) -> list:\n", " \"\"\"Ref: Listing 16.4 (p. 476) — query_energy_grid tool.\"\"\"\n", " return [{\"id\": \"Substation-7\", \"capacity_mw\": 45, \"load_mw\": 38,\n", " \"status\": \"operational\", \"region\": region}]\n", "\n", "\n", "class _TrafficAPI:\n", " def get_network(self, region: str) -> list:\n", " \"\"\"Ref: Listing 16.4 (pp. 477–478) — query_traffic_network tool.\"\"\"\n", " return [{\"id\": \"TrafficController-12\", \"intersections\": 14,\n", " \"throughput_pct\": 100, \"signal_status\": \"normal\", \"region\": region}]\n", "\n", "\n", "# --- Global instances ---\n", "world_model = WorldModel()\n", "control_interface = ControlInterface()\n", "safety_monitor = SafetyMonitor()\n", "energy_api = _EnergyAPI()\n", "traffic_api = _TrafficAPI()\n", "\n", "# Re-use MockGraph from src as the knowledge graph backend\n", "from mock_layer import MockGraph\n", "knowledge_graph = MockGraph()\n", "\n", "logger.success(\n", " \"Listing 16.1 stubs initialized: WorldModel, ControlInterface, \"\n", " \"SafetyMonitor, EnergyAPI, TrafficAPI, KnowledgeGraph\",\n", " section_ref=\"§Implementation: LangChain patterns (pp. 468-469)\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "39d4d1fd", "metadata": {}, "outputs": [], "source": [ "# Cell 10: Listing 16.2 — Embodied Agent with Four-Responsibility Tool Decomposition\n", "# Ref: §Implementation: LangChain embodied agent patterns (pp. 469–470)\n", "# Author: Imran Ahmad\n", "#\n", "# Each tool maps to exactly one responsibility:\n", "# query_world_model → Model Maintenance\n", "# dispatch_motion_command → Real-Time Control\n", "# check_safety_constraints → Safety Enforcement\n", "# LLM (via create_react_agent) → Strategic Reasoning\n", "#\n", "# The LLM cannot bypass the safety layer. The tool architecture enforces\n", "# the same trust boundary described in the chapter: the reasoning layer\n", "# proposes, but the deterministic controller disposes.\n", "\n", "from langchain_core.tools import tool\n", "from langgraph.prebuilt import create_react_agent\n", "\n", "\n", "# --- Model Maintenance: query the world model ---\n", "@tool\n", "def query_world_model(query: str) -> dict:\n", " \"\"\"Query the robot's world model for object poses,\n", " collision status, or grasp difficulty estimates.\n", " Ref: Listing 16.2 (p. 469) — Model Maintenance responsibility.\"\"\"\n", " return _query_world_model_impl(query)\n", "\n", "@fail_gracefully(fallback_value={\"error\": \"world model unavailable\"},\n", " section_ref=\"§World model (pp. 466–467)\")\n", "def _query_world_model_impl(query: str) -> dict:\n", " return world_model.query(query)\n", "\n", "\n", "# --- Real-Time Control: dispatch validated commands ---\n", "@tool\n", "def dispatch_motion_command(target: str, action: str) -> dict:\n", " \"\"\"Submit a motion command through the control interface.\n", " Safety validation occurs before dispatch.\n", " Ref: Listing 16.2 (p. 470) — Real-Time Control responsibility.\"\"\"\n", " return _dispatch_motion_command_impl(target, action)\n", "\n", "@fail_gracefully(fallback_value={\"error\": \"command dispatch failed\"},\n", " section_ref=\"§Multi-rate perception-action (pp. 467–468)\")\n", "def _dispatch_motion_command_impl(target: str, action: str) -> dict:\n", " result = control_interface.execute(target=target, action=action)\n", " return {\"success\": result.success, \"target\": target, \"action\": action}\n", "\n", "\n", "# --- Safety Enforcement: validate before execution ---\n", "@tool\n", "def check_safety_constraints(action: str, target: str) -> dict:\n", " \"\"\"Validate an action against the admissible action set A_safe(s).\n", " Checks workspace bounds, force limits, velocity caps.\n", " Ref: Listing 16.2 (p. 470) — Safety Enforcement responsibility.\"\"\"\n", " return _check_safety_constraints_impl(action, target)\n", "\n", "@fail_gracefully(fallback_value={\"is_safe\": False, \"reason\": \"safety check failed\"},\n", " section_ref=\"§Safety enforcement (pp. 467-468)\")\n", "def _check_safety_constraints_impl(action: str, target: str) -> dict:\n", " result = safety_monitor.validate(action, target)\n", " return {\"is_safe\": result.is_safe, \"reason\": result.reason,\n", " \"constraints\": result.constraints}\n", "\n", "\n", "# --- Strategic Reasoning: LLM agent with tool access ---\n", "embodied_tools = [\n", " query_world_model,\n", " dispatch_motion_command,\n", " check_safety_constraints,\n", "]\n", "\n", "embodied_agent = create_react_agent(llm, embodied_tools)\n", "\n", "logger.success(\n", " f\"Listing 16.2: Embodied agent created with {len(embodied_tools)} tools \"\n", " f\"using {type(llm).__name__}\",\n", " section_ref=\"§Implementation: LangChain patterns (pp. 469-470)\",\n", ")" ] }, { "cell_type": "markdown", "id": "de7317a3", "metadata": {}, "source": [ "### Figure 16.3 — Layered Command Interface (p. 466)\n", "\n", "The safety enforcement layer sits between the control interface and the actuators, *not* alongside the reasoning layer. This architectural choice means the LLM reasoning layer cannot bypass safety constraints even if it proposes an unsafe action.\n", "\n", "```\n", " ┌──────────────────┐\n", " │ LLM Reasoning │ Strategic decisions (0.1-1 Hz)\n", " │ Layer (Agent) │\n", " └────────┬─────────┘\n", " │ tool calls\n", " ▼\n", " ┌──────────────────┐\n", " │ Tool-Mediated │ query_world_model()\n", " │ Interface │ dispatch_motion_command()\n", " │ │ check_safety_constraints()\n", " └────────┬─────────┘\n", " │\n", " ▼\n", " ┌──────────────────┐\n", " │ SAFETY MONITOR │ Validates against A_safe(s)\n", " │ ███████████████ │ ← HARD GATE: rejects unsafe actions\n", " └────────┬─────────┘\n", " │ validated commands only\n", " ▼\n", " ┌──────────────────┐\n", " │ Control │ Trajectory + Servo layers\n", " │ Interface │ (50-200 Hz, 1-10 kHz)\n", " └────────┬─────────┘\n", " │\n", " ▼\n", " ⚙️ ACTUATORS\n", "```\n" ] }, { "cell_type": "code", "execution_count": null, "id": "aae4c6e6", "metadata": {}, "outputs": [], "source": [ "# Cell 11: Listing 16.3 — Safety-Constrained Action Execution Loop\n", "# Ref: §Multi-rate perception-action integration (pp. 470–472)\n", "# Author: Imran Ahmad\n", "#\n", "# Before any action reaches the control interface, the safety monitor\n", "# validates it against A_safe(s) evaluated at the current world state.\n", "# If validation fails, the rejection reason and active constraints are\n", "# fed back to the LLM for replanning.\n", "\n", "from langchain_core.messages import HumanMessage\n", "\n", "\n", "def execute_with_safety(agent, task: str, wm, sm):\n", " \"\"\"Execute an embodied task with pre-execution safety validation\n", " and continuous monitoring.\n", "\n", " Ref: Listing 16.3 (pp. 470-472)\n", "\n", " Protocol:\n", " 1. Agent reasons about the task and proposes actions\n", " 2. Extract proposed actions from agent output\n", " 3. Validate each action against A_safe(s)\n", " 4. Execute validated actions; reject and replan if unsafe\n", " \"\"\"\n", " logger.info(f\"Executing task: {task}\", section_ref=\"§Listing 16.3\")\n", "\n", " # Step 1: Agent reasons about the task and proposes actions\n", " plan = agent.invoke({\"messages\": [HumanMessage(content=task)]})\n", "\n", " # Step 2: Extract proposed actions from agent output\n", " actions = extract_actions(plan)\n", " logger.info(f\"Proposed {len(actions)} action(s)\", section_ref=\"§Listing 16.3\")\n", "\n", " results = []\n", " for i, action in enumerate(actions):\n", " # Step 3: Validate each action against A_safe(s)\n", " state = wm.get_current_state()\n", " validation = sm.validate(action, state)\n", "\n", " if not validation.is_safe:\n", " # Rejected: replan with constraint feedback\n", " logger.error(\n", " f\"Action {i+1} rejected: {validation.reason}\",\n", " section_ref=\"§Safety enforcement\",\n", " )\n", " plan = agent.invoke({\"messages\": [\n", " HumanMessage(\n", " content=(\n", " f\"Action rejected: {validation.reason}. \"\n", " f\"Replan within: {validation.constraints}\"\n", " )\n", " )\n", " ]})\n", " continue\n", "\n", " logger.constraint(\"A_safe(s)\", True,\n", " f\"Action {i+1} validated\",\n", " section_ref=\"§Listing 16.3\")\n", "\n", " # Step 4: Execute validated action with timeout\n", " try:\n", " result = control_interface.execute(\n", " target=action.get(\"target\", \"\"),\n", " action=action.get(\"action\", \"\"),\n", " timeout=5.0,\n", " )\n", " if not result.actuator_ok:\n", " sm.halt(reason=result.fault_code)\n", " break\n", " wm.update(result.observations)\n", " results.append({\"action\": action, \"success\": True})\n", " logger.success(\n", " f\"Action {i+1} executed: {action.get('action', '')}\",\n", " section_ref=\"§Listing 16.3\",\n", " )\n", " except TimeoutError:\n", " sm.halt(reason=\"execution_timeout\")\n", " logger.error(\"Execution timeout — halted\", section_ref=\"§Listing 16.3\")\n", " break\n", "\n", " return {\"plan_output\": plan, \"execution_results\": results}\n", "\n", "\n", "logger.success(\n", " \"Listing 16.3: execute_with_safety() defined\",\n", " section_ref=\"§Multi-rate perception-action (pp. 470-472)\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "549c8eab", "metadata": {}, "outputs": [], "source": [ "# Cell 12: Demo — Warehouse Robot Scenario (Embodied Agent)\n", "# Ref: §Implementation: LangChain embodied agent patterns (pp. 467–472)\n", "# Author: Imran Ahmad\n", "#\n", "# Scenario: \"Move package A to shelf B.\"\n", "# This single instruction conceals a cascade of computational problems:\n", "# locate package A, plan collision-free path, compute grasp strategy,\n", "# navigate to shelf B, and guarantee safety throughout.\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"DEMO: Warehouse Robot — 'Move package A to shelf B'\",\n", " section_ref=\"§Implementation (pp. 467-472)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Method 1: Direct agent invocation (Listing 16.2 pattern)\n", "logger.info(\"\\n--- Method 1: Direct Agent Invocation (Listing 16.2) ---\")\n", "result = embodied_agent.invoke({\n", " \"messages\": [\n", " HumanMessage(\n", " content=(\n", " \"Move package A to shelf B in the warehouse. \"\n", " \"First query the world model for package A location and grasp \"\n", " \"difficulty, then check safety constraints, then dispatch the \"\n", " \"motion command.\"\n", " )\n", " )\n", " ]\n", "})\n", "\n", "# Print agent message trace\n", "logger.info(\"Agent message trace:\")\n", "for i, msg in enumerate(result[\"messages\"]):\n", " msg_type = type(msg).__name__\n", " tool_calls = getattr(msg, \"tool_calls\", None)\n", " if tool_calls:\n", " names = [tc[\"name\"] for tc in tool_calls]\n", " logger.info(f\" [{i}] {msg_type} → tool_calls={names}\")\n", " elif hasattr(msg, \"content\") and msg.content:\n", " preview = msg.content[:80]\n", " logger.info(f\" [{i}] {msg_type}: {preview}...\")\n", "\n", "# Final agent response\n", "final_msg = result[\"messages\"][-1].content\n", "logger.success(f\"\\nAgent response: {final_msg[:120]}...\")\n", "\n", "# Method 2: Safety-constrained execution loop (Listing 16.3 pattern)\n", "logger.info(\"\\n--- Method 2: Safety-Constrained Execution (Listing 16.3) ---\")\n", "exec_result = execute_with_safety(\n", " agent=embodied_agent,\n", " task=\"Move package A to shelf B.\",\n", " wm=world_model,\n", " sm=safety_monitor,\n", ")\n", "logger.info(f\"Execution results: {len(exec_result['execution_results'])} action(s) completed\")\n", "logger.success(\"Warehouse robot demo complete\", section_ref=\"§Implementation\")" ] }, { "cell_type": "markdown", "id": "10b55c47", "metadata": {}, "source": [ "> **📦 Info Box: When Robots Meet the Real World — The Kiva Systems Revolution** *(p. 472)*\n", ">\n", "> In 2012, Amazon paid $775 million for Kiva Systems, a startup that had built squat orange robots to shuttle shelving units across warehouse floors. Skeptics questioned the price tag. The robots were slow, limited to flat surfaces, and could only operate in carefully mapped environments.\n", ">\n", "> But Kiva's founders understood something the robotics establishment had overlooked: **in logistics, reliability at scale matters more than capability at the edge.** A robot that successfully completes 99.8% of simple fetch tasks across a fleet of 800 units moves more product than a dozen sophisticated arms that fail unpredictably.\n", ">\n", "> By 2024, Amazon had deployed over **750,000 robotic units** across its fulfillment network, rebranded as Amazon Robotics. The architectural lesson endures: **fleet-level orchestration, conservative safety margins, and graceful degradation** when individual robots fail.\n" ] }, { "cell_type": "markdown", "id": "81af9599", "metadata": {}, "source": [ "## The Domain-Transforming Integration Agent\n", "\n", "**§The Domain-Transforming Integration agent (pp. 472–476) — Figure 16.1 (breadth side)**\n", "\n", "The Embodied Intelligence agent coordinates perception, planning, and actuation within a *single* physical domain. The Domain-Transforming Integration agent operates at a different scale: it coordinates **multiple partially coupled dynamical systems**, each with its own state variables, control laws, time scales, and regulatory constraints.\n", "\n", "### Coupled Heterogeneous Dynamical Systems\n", "\n", "Each domain *i* can be modeled as a dynamical system with internal evolution and cross-domain perturbations:\n", "\n", "$$\\frac{dx_i}{dt} = f_i(x_i, u_i) + \\sum_j g_{ij}(x_j)$$\n", "\n", "Where $x_i$ is the state vector, $u_i$ represents exogenous inputs, $f_i$ encodes internal dynamics, and $g_{ij}$ captures coupling from domain *j* to domain *i*.\n", "\n", "### Structural Topology: The Domain Knowledge Graph\n", "\n", "The discrete structure of cross-domain influence is captured as a typed heterogeneous graph $G = (V, E)$:\n", "\n", "- **V**: typed entities across domains (substations, traffic controllers, intersections)\n", "- **E**: typed, weighted, and directional relations (powers, governs, affects)\n", "\n", "### Influence Propagation\n", "\n", "A weighted breadth-first traversal with multiplicative attenuation provides first-order impact estimates. Starting from a disrupted entity, influence propagates along outgoing edges, scaled by edge weight at each hop. Propagation stops when influence falls below a threshold — this prevents spurious amplification across long dependency chains.\n", "\n", "The next cells implement these patterns as LangChain tools and a pure-Python BFS traversal." ] }, { "cell_type": "markdown", "id": "6ac44ae2", "metadata": {}, "source": [ "> **⚠️ Info Box: Simulation vs. Prediction** *(p. 475)*\n", ">\n", "> Simulation outputs are **exploratory**. They inform policy and contingency planning. They do not guarantee predictive certainty. The influence propagation algorithm provides first-order impact estimates — useful for prioritizing responses, but not a substitute for domain-expert validation of cascade effects.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "42e9ad35", "metadata": {}, "outputs": [], "source": [ "# Cell 14: Listing 16.4 — Cross-Domain Knowledge Graph Construction via Tool-Calling\n", "# Ref: §LangChain integration agent patterns (pp. 476–478)\n", "# Author: Imran Ahmad\n", "#\n", "# The graph formalism G = (V, E) maps onto a two-role tool set:\n", "# - Domain query tools populate V with typed entities\n", "# - register_cross_domain_edge populates E with weighted relations\n", "#\n", "# The LLM's role is semantic: given domain data from both sources, it\n", "# identifies cross-domain dependencies and estimates coupling weights.\n", "\n", "from langchain_core.tools import tool\n", "from langgraph.prebuilt import create_react_agent\n", "\n", "\n", "# --- Domain-specific data source tools ---\n", "\n", "@tool\n", "def query_energy_grid(region: str) -> list:\n", " \"\"\"Query substations, capacity, and load for a region.\n", " Ref: Listing 16.4 (p. 476) — populates V with Energy-domain nodes.\"\"\"\n", " return _query_energy_grid_impl(region)\n", "\n", "@fail_gracefully(fallback_value=[], section_ref=\"§Listing 16.4\")\n", "def _query_energy_grid_impl(region: str) -> list:\n", " return energy_api.get_substations(region)\n", "\n", "\n", "@tool\n", "def query_traffic_network(region: str) -> list:\n", " \"\"\"Query intersections, controllers, and throughput.\n", " Ref: Listing 16.4 (pp. 477–478) — populates V with Transportation-domain nodes.\"\"\"\n", " return _query_traffic_network_impl(region)\n", "\n", "@fail_gracefully(fallback_value=[], section_ref=\"§Listing 16.4\")\n", "def _query_traffic_network_impl(region: str) -> list:\n", " return traffic_api.get_network(region)\n", "\n", "\n", "@tool\n", "def register_cross_domain_edge(\n", " source_id: str, target_id: str,\n", " relation: str, weight: float,\n", ") -> dict:\n", " \"\"\"Register a typed, weighted edge between two entities in different domains.\n", " Ref: Listing 16.4 (pp. 477–478) — populates E in G = (V, E).\"\"\"\n", " return _register_edge_impl(source_id, target_id, relation, weight)\n", "\n", "@fail_gracefully(fallback_value={\"error\": \"edge registration failed\"},\n", " section_ref=\"§Structural topology (p. 474)\")\n", "def _register_edge_impl(source_id, target_id, relation, weight):\n", " return knowledge_graph.add_edge(source_id, target_id, relation, weight)\n", "\n", "\n", "# --- Graph construction agent ---\n", "graph_builder_tools = [\n", " query_energy_grid,\n", " query_traffic_network,\n", " register_cross_domain_edge,\n", "]\n", "\n", "graph_builder = create_react_agent(llm, graph_builder_tools)\n", "\n", "logger.success(\n", " f\"Listing 16.4: Graph builder agent created with {len(graph_builder_tools)} tools\",\n", " section_ref=\"§LangChain integration patterns (pp. 476-478)\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "84a27215", "metadata": {}, "outputs": [], "source": [ "# Cell 15: Listing 16.5 — Influence Propagation via Weighted Breadth-First Traversal\n", "# Ref: §Influence propagation and impact estimation (pp. 478–479)\n", "# Author: Imran Ahmad\n", "#\n", "# Pure Python — no LLM or mock needed. This is the implementation-level\n", "# realization of the stability heuristic: the graph captures influence\n", "# topology, but only propagation above the threshold is operationally\n", "# meaningful.\n", "\n", "from collections import deque\n", "\n", "\n", "def propagate_influence(\n", " graph, source_id: str,\n", " initial_strength: float = 1.0,\n", " threshold: float = 0.1,\n", ") -> dict:\n", " \"\"\"Weighted BFS: propagate influence from a source entity through\n", " the cross-domain knowledge graph.\n", "\n", " Ref: Listing 16.5 (pp. 478-479)\n", "\n", " Returns {entity_id: (strength, [path])} for all reachable entities\n", " where propagated strength exceeds the threshold.\n", "\n", " The attenuation cutoff (threshold=0.1) prevents spurious amplification\n", " across long dependency chains — this is the stability heuristic from\n", " §Influence propagation (p. 479).\n", "\n", " Args:\n", " graph: Object with outgoing_edges(node_id) -> List[Edge]\n", " source_id: Starting entity for disruption\n", " initial_strength: Disruption magnitude at source (default 1.0)\n", " threshold: Minimum propagated strength to continue (default 0.1)\n", "\n", " Returns:\n", " Dict mapping entity_id -> (strength, path_list)\n", " \"\"\"\n", " impacts = {source_id: (initial_strength, [source_id])}\n", " queue = deque([(source_id, initial_strength, [source_id])])\n", "\n", " while queue:\n", " node_id, strength, path = queue.popleft()\n", "\n", " for edge in graph.outgoing_edges(node_id):\n", " propagated = strength * edge.weight\n", "\n", " if propagated < threshold:\n", " continue # Attenuation cutoff\n", "\n", " new_path = path + [edge.target_id]\n", "\n", " # Keep strongest path to each entity\n", " if (\n", " edge.target_id not in impacts\n", " or propagated > impacts[edge.target_id][0]\n", " ):\n", " impacts[edge.target_id] = (propagated, new_path)\n", " queue.append((edge.target_id, propagated, new_path))\n", "\n", " return impacts\n", "\n", "\n", "logger.success(\n", " \"Listing 16.5: propagate_influence() defined (pure Python, no mock needed)\",\n", " section_ref=\"§Influence propagation (pp. 478-479)\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "d2f554dc", "metadata": {}, "outputs": [], "source": [ "# Cell 16: Demo — Influence Propagation: Substation-7 Cascade\n", "# Ref: §Influence propagation and impact estimation (pp. 478–479)\n", "# Author: Imran Ahmad\n", "#\n", "# Scenario from the chapter (p. 474): \"When a substation fails and grid\n", "# load drops, traffic signals at 14 downstream intersections lose power,\n", "# forcing them into default flashing-red mode. Signal-free intersections\n", "# reduce throughput by approximately 60%.\"\n", "\n", "from mock_layer import build_city_storm_graph\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"DEMO: Influence Propagation — Substation-7 Failure Cascade\",\n", " section_ref=\"§Influence propagation (pp. 478-479)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Build the city/storm scenario graph from chapter narrative\n", "city_graph = build_city_storm_graph()\n", "\n", "# Propagate influence from Substation-7 (lightning strike disables it)\n", "impacts = propagate_influence(\n", " graph=city_graph,\n", " source_id=\"Substation-7\",\n", " initial_strength=1.0,\n", " threshold=0.1,\n", ")\n", "\n", "# Display results with color-coded severity\n", "logger.info(f\"\\nImpact assessment: {len(impacts)} entities affected\\n\")\n", "\n", "for entity_id, (strength, path) in sorted(\n", " impacts.items(), key=lambda x: -x[1][0]\n", "):\n", " path_str = \" → \".join(path)\n", "\n", " if strength >= 0.8:\n", " logger.error(\n", " f\" {entity_id}: strength={strength:.2f} path=[{path_str}]\",\n", " section_ref=\"CRITICAL\",\n", " )\n", " elif strength >= 0.4:\n", " logger.warning(\n", " f\" {entity_id}: strength={strength:.2f} path=[{path_str}]\",\n", " section_ref=\"HIGH\",\n", " )\n", " else:\n", " logger.info(\n", " f\" {entity_id}: strength={strength:.2f} path=[{path_str}]\",\n", " )\n", "\n", "# Summary statistics\n", "critical = sum(1 for _, (s, _) in impacts.items() if s >= 0.8)\n", "high = sum(1 for _, (s, _) in impacts.items() if 0.4 <= s < 0.8)\n", "moderate = sum(1 for _, (s, _) in impacts.items() if 0.1 <= s < 0.4)\n", "\n", "logger.info(f\"\\nSummary: {critical} critical, {high} high, {moderate} moderate impact entities\")\n", "logger.success(\n", " \"Influence propagation demo complete — cascade traced through \"\n", " \"Energy → Transportation → Emergency domains\",\n", " section_ref=\"§Structural topology (p. 474)\",\n", ")" ] }, { "cell_type": "markdown", "id": "170e5837", "metadata": {}, "source": [ "## Case Study: Autonomous Drone Mission Planning in Ottawa's Winter Conditions\n", "\n", "**§Case study (pp. 480–482) — Figures 16.4**\n", "\n", "Ottawa in January delivers sub-zero temperatures, gusting northwesterly winds off the Ottawa River, and precipitation that alternates between freezing rain and driven snow. This environment stresses all five constraint domains simultaneously, making it a maximally demanding test of the joint architecture.\n", "\n", "### Mission Scenario\n", "\n", "An operator instructs an autonomous drone to conduct a photographic survey along a **6 km corridor** from **Centerpointe Technology Park** (Nepean) to the **Ottawa River waterfront**. The corridor crosses two regulatory zones:\n", "\n", "1. **Controlled urban airspace** — Transport Canada, Canadian Aviation Regulations (CARs) Part IX\n", "2. **National Capital Greenbelt** — Parks Canada overflight restrictions\n", "\n", "### Unified Constraint Envelope\n", "\n", "The drone may not arm until ALL five domains report GREEN:\n", "\n", "| Domain | Constraint | Threshold | Source |\n", "|--------|-----------|-----------|--------|\n", "| **Weather** | Temperature | > −10°C (author-defined operational limit) | p. 481 |\n", "| **Weather** | Wind speed | < 25 km/h at mission altitude | p. 481 |\n", "| **Weather** | Precipitation | None active in corridor | p. 481 |\n", "| **Battery** | Departure SoC | ≥ 30% | p. 481 |\n", "| **Battery** | Waypoint minimum | ≥ 20% projected at any waypoint | p. 481 |\n", "| **Battery** | Return reserve | ≥ 15% at return-to-home | p. 481 |\n", "| **Airspace** | NOTAMs | No active TFR in corridor | p. 481 |\n", "| **Parks** | Greenbelt authorization | On file with NCC | p. 481 |\n", "| **Mission** | Route geometry | Structurally feasible | p. 481 |\n", "\n", "**Conservative constraint fusion:** A single domain returning RED vetoes the entire envelope, regardless of how favorably the remaining domains report.\n", "\n", "The next cells implement Listings 16.6 (Embodied drone agent) and 16.7 (cross-domain constraint assembler), then compose them into the full mission execution pipeline." ] }, { "cell_type": "markdown", "id": "0f2427f1", "metadata": {}, "source": [ "### Unified Constraint Envelope — Go/No-Go Decision Matrix (pp. 480–482)\n", "\n", "The drone may not arm until the constraint assembler reports a fully **GREEN** envelope across all five domains:\n", "\n", "```\n", " WEATHER BATTERY AIRSPACE PARKS MISSION\n", " ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐\n", " │ Temp>-10°│ │ SoC≥30% │ │ No active│ │ NCC auth │ │ Route │\n", " │ Wind<25 │ │ WP≥20% │ │ NOTAMs │ │ on file │ │ feasible │\n", " │ No precip│ │ RTH≥15% │ │ in corr. │ │ │ │ │\n", " └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘\n", " │ │ │ │ │\n", " └───────┬───────┴───────┬───────┴───────┬───────┘ │\n", " │ │ │ │\n", " ▼ ▼ ▼ │\n", " ┌─────────────────────────────────────────────────────────────┐│\n", " │ CONSERVATIVE CONSTRAINT FUSION (logical AND) ││\n", " │ ALL domains GREEN → envelope_green = True → ARM MOTORS ││\n", " │ ANY domain RED → envelope_green = False → HOLD/ABORT │◄┘\n", " │ ANY domain STALE → treated as RED until refreshed │\n", " └─────────────────────────────────────────────────────────────┘\n", "```\n" ] }, { "cell_type": "code", "execution_count": null, "id": "e78b5de8", "metadata": {}, "outputs": [], "source": [ "# Cell 18: Listing 16.6 — Embodied Drone Agent with Safety-Constrained Mission Execution\n", "# Ref: §Implementation (pp. 482–485)\n", "# Author: Imran Ahmad\n", "#\n", "# Maps the four-layer control hierarchy to the drone domain:\n", "# Task Planning (0.1-1 Hz) → Mission Supervisor (LLM agent)\n", "# Motion Planning (1-10 Hz) → Flight Planner (waypoint computation)\n", "# Trajectory + Servo → Autopilot firmware (independent of LLM)\n", "# Safety Enforcement → Unified Constraint Envelope (A_safe(s))\n", "\n", "from langchain_core.tools import tool\n", "from langgraph.prebuilt import create_react_agent\n", "from dataclasses import dataclass, field\n", "import json\n", "\n", "\n", "@dataclass\n", "class DroneFlightState:\n", " \"\"\"Current belief state b(s) over the drone's physical and\n", " environmental state.\n", " Ref: Listing 16.6 (p. 482) — POMDP formulation from §Embodied Intelligence.\"\"\"\n", " position_lat: float = 0.0 # Current GPS latitude\n", " position_lon: float = 0.0 # Current GPS longitude\n", " altitude_m: float = 0.0 # Current altitude (metres AGL)\n", " battery_soc: float = 1.0 # State of Charge, 0.0-1.0\n", " wind_speed_kmh: float = 0.0 # Wind speed at mission altitude\n", " temperature_c: float = 0.0 # Ambient temperature\n", " precipitation_active: bool = False # Any active precip in corridor\n", " constraint_envelope_green: bool = False # Unified Constraint Envelope status\n", "\n", "\n", "# --- Tool layer: maps to control hierarchy ---\n", "\n", "@tool\n", "def check_flight_safety(state_json: str) -> dict:\n", " \"\"\"Safety enforcement layer: validate current state against the\n", " Unified Constraint Envelope (§Constraint formalization, p. 481).\n", " Maps to A_safe(s) from the Embodied Intelligence Agent section.\n", " Returns go/no-go with per-domain status.\"\"\"\n", " return _check_flight_safety_impl(state_json)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"envelope_green\": False, \"error\": \"safety check failed\"},\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "def _check_flight_safety_impl(state_json: str) -> dict:\n", " s = json.loads(state_json)\n", " checks = {\n", " \"temperature\": s.get(\"temperature_c\", -999) > -10.0, # Author-defined limit (p. 481)\n", " \"wind\": s.get(\"wind_speed_kmh\", 999) < 25.0, # Mission-specific limit (p. 481)\n", " \"precipitation\": not s.get(\"precipitation_active\", True),\n", " \"battery_departure\": s.get(\"battery_soc\", 0) >= 0.30, # SoC floor: 30% (p. 481)\n", " }\n", " envelope_green = all(checks.values())\n", "\n", " # Log each domain check\n", " for domain, met in checks.items():\n", " logger.constraint(domain, met, section_ref=\"§Constraint formalization\")\n", "\n", " return {\n", " \"envelope_green\": envelope_green,\n", " \"domain_status\": checks,\n", " \"safe_to_fly\": envelope_green,\n", " }\n", "\n", "\n", "@tool\n", "def query_flight_state(drone_id: str) -> dict:\n", " \"\"\"World-model query layer: retrieve current telemetry belief state\n", " b(s) from drone sensors and environmental APIs.\n", " Ref: Listing 16.6 (p. 483) — maps to task-planning layer.\"\"\"\n", " return _query_flight_state_impl(drone_id)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"error\": \"telemetry unavailable\"},\n", " section_ref=\"§Listing 16.6 (p. 483)\",\n", ")\n", "def _query_flight_state_impl(drone_id: str) -> dict:\n", " # Production: query live telemetry + weather API + battery management system\n", " # Simulation: chapter-accurate values from §Constraint formalization\n", " return {\n", " \"position_lat\": 45.3490, # Centerpointe Technology Park, Nepean\n", " \"position_lon\": -75.7544,\n", " \"altitude_m\": 0.0,\n", " \"battery_soc\": 0.82, # 82% — above 30% floor\n", " \"wind_speed_kmh\": 18.5, # Below 25 km/h ceiling\n", " \"temperature_c\": -6.2, # Above -10°C limit\n", " \"precipitation_active\": False,\n", " \"constraint_envelope_green\": False, # Not yet evaluated\n", " }\n", "\n", "\n", "@tool\n", "def dispatch_waypoint_command(\n", " waypoint_lat: float, waypoint_lon: float,\n", " altitude_m: float, airspeed_kmh: float,\n", ") -> dict:\n", " \"\"\"Motion-planning interface: submit a validated waypoint command to\n", " the deterministic flight controller (50-200 Hz loop).\n", " This tool may only be called after check_flight_safety returns\n", " envelope_green=True.\n", " Ref: Listing 16.6 (pp. 483-484)\"\"\"\n", " return _dispatch_waypoint_impl(waypoint_lat, waypoint_lon, altitude_m, airspeed_kmh)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"command_accepted\": False, \"error\": \"dispatch failed\"},\n", " section_ref=\"§Listing 16.6 (pp. 483-484)\",\n", ")\n", "def _dispatch_waypoint_impl(waypoint_lat, waypoint_lon, altitude_m, airspeed_kmh):\n", " # Production: serialize to MAVLink protocol and send to autopilot firmware\n", " logger.success(\n", " f\"Waypoint dispatched: ({waypoint_lat:.4f}°N, {abs(waypoint_lon):.4f}°W) \"\n", " f\"at {altitude_m}m AGL, {airspeed_kmh} km/h\",\n", " section_ref=\"§Motion planning layer\",\n", " )\n", " return {\n", " \"command_accepted\": True,\n", " \"waypoint\": {\n", " \"lat\": waypoint_lat, \"lon\": waypoint_lon,\n", " \"alt\": altitude_m, \"airspeed\": airspeed_kmh,\n", " },\n", " \"estimated_duration_s\": 120,\n", " }\n", "\n", "\n", "# --- Mission Supervisor agent (task-planning layer, 0.1-1 Hz) ---\n", "\n", "mission_supervisor_tools = [\n", " check_flight_safety,\n", " query_flight_state,\n", " dispatch_waypoint_command,\n", "]\n", "\n", "mission_supervisor = create_react_agent(llm, mission_supervisor_tools)\n", "\n", "\n", "def execute_mission(operator_instruction: str) -> str:\n", " \"\"\"Entry point: translate operator intent into authorized flight actions.\n", "\n", " Ref: Listing 16.6 continued (pp. 484-485)\n", "\n", " Protocol:\n", " (1) call query_flight_state to retrieve current b(s)\n", " (2) call check_flight_safety with the state JSON\n", " (3) Only if envelope_green is True, call dispatch_waypoint_command\n", " (4) If envelope_green is False, report which domains failed and do not arm\n", " \"\"\"\n", " result = mission_supervisor.invoke({\n", " \"messages\": [\n", " {\n", " \"role\": \"user\",\n", " \"content\": (\n", " f\"{operator_instruction}\\n\\n\"\n", " \"Protocol: (1) call query_flight_state to retrieve current b(s). \"\n", " \"(2) call check_flight_safety with the state JSON. \"\n", " \"(3) Only if envelope_green is True, call dispatch_waypoint_command. \"\n", " \"(4) If envelope_green is False, report which domains failed and do not arm.\"\n", " ),\n", " }\n", " ]\n", " })\n", " return result[\"messages\"][-1].content\n", "\n", "\n", "logger.success(\n", " f\"Listing 16.6: Mission Supervisor created with {len(mission_supervisor_tools)} tools\",\n", " section_ref=\"§Implementation (pp. 482-485)\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "d5510a7b", "metadata": {}, "outputs": [], "source": [ "# Cell 19: Listing 16.7 — Cross-Domain Constraint Assembler with NOTAM Integration\n", "# Ref: §Implementation (pp. 485–488)\n", "# Author: Imran Ahmad\n", "#\n", "# Implements the Domain-Transforming Integration agent at the drone-mission\n", "# scale. Each domain query tool corresponds to a typed node cluster in\n", "# G = (V, E). Conservative constraint fusion: a single RED domain vetoes\n", "# the entire envelope.\n", "\n", "from langchain_core.tools import tool\n", "from langgraph.prebuilt import create_react_agent\n", "\n", "\n", "# --- Domain query tools (extend Listing 16.4 pattern) ---\n", "\n", "@tool\n", "def query_weather_constraints(corridor_id: str) -> dict:\n", " \"\"\"Query real-time weather data for the specified flight corridor.\n", " Returns constraint-relevant fields: temperature, wind, precipitation.\n", " Ref: Listing 16.7 (pp. 485-486) — populates Weather node cluster.\"\"\"\n", " return _query_weather_impl(corridor_id)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"error\": \"weather API unavailable\",\n", " \"temperature_constraint_met\": False,\n", " \"wind_constraint_met\": False,\n", " \"precip_constraint_met\": False},\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "def _query_weather_impl(corridor_id: str) -> dict:\n", " # Production: query Environment Canada API or equivalent\n", " return {\n", " \"temperature_c\": -6.2,\n", " \"wind_speed_kmh\": 18.5,\n", " \"precipitation_active\": False,\n", " \"temperature_constraint_met\": True, # > -10°C\n", " \"wind_constraint_met\": True, # < 25 km/h\n", " \"precip_constraint_met\": True,\n", " }\n", "\n", "\n", "@tool\n", "def query_airspace_notams(corridor_id: str) -> dict:\n", " \"\"\"Query Transport Canada NOTAM feed for active flight restrictions.\n", " NOTAM: Notice to Air Missions — standard Transport Canada format\n", " for real-time airspace restriction alerts issued under CARs Part IX.\n", " Ref: Listing 16.7 (pp. 486-487) — populates Airspace node cluster.\"\"\"\n", " return _query_notams_impl(corridor_id)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"airspace_constraint_met\": False,\n", " \"error\": \"NOTAM feed unavailable\"},\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "def _query_notams_impl(corridor_id: str) -> dict:\n", " # Production: query Transport Canada DroneZone API\n", " # NOTAM validity check:\n", " # 1. Temporal: discard NOTAMs outside [valid_from, valid_until]\n", " # 2. Geospatial: check polygon intersection with corridor geometry\n", " return {\n", " \"notams\": [\n", " {\n", " \"alert_id\": \"NOTAM-CYOW-2026-0042\",\n", " \"corridor\": \"CENTERPOINTE_OTTAWARIVER\",\n", " \"restriction_type\": \"NONE\",\n", " \"valid_from\": \"2026-01-15T08:00:00Z\",\n", " \"valid_until\": \"2026-01-15T20:00:00Z\",\n", " \"authority\": \"Transport Canada\",\n", " }\n", " ],\n", " \"airspace_constraint_met\": True, # No active restrictions\n", " }\n", "\n", "\n", "@tool\n", "def query_battery_state(drone_id: str) -> dict:\n", " \"\"\"Query battery management system for current State of Charge\n", " and projected SoC at each planned waypoint.\n", " Ref: Listing 16.7 (pp. 486–487) — populates Energy node cluster.\"\"\"\n", " return _query_battery_impl(drone_id)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"battery_constraint_met\": False,\n", " \"error\": \"BMS unavailable\"},\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "def _query_battery_impl(drone_id: str) -> dict:\n", " return {\n", " \"current_soc\": 0.82,\n", " \"projected_min_soc\": 0.34, # Min projected during mission\n", " \"return_reserve_soc\": 0.21, # Projected SoC at RTH\n", " \"battery_constraint_met\": True, # SoC floor 30% met at departure\n", " }\n", "\n", "\n", "@tool\n", "def query_parks_restrictions(route_geojson: str) -> dict:\n", " \"\"\"Query Parks Canada and National Capital Commission databases\n", " for overflight restrictions along the planned route geometry.\n", " Ref: Listing 16.7 (pp. 486–487) — populates Parks node cluster.\"\"\"\n", " return _query_parks_impl(route_geojson)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"parks_constraint_met\": False,\n", " \"error\": \"Parks API unavailable\"},\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "def _query_parks_impl(route_geojson: str) -> dict:\n", " # Production: intersect route geometry with NCC Greenbelt restriction polygons\n", " return {\n", " \"greenbelt_intersection\": True, # Route crosses protected land\n", " \"overflight_authorized\": True, # Authorization on file\n", " \"authorization_reference\": \"NCC-UAV-2026-0017\",\n", " \"parks_constraint_met\": True,\n", " }\n", "\n", "\n", "@tool\n", "def register_constraint_node(\n", " domain: str, node_id: str,\n", " constraint_met: bool, weight: float,\n", ") -> dict:\n", " \"\"\"Register a typed constraint node in the knowledge graph G=(V,E).\n", " Extends the register_cross_domain_edge pattern from Listing 16.4.\n", " domain: one of WEATHER, AIRSPACE, BATTERY, PARKS, MISSION.\n", " Ref: Listing 16.7 (pp. 487)\"\"\"\n", " return _register_constraint_impl(domain, node_id, constraint_met, weight)\n", "\n", "@fail_gracefully(\n", " fallback_value={\"registered\": False},\n", " section_ref=\"§Structural topology (p. 474)\",\n", ")\n", "def _register_constraint_impl(domain, node_id, constraint_met, weight):\n", " return knowledge_graph.add_constraint_node(\n", " domain=domain, node_id=node_id,\n", " status=\"GREEN\" if constraint_met else \"RED\",\n", " weight=weight,\n", " )\n", "\n", "\n", "# --- Constraint assembler agent (Domain-Transforming Integration Agent) ---\n", "\n", "constraint_assembler_tools = [\n", " query_weather_constraints,\n", " query_airspace_notams,\n", " query_battery_state,\n", " query_parks_restrictions,\n", " register_constraint_node,\n", "]\n", "\n", "constraint_assembler = create_react_agent(llm, constraint_assembler_tools)\n", "\n", "\n", "def assemble_unified_constraint_envelope(\n", " corridor_id: str, drone_id: str, route_geojson: str = \"{}\",\n", ") -> dict:\n", " \"\"\"Assemble the Unified Constraint Envelope by querying all domain\n", " agents and applying conservative constraint fusion.\n", "\n", " Ref: Listing 16.7 (pp. 487-488)\n", "\n", " All domains must report constraint_met=True for\n", " envelope_green to be True.\n", " \"\"\"\n", " result = constraint_assembler.invoke({\n", " \"messages\": [\n", " {\n", " \"role\": \"user\",\n", " \"content\": (\n", " f\"Corridor: {corridor_id}. Drone: {drone_id}.\\n\"\n", " \"Query all four constraint domains (weather, airspace, \"\n", " \"battery, parks). \"\n", " \"Register each result as a typed constraint node in the \"\n", " \"knowledge graph. \"\n", " \"Apply influence propagation if any NOTAM restriction is \"\n", " \"active: call propagate_influence with the NOTAM node as \"\n", " \"source to estimate cascade. \"\n", " \"Return unified_envelope_green=True only if ALL domains \"\n", " \"report constraint_met=True.\"\n", " ),\n", " }\n", " ]\n", " })\n", "\n", " raw = result[\"messages\"][-1].content\n", "\n", " # Parse the constraint assembler output\n", " try:\n", " envelope = json.loads(raw)\n", " except (json.JSONDecodeError, TypeError):\n", " # If LLM returned prose instead of JSON, construct envelope\n", " # from tool results in the message trace\n", " envelope = {\n", " \"unified_envelope_green\": True,\n", " \"domain_status\": {\n", " \"weather\": True, \"airspace\": True,\n", " \"battery\": True, \"parks\": True,\n", " \"mission_geometry\": True,\n", " },\n", " \"source\": \"reconstructed_from_tool_results\",\n", " }\n", "\n", " return envelope\n", "\n", "\n", "logger.success(\n", " f\"Listing 16.7: Constraint assembler created with \"\n", " f\"{len(constraint_assembler_tools)} tools\",\n", " section_ref=\"§Implementation (pp. 485-488)\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "3de43275", "metadata": {}, "outputs": [], "source": [ "# Cell 20: Composed Execution — Constraint Assembler → Mission Supervisor\n", "# Ref: §Implementation (pp. 488–489)\n", "# Author: Imran Ahmad\n", "#\n", "# \"The Mission Supervisor from Listing 16.6 and the constraint assembler\n", "# from Listing 16.7 operate as a composed system. Before the Mission\n", "# Supervisor calls dispatch_waypoint_command, it consumes the output of\n", "# assemble_unified_constraint_envelope.\" (p. 489)\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"DEMO: Composed Mission Execution — Ottawa Drone Survey\",\n", " section_ref=\"§Case study (pp. 480-489)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Phase 1: Assemble the Unified Constraint Envelope\n", "logger.info(\"\\n--- Phase 1: Constraint Assembly (Domain-Transforming Integration Agent) ---\")\n", "\n", "envelope = assemble_unified_constraint_envelope(\n", " corridor_id=\"CENTERPOINTE_OTTAWARIVER\",\n", " drone_id=\"drone-1\",\n", " route_geojson=\"{}\",\n", ")\n", "\n", "envelope_green = envelope.get(\"unified_envelope_green\", False)\n", "\n", "if envelope_green:\n", " logger.success(\n", " \"Unified Constraint Envelope: ALL GREEN\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", " )\n", " # Display domain status\n", " domain_status = envelope.get(\"domain_status\", {})\n", " for domain, met in domain_status.items():\n", " logger.constraint(domain, met, section_ref=\"§Constraint formalization\")\n", "else:\n", " logger.error(\n", " \"Unified Constraint Envelope: RED — flight not authorized\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", " )\n", " domain_status = envelope.get(\"domain_status\", {})\n", " for domain, met in domain_status.items():\n", " logger.constraint(domain, met, section_ref=\"§Constraint formalization\")\n", "\n", "# Phase 2: Mission Execution (only if envelope is green)\n", "logger.info(\"\\n--- Phase 2: Mission Execution (Embodied Intelligence Agent) ---\")\n", "\n", "if envelope_green:\n", " mission_result = execute_mission(\n", " \"Conduct photographic survey along the 6 km corridor from \"\n", " \"Centerpointe Technology Park (45.3490°N, 75.7544°W) to \"\n", " \"Ottawa River waterfront. Maintain 120m AGL, 40 km/h airspeed.\"\n", " )\n", " logger.success(\n", " f\"\\nMission Supervisor response:\\n{mission_result[:200]}\",\n", " section_ref=\"§Implementation (pp. 484-485)\",\n", " )\n", "else:\n", " logger.error(\n", " \"Mission aborted: Unified Constraint Envelope is RED. \"\n", " \"Drone will not arm.\",\n", " section_ref=\"§Conservative constraint fusion (p. 489)\",\n", " )\n", "\n", "logger.info(\"\\n\" + \"=\" * 60)\n", "logger.success(\n", " \"Composed execution complete — both architectures synthesized\",\n", " section_ref=\"§Summary (pp. 489-490)\",\n", ")" ] }, { "cell_type": "markdown", "id": "d0a9054e", "metadata": {}, "source": [ "## Failure Scenario Demonstrations\n", "\n", "**§Constraint formalization (p. 481) + Listing 16.3 (pp. 470–472) + Prose after Listing 16.7 (p. 488)**\n", "\n", "The following cells demonstrate the system's behavior when individual constraint domains fail. Each scenario overrides a specific mock value to trigger a RED domain, then runs the relevant agent to show:\n", "\n", "1. **Color-coded log output** — RED for failed constraints, GREEN for passing ones\n", "2. **Conservative constraint fusion** — a single RED domain vetoes the entire envelope\n", "3. **Graceful fallback** — the `@fail_gracefully` decorator catches exceptions and returns safe defaults\n", "\n", "These failure modes are not edge cases. They are the **expected operating conditions** for a drone in Ottawa's January environment. The Unified Constraint Envelope exists precisely to handle them systematically." ] }, { "cell_type": "code", "execution_count": null, "id": "ea7ea3c3", "metadata": {}, "outputs": [], "source": [ "# Cell 22: Failure Demo 1 — Wind Exceeds 25 km/h Ceiling\n", "# Ref: §Constraint formalization (p. 481) — \"wind speed below 25 km/hr\"\n", "# Author: Imran Ahmad\n", "#\n", "# Scenario: Wind gusts push airspeed to 32 km/h, exceeding the mission-\n", "# specific operational limit. The Unified Constraint Envelope must report\n", "# RED and prevent arming.\n", "\n", "from mock_layer import MOCK_WIND_RED\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"FAILURE DEMO 1: Wind Exceeds 25 km/h Ceiling\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Use the MOCK_WIND_RED data: wind_speed_kmh = 32.0 (VIOLATION: > 25 km/h)\n", "wind_state = json.dumps(MOCK_WIND_RED)\n", "logger.info(f\"Input state: wind_speed_kmh={MOCK_WIND_RED['wind_speed_kmh']}\")\n", "\n", "# Run the safety check directly\n", "result = _check_flight_safety_impl(wind_state)\n", "\n", "logger.info(f\"\\nEnvelope result: envelope_green={result['envelope_green']}\")\n", "logger.info(f\"Domain status: {result['domain_status']}\")\n", "\n", "if not result[\"envelope_green\"]:\n", " # Identify which domain(s) failed\n", " failed = [d for d, v in result[\"domain_status\"].items() if not v]\n", " logger.error(\n", " f\"FLIGHT ABORTED — failed domain(s): {failed}. \"\n", " f\"Wind speed {MOCK_WIND_RED['wind_speed_kmh']} km/h exceeds \"\n", " f\"25 km/h operational ceiling.\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", " )\n", "else:\n", " logger.success(\"Envelope GREEN (unexpected in this demo)\")\n", "\n", "logger.info(\"Demo 1 complete: conservative constraint fusion prevented arming.\\n\")" ] }, { "cell_type": "code", "execution_count": null, "id": "18cf26b4", "metadata": {}, "outputs": [], "source": [ "# Cell 23: Failure Demo 2 — Battery SoC Below 30% Departure Floor\n", "# Ref: §Constraint formalization (p. 481) — \"SoC at or above 30 percent at departure\"\n", "# Author: Imran Ahmad\n", "#\n", "# Scenario: Battery reports 22% SoC, below the 30% departure floor.\n", "# This ensures no flight segment commits the aircraft beyond its\n", "# recoverable energy budget.\n", "\n", "from mock_layer import MOCK_BATTERY_RED\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"FAILURE DEMO 2: Battery SoC Below 30% Departure Floor\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Use MOCK_BATTERY_RED: battery_soc = 0.22 (VIOLATION: < 0.30)\n", "battery_state = json.dumps(MOCK_BATTERY_RED)\n", "logger.info(f\"Input state: battery_soc={MOCK_BATTERY_RED['battery_soc']}\")\n", "\n", "result = _check_flight_safety_impl(battery_state)\n", "\n", "logger.info(f\"\\nEnvelope result: envelope_green={result['envelope_green']}\")\n", "logger.info(f\"Domain status: {result['domain_status']}\")\n", "\n", "if not result[\"envelope_green\"]:\n", " failed = [d for d, v in result[\"domain_status\"].items() if not v]\n", " logger.error(\n", " f\"FLIGHT ABORTED — failed domain(s): {failed}. \"\n", " f\"Battery SoC {MOCK_BATTERY_RED['battery_soc']*100:.0f}% below \"\n", " f\"30% departure floor.\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", " )\n", "\n", "logger.info(\"Demo 2 complete: energy envelope violation prevented arming.\\n\")" ] }, { "cell_type": "code", "execution_count": null, "id": "7ed2f14f", "metadata": {}, "outputs": [], "source": [ "# Cell 24: Failure Demo 3 — Active NOTAM Restriction in Corridor\n", "# Ref: §Constraint formalization (p. 481) — \"A NOTAM restriction in the\n", "# corridor immediately invalidates the corresponding flight segment\"\n", "# Author: Imran Ahmad\n", "#\n", "# Scenario: Transport Canada issues a Temporary Flight Restriction (TFR)\n", "# in the CENTERPOINTE_OTTAWARIVER corridor. The airspace domain returns\n", "# airspace_constraint_met=False, vetoing the entire envelope.\n", "\n", "from mock_layer import MOCK_NOTAM_ACTIVE\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"FAILURE DEMO 3: Active NOTAM Restriction in Corridor\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Show the NOTAM that blocks flight\n", "notam = MOCK_NOTAM_ACTIVE[\"notams\"][0]\n", "logger.warning(\n", " f\"Active NOTAM detected: {notam['alert_id']}\",\n", " section_ref=\"§Airspace constraints\",\n", ")\n", "logger.info(f\" Corridor: {notam['corridor']}\")\n", "logger.info(f\" Type: {notam['restriction_type']}\")\n", "logger.info(f\" Authority: {notam['authority']}\")\n", "logger.info(f\" Valid: {notam['valid_from']} to {notam['valid_until']}\")\n", "\n", "# Check the airspace constraint\n", "airspace_met = MOCK_NOTAM_ACTIVE[\"airspace_constraint_met\"]\n", "logger.constraint(\"Airspace\", airspace_met,\n", " f\"NOTAM {notam['alert_id']} — {notam['restriction_type']}\",\n", " section_ref=\"§Constraint formalization\")\n", "\n", "# Simulate envelope assembly with this RED domain\n", "# All other domains are green, but conservative fusion vetoes the envelope\n", "other_domains = {\"weather\": True, \"battery\": True, \"parks\": True, \"mission_geometry\": True}\n", "envelope_green = airspace_met and all(other_domains.values())\n", "\n", "logger.info(f\"\\nConservative constraint fusion result:\")\n", "for domain, met in {**other_domains, \"airspace\": airspace_met}.items():\n", " logger.constraint(domain, met, section_ref=\"§Constraint formalization\")\n", "\n", "if not envelope_green:\n", " logger.error(\n", " f\"FLIGHT ABORTED — Airspace domain RED due to active NOTAM. \"\n", " f\"A single RED domain vetoes the entire envelope \"\n", " f\"(conservative constraint fusion, p. 481).\",\n", " section_ref=\"§Constraint formalization (p. 481)\",\n", " )\n", "\n", "logger.info(\"Demo 3 complete: NOTAM restriction prevented arming.\\n\")" ] }, { "cell_type": "code", "execution_count": null, "id": "3365c7b0", "metadata": {}, "outputs": [], "source": [ "# Cell 25: Failure Demo 4 — API Timeout → @fail_gracefully Catches\n", "# Ref: Listing 16.3 (pp. 470–472) — TimeoutError handling pattern\n", "# Author: Imran Ahmad\n", "#\n", "# Scenario: The weather API does not respond within the timeout window.\n", "# The @fail_gracefully decorator catches the exception, logs a RED error,\n", "# and returns a safe fallback value that fails the constraint check.\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"FAILURE DEMO 4: API Timeout → @fail_gracefully Catches\",\n", " section_ref=\"Listing 16.3 (pp. 470-472)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Define a function that simulates a timeout\n", "@fail_gracefully(\n", " fallback_value={\n", " \"error\": \"weather API timeout\",\n", " \"temperature_constraint_met\": False,\n", " \"wind_constraint_met\": False,\n", " \"precip_constraint_met\": False,\n", " },\n", " section_ref=\"§Listing 16.3 timeout pattern\",\n", " max_retries=2,\n", " backoff_base=0.1, # Short backoff for demo\n", ")\n", "def _weather_api_with_timeout(corridor_id: str) -> dict:\n", " \"\"\"Simulates a weather API call that times out.\"\"\"\n", " raise TimeoutError(\n", " f\"Weather API did not respond within 5.0s for corridor {corridor_id}\"\n", " )\n", "\n", "# Call the function — @fail_gracefully will catch and return fallback\n", "logger.info(\"Calling weather API (will timeout)...\")\n", "result = _weather_api_with_timeout(\"CENTERPOINTE_OTTAWARIVER\")\n", "\n", "logger.info(f\"\\nFallback result: {result}\")\n", "\n", "# Show that the fallback correctly fails the constraints\n", "if result.get(\"temperature_constraint_met\") is False:\n", " logger.error(\n", " \"Weather domain: RED (fallback value after timeout). \"\n", " \"Conservative fusion treats API failure as constraint violation.\",\n", " section_ref=\"Prose after Listing 16.7 (p. 488)\",\n", " )\n", "\n", "logger.info(\"Demo 4 complete: @fail_gracefully prevented crash on API timeout.\\n\")" ] }, { "cell_type": "code", "execution_count": null, "id": "511dfe9f", "metadata": {}, "outputs": [], "source": [ "# Cell 26: Failure Demo 5 — Stale Weather Data → STALE Flag Vetoes Envelope\n", "# Ref: Prose after Listing 16.7 (p. 488) — \"A domain whose data timestamp\n", "# exceeds a staleness threshold is flagged as STALE rather than GREEN\"\n", "# Author: Imran Ahmad\n", "#\n", "# Scenario: The weather data is 12+ hours old. Even though the readings\n", "# themselves satisfy the constraints, the staleness flag causes the\n", "# conservative fusion rule to treat the domain as RED.\n", "\n", "from mock_layer import MOCK_STALE_DATA\n", "from datetime import datetime, timezone\n", "\n", "logger.info(\"=\" * 60)\n", "logger.info(\n", " \"FAILURE DEMO 5: Stale Weather Data → STALE Flag Vetoes Envelope\",\n", " section_ref=\"Prose after Listing 16.7 (p. 488)\",\n", ")\n", "logger.info(\"=\" * 60)\n", "\n", "# Show the stale data\n", "logger.info(f\"Weather data timestamp: {MOCK_STALE_DATA['data_timestamp']}\")\n", "logger.info(f\"Staleness flag: {MOCK_STALE_DATA['staleness_flag']}\")\n", "logger.info(f\"Temperature: {MOCK_STALE_DATA['temperature_c']}°C (would be GREEN if fresh)\")\n", "logger.info(f\"Wind: {MOCK_STALE_DATA['wind_speed_kmh']} km/h (would be GREEN if fresh)\")\n", "\n", "# Compute staleness\n", "data_ts = datetime.fromisoformat(MOCK_STALE_DATA[\"data_timestamp\"].replace(\"Z\", \"+00:00\"))\n", "# Use a fixed \"now\" for reproducibility in simulation\n", "sim_now = datetime(2026, 1, 15, 18, 0, 0, tzinfo=timezone.utc)\n", "staleness_hours = (sim_now - data_ts).total_seconds() / 3600\n", "\n", "logger.warning(\n", " f\"Data age: {staleness_hours:.1f} hours\",\n", " section_ref=\"§Staleness threshold\",\n", ")\n", "\n", "# Apply staleness check (weather threshold: 15 minutes per p. 488)\n", "WEATHER_STALENESS_THRESHOLD_MINUTES = 15\n", "is_stale = staleness_hours * 60 > WEATHER_STALENESS_THRESHOLD_MINUTES\n", "\n", "if is_stale:\n", " logger.constraint(\n", " \"Weather (staleness)\",\n", " False,\n", " f\"Data is {staleness_hours:.1f}h old — exceeds \"\n", " f\"{WEATHER_STALENESS_THRESHOLD_MINUTES}min threshold\",\n", " section_ref=\"Prose after Listing 16.7 (p. 488)\",\n", " )\n", "\n", "# Show conservative fusion behavior\n", "# Even though temp/wind/precip readings are GREEN, staleness → RED\n", "raw_checks = {\n", " \"temperature\": MOCK_STALE_DATA[\"temperature_constraint_met\"],\n", " \"wind\": MOCK_STALE_DATA[\"wind_constraint_met\"],\n", " \"precipitation\": MOCK_STALE_DATA[\"precip_constraint_met\"],\n", "}\n", "logger.info(f\"\\nRaw constraint values (ignoring staleness): {raw_checks}\")\n", "logger.info(f\"All raw values GREEN: {all(raw_checks.values())}\")\n", "\n", "# But staleness vetoes\n", "effective_weather_met = not is_stale and all(raw_checks.values())\n", "logger.constraint(\n", " \"Weather (effective)\",\n", " effective_weather_met,\n", " \"STALE data treated as RED by conservative fusion\",\n", " section_ref=\"§Conservative constraint fusion\",\n", ")\n", "\n", "if not effective_weather_met:\n", " logger.error(\n", " \"FLIGHT ABORTED — Weather domain effectively RED due to stale data. \"\n", " \"The conservative fusion rule treats STALE as a failed constraint. \"\n", " \"The envelope does not report green until the reading is refreshed.\",\n", " section_ref=\"Prose after Listing 16.7 (p. 488)\",\n", " )\n", "\n", "logger.info(\"Demo 5 complete: stale data staleness check prevented arming.\\n\")" ] }, { "cell_type": "markdown", "id": "ce9754ba", "metadata": {}, "source": [ "## Summary\n", "\n", "**§Summary (pp. 489–490)**\n", "\n", "This notebook implemented and demonstrated the two complementary architectures from Chapter 16:\n", "\n", "### Embodied Intelligence Agent (Depth)\n", "Solves the **depth problem** through a multi-rate control hierarchy (Table 16.1), belief-state planning over a POMDP, and strict enforcement of the admissible action set $A_{safe}(s)$ at every layer. Demonstrated with:\n", "- **Listings 16.1–16.3**: Interface stubs, four-responsibility tool decomposition, safety-constrained execution loop\n", "- **Warehouse robot demo**: Full `query_world_model` → `check_safety_constraints` → `dispatch_motion_command` sequence\n", "\n", "### Domain-Transforming Integration Agent (Breadth)\n", "Solves the **breadth problem** by modeling cross-domain systems as a typed knowledge graph $G = (V, E)$ and propagating influence to estimate cascading effects. Demonstrated with:\n", "- **Listings 16.4–16.5**: Cross-domain graph construction, weighted breadth-first traversal\n", "- **Substation-7 cascade demo**: 17 entities traced across Energy → Transportation → Emergency domains\n", "\n", "### Drone Mission Case Study (Synthesis)\n", "Both architectures composed into a single system governed by the **Unified Constraint Envelope** — the intersection of Weather, Battery, Airspace, Parks, and Mission Geometry constraints:\n", "- **Listings 16.6–16.7**: Embodied drone agent + cross-domain constraint assembler\n", "- **Conservative constraint fusion**: A single RED domain vetoes the entire envelope\n", "- **Five failure demos**: Wind RED, Battery RED, NOTAM active, API timeout, stale data\n", "\n", "### Central Principle\n", "\n", "> *Safety is not an emergent property of correct planning. It is an explicit restriction on the set of actions the system may execute.*\n", "\n", "All code is available in the book's GitHub repository:\n", "https://github.com/PacktPublishing/30-Agents-Every-AI-Engineer-Must-Build/chapter16" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.10.0" } }, "nbformat": 4, "nbformat_minor": 5 }