Apply recent changes
This commit is contained in:
120
crates/owlen-core/src/mcp/remote_client.rs
Normal file
120
crates/owlen-core/src/mcp/remote_client.rs
Normal file
@@ -0,0 +1,120 @@
|
||||
use super::protocol::{RequestId, RpcErrorResponse, RpcRequest, RpcResponse};
|
||||
use super::{McpClient, McpToolCall, McpToolDescriptor, McpToolResponse};
|
||||
use crate::{Error, Result};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
/// Client that talks to the external `owlen-mcp-server` over STDIO.
|
||||
pub struct RemoteMcpClient {
|
||||
// Child process handling the server (kept alive for the duration of the client).
|
||||
#[allow(dead_code)]
|
||||
child: Arc<Mutex<Child>>, // guarded for mutable access across calls
|
||||
// Writer to server stdin.
|
||||
stdin: Arc<Mutex<tokio::process::ChildStdin>>, // async write
|
||||
// Reader for server stdout.
|
||||
stdout: Arc<Mutex<BufReader<tokio::process::ChildStdout>>>,
|
||||
// Incrementing request identifier.
|
||||
next_id: AtomicU64,
|
||||
}
|
||||
|
||||
impl RemoteMcpClient {
|
||||
/// Spawn the MCP server binary and prepare communication channels.
|
||||
pub fn new() -> Result<Self> {
|
||||
// Locate the binary – it is built by Cargo into target/debug.
|
||||
// The test binary runs inside the crate directory, so we check a couple of relative locations.
|
||||
// Attempt to locate the server binary; if unavailable we will fall back to launching via `cargo run`.
|
||||
let _ = ();
|
||||
// Resolve absolute path based on workspace root to avoid cwd dependence.
|
||||
let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("../..")
|
||||
.canonicalize()
|
||||
.map_err(Error::Io)?;
|
||||
let binary_path = workspace_root.join("target/debug/owlen-mcp-server");
|
||||
if !binary_path.exists() {
|
||||
return Err(Error::NotImplemented(format!(
|
||||
"owlen-mcp-server binary not found at {}",
|
||||
binary_path.display()
|
||||
)));
|
||||
}
|
||||
// Launch the already‑built server binary directly.
|
||||
let mut child = Command::new(&binary_path)
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::inherit())
|
||||
.spawn()
|
||||
.map_err(Error::Io)?;
|
||||
|
||||
let stdin = child.stdin.take().ok_or_else(|| {
|
||||
Error::Io(std::io::Error::other(
|
||||
"Failed to capture stdin of MCP server",
|
||||
))
|
||||
})?;
|
||||
let stdout = child.stdout.take().ok_or_else(|| {
|
||||
Error::Io(std::io::Error::other(
|
||||
"Failed to capture stdout of MCP server",
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
child: Arc::new(Mutex::new(child)),
|
||||
stdin: Arc::new(Mutex::new(stdin)),
|
||||
stdout: Arc::new(Mutex::new(BufReader::new(stdout))),
|
||||
next_id: AtomicU64::new(1),
|
||||
})
|
||||
}
|
||||
|
||||
async fn send_rpc(&self, method: &str, params: serde_json::Value) -> Result<serde_json::Value> {
|
||||
let id = RequestId::Number(self.next_id.fetch_add(1, Ordering::Relaxed));
|
||||
let request = RpcRequest::new(id.clone(), method, Some(params));
|
||||
let req_str = serde_json::to_string(&request)? + "\n";
|
||||
{
|
||||
let mut stdin = self.stdin.lock().await;
|
||||
stdin.write_all(req_str.as_bytes()).await?;
|
||||
stdin.flush().await?;
|
||||
}
|
||||
// Read a single line response
|
||||
let mut line = String::new();
|
||||
{
|
||||
let mut stdout = self.stdout.lock().await;
|
||||
stdout.read_line(&mut line).await?;
|
||||
}
|
||||
// Try to parse successful response first
|
||||
if let Ok(resp) = serde_json::from_str::<RpcResponse>(&line) {
|
||||
if resp.id == id {
|
||||
return Ok(resp.result);
|
||||
}
|
||||
}
|
||||
// Fallback to error response
|
||||
let err_resp: RpcErrorResponse =
|
||||
serde_json::from_str(&line).map_err(Error::Serialization)?;
|
||||
Err(Error::Network(format!(
|
||||
"MCP server error {}: {}",
|
||||
err_resp.error.code, err_resp.error.message
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl McpClient for RemoteMcpClient {
|
||||
async fn list_tools(&self) -> Result<Vec<McpToolDescriptor>> {
|
||||
// The file server does not expose tool descriptors; fall back to NotImplemented.
|
||||
Err(Error::NotImplemented(
|
||||
"Remote MCP client does not support list_tools".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn call_tool(&self, call: McpToolCall) -> Result<McpToolResponse> {
|
||||
let result = self.send_rpc(&call.name, call.arguments.clone()).await?;
|
||||
// The remote server returns only the tool result; we fabricate metadata.
|
||||
Ok(McpToolResponse {
|
||||
name: call.name,
|
||||
success: true,
|
||||
output: result,
|
||||
metadata: std::collections::HashMap::new(),
|
||||
duration_ms: 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user