mcp.clientΒΆ

MCP Client Implementation Package.

This package provides native MCP protocol client implementation for connecting to and communicating with MCP servers according to the Model Context Protocol specification.

Key ComponentsΒΆ

Core Classes
Transport Support
  • STDIO: Communication via stdin/stdout

  • HTTP: RESTful communication

  • SSE: Server-sent events

  • WebSocket: Real-time bidirectional communication

Usage ExamplesΒΆ

Basic ConnectionΒΆ

from haive.mcp.client import MCPClient, StdioTransport

# Create transport and client
transport = StdioTransport(
    command="npx",
    args=["-y", "@modelcontextprotocol/server-filesystem"]
)
client = MCPClient(transport)

# Connect and use
await client.connect()
tools = await client.list_tools()
result = await client.call_tool("read_file", {"path": "/etc/hosts"})
await client.disconnect()

Context Manager UsageΒΆ

async with MCPClient(transport) as client:
    tools = await client.list_tools()
    result = await client.call_tool("tool_name", args)

Available ClassesΒΆ

Transport ClassesΒΆ

Exception ClassesΒΆ

Note

This is a native implementation of the MCP protocol, designed to work with any MCP-compliant server. It handles the full protocol lifecycle including initialization, capability discovery, and tool execution.

SubmodulesΒΆ

ExceptionsΒΆ

MCPAuthenticationError

Authentication failure with MCP server.

MCPCapabilityError

Error related to MCP capabilities.

MCPConnectionError

Error during MCP connection establishment or management.

MCPError

Base exception for all MCP-related errors.

MCPProtocolError

Error in MCP protocol communication.

MCPTimeoutError

Timeout during MCP operation.

MCPToolError

Error during tool execution.

MCPTransportError

Error in the transport layer.

ClassesΒΆ

HttpTransport

HTTP transport for MCP communication.

MCPClient

High-level MCP client for communicating with MCP servers.

MCPConnection

MCP connection manager with health monitoring and auto-reconnection.

MCPProtocol

MCP protocol implementation.

MCPTransport

Abstract base class for MCP transports.

SseTransport

Server-Sent Events transport for MCP communication.

StdioTransport

STDIO transport for MCP communication.

WebSocketTransport

WebSocket transport for MCP communication.

Package ContentsΒΆ

exception mcp.client.MCPAuthenticationError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: MCPError

Authentication failure with MCP server.

Raised when authentication is required but fails, or when authorization is denied for a specific operation. This includes token validation failures and permission issues.

Examples

Invalid token:

raise MCPAuthenticationError(
    "Invalid authentication token",
    error_code=401
)

Insufficient permissions:

raise MCPAuthenticationError(
    "Insufficient permissions for tool execution",
    details={"tool": "sensitive_operation", "required_scope": "admin"}
)
exception mcp.client.MCPCapabilityError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: MCPProtocolError

Error related to MCP capabilities.

Raised when there are issues with capability negotiation, unsupported capabilities, or capability validation failures. This indicates a mismatch between client requirements and server capabilities.

Examples

Unsupported capability:

raise MCPCapabilityError(
    "Server doesn't support required capability",
    details={"required": "tools", "supported": ["logging"]}
)
exception mcp.client.MCPConnectionError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: MCPError

Error during MCP connection establishment or management.

Raised when there are issues connecting to, maintaining, or disconnecting from an MCP server. This includes transport-level failures, timeout issues, and connection state problems.

Examples

Connection timeout:

raise MCPConnectionError("Connection timeout after 30s")

Transport failure:

raise MCPConnectionError(
    "Failed to start server process",
    details={"command": ["npx", "server"], "exit_code": 1}
)
exception mcp.client.MCPError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: Exception

Base exception for all MCP-related errors.

This is the root exception class for all MCP client errors. It provides a consistent interface for error handling and includes optional error details and context information.

Parameters:
  • message – Human-readable error description

  • error_code – MCP protocol error code (if applicable)

  • details – Additional error details or context

Examples

Basic error:

raise MCPError("Connection failed")

With error code:

raise MCPError("Invalid request", error_code=-32600)

With details:

raise MCPError(
    "Tool execution failed",
    details={"tool": "filesystem", "args": {...}}
)
__str__() strΒΆ

Return formatted error message.

detailsΒΆ
error_code = NoneΒΆ
messageΒΆ
exception mcp.client.MCPProtocolError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: MCPError

Error in MCP protocol communication.

