mcp.clientΒΆ
MCP Client Implementation Package.
This package provides native MCP protocol client implementation for connecting to and communicating with MCP servers according to the Model Context Protocol specification.
Key ComponentsΒΆ
- Core Classes
MCPClient: Main client for protocol communicationMCPTransport: Transport layer abstractionMCPConnection: Connection managementMCPProtocol: Protocol implementation
- Transport Support
STDIO: Communication via stdin/stdout
HTTP: RESTful communication
SSE: Server-sent events
WebSocket: Real-time bidirectional communication
Usage ExamplesΒΆ
Basic ConnectionΒΆ
from haive.mcp.client import MCPClient, StdioTransport
# Create transport and client
transport = StdioTransport(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"]
)
client = MCPClient(transport)
# Connect and use
await client.connect()
tools = await client.list_tools()
result = await client.call_tool("read_file", {"path": "/etc/hosts"})
await client.disconnect()
Context Manager UsageΒΆ
async with MCPClient(transport) as client:
tools = await client.list_tools()
result = await client.call_tool("tool_name", args)
Available ClassesΒΆ
Transport ClassesΒΆ
StdioTransport- Standard I/O transportHttpTransport- HTTP-based transportSseTransport- Server-Sent Events transportWebSocketTransport- WebSocket transport
Exception ClassesΒΆ
MCPError- Base MCP exceptionMCPConnectionError- Connection-related errorsMCPProtocolError- Protocol violation errorsMCPTimeoutError- Timeout-related errorsMCPTransportError- Transport layer errorsMCPAuthenticationError- Authentication failuresMCPCapabilityError- Capability negotiation errorsMCPToolError- Tool execution errors
Note
This is a native implementation of the MCP protocol, designed to work with any MCP-compliant server. It handles the full protocol lifecycle including initialization, capability discovery, and tool execution.
SubmodulesΒΆ
ExceptionsΒΆ
Authentication failure with MCP server. |
|
Error related to MCP capabilities. |
|
Error during MCP connection establishment or management. |
|
Base exception for all MCP-related errors. |
|
Error in MCP protocol communication. |
|
Timeout during MCP operation. |
|
Error during tool execution. |
|
Error in the transport layer. |
ClassesΒΆ
HTTP transport for MCP communication. |
|
High-level MCP client for communicating with MCP servers. |
|
MCP connection manager with health monitoring and auto-reconnection. |
|
MCP protocol implementation. |
|
Abstract base class for MCP transports. |
|
Server-Sent Events transport for MCP communication. |
|
STDIO transport for MCP communication. |
|
WebSocket transport for MCP communication. |
Package ContentsΒΆ
- exception mcp.client.MCPAuthenticationError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPErrorAuthentication failure with MCP server.
Raised when authentication is required but fails, or when authorization is denied for a specific operation. This includes token validation failures and permission issues.
Examples
Invalid token:
raise MCPAuthenticationError( "Invalid authentication token", error_code=401 )
Insufficient permissions:
raise MCPAuthenticationError( "Insufficient permissions for tool execution", details={"tool": "sensitive_operation", "required_scope": "admin"} )
- exception mcp.client.MCPCapabilityError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPProtocolErrorError related to MCP capabilities.
Raised when there are issues with capability negotiation, unsupported capabilities, or capability validation failures. This indicates a mismatch between client requirements and server capabilities.
Examples
Unsupported capability:
raise MCPCapabilityError( "Server doesn't support required capability", details={"required": "tools", "supported": ["logging"]} )
- exception mcp.client.MCPConnectionError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPErrorError during MCP connection establishment or management.
Raised when there are issues connecting to, maintaining, or disconnecting from an MCP server. This includes transport-level failures, timeout issues, and connection state problems.
Examples
Connection timeout:
raise MCPConnectionError("Connection timeout after 30s")
Transport failure:
raise MCPConnectionError( "Failed to start server process", details={"command": ["npx", "server"], "exit_code": 1} )
- exception mcp.client.MCPError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
ExceptionBase exception for all MCP-related errors.
This is the root exception class for all MCP client errors. It provides a consistent interface for error handling and includes optional error details and context information.
- Parameters:
message β Human-readable error description
error_code β MCP protocol error code (if applicable)
details β Additional error details or context
Examples
Basic error:
raise MCPError("Connection failed")
With error code:
raise MCPError("Invalid request", error_code=-32600)
With details:
raise MCPError( "Tool execution failed", details={"tool": "filesystem", "args": {...}} )
- detailsΒΆ
- error_code = NoneΒΆ
- messageΒΆ
- exception mcp.client.MCPProtocolError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPErrorError in MCP protocol communication.
Raised when there are protocol-level issues such as invalid messages, unsupported protocol versions, or malformed responses. This indicates a problem with the MCP protocol implementation or server compliance.
Examples
Protocol version mismatch:
raise MCPProtocolError( "Unsupported protocol version", details={"client": "1.0", "server": "0.9"} )
Invalid message format:
raise MCPProtocolError( "Malformed JSON-RPC message", error_code=-32700, details={"raw_message": "invalid json"} )
- exception mcp.client.MCPTimeoutError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPConnectionErrorTimeout during MCP operation.
Raised when an MCP operation (connection, tool call, etc.) exceeds the configured timeout period. This is a specialized connection error that specifically indicates timing issues.
- Parameters:
operation β Name of the operation that timed out
timeout β Timeout value that was exceeded
Examples
Tool execution timeout:
raise MCPTimeoutError( "Tool call timed out", details={"operation": "call_tool", "timeout": 30.0} )
- exception mcp.client.MCPToolError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPErrorError during tool execution.
Raised when there are issues executing tools on the MCP server. This includes tool not found errors, invalid arguments, and execution failures.
Examples
Tool not found:
raise MCPToolError( "Tool not found", details={"tool_name": "nonexistent_tool"} )
Invalid arguments:
raise MCPToolError( "Invalid tool arguments", details={"tool": "read_file", "error": "path is required"} )
- exception mcp.client.MCPTransportError(message: str, error_code: int | None = None, details: Dict[str, Any] | None = None)ΒΆ
Bases:
MCPConnectionErrorError in the transport layer.
Raised when there are issues with the underlying transport mechanism (STDIO, HTTP, WebSocket, etc.). This includes process management failures, network issues, and transport-specific problems.
Examples
Process died unexpectedly:
raise MCPTransportError( "Server process died", details={"pid": 12345, "exit_code": -9} )
Network error:
raise MCPTransportError( "HTTP connection failed", details={"url": "http://localhost:8080", "status": 500} )
- class mcp.client.HttpTransport(url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransportHTTP transport for MCP communication.
This transport communicates with an MCP server over HTTP using JSON-RPC over HTTP POST requests. This is useful for servers that expose HTTP APIs or for remote MCP servers.
Examples
Basic HTTP transport:
transport = HttpTransport("http://localhost:8080/mcp")
With authentication:
transport = HttpTransport( "https://api.example.com/mcp", headers={"Authorization": "Bearer token123"} )
- async send_message(message: Dict[str, Any]) NoneΒΆ
Send JSON-RPC message via HTTP POST.
Note: For HTTP transport, sending and receiving are combined in a request-response cycle. This method stores the message for the next receive_message call.
- headersΒΆ
- urlΒΆ
- class mcp.client.MCPClient(transport: mcp.client.transport.MCPTransport, timeout: float = 30.0, client_info: Dict[str, Any] | None = None, auto_reconnect: bool = False, max_reconnect_attempts: int = 3)ΒΆ
High-level MCP client for communicating with MCP servers.
This is the main interface for interacting with MCP servers. It combines the transport and protocol layers to provide a clean, easy-to-use API for MCP operations.
- The client handles the complete MCP lifecycle:
Connection establishment
Capability negotiation
Tool/resource/prompt discovery
Operation execution
Connection cleanup
It supports multiple transport types and provides both sync-style and async context manager interfaces.
Examples
Basic usage with STDIO transport:
from haive.mcp.client import MCPClient, StdioTransport transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"]) client = MCPClient(transport) await client.connect() tools = await client.list_tools() result = await client.call_tool("read_file", {"path": "/etc/hosts"}) await client.disconnect()
Using context manager (recommended):
async with MCPClient(transport) as client: tools = await client.list_tools() for tool in tools: print(f"Available tool: {tool.name}") result = await client.call_tool("tool_name", {"arg": "value"})
With HTTP transport:
from haive.mcp.client import HttpTransport transport = HttpTransport("http://localhost:8080/mcp") async with MCPClient(transport) as client: resources = await client.list_resources() content = await client.read_resource("file://config.json")
With notification handling:
def on_tool_list_changed(params): print("Tool list updated!") client.add_notification_handler("tools/list_changed", on_tool_list_changed)
Error handling:
try: async with MCPClient(transport) as client: result = await client.call_tool("nonexistent", {}) except MCPToolError as e: print(f"Tool error: {e}") except MCPConnectionError as e: print(f"Connection error: {e}")
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- add_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ
Add a handler for server notifications.
- Parameters:
method β Notification method name
handler β Async handler function
- async call_tool(name: str, arguments: Dict[str, Any] | None = None, timeout: float | None = None) AnyΒΆ
Call a tool on the server.
- Parameters:
name β Tool name to call
arguments β Tool arguments
timeout β Call timeout (uses default if None)
- Returns:
Tool execution result
- Raises:
MCPToolError β If tool execution fails
MCPProtocolError β If request fails
- async connect() Dict[str, Any]ΒΆ
Connect to the MCP server and perform initialization.
This method establishes the connection and performs the MCP initialization handshake, including capability negotiation.
- Returns:
Server information and capabilities
- Raises:
MCPConnectionError β If connection fails
MCPProtocolError β If protocol handshake fails
- async disconnect() NoneΒΆ
Disconnect from the MCP server gracefully.
This method cleanly shuts down the MCP connection and cleans up all resources. Itβs safe to call multiple times.
- async get_capabilities() List[mcp.client.protocol.MCPCapability]ΒΆ
Get server capabilities.
- Returns:
List of server capabilities
- Raises:
MCPConnectionError β If not connected
- async get_prompt(name: str, arguments: Dict[str, Any] | None = None) Dict[str, Any]ΒΆ
Get a prompt from the server.
- Parameters:
name β Prompt name
arguments β Prompt arguments
- Returns:
Prompt content and metadata
- async get_server_info() Dict[str, Any]ΒΆ
Get server information from initialization.
- Returns:
Server information dictionary
- Raises:
MCPConnectionError β If not connected
- async health_check() Dict[str, Any]ΒΆ
Perform a health check on the MCP connection.
- Returns:
Health check results including connectivity and capabilities
- async is_connected() boolΒΆ
Check if client is currently connected.
- Returns:
True if connected, False otherwise
- async list_prompts(use_cache: bool = True) List[mcp.client.protocol.MCPPrompt]ΒΆ
List available prompts from the server.
- Parameters:
use_cache β Whether to use cached results
- Returns:
List of available prompts
- Raises:
MCPCapabilityError β If prompts capability not supported
- async list_resources(use_cache: bool = True) List[mcp.client.protocol.MCPResource]ΒΆ
List available resources from the server.
- Parameters:
use_cache β Whether to use cached results
- Returns:
List of available resources
- async list_tools(use_cache: bool = True) List[mcp.client.protocol.MCPTool]ΒΆ
List available tools from the server.
- Parameters:
use_cache β Whether to use cached results if available
- Returns:
List of available tools
- Raises:
MCPCapabilityError β If tools capability not supported
MCPProtocolError β If request fails
- async read_resource(uri: str) Dict[str, Any]ΒΆ
Read a resource from the server.
- Parameters:
uri β Resource URI to read
- Returns:
Resource content and metadata
- async refresh_cache() NoneΒΆ
Refresh all cached server information.
This re-fetches tools, prompts, and resources from the server and updates the local cache.
- remove_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ
Remove a notification handler.
- Parameters:
method β Notification method name
handler β Handler function to remove
- auto_reconnect = FalseΒΆ
- max_reconnect_attempts = 3ΒΆ
- protocolΒΆ
- timeout = 30.0ΒΆ
- transportΒΆ
- class mcp.client.MCPConnection(name: str, transport: mcp.client.transport.MCPTransport, auto_reconnect: bool = True, max_reconnect_attempts: int = 5, reconnect_delay: float = 1.0, max_reconnect_delay: float = 60.0, health_check_interval: float = 30.0, connection_timeout: float = 30.0)ΒΆ
MCP connection manager with health monitoring and auto-reconnection.
- This class manages individual MCP connections with features like:
Health monitoring and scoring
Automatic reconnection with backoff
Connection lifecycle management
Error tracking and recovery
Performance metrics
Examples
Basic connection management:
from haive.mcp.client import MCPConnection, StdioTransport transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"]) connection = MCPConnection("filesystem", transport) await connection.connect() client = connection.get_client() tools = await client.list_tools() await connection.disconnect()
With health monitoring:
connection = MCPConnection( "filesystem", transport, health_check_interval=30.0, auto_reconnect=True ) await connection.start_monitoring() # Connection will be monitored and auto-reconnected
Connection status checking:
if connection.is_healthy(): client = connection.get_client() result = await client.call_tool("read_file", {"path": "/tmp/test"})
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- async connect(timeout: float | None = None) mcp.client.mcp_client.MCPClientΒΆ
Connect to the MCP server.
- Parameters:
timeout β Connection timeout (uses default if None)
- Returns:
Connected MCP client
- Raises:
MCPConnectionError β If connection fails
- get_client() mcp.client.mcp_client.MCPClientΒΆ
Get the MCP client if connected.
- Returns:
MCP client instance
- Raises:
MCPConnectionError β If not connected
- get_info() ConnectionInfoΒΆ
Get connection information.
- Returns:
ConnectionInfo object with current state
- async health_check() Dict[str, Any]ΒΆ
Perform a health check on the connection.
- Returns:
Health check results
- is_healthy(threshold: float = 0.7) boolΒΆ
Check if connection is healthy.
- Parameters:
threshold β Health score threshold (0.0-1.0)
- Returns:
True if health score is above threshold
- async reconnect() mcp.client.mcp_client.MCPClient | NoneΒΆ
Attempt to reconnect to the server.
- Returns:
Connected client if successful, None if failed
- auto_reconnect = TrueΒΆ
- client: mcp.client.mcp_client.MCPClient | None = NoneΒΆ
- connection_count = 0ΒΆ
- connection_timeout = 30.0ΒΆ
- error_count = 0ΒΆ
- health_check_interval = 30.0ΒΆ
- health_score = 1.0ΒΆ
- last_operation_time = 0.0ΒΆ
- max_reconnect_attempts = 5ΒΆ
- max_reconnect_delay = 60.0ΒΆ
- nameΒΆ
- reconnect_attempts = 0ΒΆ
- reconnect_delay = 1.0ΒΆ
- statusΒΆ
- total_uptime = 0.0ΒΆ
- transportΒΆ
- class mcp.client.MCPProtocol(transport, timeout: float = 30.0, client_info: Dict[str, Any] | None = None)ΒΆ
MCP protocol implementation.
- This class handles the MCP protocol layer, including:
Message serialization/deserialization
Request/response matching
Capability negotiation
Protocol state management
Error handling
The protocol is transport-agnostic and works with any MCPTransport implementation. It provides a clean async API for MCP operations.
Examples
Basic protocol usage:
from haive.mcp.client.transport import StdioTransport from haive.mcp.client.protocol import MCPProtocol transport = StdioTransport("npx", ["-y", "@modelcontextprotocol/server-filesystem"]) protocol = MCPProtocol(transport) await protocol.initialize() tools = await protocol.list_tools() result = await protocol.call_tool("read_file", {"path": "/etc/hosts"}) await protocol.shutdown()
With context manager:
async with MCPProtocol(transport) as protocol: tools = await protocol.list_tools() result = await protocol.call_tool("tool_name", args)
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- add_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ
Add a handler for notifications.
- Parameters:
method β Notification method name
handler β Async handler function
- async call_tool(name: str, arguments: Dict[str, Any] | None = None) AnyΒΆ
Call a tool on the server.
- Parameters:
name β Tool name to call
arguments β Tool arguments
- Returns:
Tool execution result
- Raises:
MCPCapabilityError β If tools capability not supported
MCPToolError β If tool execution fails
MCPProtocolError β If request fails
- async get_prompt(name: str, arguments: Dict[str, Any] | None = None) Dict[str, Any]ΒΆ
Get a prompt from the server.
- Parameters:
name β Prompt name
arguments β Prompt arguments
- Returns:
Prompt content and metadata
- async initialize() Dict[str, Any]ΒΆ
Initialize the MCP connection.
This performs the MCP initialization handshake, including capability negotiation and protocol version agreement.
- Returns:
Server information and capabilities
- Raises:
MCPProtocolError β If initialization fails
MCPCapabilityError β If capabilities are incompatible
- async list_prompts() List[MCPPrompt]ΒΆ
List available prompts from the server.
- Returns:
List of available prompts
- Raises:
MCPCapabilityError β If prompts capability not supported
- async list_resources() List[MCPResource]ΒΆ
List available resources from the server.
- Returns:
List of available resources
- async list_tools() List[MCPTool]ΒΆ
List available tools from the server.
- Returns:
List of available tools
- Raises:
MCPCapabilityError β If tools capability not supported
MCPProtocolError β If request fails
- async read_resource(uri: str) Dict[str, Any]ΒΆ
Read a resource from the server.
- Parameters:
uri β Resource URI to read
- Returns:
Resource content and metadata
- remove_notification_handler(method: str, handler: Callable[[Dict[str, Any]], Awaitable[None]]) NoneΒΆ
Remove a notification handler.
- Parameters:
method β Notification method name
handler β Handler function to remove
- client_infoΒΆ
- initialized = FalseΒΆ
- protocol_version: MCPProtocolVersion | None = NoneΒΆ
- server_capabilities: Set[MCPCapability]ΒΆ
- timeout = 30.0ΒΆ
- transportΒΆ
- class mcp.client.MCPTransport(timeout: float = 30.0)ΒΆ
Bases:
abc.ABCAbstract base class for MCP transports.
All MCP transports must implement this interface to provide a consistent API for the protocol layer. The transport handles the low-level communication while the protocol layer handles MCP message semantics.
- The transport is responsible for:
Connection establishment and teardown
Raw message sending and receiving
Transport-specific error handling
Connection state management
- async __aenter__()ΒΆ
Async context manager entry.
- async __aexit__(exc_type, exc_val, exc_tb)ΒΆ
Async context manager exit.
- abstractmethod connect() NoneΒΆ
- Async:
Establish connection to the MCP server.
- Raises:
MCPConnectionError β If connection fails
MCPTimeoutError β If connection times out
- abstractmethod disconnect() NoneΒΆ
- Async:
Close connection to the MCP server.
Should be idempotent and safe to call multiple times.
- abstractmethod receive_message() Dict[str, Any]ΒΆ
- Async:
Receive a message from the server.
- Returns:
JSON-RPC message from server
- Raises:
MCPTransportError β If receiving fails
MCPConnectionError β If not connected
MCPTimeoutError β If receive times out
- abstractmethod send_message(message: Dict[str, Any]) NoneΒΆ
- Async:
Send a message to the server.
- Parameters:
message β JSON-RPC message to send
- Raises:
MCPTransportError β If sending fails
MCPConnectionError β If not connected
- connected = FalseΒΆ
- timeout = 30.0ΒΆ
- class mcp.client.SseTransport(sse_url: str, post_url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransportServer-Sent Events transport for MCP communication.
This transport uses SSE for receiving messages and HTTP POST for sending. Useful for streaming scenarios where the server needs to push messages to the client.
- headersΒΆ
- post_urlΒΆ
- sse_urlΒΆ
- class mcp.client.StdioTransport(command: str, args: List[str] | None = None, env: Dict[str, str] | None = None, cwd: str | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransportSTDIO transport for MCP communication.
This transport communicates with an MCP server via stdin/stdout of a subprocess. This is the most common transport for MCP servers that are designed to run as command-line tools.
- The transport handles:
Process lifecycle management
JSON-RPC message framing over stdio
Process cleanup on disconnection
Error handling for process failures
Examples
Basic usage:
transport = StdioTransport( command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"] )
With environment variables:
transport = StdioTransport( command="python", args=["-m", "my_mcp_server"], env={"API_KEY": "secret"}, cwd="/path/to/server" )
- args = []ΒΆ
- commandΒΆ
- cwd = NoneΒΆ
- env = NoneΒΆ
- process: asyncio.subprocess.Process | None = NoneΒΆ
- class mcp.client.WebSocketTransport(url: str, headers: Dict[str, str] | None = None, timeout: float = 30.0)ΒΆ
Bases:
MCPTransportWebSocket transport for MCP communication.
This transport provides real-time bidirectional communication over WebSocket. Useful for interactive applications and real-time scenarios.
- headersΒΆ
- urlΒΆ