use mcp_client::McpClient; use std::fs; use tempfile::tempdir; #[tokio::test] async fn mcp_server_capability_negotiation() { // Create a mock MCP server script let dir = tempdir().unwrap(); let server_script = dir.path().join("mock_server.py"); let script_content = r#"#!/usr/bin/env python3 import sys import json def read_request(): line = sys.stdin.readline() return json.loads(line) def send_response(response): sys.stdout.write(json.dumps(response) + '\n') sys.stdout.flush() # Main loop while True: try: req = read_request() method = req.get('method') req_id = req.get('id') if method == 'initialize': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'protocolVersion': '2024-11-05', 'capabilities': { 'tools': {'list_changed': True}, 'resources': {'subscribe': False} }, 'serverInfo': { 'name': 'test-server', 'version': '1.0.0' } } }) elif method == 'tools/list': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'tools': [] } }) else: send_response({ 'jsonrpc': '2.0', 'id': req_id, 'error': { 'code': -32601, 'message': f'Method not found: {method}' } }) except EOFError: break except Exception as e: sys.stderr.write(f'Error: {e}\n') break "#; fs::write(&server_script, script_content).unwrap(); #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; fs::set_permissions(&server_script, std::fs::Permissions::from_mode(0o755)).unwrap(); } // Connect to the server let client = McpClient::spawn( "python3", &[server_script.to_str().unwrap()], "test-server" ).await.unwrap(); // Initialize let capabilities = client.initialize().await.unwrap(); // Verify capabilities assert!(capabilities.tools.is_some()); assert_eq!(capabilities.tools.unwrap().list_changed, Some(true)); client.close().await.unwrap(); } #[tokio::test] async fn mcp_tool_invocation() { let dir = tempdir().unwrap(); let server_script = dir.path().join("mock_server.py"); let script_content = r#"#!/usr/bin/env python3 import sys import json def read_request(): line = sys.stdin.readline() return json.loads(line) def send_response(response): sys.stdout.write(json.dumps(response) + '\n') sys.stdout.flush() while True: try: req = read_request() method = req.get('method') req_id = req.get('id') params = req.get('params', {}) if method == 'initialize': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'protocolVersion': '2024-11-05', 'capabilities': { 'tools': {} }, 'serverInfo': { 'name': 'test-server', 'version': '1.0.0' } } }) elif method == 'tools/list': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'tools': [ { 'name': 'echo', 'description': 'Echo the input', 'input_schema': { 'type': 'object', 'properties': { 'message': {'type': 'string'} } } } ] } }) elif method == 'tools/call': tool_name = params.get('name') arguments = params.get('arguments', {}) if tool_name == 'echo': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'content': [ { 'type': 'text', 'text': arguments.get('message', '') } ] } }) else: send_response({ 'jsonrpc': '2.0', 'id': req_id, 'error': { 'code': -32602, 'message': f'Unknown tool: {tool_name}' } }) else: send_response({ 'jsonrpc': '2.0', 'id': req_id, 'error': { 'code': -32601, 'message': f'Method not found: {method}' } }) except EOFError: break except Exception as e: sys.stderr.write(f'Error: {e}\n') break "#; fs::write(&server_script, script_content).unwrap(); #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; fs::set_permissions(&server_script, std::fs::Permissions::from_mode(0o755)).unwrap(); } let client = McpClient::spawn( "python3", &[server_script.to_str().unwrap()], "test-server" ).await.unwrap(); client.initialize().await.unwrap(); // List tools let tools = client.list_tools().await.unwrap(); assert_eq!(tools.len(), 1); assert_eq!(tools[0].name, "echo"); // Call tool let result = client.call_tool( "echo", serde_json::json!({"message": "Hello, MCP!"}) ).await.unwrap(); // Verify result let content = result.as_array().unwrap(); assert_eq!(content[0]["text"].as_str().unwrap(), "Hello, MCP!"); client.close().await.unwrap(); } #[tokio::test] async fn mcp_resource_reads() { let dir = tempdir().unwrap(); let server_script = dir.path().join("mock_server.py"); let script_content = r#"#!/usr/bin/env python3 import sys import json def read_request(): line = sys.stdin.readline() return json.loads(line) def send_response(response): sys.stdout.write(json.dumps(response) + '\n') sys.stdout.flush() while True: try: req = read_request() method = req.get('method') req_id = req.get('id') params = req.get('params', {}) if method == 'initialize': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'protocolVersion': '2024-11-05', 'capabilities': { 'resources': {} }, 'serverInfo': { 'name': 'test-server', 'version': '1.0.0' } } }) elif method == 'resources/list': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'resources': [ { 'uri': 'file:///test.txt', 'name': 'Test File', 'description': 'A test file', 'mime_type': 'text/plain' } ] } }) elif method == 'resources/read': uri = params.get('uri') if uri == 'file:///test.txt': send_response({ 'jsonrpc': '2.0', 'id': req_id, 'result': { 'contents': [ { 'uri': uri, 'mime_type': 'text/plain', 'text': 'Hello from resource!' } ] } }) else: send_response({ 'jsonrpc': '2.0', 'id': req_id, 'error': { 'code': -32602, 'message': f'Unknown resource: {uri}' } }) else: send_response({ 'jsonrpc': '2.0', 'id': req_id, 'error': { 'code': -32601, 'message': f'Method not found: {method}' } }) except EOFError: break except Exception as e: sys.stderr.write(f'Error: {e}\n') break "#; fs::write(&server_script, script_content).unwrap(); #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; fs::set_permissions(&server_script, std::fs::Permissions::from_mode(0o755)).unwrap(); } let client = McpClient::spawn( "python3", &[server_script.to_str().unwrap()], "test-server" ).await.unwrap(); client.initialize().await.unwrap(); // List resources let resources = client.list_resources().await.unwrap(); assert_eq!(resources.len(), 1); assert_eq!(resources[0].uri, "file:///test.txt"); // Read resource let contents = client.read_resource("file:///test.txt").await.unwrap(); let contents_array = contents.as_array().unwrap(); assert_eq!(contents_array[0]["text"].as_str().unwrap(), "Hello from resource!"); client.close().await.unwrap(); }