Raised when there are protocol-level issues such as invalid messages, unsupported protocol versions, or malformed responses. This indicates a problem with the MCP protocol implementation or server compliance.

Examples

Protocol version mismatch:

raise MCPProtocolError(
    "Unsupported protocol version",
    details={"client": "1.0", "server": "0.9"}
)

Invalid message format:

raise MCPProtocolError(
    "Malformed JSON-RPC message",
    error_code=-32700,
    details={"raw_message": "invalid json"}
)
exception mcp.client.MCPTimeoutError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: MCPConnectionError

Timeout during MCP operation.

Raised when an MCP operation (connection, tool call, etc.) exceeds the configured timeout period. This is a specialized connection error that specifically indicates timing issues.

Parameters:
  • operation – Name of the operation that timed out

  • timeout – Timeout value that was exceeded

Examples

Tool execution timeout:

raise MCPTimeoutError(
    "Tool call timed out",
    details={"operation": "call_tool", "timeout": 30.0}
)
exception mcp.client.MCPToolError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: MCPError

Error during tool execution.

Raised when there are issues executing tools on the MCP server. This includes tool not found errors, invalid arguments, and execution failures.

Examples

Tool not found:

raise MCPToolError(
    "Tool not found",
    details={"tool_name": "nonexistent_tool"}
)

Invalid arguments:

raise MCPToolError(
    "Invalid tool arguments",
    details={"tool": "read_file", "error": "path is required"}
)
exception mcp.client.MCPTransportError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ

Bases: MCPConnectionError

Error in the transport layer.

Raised when there are issues with the underlying transport mechanism (STDIO, HTTP, WebSocket, etc.). This includes process management failures, network issues, and transport-specific problems.

Examples

Process died unexpectedly:

raise MCPTransportError(
    "Server process died",
    details={"pid": 12345, "exit_code": -9}
)

Network error:

raise MCPTransportError(
    "HTTP connection failed",
    details={"url": "http://localhost:8080", "status": 500}
)
class mcp.client.HttpTransport(url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ

Bases: MCPTransport

HTTP transport for MCP communication.

This transport communicates with an MCP server over HTTP using JSON-RPC over HTTP POST requests. This is useful for servers that expose HTTP APIs or for remote MCP servers.

Examples

Basic HTTP transport:

transport = HttpTransport("http://localhost:8080/mcp")

With authentication:

transport = HttpTransport(
    "https://api.example.com/mcp",
    headers={"Authorization": "Bearer token123"}
)
async connect() NoneΒΆ

Create HTTP session.

async disconnect() NoneΒΆ

Close HTTP session.

async receive_message() Dict[str, Any]ΒΆ

Send pending message and receive response.

async send_message(message: Dict[str, Any]) NoneΒΆ

Send JSON-RPC message via HTTP POST.

Note: For HTTP transport, sending and receiving are combined in a request-response cycle. This method stores the message for the next receive_message call.

headersΒΆ
session: aiohttp.ClientSession | None = NoneΒΆ
urlΒΆ
class mcp.client.MCPClient(transport: mcp.client.transport.MCPTransport, timeout: float = 30.0, client_info: Dict[str, Any] | None = None, auto_reconnect: bool = False, max_reconnect_attempts: int = 3)ΒΆ

High-level MCP client for communicating with MCP servers.

This is the main interface for interacting with MCP servers. It combines the transport and protocol layers to provide a clean, easy-to-use API for MCP operations.

The client handles the complete MCP lifecycle:
  1. Connection establishment

  2. Capability negotiation

  3. Tool/resource/prompt discovery

  4. Operation execution

  5. Connection cleanup

It supports multiple transport types and provides both sync-style and async context manager interfaces.

Examples

Basic usage with STDIO transport:

from haive.mcp.client import MCPClient, StdioTransport

transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"])
client = MCPClient(transport)

await client.connect()
tools = await client.list_tools()
result = await client.call_tool("read_file", {"path": "/etc/hosts"})
await client.disconnect()

Using context manager (recommended):

async with MCPClient(transport) as client:
    tools = await client.list_tools()
    for tool in tools:
        print(f"Available tool: {tool.name}")

    result = await client.call_tool("tool_name", {"arg": "value"})

With HTTP transport:

from haive.mcp.client import HttpTransport

transport = HttpTransport("http://localhost:8080/mcp")
async with MCPClient(transport) as client:
    resources = await client.list_resources()
    content = await client.read_resource("file://config.json")

With notification handling:

def on_tool_list_changed(params):
    print("Tool list updated!")

client.add_notification_handler("tools/list_changed", on_tool_list_changed)

Error handling:

try:
    async with MCPClient(transport) as client:
        result = await client.call_tool("nonexistent", {})
except MCPToolError as e:
    print(f"Tool error: {e}")
except MCPConnectionError as e:
    print(f"Connection error: {e}")
async __aenter__()ΒΆ

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)ΒΆ

