"""Claude SDK Client for interacting with Claude Code.""" import json import os from collections.abc import AsyncIterable, AsyncIterator from dataclasses import asdict, replace from typing import TYPE_CHECKING, Any from . import Transport from ._errors import CLIConnectionError if TYPE_CHECKING: from ._internal.session_resume import MaterializedResume from .types import ( ClaudeAgentOptions, ContextUsageResponse, HookEvent, HookMatcher, McpStatusResponse, Message, PermissionMode, ResultMessage, ) class ClaudeSDKClient: """ Client for bidirectional, interactive conversations with Claude Code. This client provides full control over the conversation flow with support for streaming, interrupts, and dynamic message sending. For simple one-shot queries, consider using the query() function instead. Key features: - **Bidirectional**: Send and receive messages at any time - **Stateful**: Maintains conversation context across messages - **Interactive**: Send follow-ups based on responses - **Control flow**: Support for interrupts and session management When to use ClaudeSDKClient: - Building chat interfaces or conversational UIs - Interactive debugging or exploration sessions - Multi-turn conversations with context - When you need to react to Claude's responses - Real-time applications with user input - When you need interrupt capabilities When to use query() instead: - Simple one-off questions - Batch processing of prompts - Fire-and-forget automation scripts - When all inputs are known upfront - Stateless operations See examples/streaming_mode.py for full examples of ClaudeSDKClient in different scenarios. Caveat: As of v0.0.20, you cannot use a ClaudeSDKClient instance across different async runtime contexts (e.g., different trio nurseries or asyncio task groups). The client internally maintains a persistent anyio task group for reading messages that remains active from connect() until disconnect(). This means you must complete all operations with the client within the same async context where it was connected. Ideally, this limitation should not exist. """ def __init__( self, options: ClaudeAgentOptions | None = None, transport: Transport | None = None, ): """Initialize Claude SDK client.""" if options is None: options = ClaudeAgentOptions() self.options = options self._custom_transport = transport self._transport: Transport | None = None self._query: Any | None = None self._materialized: MaterializedResume | None = None def _convert_hooks_to_internal_format( self, hooks: dict[HookEvent, list[HookMatcher]] ) -> dict[str, list[dict[str, Any]]]: """Convert HookMatcher format to internal Query format.""" internal_hooks: dict[str, list[dict[str, Any]]] = {} for event, matchers in hooks.items(): internal_hooks[event] = [] for matcher in matchers: # Convert HookMatcher to internal dict format internal_matcher: dict[str, Any] = { "matcher": matcher.matcher if hasattr(matcher, "matcher") else None, "hooks": matcher.hooks if hasattr(matcher, "hooks") else [], } if hasattr(matcher, "timeout") and matcher.timeout is not None: internal_matcher["timeout"] = matcher.timeout internal_hooks[event].append(internal_matcher) return internal_hooks async def connect( self, prompt: str | AsyncIterable[dict[str, Any]] | None = None ) -> None: """Connect to Claude with a prompt or message stream.""" from ._internal.session_resume import materialize_resume_session from ._internal.session_store_validation import validate_session_store_options # Auto-connect with empty async iterable if no prompt is provided async def _empty_stream() -> AsyncIterator[dict[str, Any]]: # Never yields, but indicates that this function is an iterator and # keeps the connection open. # This yield is never reached but makes this an async generator return yield {} # type: ignore[unreachable] # String prompts are sent via transport.write() below, so the transport # only needs an AsyncIterable (or an empty stream for None/str cases). actual_prompt = prompt if isinstance(prompt, AsyncIterable) else _empty_stream() # Fail fast on invalid session_store option combinations before # spawning the subprocess. validate_session_store_options(self.options) # resume/continue + session_store: load the session from the store # into a temp CLAUDE_CONFIG_DIR for the subprocess to resume from. # When materialized, override resume/continue/env on a copy of options # so the subprocess points at the temp dir; when None, fall through # to normal handling (fresh session or local-disk resume). Skipped # when a custom transport was supplied — the materialized options # never reach a pre-constructed transport, so loading the store and # writing .credentials.json to a temp dir would be wasted work. self._materialized = ( await materialize_resume_session(self.options) if self._custom_transport is None else None ) try: await self._connect_inner(prompt, actual_prompt) except BaseException: # If connect fails after the subprocess has spawned (e.g. at # query.initialize()), close the subprocess/read task *before* # removing the temp CLAUDE_CONFIG_DIR it points at. disconnect() # already orders close() → cleanup() and is None-safe for # pre-spawn failures, so reuse it here. await self.disconnect() raise async def _connect_inner( self, prompt: str | AsyncIterable[dict[str, Any]] | None, actual_prompt: AsyncIterable[dict[str, Any]], ) -> None: from ._internal.query import Query from ._internal.session_resume import ( apply_materialized_options, build_mirror_batcher, ) from ._internal.transport.subprocess_cli import SubprocessCLITransport # Validate and configure permission settings (matching TypeScript SDK logic) if self.options.can_use_tool: # canUseTool callback requires streaming mode (AsyncIterable prompt) if isinstance(prompt, str): raise ValueError( "can_use_tool callback requires streaming mode. " "Please provide prompt as an AsyncIterable instead of a string." ) # canUseTool and permission_prompt_tool_name are mutually exclusive if self.options.permission_prompt_tool_name: raise ValueError( "can_use_tool callback cannot be used with permission_prompt_tool_name. " "Please use one or the other." ) # Automatically set permission_prompt_tool_name to "stdio" for control protocol options = replace(self.options, permission_prompt_tool_name="stdio") else: options = self.options if self._materialized is not None: options = apply_materialized_options(options, self._materialized) # Use provided custom transport or create subprocess transport if self._custom_transport: self._transport = self._custom_transport else: self._transport = SubprocessCLITransport( prompt=actual_prompt, options=options, ) await self._transport.connect() # Extract SDK MCP servers from options sdk_mcp_servers = {} if self.options.mcp_servers and isinstance(self.options.mcp_servers, dict): for name, config in self.options.mcp_servers.items(): if isinstance(config, dict) and config.get("type") == "sdk": sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] # Calculate initialize timeout from CLAUDE_CODE_STREAM_CLOSE_TIMEOUT env var if set # CLAUDE_CODE_STREAM_CLOSE_TIMEOUT is in milliseconds, convert to seconds initialize_timeout_ms = int( os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000") ) initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0) # Extract exclude_dynamic_sections from preset system prompt for the # initialize request (older CLIs ignore unknown initialize fields). exclude_dynamic_sections: bool | None = None sp = self.options.system_prompt if isinstance(sp, dict) and sp.get("type") == "preset": eds = sp.get("exclude_dynamic_sections") if isinstance(eds, bool): exclude_dynamic_sections = eds # Convert agents to dict format for initialize request agents_dict: dict[str, dict[str, Any]] | None = None if self.options.agents: agents_dict = { name: {k: v for k, v in asdict(agent_def).items() if v is not None} for name, agent_def in self.options.agents.items() } # Create Query to handle control protocol self._query = Query( transport=self._transport, is_streaming_mode=True, # ClaudeSDKClient always uses streaming mode can_use_tool=self.options.can_use_tool, hooks=self._convert_hooks_to_internal_format(self.options.hooks) if self.options.hooks else None, sdk_mcp_servers=sdk_mcp_servers, initialize_timeout=initialize_timeout, agents=agents_dict, exclude_dynamic_sections=exclude_dynamic_sections, skills=self.options.skills, ) if self.options.session_store is not None: q = self._query async def _on_mirror_error(key: Any, error: str) -> None: q.report_mirror_error(key, error) self._query.set_transcript_mirror_batcher( build_mirror_batcher( store=self.options.session_store, materialized=self._materialized, env=self.options.env, on_error=_on_mirror_error, ) ) # Start reading messages and initialize await self._query.start() await self._query.initialize() # If we have an initial prompt, send it if isinstance(prompt, str): message = { "type": "user", "message": {"role": "user", "content": prompt}, "parent_tool_use_id": None, "session_id": "default", } await self._transport.write(json.dumps(message) + "\n") elif prompt is not None and isinstance(prompt, AsyncIterable): self._query.spawn_task(self._query.stream_input(prompt)) async def receive_messages(self) -> AsyncIterator[Message]: """Receive all messages from Claude.""" if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") from ._internal.message_parser import parse_message async for data in self._query.receive_messages(): message = parse_message(data) if message is not None: yield message async def query( self, prompt: str | AsyncIterable[dict[str, Any]], session_id: str = "default" ) -> None: """ Send a new request in streaming mode. Args: prompt: Either a string message or an async iterable of message dictionaries session_id: Session identifier for the conversation """ if not self._query or not self._transport: raise CLIConnectionError("Not connected. Call connect() first.") # Handle string prompts if isinstance(prompt, str): message = { "type": "user", "message": {"role": "user", "content": prompt}, "parent_tool_use_id": None, "session_id": session_id, } await self._transport.write(json.dumps(message) + "\n") else: # Handle AsyncIterable prompts - stream them async for msg in prompt: # Ensure session_id is set on each message if "session_id" not in msg: msg["session_id"] = session_id await self._transport.write(json.dumps(msg) + "\n") async def interrupt(self) -> None: """Send interrupt signal (only works with streaming mode).""" if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") await self._query.interrupt() async def set_permission_mode(self, mode: PermissionMode) -> None: """Change permission mode during conversation (only works with streaming mode). Args: mode: The permission mode to set. Valid options: - 'default': CLI prompts for dangerous tools - 'acceptEdits': Auto-accept file edits - 'plan': Plan-only mode (no tool execution) - 'bypassPermissions': Allow all tools (use with caution) - 'dontAsk': Deny anything not pre-approved by allow rules - 'auto': A model classifier approves or denies each tool call Example: ```python async with ClaudeSDKClient() as client: # Start with default permissions await client.query("Help me analyze this codebase") # Review mode done, switch to auto-accept edits await client.set_permission_mode('acceptEdits') await client.query("Now implement the fix we discussed") ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") await self._query.set_permission_mode(mode) async def set_model(self, model: str | None = None) -> None: """Change the AI model during conversation (only works with streaming mode). Args: model: The model to use, or None to use default. Examples: - 'claude-sonnet-4-5' - 'claude-opus-4-1-20250805' - 'claude-opus-4-20250514' Example: ```python async with ClaudeSDKClient() as client: # Start with default model await client.query("Help me understand this problem") # Switch to a different model for implementation await client.set_model('claude-sonnet-4-5') await client.query("Now implement the solution") ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") await self._query.set_model(model) async def rewind_files(self, user_message_id: str) -> None: """Rewind tracked files to their state at a specific user message. Requires: - `enable_file_checkpointing=True` to track file changes - `extra_args={"replay-user-messages": None}` to receive UserMessage objects with `uuid` in the response stream Args: user_message_id: UUID of the user message to rewind to. This should be the `uuid` field from a `UserMessage` received during the conversation. Example: ```python options = ClaudeAgentOptions( enable_file_checkpointing=True, extra_args={"replay-user-messages": None}, ) async with ClaudeSDKClient(options) as client: await client.query("Make some changes to my files") async for msg in client.receive_response(): if isinstance(msg, UserMessage) and msg.uuid: checkpoint_id = msg.uuid # Save this for later # Later, rewind to that point await client.rewind_files(checkpoint_id) ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") await self._query.rewind_files(user_message_id) async def reconnect_mcp_server(self, server_name: str) -> None: """Reconnect a disconnected or failed MCP server (only works with streaming mode). Use this to retry connecting to an MCP server that failed to connect or was disconnected. Raises an exception if the reconnection fails. Args: server_name: The name of the MCP server to reconnect Example: ```python async with ClaudeSDKClient(options) as client: status = await client.get_mcp_status() for server in status.get("mcpServers", []): if server["status"] == "failed": await client.reconnect_mcp_server(server["name"]) ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") await self._query.reconnect_mcp_server(server_name) async def toggle_mcp_server(self, server_name: str, enabled: bool) -> None: """Enable or disable an MCP server (only works with streaming mode). Disabling a server disconnects it and removes its tools from the available tool set. Enabling a server reconnects it and makes its tools available again. Raises an exception on failure. Args: server_name: The name of the MCP server to toggle enabled: True to enable the server, False to disable it Example: ```python async with ClaudeSDKClient(options) as client: # Temporarily disable a server await client.toggle_mcp_server("my-server", enabled=False) await client.query("Do something without my-server tools") # Re-enable it later await client.toggle_mcp_server("my-server", enabled=True) ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") await self._query.toggle_mcp_server(server_name, enabled) async def stop_task(self, task_id: str) -> None: """Stop a running task (only works with streaming mode). After this resolves, a `task_notification` system message with status `'stopped'` will be emitted by the CLI in the message stream. Args: task_id: The task ID from `task_notification` events. Example: ```python async with ClaudeSDKClient() as client: await client.query("Start a long-running task") # Listen for task_notification to get task_id, then: await client.stop_task("task-abc123") # A task_notification with status 'stopped' will follow ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") await self._query.stop_task(task_id) async def get_mcp_status(self) -> McpStatusResponse: """Get current MCP server connection status (only works with streaming mode). Queries the Claude Code CLI for the live connection status of all configured MCP servers. Returns: McpStatusResponse dictionary with an 'mcpServers' key containing a list of McpServerStatus entries. Each entry includes: - 'name': Server name (str) - 'status': Connection status ('connected', 'pending', 'failed', 'needs-auth', 'disabled') - 'serverInfo': MCP server name/version (when connected) - 'error': Error message (when status is 'failed') - 'config': Server configuration (stdio/sse/http/sdk/claudeai-proxy) - 'scope': Configuration scope (e.g., project, user, local) - 'tools': List of tools provided by the server (when connected) Example: ```python async with ClaudeSDKClient(options) as client: status = await client.get_mcp_status() for server in status["mcpServers"]: print(f"{server['name']}: {server['status']}") if server["status"] == "failed": print(f" Error: {server.get('error')}") ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") result: McpStatusResponse = await self._query.get_mcp_status() return result async def get_context_usage(self) -> ContextUsageResponse: """Get a breakdown of current context window usage by category. Returns the same data shown by the `/context` command in the CLI, including token counts per category, total usage, and detailed breakdowns of MCP tools, memory files, and agents. Returns: ContextUsageResponse dictionary with keys including: - 'categories': List of categories with name, tokens, color - 'totalTokens': Total tokens in context - 'maxTokens': Effective context limit - 'percentage': Percent of context used (0-100) - 'model': Model the usage is calculated for - 'mcpTools': Per-tool token breakdown for MCP servers - 'memoryFiles': Per-file token breakdown for CLAUDE.md files - 'agents': Per-agent token breakdown Example: ```python async with ClaudeSDKClient() as client: await client.query("Read this file") async for _ in client.receive_response(): pass usage = await client.get_context_usage() print(f"Using {usage['percentage']:.1f}% of context") for cat in usage['categories']: print(f" {cat['name']}: {cat['tokens']} tokens") ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") result: ContextUsageResponse = await self._query.get_context_usage() return result async def get_server_info(self) -> dict[str, Any] | None: """Get server initialization info including available commands and output styles. Returns initialization information from the Claude Code server including: - Available commands (slash commands, system commands, etc.) - Current and available output styles - Server capabilities Returns: Dictionary with server info, or None if not in streaming mode Example: ```python async with ClaudeSDKClient() as client: info = await client.get_server_info() if info: print(f"Commands available: {len(info.get('commands', []))}") print(f"Output style: {info.get('output_style', 'default')}") ``` """ if not self._query: raise CLIConnectionError("Not connected. Call connect() first.") # Return the initialization result that was already obtained during connect return getattr(self._query, "_initialization_result", None) async def receive_response(self) -> AsyncIterator[Message]: """ Receive messages from Claude until and including a ResultMessage. This async iterator yields all messages in sequence and automatically terminates after yielding a ResultMessage (which indicates the response is complete). It's a convenience method over receive_messages() for single-response workflows. **Stopping Behavior:** - Yields each message as it's received - Terminates immediately after yielding a ResultMessage - The ResultMessage IS included in the yielded messages - If no ResultMessage is received, the iterator continues indefinitely Yields: Message: Each message received (UserMessage, AssistantMessage, SystemMessage, ResultMessage) Example: ```python async with ClaudeSDKClient() as client: await client.query("What's the capital of France?") async for msg in client.receive_response(): if isinstance(msg, AssistantMessage): for block in msg.content: if isinstance(block, TextBlock): print(f"Claude: {block.text}") elif isinstance(msg, ResultMessage): print(f"Cost: ${msg.total_cost_usd:.4f}") # Iterator will terminate after this message ``` Note: To collect all messages: `messages = [msg async for msg in client.receive_response()]` The final message in the list will always be a ResultMessage. """ async for message in self.receive_messages(): yield message if isinstance(message, ResultMessage): return async def disconnect(self) -> None: """Disconnect from Claude.""" if self._query: await self._query.close() self._query = None self._transport = None if self._materialized is not None: await self._materialized.cleanup() self._materialized = None async def __aenter__(self) -> "ClaudeSDKClient": """Enter async context - automatically connects with empty stream for interactive use.""" await self.connect() return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: """Exit async context - always disconnects.""" await self.disconnect() return False