mcp.clientΒΆ
MCP Client Implementation Package.
This package provides native MCP protocol client implementation for connecting to and communicating with MCP servers according to the Model Context Protocol specification.
Key ComponentsΒΆ
- Core Classes
MCPClient
: Main client for protocol communicationMCPTransport
: Transport layer abstractionMCPConnection
: Connection managementMCPProtocol
: Protocol implementation
- Transport Support
STDIO: Communication via stdin/stdout
HTTP: RESTful communication
SSE: Server-sent events
WebSocket: Real-time bidirectional communication
Usage ExamplesΒΆ
Basic ConnectionΒΆ
from haive.mcp.client import MCPClient, StdioTransport
# Create transport and client
transport = StdioTransport(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"]
)
client = MCPClient(transport)
# Connect and use
await client.connect()
tools = await client.list_tools()
result = await client.call_tool("read_file", {"path": "/etc/hosts"})
await client.disconnect()
Context Manager UsageΒΆ
async with MCPClient(transport) as client:
tools = await client.list_tools()
result = await client.call_tool("tool_name", args)
Available ClassesΒΆ
Transport ClassesΒΆ
StdioTransport
- Standard I/O transportHttpTransport
- HTTP-based transportSseTransport
- Server-Sent Events transportWebSocketTransport
- WebSocket transport
Exception ClassesΒΆ
MCPError
- Base MCP exceptionMCPConnectionError
- Connection-related errorsMCPProtocolError
- Protocol violation errorsMCPTimeoutError
- Timeout-related errorsMCPTransportError
- Transport layer errorsMCPAuthenticationError
- Authentication failuresMCPCapabilityError
- Capability negotiation errorsMCPToolError
- Tool execution errors
Note
This is a native implementation of the MCP protocol, designed to work with any MCP-compliant server. It handles the full protocol lifecycle including initialization, capability discovery, and tool execution.
SubmodulesΒΆ
ExceptionsΒΆ
Authentication failure with MCP server. |
|
Error related to MCP capabilities. |
|
Error during MCP connection establishment or management. |
|
Base exception for all MCP-related errors. |
|
Error in MCP protocol communication. |
|
Timeout during MCP operation. |
|
Error during tool execution. |
|
Error in the transport layer. |
ClassesΒΆ
HTTP transport for MCP communication. |
|
High-level MCP client for communicating with MCP servers. |
|
MCP connection manager with health monitoring and auto-reconnection. |
|
MCP protocol implementation. |
|
Abstract base class for MCP transports. |
|
Server-Sent Events transport for MCP communication. |
|
STDIO transport for MCP communication. |
|
WebSocket transport for MCP communication. |
Package ContentsΒΆ
- exception mcp.client.MCPAuthenticationError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPError
Authentication failure with MCP server.
Raised when authentication is required but fails, or when authorization is denied for a specific operation. This includes token validation failures and permission issues.
Examples
Invalid token:
raise MCPAuthenticationError( "Invalid authentication token", error_code=401 )
Insufficient permissions:
raise MCPAuthenticationError( "Insufficient permissions for tool execution", details={"tool": "sensitive_operation", "required_scope": "admin"} )
- exception mcp.client.MCPCapabilityError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPProtocolError
Error related to MCP capabilities.
Raised when there are issues with capability negotiation, unsupported capabilities, or capability validation failures. This indicates a mismatch between client requirements and server capabilities.
Examples
Unsupported capability:
raise MCPCapabilityError( "Server doesn't support required capability", details={"required": "tools", "supported": ["logging"]} )
- exception mcp.client.MCPConnectionError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPError
Error during MCP connection establishment or management.
Raised when there are issues connecting to, maintaining, or disconnecting from an MCP server. This includes transport-level failures, timeout issues, and connection state problems.
Examples
Connection timeout:
raise MCPConnectionError("Connection timeout after 30s")
Transport failure:
raise MCPConnectionError( "Failed to start server process", details={"command": ["npx", "server"], "exit_code": 1} )
- exception mcp.client.MCPError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
Exception
Base exception for all MCP-related errors.
This is the root exception class for all MCP client errors. It provides a consistent interface for error handling and includes optional error details and context information.
- Parameters:
message β Human-readable error description
error_code β MCP protocol error code (if applicable)
details β Additional error details or context
Examples
Basic error:
raise MCPError("Connection failed")
With error code:
raise MCPError("Invalid request", error_code=-32600)
With details:
raise MCPError( "Tool execution failed", details={"tool": "filesystem", "args": {...}} )
- detailsΒΆ
- error_code = NoneΒΆ
- messageΒΆ
- exception mcp.client.MCPProtocolError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPError
Error in MCP protocol communication.
Raised when there are protocol-level issues such as invalid messages, unsupported protocol versions, or malformed responses. This indicates a problem with the MCP protocol implementation or server compliance.
Examples
Protocol version mismatch:
raise MCPProtocolError( "Unsupported protocol version", details={"client": "1.0", "server": "0.9"} )
Invalid message format:
raise MCPProtocolError( "Malformed JSON-RPC message", error_code=-32700, details={"raw_message": "invalid json"} )
- exception mcp.client.MCPTimeoutError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPConnectionError
Timeout during MCP operation.
Raised when an MCP operation (connection, tool call, etc.) exceeds the configured timeout period. This is a specialized connection error that specifically indicates timing issues.
- Parameters:
operation β Name of the operation that timed out
timeout β Timeout value that was exceeded
Examples
Tool execution timeout:
raise MCPTimeoutError( "Tool call timed out", details={"operation": "call_tool", "timeout": 30.0} )
- exception mcp.client.MCPToolError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPError
Error during tool execution.
Raised when there are issues executing tools on the MCP server. This includes tool not found errors, invalid arguments, and execution failures.
Examples
Tool not found:
raise MCPToolError( "Tool not found", details={"tool_name": "nonexistent_tool"} )
Invalid arguments:
raise MCPToolError( "Invalid tool arguments", details={"tool": "read_file", "error": "path is required"} )
- exception mcp.client.MCPTransportError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPConnectionError
Error in the transport layer.
Raised when there are issues with the underlying transport mechanism (STDIO, HTTP, WebSocket, etc.). This includes process management failures, network issues, and transport-specific problems.
Examples
Process died unexpectedly:
raise MCPTransportError( "Server process died", details={"pid": 12345, "exit_code": -9} )
Network error:
raise MCPTransportError( "HTTP connection failed", details={"url": "http://localhost:8080", "status": 500} )
- class mcp.client.HttpTransport(url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransport
HTTP transport for MCP communication.
This transport communicates with an MCP server over HTTP using JSON-RPC over HTTP POST requests. This is useful for servers that expose HTTP APIs or for remote MCP servers.
Examples
Basic HTTP transport:
transport = HttpTransport("http://localhost:8080/mcp")
With authentication:
transport = HttpTransport( "https://api.example.com/mcp", headers={"Authorization": "Bearer token123"} )
- async send_message(message: Dict[str, Any]) None ΒΆ
Send JSON-RPC message via HTTP POST.
Note: For HTTP transport, sending and receiving are combined in a request-response cycle. This method stores the message for the next receive_message call.
- headersΒΆ
- urlΒΆ
- class mcp.client.MCPClient(transport: mcp.client.transport.MCPTransport, timeout: float = 30.0, client_info: Dict[str, Any] | None = None, auto_reconnect: bool = False, max_reconnect_attempts: int = 3)ΒΆ
High-level MCP client for communicating with MCP servers.
This is the main interface for interacting with MCP servers. It combines the transport and protocol layers to provide a clean, easy-to-use API for MCP operations.
- The client handles the complete MCP lifecycle:
Connection establishment
Capability negotiation
Tool/resource/prompt discovery
Operation execution
Connection cleanup
It supports multiple transport types and provides both sync-style and async context manager interfaces.
Examples
Basic usage with STDIO transport:
from haive.mcp.client import MCPClient, StdioTransport transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"]) client = MCPClient(transport) await client.connect() tools = await client.list_tools() result = await client.call_tool("read_file", {"path": "/etc/hosts"}) await client.disconnect()
Using context manager (recommended):
async with MCPClient(transport) as client: tools = await client.list_tools() for tool in tools: print(f"Available tool: {tool.name}") result = await client.call_tool("tool_name", {"arg": "value"})
With HTTP transport:
from haive.mcp.client import HttpTransport transport = HttpTransport("http://localhost:8080/mcp") async with MCPClient(transport) as client: resources = await client.list_resources() content = await client.read_resource("file://config.json")
With notification handling:
def on_tool_list_changed(params): print("Tool list updated!") client.add_notification_handler("tools/list_changed", on_tool_list_changed)
Error handling:
try: async with MCPClient(transport) as client: result = await client.call_tool("nonexistent", {}) except MCPToolError as e: print(f"Tool error: {e}") except MCPConnectionError as e: print(f"Connection error: {e}")
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- add_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) None ΒΆ
Add a handler for server notifications.
- Parameters:
method β Notification method name
handler β Async handler function
- async call_tool(name: str, arguments: Dict[str, Any] | None = None, timeout: float | None = None) Any ΒΆ
Call a tool on the server.
- Parameters:
name β Tool name to call
arguments β Tool arguments
timeout β Call timeout (uses default if None)
- Returns:
Tool execution result
- Raises:
MCPToolError β If tool execution fails
MCPProtocolError β If request fails
- async connect() Dict[str, Any] ΒΆ
Connect to the MCP server and perform initialization.
This method establishes the connection and performs the MCP initialization handshake, including capability negotiation.
- Returns:
Server information and capabilities
- Raises:
MCPConnectionError β If connection fails
MCPProtocolError β If protocol handshake fails
- async disconnect() None ΒΆ
Disconnect from the MCP server gracefully.
This method cleanly shuts down the MCP connection and cleans up all resources. Itβs safe to call multiple times.
- async get_capabilities() List[mcp.client.protocol.MCPCapability] ΒΆ
Get server capabilities.
- Returns:
List of server capabilities
- Raises:
MCPConnectionError β If not connected
- async get_prompt(name: str, arguments: Dict[str, Any] | None = None) Dict[str, Any] ΒΆ
Get a prompt from the server.
- Parameters:
name β Prompt name
arguments β Prompt arguments
- Returns:
Prompt content and metadata
- async get_server_info() Dict[str, Any] ΒΆ
Get server information from initialization.
- Returns:
Server information dictionary
- Raises:
MCPConnectionError β If not connected
- async health_check() Dict[str, Any] ΒΆ
Perform a health check on the MCP connection.
- Returns:
Health check results including connectivity and capabilities
- async is_connected() bool ΒΆ
Check if client is currently connected.
- Returns:
True if connected, False otherwise
- async list_prompts(use_cache: bool = True) List[mcp.client.protocol.MCPPrompt] ΒΆ
List available prompts from the server.
- Parameters:
use_cache β Whether to use cached results
- Returns:
List of available prompts
- Raises:
MCPCapabilityError β If prompts capability not supported
- async list_resources(use_cache: bool = True) List[mcp.client.protocol.MCPResource] ΒΆ
List available resources from the server.
- Parameters:
use_cache β Whether to use cached results
- Returns:
List of available resources
- async list_tools(use_cache: bool = True) List[mcp.client.protocol.MCPTool] ΒΆ
List available tools from the server.
- Parameters:
use_cache β Whether to use cached results if available
- Returns:
List of available tools
- Raises:
MCPCapabilityError β If tools capability not supported
MCPProtocolError β If request fails
- async read_resource(uri: str) Dict[str, Any] ΒΆ
Read a resource from the server.
- Parameters:
uri β Resource URI to read
- Returns:
Resource content and metadata
- async refresh_cache() None ΒΆ
Refresh all cached server information.
This re-fetches tools, prompts, and resources from the server and updates the local cache.
- remove_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) None ΒΆ
Remove a notification handler.
- Parameters:
method β Notification method name
handler β Handler function to remove
- auto_reconnect = FalseΒΆ
- max_reconnect_attempts = 3ΒΆ
- protocolΒΆ
- timeout = 30.0ΒΆ
- transportΒΆ
- class mcp.client.MCPConnection(name: str, transport: mcp.client.transport.MCPTransport, auto_reconnect: bool = True, max_reconnect_attempts: int = 5, reconnect_delay: float = 1.0, max_reconnect_delay: float = 60.0, health_check_interval: float = 30.0, connection_timeout: float = 30.0)ΒΆ
MCP connection manager with health monitoring and auto-reconnection.
- This class manages individual MCP connections with features like:
Health monitoring and scoring
Automatic reconnection with backoff
Connection lifecycle management
Error tracking and recovery
Performance metrics
Examples
Basic connection management:
from haive.mcp.client import MCPConnection, StdioTransport transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"]) connection = MCPConnection("filesystem", transport) await connection.connect() client = connection.get_client() tools = await client.list_tools() await connection.disconnect()
With health monitoring:
connection = MCPConnection( "filesystem", transport, health_check_interval=30.0, auto_reconnect=True ) await connection.start_monitoring() # Connection will be monitored and auto-reconnected
Connection status checking:
if connection.is_healthy(): client = connection.get_client() result = await client.call_tool("read_file", {"path": "/tmp/test"})
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- async connect(timeout: float | None = None) mcp.client.mcp_client.MCPClient ΒΆ
Connect to the MCP server.
- Parameters:
timeout β Connection timeout (uses default if None)
- Returns:
Connected MCP client
- Raises:
MCPConnectionError β If connection fails
- get_client() mcp.client.mcp_client.MCPClient ΒΆ
Get the MCP client if connected.
- Returns:
MCP client instance
- Raises:
MCPConnectionError β If not connected
- get_info() ConnectionInfo ΒΆ
Get connection information.
- Returns:
ConnectionInfo object with current state
- async health_check() Dict[str, Any] ΒΆ
Perform a health check on the connection.
- Returns:
Health check results
- is_healthy(threshold: float = 0.7) bool ΒΆ
Check if connection is healthy.
- Parameters:
threshold β Health score threshold (0.0-1.0)
- Returns:
True if health score is above threshold
- async reconnect() mcp.client.mcp_client.MCPClient | None ΒΆ
Attempt to reconnect to the server.
- Returns:
Connected client if successful, None if failed
- auto_reconnect = TrueΒΆ
- client: mcp.client.mcp_client.MCPClient | None = NoneΒΆ
- connection_count = 0ΒΆ
- connection_timeout = 30.0ΒΆ
- error_count = 0ΒΆ
- health_check_interval = 30.0ΒΆ
- health_score = 1.0ΒΆ
- last_operation_time = 0.0ΒΆ
- max_reconnect_attempts = 5ΒΆ
- max_reconnect_delay = 60.0ΒΆ
- nameΒΆ
- reconnect_attempts = 0ΒΆ
- reconnect_delay = 1.0ΒΆ
- statusΒΆ
- total_uptime = 0.0ΒΆ
- transportΒΆ
- class mcp.client.MCPProtocol(transport, timeout: float = 30.0, client_info: Dict[str, Any] | None = None)ΒΆ
MCP protocol implementation.
- This class handles the MCP protocol layer, including:
Message serialization/deserialization
Request/response matching
Capability negotiation
Protocol state management
Error handling
The protocol is transport-agnostic and works with any MCPTransport implementation. It provides a clean async API for MCP operations.
Examples
Basic protocol usage:
from haive.mcp.client.transport import StdioTransport from haive.mcp.client.protocol import MCPProtocol transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"]) protocol = MCPProtocol(transport) await protocol.initialize() tools = await protocol.list_tools() result = await protocol.call_tool("read_file", {"path": "/etc/hosts"}) await protocol.shutdown()
With context manager:
async with MCPProtocol(transport) as protocol: tools = await protocol.list_tools() result = await protocol.call_tool("tool_name", args)
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- add_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) None ΒΆ
Add a handler for notifications.
- Parameters:
method β Notification method name
handler β Async handler function
- async call_tool(name: str, arguments: Dict[str, Any] | None = None) Any ΒΆ
Call a tool on the server.
- Parameters:
name β Tool name to call
arguments β Tool arguments
- Returns:
Tool execution result
- Raises:
MCPCapabilityError β If tools capability not supported
MCPToolError β If tool execution fails
MCPProtocolError β If request fails
- async get_prompt(name: str, arguments: Dict[str, Any] | None = None) Dict[str, Any] ΒΆ
Get a prompt from the server.
- Parameters:
name β Prompt name
arguments β Prompt arguments
- Returns:
Prompt content and metadata
- async initialize() Dict[str, Any] ΒΆ
Initialize the MCP connection.
This performs the MCP initialization handshake, including capability negotiation and protocol version agreement.
- Returns:
Server information and capabilities
- Raises:
MCPProtocolError β If initialization fails
MCPCapabilityError β If capabilities are incompatible
- async list_prompts() List[MCPPrompt] ΒΆ
List available prompts from the server.
- Returns:
List of available prompts
- Raises:
MCPCapabilityError β If prompts capability not supported
- async list_resources() List[MCPResource] ΒΆ
List available resources from the server.
- Returns:
List of available resources
- async list_tools() List[MCPTool] ΒΆ
List available tools from the server.
- Returns:
List of available tools
- Raises:
MCPCapabilityError β If tools capability not supported
MCPProtocolError β If request fails
- async read_resource(uri: str) Dict[str, Any] ΒΆ
Read a resource from the server.
- Parameters:
uri β Resource URI to read
- Returns:
Resource content and metadata
- remove_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) None ΒΆ
Remove a notification handler.
- Parameters:
method β Notification method name
handler β Handler function to remove
- client_infoΒΆ
- initialized = FalseΒΆ
- protocol_version: MCPProtocolVersion | None = NoneΒΆ
- server_capabilities: Set[MCPCapability]ΒΆ
- timeout = 30.0ΒΆ
- transportΒΆ
- class mcp.client.MCPTransport(timeout: float = 30.0)ΒΆ
Bases:
abc.ABC
Abstract base class for MCP transports.
All MCP transports must implement this interface to provide a consistent API for the protocol layer. The transport handles the low-level communication while the protocol layer handles MCP message semantics.
- The transport is responsible for:
Connection establishment and teardown
Raw message sending and receiving
Transport-specific error handling
Connection state management
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- abstractmethod connect() None ΒΆ
- Async:
Establish connection to the MCP server.
- Raises:
MCPConnectionError β If connection fails
MCPTimeoutError β If connection times out
- abstractmethod disconnect() None ΒΆ
- Async:
Close connection to the MCP server.
Should be idempotent and safe to call multiple times.
- abstractmethod receive_message() Dict[str, Any] ΒΆ
- Async:
Receive a message from the server.
- Returns:
JSON-RPC message from server
- Raises:
MCPTransportError β If receiving fails
MCPConnectionError β If not connected
MCPTimeoutError β If receive times out
- abstractmethod send_message(message: Dict[str, Any]) None ΒΆ
- Async:
Send a message to the server.
- Parameters:
message β JSON-RPC message to send
- Raises:
MCPTransportError β If sending fails
MCPConnectionError β If not connected
- connected = FalseΒΆ
- timeout = 30.0ΒΆ
- class mcp.client.SseTransport(sse_url: str, post_url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransport
Server-Sent Events transport for MCP communication.
This transport uses SSE for receiving messages and HTTP POST for sending. Useful for streaming scenarios where the server needs to push messages to the client.
- headersΒΆ
- post_urlΒΆ
- sse_urlΒΆ
- class mcp.client.StdioTransport(command: str, args: List[str] | None = None, env: Dict[str, str] | None = None, cwd: str | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransport
STDIO transport for MCP communication.
This transport communicates with an MCP server via stdin/stdout of a subprocess. This is the most common transport for MCP servers that are designed to run as command-line tools.
- The transport handles:
Process lifecycle management
JSON-RPC message framing over stdio
Process cleanup on disconnection
Error handling for process failures
Examples
Basic usage:
transport = StdioTransport( command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"] )
With environment variables:
transport = StdioTransport( command="python", args=["-m", "my_mcp_server"], env={"API_KEY": "secret"}, cwd="/path/to/server" )
- args = []ΒΆ
- commandΒΆ
- cwd = NoneΒΆ
- env = NoneΒΆ
- process: asyncio.subprocess.Process | None = NoneΒΆ
- class mcp.client.WebSocketTransport(url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransport
WebSocket transport for MCP communication.
This transport provides real-time bidirectional communication over WebSocket. Useful for interactive applications and real-time scenarios.
- headersΒΆ
- urlΒΆ