Async context manager exit.

add_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ

Add a handler for server notifications.

Parameters:
  • method – Notification method name

  • handler – Async handler function

async call_tool(name: str, arguments: Dict[str, Any] | None = None, timeout: float | None = None) AnyΒΆ

Call a tool on the server.

Parameters:
  • name – Tool name to call

  • arguments – Tool arguments

  • timeout – Call timeout (uses default if None)

Returns:

Tool execution result

Raises:
async connect() Dict[str, Any]ΒΆ

Connect to the MCP server and perform initialization.

This method establishes the connection and performs the MCP initialization handshake, including capability negotiation.

Returns:

Server information and capabilities

Raises:
async disconnect() NoneΒΆ

Disconnect from the MCP server gracefully.

This method cleanly shuts down the MCP connection and cleans up all resources. It’s safe to call multiple times.

async get_capabilities() List[mcp.client.protocol.MCPCapability]ΒΆ

Get server capabilities.

Returns:

List of server capabilities

Raises:

MCPConnectionError – If not connected

async get_prompt(name: str, arguments: Dict[str, Any] | None = None) Dict[str, Any]ΒΆ

Get a prompt from the server.

Parameters:
  • name – Prompt name

  • arguments – Prompt arguments

Returns:

Prompt content and metadata

async get_server_info() Dict[str, Any]ΒΆ

Get server information from initialization.

Returns:

Server information dictionary

Raises:

MCPConnectionError – If not connected

async health_check() Dict[str, Any]ΒΆ

Perform a health check on the MCP connection.

Returns:

Health check results including connectivity and capabilities

async is_connected() boolΒΆ

Check if client is currently connected.

Returns:

True if connected, False otherwise

async list_prompts(use_cache: bool = True) List[mcp.client.protocol.MCPPrompt]ΒΆ

List available prompts from the server.

Parameters:

use_cache – Whether to use cached results

Returns:

List of available prompts

Raises:

MCPCapabilityError – If prompts capability not supported

async list_resources(use_cache: bool = True) List[mcp.client.protocol.MCPResource]ΒΆ

List available resources from the server.

Parameters:

use_cache – Whether to use cached results

Returns:

List of available resources

async list_tools(use_cache: bool = True) List[mcp.client.protocol.MCPTool]ΒΆ

List available tools from the server.

Parameters:

use_cache – Whether to use cached results if available

Returns:

List of available tools

Raises:
async read_resource(uri: str) Dict[str, Any]ΒΆ

Read a resource from the server.

Parameters:

uri – Resource URI to read

Returns:

Resource content and metadata

async refresh_cache() NoneΒΆ

Refresh all cached server information.

This re-fetches tools, prompts, and resources from the server and updates the local cache.

remove_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ

Remove a notification handler.

Parameters:
  • method – Notification method name

  • handler – Handler function to remove

auto_reconnect = FalseΒΆ
max_reconnect_attempts = 3ΒΆ
protocolΒΆ
timeout = 30.0ΒΆ
transportΒΆ
class mcp.client.MCPConnection(name: str, transport: mcp.client.transport.MCPTransport, auto_reconnect: bool = True, max_reconnect_attempts: int = 5, reconnect_delay: float = 1.0, max_reconnect_delay: float = 60.0, health_check_interval: float = 30.0, connection_timeout: float = 30.0)ΒΆ

MCP connection manager with health monitoring and auto-reconnection.

This class manages individual MCP connections with features like:
  • Health monitoring and scoring

  • Automatic reconnection with backoff

  • Connection lifecycle management

  • Error tracking and recovery

  • Performance metrics

Examples

Basic connection management:

from haive.mcp.client import MCPConnection, StdioTransport

transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"])
connection = MCPConnection("filesystem", transport)

await connection.connect()
client = connection.get_client()
tools = await client.list_tools()
await connection.disconnect()

With health monitoring:

connection = MCPConnection(
    "filesystem",
    transport,
    health_check_interval=30.0,
    auto_reconnect=True
)

await connection.start_monitoring()
# Connection will be monitored and auto-reconnected

Connection status checking:

if connection.is_healthy():
    client = connection.get_client()
    result = await client.call_tool("read_file", {"path": "/tmp/test"})
async __aenter__()ΒΆ

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)ΒΆ

Async context manager exit.

async connect(timeout: float | None = None) mcp.client.mcp_client.MCPClientΒΆ

Connect to the MCP server.

Parameters:

timeout – Connection timeout (uses default if None)

Returns:

Connected MCP client

Raises:

MCPConnectionError – If connection fails

async disconnect() NoneΒΆ

Disconnect from the MCP server.

get_client() mcp.client.mcp_client.MCPClientΒΆ

Get the MCP client if connected.

Returns:

MCP client instance

Raises:

MCPConnectionError – If not connected

get_info() ConnectionInfoΒΆ

Get connection information.

Returns:

ConnectionInfo object with current state

async health_check() Dict[str, Any]ΒΆ

Perform a health check on the connection.

Returns:

Health check results

is_connected() boolΒΆ

Check if connection is active.

Returns:

True if connected, False otherwise

is_healthy(threshold: float = 0.7) boolΒΆ

Check if connection is healthy.

Parameters:

threshold – Health score threshold (0.0-1.0)

Returns:

True if health score is above threshold

async reconnect() mcp.client.mcp_client.MCPClient | NoneΒΆ

Attempt to reconnect to the server.

Returns:

Connected client if successful, None if failed

async start_monitoring() NoneΒΆ

Start background health monitoring.

async stop_monitoring() NoneΒΆ

Stop background health monitoring.

auto_reconnect = TrueΒΆ
client: mcp.client.mcp_client.MCPClient | None = NoneΒΆ
connection_count = 0ΒΆ
connection_timeout = 30.0ΒΆ
error_count = 0ΒΆ
health_check_interval = 30.0ΒΆ
health_score = 1.0ΒΆ
last_connected: float | None = NoneΒΆ
last_error: str | None = NoneΒΆ
last_operation_time = 0.0ΒΆ
max_reconnect_attempts = 5ΒΆ
max_reconnect_delay = 60.0ΒΆ
nameΒΆ
reconnect_attempts = 0ΒΆ
reconnect_delay = 1.0ΒΆ
statusΒΆ
total_uptime = 0.0ΒΆ
transportΒΆ
class mcp.client.MCPProtocol(transport, timeout: float = 30.0, client_info: Dict[str, Any] | None = None)ΒΆ

MCP protocol implementation.

This class handles the MCP protocol layer, including:
  • Message serialization/deserialization

  • Request/response matching

  • Capability negotiation

  • Protocol state management

  • Error handling

The protocol is transport-agnostic and works with any MCPTransport implementation. It provides a clean async API for MCP operations.

Examples

Basic protocol usage:

from haive.mcp.client.transport import StdioTransport
from haive.mcp.client.protocol import MCPProtocol

transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"])
protocol = MCPProtocol(transport)

await protocol.initialize()
tools = await protocol.list_tools()
result = await protocol.call_tool("read_file", {"path": "/etc/hosts"})
await protocol.shutdown()

With context manager:

async with MCPProtocol(transport) as protocol:
    tools = await protocol.list_tools()
    result = await protocol.call_tool("tool_name", args)
async __aenter__()ΒΆ

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)ΒΆ

Async context manager exit.

add_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ

Add a handler for notifications.

Parameters:
  • method – Notification method name

  • handler – Async handler function

async call_tool(name: str, arguments: Dict[str, Any] | None = None) AnyΒΆ

Call a tool on the server.

Parameters:
  • name – Tool name to call

  • arguments – Tool arguments

Returns:

Tool execution result

Raises:
async get_prompt(name: str, arguments: Dict[str, Any] | None = None) Dict[str, Any]ΒΆ

Get a prompt from the server.

Parameters:
  • name – Prompt name

  • arguments – Prompt arguments

Returns:

Prompt content and metadata

async initialize() Dict[str, Any]ΒΆ

Initialize the MCP connection.

This performs the MCP initialization handshake, including capability negotiation and protocol version agreement.

Returns:

Server information and capabilities

Raises:
async list_prompts() List[MCPPrompt]ΒΆ

List available prompts from the server.

Returns:

List of available prompts

Raises:

MCPCapabilityError – If prompts capability not supported

async list_resources() List[MCPResource]ΒΆ

List available resources from the server.

Returns:

List of available resources

async list_tools() List[MCPTool]ΒΆ

List available tools from the server.

Returns:

List of available tools

Raises:
async read_resource(uri: str) Dict[str, Any]ΒΆ

Read a resource from the server.

Parameters:

uri – Resource URI to read

Returns:

Resource content and metadata

remove_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ

Remove a notification handler.

Parameters:
  • method – Notification method name

  • handler – Handler function to remove

async shutdown() NoneΒΆ

Shutdown the MCP connection gracefully.

client_infoΒΆ
initialized = FalseΒΆ
protocol_version: MCPProtocolVersion | None = NoneΒΆ
server_capabilities: Set[MCPCapability]ΒΆ
server_info: Dict[str, Any] | None = NoneΒΆ
timeout = 30.0ΒΆ
transportΒΆ
class mcp.client.MCPTransport(timeout: float = 30.0)ΒΆ

Bases: abc.ABC

Abstract base class for MCP transports.

All MCP transports must implement this interface to provide a consistent API for the protocol layer. The transport handles the low-level communication while the protocol layer handles MCP message semantics.

The transport is responsible for:
  • Connection establishment and teardown

  • Raw message sending and receiving

  • Transport-specific error handling

  • Connection state management

async __aenter__()ΒΆ

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)ΒΆ

Async context manager exit.

abstractmethod connect() NoneΒΆ
Async:

Establish connection to the MCP server.

Raises:
abstractmethod disconnect() NoneΒΆ
Async:

Close connection to the MCP server.

Should be idempotent and safe to call multiple times.

abstractmethod receive_message() Dict[str, Any]ΒΆ
Async:

Receive a message from the server.

Returns:

JSON-RPC message from server

Raises:
abstractmethod send_message(message: Dict[str, Any]) NoneΒΆ
Async:

Send a message to the server.

Parameters:

message – JSON-RPC message to send

Raises:
connected = FalseΒΆ
timeout = 30.0ΒΆ
class mcp.client.SseTransport(sse_url: str, post_url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ

Bases: MCPTransport

Server-Sent Events transport for MCP communication.

This transport uses SSE for receiving messages and HTTP POST for sending. Useful for streaming scenarios where the server needs to push messages to the client.

async connect() NoneΒΆ

Establish SSE connection.

async disconnect() NoneΒΆ

Close SSE connection.

async receive_message() Dict[str, Any]ΒΆ

Receive message from SSE stream.

async send_message(message: Dict[str, Any]) NoneΒΆ

Send message via HTTP POST.

headersΒΆ
post_urlΒΆ
session: aiohttp.ClientSession | None = NoneΒΆ
sse_urlΒΆ
class mcp.client.StdioTransport(command: str, args: List[str] | None = None, env: Dict[str, str] | None = None, cwd: str | None = None, timeout: float = 30.0)ΒΆ

Bases: MCPTransport

STDIO transport for MCP communication.

This transport communicates with an MCP server via stdin/stdout of a subprocess. This is the most common transport for MCP servers that are designed to run as command-line tools.

The transport handles:
  • Process lifecycle management

  • JSON-RPC message framing over stdio

  • Process cleanup on disconnection

  • Error handling for process failures

Examples

Basic usage:

transport = StdioTransport(
    command="npx",
    args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
)

With environment variables:

transport = StdioTransport(
    command="python",
    args=["-m", "my_mcp_server"],
    env={"API_KEY": "secret"},
    cwd="/path/to/server"
)
async connect() NoneΒΆ

Start the MCP server process and establish stdio communication.

async disconnect() NoneΒΆ

Stop the MCP server process and cleanup resources.

async receive_message() Dict[str, Any]ΒΆ

Receive JSON-RPC message from server via stdout.

async send_message(message: Dict[str, Any]) NoneΒΆ

Send JSON-RPC message to server via stdin.

args = []ΒΆ
commandΒΆ
cwd = NoneΒΆ
env = NoneΒΆ
process: asyncio.subprocess.Process | None = NoneΒΆ
class mcp.client.WebSocketTransport(url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ

Bases: MCPTransport

WebSocket transport for MCP communication.

This transport provides real-time bidirectional communication over WebSocket. Useful for interactive applications and real-time scenarios.

async connect() NoneΒΆ

Establish WebSocket connection.

async disconnect() NoneΒΆ

Close WebSocket connection.

async receive_message() Dict[str, Any]ΒΆ

Receive message from WebSocket.

async send_message(message: Dict[str, Any]) NoneΒΆ

Send message over WebSocket.

headersΒΆ
session: aiohttp.ClientSession | None = NoneΒΆ
urlΒΆ
websocket: aiohttp.ClientWebSocketResponse | None = NoneΒΆ