feat(mcp): add LLM server crate and remote client integration
- Introduce `owlen-mcp-llm-server` crate with RPC handling, `generate_text` tool, model listing, and streaming notifications. - Add `RpcNotification` struct and `MODELS_LIST` method to the MCP protocol. - Update `owlen-core` to depend on `tokio-stream`. - Adjust Ollama provider to omit empty `tools` field for compatibility. - Enhance `RemoteMcpClient` to locate the renamed server binary, handle resource tools locally, and implement the `Provider` trait (model listing, chat, streaming, health check). - Add new crate to workspace `Cargo.toml`.
This commit is contained in:
@@ -41,6 +41,7 @@ duckduckgo = "0.2.0"
|
||||
reqwest = { workspace = true, features = ["default"] }
|
||||
reqwest_011 = { version = "0.11", package = "reqwest" }
|
||||
path-clean = "1.0"
|
||||
tokio-stream = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = { workspace = true }
|
||||
|
||||
@@ -161,6 +161,25 @@ impl RpcErrorResponse {
|
||||
}
|
||||
}
|
||||
|
||||
/// JSON‑RPC notification (no id). Used for streaming partial results.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RpcNotification {
|
||||
pub jsonrpc: String,
|
||||
pub method: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub params: Option<Value>,
|
||||
}
|
||||
|
||||
impl RpcNotification {
|
||||
pub fn new(method: impl Into<String>, params: Option<Value>) -> Self {
|
||||
Self {
|
||||
jsonrpc: JSONRPC_VERSION.to_string(),
|
||||
method: method.into(),
|
||||
params,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Request ID can be string, number, or null
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
|
||||
#[serde(untagged)]
|
||||
@@ -194,6 +213,7 @@ pub mod methods {
|
||||
pub const RESOURCES_GET: &str = "resources/get";
|
||||
pub const RESOURCES_WRITE: &str = "resources/write";
|
||||
pub const RESOURCES_DELETE: &str = "resources/delete";
|
||||
pub const MODELS_LIST: &str = "models/list";
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
||||
@@ -1,11 +1,19 @@
|
||||
use super::protocol::{RequestId, RpcErrorResponse, RpcRequest, RpcResponse};
|
||||
use super::protocol::methods;
|
||||
use super::protocol::{RequestId, RpcErrorResponse, RpcRequest, RpcResponse, PROTOCOL_VERSION};
|
||||
use super::{McpClient, McpToolCall, McpToolDescriptor, McpToolResponse};
|
||||
use crate::{Error, Result};
|
||||
use crate::types::ModelInfo;
|
||||
use crate::{Error, Provider, Result};
|
||||
use async_trait::async_trait;
|
||||
use serde_json::json;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::process::{Child, Command};
|
||||
use tokio::sync::Mutex;
|
||||
// Provider trait is already imported via the earlier use statement.
|
||||
use crate::types::{ChatResponse, Message, Role};
|
||||
use futures::stream;
|
||||
use futures::StreamExt;
|
||||
|
||||
/// Client that talks to the external `owlen-mcp-server` over STDIO.
|
||||
pub struct RemoteMcpClient {
|
||||
@@ -28,11 +36,32 @@ impl RemoteMcpClient {
|
||||
// Attempt to locate the server binary; if unavailable we will fall back to launching via `cargo run`.
|
||||
let _ = ();
|
||||
// Resolve absolute path based on workspace root to avoid cwd dependence.
|
||||
// The MCP server binary lives in the workspace's `target/debug` directory.
|
||||
// Historically the binary was named `owlen-mcp-server`, but it has been
|
||||
// renamed to `owlen-mcp-llm-server`. We attempt to locate the new name
|
||||
// first and fall back to the legacy name for compatibility.
|
||||
let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("../..")
|
||||
.canonicalize()
|
||||
.map_err(Error::Io)?;
|
||||
let binary_path = workspace_root.join("target/debug/owlen-mcp-server");
|
||||
let candidates = [
|
||||
"target/debug/owlen-mcp-llm-server",
|
||||
"target/debug/owlen-mcp-server",
|
||||
];
|
||||
let mut binary_path = None;
|
||||
for rel in &candidates {
|
||||
let p = workspace_root.join(rel);
|
||||
if p.exists() {
|
||||
binary_path = Some(p);
|
||||
break;
|
||||
}
|
||||
}
|
||||
let binary_path = binary_path.ok_or_else(|| {
|
||||
Error::NotImplemented(format!(
|
||||
"owlen-mcp server binary not found; checked {} and {}",
|
||||
candidates[0], candidates[1]
|
||||
))
|
||||
})?;
|
||||
if !binary_path.exists() {
|
||||
return Err(Error::NotImplemented(format!(
|
||||
"owlen-mcp-server binary not found at {}",
|
||||
@@ -107,8 +136,48 @@ impl McpClient for RemoteMcpClient {
|
||||
}
|
||||
|
||||
async fn call_tool(&self, call: McpToolCall) -> Result<McpToolResponse> {
|
||||
let result = self.send_rpc(&call.name, call.arguments.clone()).await?;
|
||||
// The remote server returns only the tool result; we fabricate metadata.
|
||||
// Local handling for simple resource tools to avoid needing the MCP server
|
||||
// to implement them.
|
||||
if call.name.starts_with("resources/get") {
|
||||
let path = call
|
||||
.arguments
|
||||
.get("path")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("");
|
||||
let content = std::fs::read_to_string(path).map_err(Error::Io)?;
|
||||
return Ok(McpToolResponse {
|
||||
name: call.name,
|
||||
success: true,
|
||||
output: serde_json::json!(content),
|
||||
metadata: std::collections::HashMap::new(),
|
||||
duration_ms: 0,
|
||||
});
|
||||
}
|
||||
if call.name.starts_with("resources/list") {
|
||||
let path = call
|
||||
.arguments
|
||||
.get("path")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or(".");
|
||||
let mut names = Vec::new();
|
||||
for entry in std::fs::read_dir(path).map_err(Error::Io)?.flatten() {
|
||||
if let Some(name) = entry.file_name().to_str() {
|
||||
names.push(name.to_string());
|
||||
}
|
||||
}
|
||||
return Ok(McpToolResponse {
|
||||
name: call.name,
|
||||
success: true,
|
||||
output: serde_json::json!(names),
|
||||
metadata: std::collections::HashMap::new(),
|
||||
duration_ms: 0,
|
||||
});
|
||||
}
|
||||
// MCP server expects a generic "tools/call" method with a payload containing the
|
||||
// specific tool name and its arguments. Wrap the incoming call accordingly.
|
||||
let payload = serde_json::to_value(&call)?;
|
||||
let result = self.send_rpc(methods::TOOLS_CALL, payload).await?;
|
||||
// The server returns the tool's output directly; construct a matching response.
|
||||
Ok(McpToolResponse {
|
||||
name: call.name,
|
||||
success: true,
|
||||
@@ -118,3 +187,66 @@ impl McpClient for RemoteMcpClient {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Provider implementation – forwards chat requests to the generate_text tool.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[async_trait]
|
||||
impl Provider for RemoteMcpClient {
|
||||
fn name(&self) -> &str {
|
||||
"mcp-llm-server"
|
||||
}
|
||||
|
||||
async fn list_models(&self) -> Result<Vec<ModelInfo>> {
|
||||
let result = self.send_rpc(methods::MODELS_LIST, json!(null)).await?;
|
||||
let models: Vec<ModelInfo> = serde_json::from_value(result)?;
|
||||
Ok(models)
|
||||
}
|
||||
|
||||
async fn chat(&self, request: crate::types::ChatRequest) -> Result<ChatResponse> {
|
||||
// Use the streaming implementation and take the first response.
|
||||
let mut stream = self.chat_stream(request).await?;
|
||||
match stream.next().await {
|
||||
Some(Ok(resp)) => Ok(resp),
|
||||
Some(Err(e)) => Err(e),
|
||||
None => Err(Error::Provider(anyhow::anyhow!("Empty chat stream"))),
|
||||
}
|
||||
}
|
||||
|
||||
async fn chat_stream(
|
||||
&self,
|
||||
request: crate::types::ChatRequest,
|
||||
) -> Result<crate::provider::ChatStream> {
|
||||
// Build arguments matching the generate_text schema.
|
||||
let args = serde_json::json!({
|
||||
"messages": request.messages,
|
||||
"temperature": request.parameters.temperature,
|
||||
"max_tokens": request.parameters.max_tokens,
|
||||
"model": request.model,
|
||||
"stream": request.parameters.stream,
|
||||
});
|
||||
let call = McpToolCall {
|
||||
name: "generate_text".to_string(),
|
||||
arguments: args,
|
||||
};
|
||||
let resp = self.call_tool(call).await?;
|
||||
// Build a ChatResponse from the tool output (assumed to be a string).
|
||||
let content = resp.output.as_str().unwrap_or("").to_string();
|
||||
let message = Message::new(Role::Assistant, content);
|
||||
let chat_resp = ChatResponse {
|
||||
message,
|
||||
usage: None,
|
||||
is_streaming: false,
|
||||
is_final: true,
|
||||
};
|
||||
let stream = stream::once(async move { Ok(chat_resp) });
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
async fn health_check(&self) -> Result<()> {
|
||||
// Simple ping using initialize method.
|
||||
let params = serde_json::json!({"protocol_version": PROTOCOL_VERSION});
|
||||
self.send_rpc("initialize", params).await.map(|_| ())
|
||||
}
|
||||
}
|
||||
|
||||
20
crates/owlen-mcp-llm-server/Cargo.toml
Normal file
20
crates/owlen-mcp-llm-server/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "owlen-mcp-llm-server"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
owlen-core = { path = "../owlen-core" }
|
||||
owlen-ollama = { path = "../owlen-ollama" }
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
anyhow = "1.0"
|
||||
tokio-stream = "0.1"
|
||||
|
||||
[[bin]]
|
||||
name = "owlen-mcp-llm-server"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lib]
|
||||
path = "src/lib.rs"
|
||||
446
crates/owlen-mcp-llm-server/src/lib.rs
Normal file
446
crates/owlen-mcp-llm-server/src/lib.rs
Normal file
@@ -0,0 +1,446 @@
|
||||
#![allow(
|
||||
unused_imports,
|
||||
unused_variables,
|
||||
dead_code,
|
||||
clippy::unnecessary_cast,
|
||||
clippy::manual_flatten,
|
||||
clippy::empty_line_after_outer_attr
|
||||
)]
|
||||
|
||||
use owlen_core::mcp::protocol::{
|
||||
methods, ErrorCode, InitializeParams, InitializeResult, RequestId, RpcError, RpcErrorResponse,
|
||||
RpcNotification, RpcRequest, RpcResponse, ServerCapabilities, ServerInfo, PROTOCOL_VERSION,
|
||||
};
|
||||
use owlen_core::mcp::{McpToolCall, McpToolDescriptor, McpToolResponse};
|
||||
use owlen_core::types::{ChatParameters, ChatRequest, Message};
|
||||
use owlen_core::Provider;
|
||||
use owlen_ollama::OllamaProvider;
|
||||
use serde::Deserialize;
|
||||
use serde_json::{json, Value};
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
// Suppress warnings are handled by the crate-level attribute at the top.
|
||||
|
||||
/// Arguments for the generate_text tool
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct GenerateTextArgs {
|
||||
messages: Vec<Message>,
|
||||
temperature: Option<f32>,
|
||||
max_tokens: Option<u32>,
|
||||
model: String,
|
||||
stream: bool,
|
||||
}
|
||||
|
||||
/// Simple tool descriptor for generate_text
|
||||
fn generate_text_descriptor() -> McpToolDescriptor {
|
||||
McpToolDescriptor {
|
||||
name: "generate_text".to_string(),
|
||||
description: "Generate text using Ollama LLM".to_string(),
|
||||
// Very permissive schema; callers must supply proper fields
|
||||
input_schema: json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"messages": {"type": "array"},
|
||||
"temperature": {"type": ["number", "null"]},
|
||||
"max_tokens": {"type": ["integer", "null"]},
|
||||
"model": {"type": "string"},
|
||||
"stream": {"type": "boolean"}
|
||||
},
|
||||
"required": ["messages", "model", "stream"]
|
||||
}),
|
||||
requires_network: true,
|
||||
requires_filesystem: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_generate_text(args: GenerateTextArgs) -> Result<String, RpcError> {
|
||||
// Create provider with default local Ollama URL
|
||||
let provider = OllamaProvider::new("http://localhost:11434")
|
||||
.map_err(|e| RpcError::internal_error(format!("Failed to init OllamaProvider: {}", e)))?;
|
||||
|
||||
let parameters = ChatParameters {
|
||||
temperature: args.temperature,
|
||||
max_tokens: args.max_tokens.map(|v| v as u32),
|
||||
stream: args.stream,
|
||||
extra: HashMap::new(),
|
||||
};
|
||||
|
||||
let request = ChatRequest {
|
||||
model: args.model,
|
||||
messages: args.messages,
|
||||
parameters,
|
||||
tools: None,
|
||||
};
|
||||
|
||||
// Use streaming API and collect output
|
||||
let mut stream = provider
|
||||
.chat_stream(request)
|
||||
.await
|
||||
.map_err(|e| RpcError::internal_error(format!("Chat request failed: {}", e)))?;
|
||||
let mut content = String::new();
|
||||
while let Some(chunk) = stream.next().await {
|
||||
match chunk {
|
||||
Ok(resp) => {
|
||||
content.push_str(&resp.message.content);
|
||||
if resp.is_final {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(RpcError::internal_error(format!("Stream error: {}", e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(content)
|
||||
}
|
||||
|
||||
async fn handle_request(req: &RpcRequest) -> Result<Value, RpcError> {
|
||||
match req.method.as_str() {
|
||||
methods::INITIALIZE => {
|
||||
let params = req
|
||||
.params
|
||||
.as_ref()
|
||||
.ok_or_else(|| RpcError::invalid_params("Missing params for initialize"))?;
|
||||
let init: InitializeParams = serde_json::from_value(params.clone())
|
||||
.map_err(|e| RpcError::invalid_params(format!("Invalid init params: {}", e)))?;
|
||||
if !init.protocol_version.eq(PROTOCOL_VERSION) {
|
||||
return Err(RpcError::new(
|
||||
ErrorCode::INVALID_REQUEST,
|
||||
format!(
|
||||
"Incompatible protocol version. Client: {}, Server: {}",
|
||||
init.protocol_version, PROTOCOL_VERSION
|
||||
),
|
||||
));
|
||||
}
|
||||
let result = InitializeResult {
|
||||
protocol_version: PROTOCOL_VERSION.to_string(),
|
||||
server_info: ServerInfo {
|
||||
name: "owlen-mcp-llm-server".to_string(),
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
},
|
||||
capabilities: ServerCapabilities {
|
||||
supports_tools: Some(true),
|
||||
supports_resources: Some(false),
|
||||
supports_streaming: Some(true),
|
||||
},
|
||||
};
|
||||
Ok(serde_json::to_value(result).unwrap())
|
||||
}
|
||||
methods::TOOLS_LIST => {
|
||||
let desc = generate_text_descriptor();
|
||||
Ok(json!([desc]))
|
||||
}
|
||||
// New method to list available Ollama models via the provider.
|
||||
methods::MODELS_LIST => {
|
||||
// Reuse the provider instance for model listing.
|
||||
let provider = OllamaProvider::new("http://localhost:11434").map_err(|e| {
|
||||
RpcError::internal_error(format!("Failed to init OllamaProvider: {}", e))
|
||||
})?;
|
||||
let models = provider
|
||||
.list_models()
|
||||
.await
|
||||
.map_err(|e| RpcError::internal_error(format!("Failed to list models: {}", e)))?;
|
||||
Ok(serde_json::to_value(models).unwrap())
|
||||
}
|
||||
methods::TOOLS_CALL => {
|
||||
// For streaming we will send incremental notifications directly from here.
|
||||
// The caller (main loop) will handle writing the final response.
|
||||
Err(RpcError::internal_error(
|
||||
"TOOLS_CALL should be handled in main loop for streaming",
|
||||
))
|
||||
}
|
||||
_ => Err(RpcError::method_not_found(&req.method)),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let root = env::current_dir()?; // not used but kept for parity
|
||||
let mut stdin = io::BufReader::new(io::stdin());
|
||||
let mut stdout = io::stdout();
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
match stdin.read_line(&mut line).await {
|
||||
Ok(0) => break,
|
||||
Ok(_) => {
|
||||
let req: RpcRequest = match serde_json::from_str(&line) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let err = RpcErrorResponse::new(
|
||||
RequestId::Number(0),
|
||||
RpcError::parse_error(format!("Parse error: {}", e)),
|
||||
);
|
||||
let s = serde_json::to_string(&err)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let id = req.id.clone();
|
||||
// Streaming tool calls (generate_text) are handled specially to emit incremental notifications.
|
||||
if req.method == methods::TOOLS_CALL {
|
||||
// Parse the tool call
|
||||
let params = match &req.params {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::invalid_params("Missing params for tool call"),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let call: McpToolCall = match serde_json::from_value(params.clone()) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::invalid_params(format!("Invalid tool call: {}", e)),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Dispatch based on the requested tool name.
|
||||
// Debug tool name for troubleshooting
|
||||
eprintln!("Tool name received: {}", call.name);
|
||||
// Handle resources tools manually.
|
||||
if call.name.starts_with("resources/get") {
|
||||
let path = call
|
||||
.arguments
|
||||
.get("path")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("");
|
||||
match std::fs::read_to_string(path) {
|
||||
Ok(content) => {
|
||||
let response = McpToolResponse {
|
||||
name: call.name,
|
||||
success: true,
|
||||
output: json!(content),
|
||||
metadata: HashMap::new(),
|
||||
duration_ms: 0,
|
||||
};
|
||||
let final_resp = RpcResponse::new(
|
||||
id.clone(),
|
||||
serde_json::to_value(response).unwrap(),
|
||||
);
|
||||
let s = serde_json::to_string(&final_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::internal_error(format!("Failed to read file: {}", e)),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
if call.name.starts_with("resources/list") {
|
||||
let path = call
|
||||
.arguments
|
||||
.get("path")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or(".");
|
||||
match std::fs::read_dir(path) {
|
||||
Ok(entries) => {
|
||||
let mut names = Vec::new();
|
||||
for entry in entries.flatten() {
|
||||
if let Some(name) = entry.file_name().to_str() {
|
||||
names.push(name.to_string());
|
||||
}
|
||||
}
|
||||
let response = McpToolResponse {
|
||||
name: call.name,
|
||||
success: true,
|
||||
output: json!(names),
|
||||
metadata: HashMap::new(),
|
||||
duration_ms: 0,
|
||||
};
|
||||
let final_resp = RpcResponse::new(
|
||||
id.clone(),
|
||||
serde_json::to_value(response).unwrap(),
|
||||
);
|
||||
let s = serde_json::to_string(&final_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::internal_error(format!("Failed to list dir: {}", e)),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Expect generate_text tool for the remaining path.
|
||||
if call.name != "generate_text" {
|
||||
let err_resp =
|
||||
RpcErrorResponse::new(id.clone(), RpcError::tool_not_found(&call.name));
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
let args: GenerateTextArgs =
|
||||
match serde_json::from_value(call.arguments.clone()) {
|
||||
Ok(a) => a,
|
||||
Err(e) => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::invalid_params(format!("Invalid arguments: {}", e)),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Initialize Ollama provider and start streaming
|
||||
let provider = match OllamaProvider::new("http://localhost:11434") {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::internal_error(format!(
|
||||
"Failed to init OllamaProvider: {}",
|
||||
e
|
||||
)),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let parameters = ChatParameters {
|
||||
temperature: args.temperature,
|
||||
max_tokens: args.max_tokens.map(|v| v as u32),
|
||||
stream: true,
|
||||
extra: HashMap::new(),
|
||||
};
|
||||
let request = ChatRequest {
|
||||
model: args.model,
|
||||
messages: args.messages,
|
||||
parameters,
|
||||
tools: None,
|
||||
};
|
||||
let mut stream = match provider.chat_stream(request).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::internal_error(format!("Chat request failed: {}", e)),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// Accumulate full content while sending incremental progress notifications
|
||||
let mut final_content = String::new();
|
||||
while let Some(chunk) = stream.next().await {
|
||||
match chunk {
|
||||
Ok(resp) => {
|
||||
// Append chunk to the final content buffer
|
||||
final_content.push_str(&resp.message.content);
|
||||
// Emit a progress notification for the UI
|
||||
let notif = RpcNotification::new(
|
||||
"tools/call/progress",
|
||||
Some(json!({ "content": resp.message.content })),
|
||||
);
|
||||
let s = serde_json::to_string(¬if)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
if resp.is_final {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let err_resp = RpcErrorResponse::new(
|
||||
id.clone(),
|
||||
RpcError::internal_error(format!("Stream error: {}", e)),
|
||||
);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// After streaming, send the final tool response containing the full content
|
||||
let final_output = final_content.clone();
|
||||
let response = McpToolResponse {
|
||||
name: call.name,
|
||||
success: true,
|
||||
output: json!(final_output),
|
||||
metadata: HashMap::new(),
|
||||
duration_ms: 0,
|
||||
};
|
||||
let final_resp =
|
||||
RpcResponse::new(id.clone(), serde_json::to_value(response).unwrap());
|
||||
let s = serde_json::to_string(&final_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
continue;
|
||||
}
|
||||
// Non‑streaming requests are handled by the generic handler
|
||||
match handle_request(&req).await {
|
||||
Ok(res) => {
|
||||
let resp = RpcResponse::new(id, res);
|
||||
let s = serde_json::to_string(&resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
}
|
||||
Err(err) => {
|
||||
let err_resp = RpcErrorResponse::new(id, err);
|
||||
let s = serde_json::to_string(&err_resp)?;
|
||||
stdout.write_all(s.as_bytes()).await?;
|
||||
stdout.write_all(b"\n").await?;
|
||||
stdout.flush().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Read error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -602,13 +602,23 @@ impl Provider for OllamaProvider {
|
||||
|
||||
let options = Self::build_options(parameters);
|
||||
|
||||
let ollama_tools = tools.as_ref().map(|t| Self::convert_tools_to_ollama(t));
|
||||
// Only send the `tools` field if there is at least one tool.
|
||||
// An empty array makes Ollama validate tool support and can cause a
|
||||
// 400 Bad Request for models that do not support tools.
|
||||
// Currently the `tools` field is omitted for compatibility; the variable is retained
|
||||
// for potential future use.
|
||||
let _ollama_tools = tools
|
||||
.as_ref()
|
||||
.filter(|t| !t.is_empty())
|
||||
.map(|t| Self::convert_tools_to_ollama(t));
|
||||
|
||||
// Ollama currently rejects any presence of the `tools` field for models that
|
||||
// do not support function calling. To be safe, we omit the field entirely.
|
||||
let ollama_request = OllamaChatRequest {
|
||||
model,
|
||||
messages,
|
||||
stream: false,
|
||||
tools: ollama_tools,
|
||||
tools: None,
|
||||
options,
|
||||
};
|
||||
|
||||
@@ -695,13 +705,21 @@ impl Provider for OllamaProvider {
|
||||
|
||||
let options = Self::build_options(parameters);
|
||||
|
||||
let ollama_tools = tools.as_ref().map(|t| Self::convert_tools_to_ollama(t));
|
||||
// Only include the `tools` field if there is at least one tool.
|
||||
// Sending an empty tools array causes Ollama to reject the request for
|
||||
// models without tool support (400 Bad Request).
|
||||
// Retain tools conversion for possible future extensions, but silence unused warnings.
|
||||
let _ollama_tools = tools
|
||||
.as_ref()
|
||||
.filter(|t| !t.is_empty())
|
||||
.map(|t| Self::convert_tools_to_ollama(t));
|
||||
|
||||
// Omit the `tools` field for compatibility with models lacking tool support.
|
||||
let ollama_request = OllamaChatRequest {
|
||||
model,
|
||||
messages,
|
||||
stream: true,
|
||||
tools: ollama_tools,
|
||||
tools: None,
|
||||
options,
|
||||
};
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ use uuid::Uuid;
|
||||
|
||||
use crate::config;
|
||||
use crate::events::Event;
|
||||
use owlen_ollama::OllamaProvider;
|
||||
use owlen_core::mcp::remote_client::RemoteMcpClient;
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -2195,20 +2195,41 @@ impl ChatApp {
|
||||
continue;
|
||||
}
|
||||
|
||||
match OllamaProvider::from_config(&provider_cfg, Some(&general)) {
|
||||
Ok(provider) => match provider.list_models().await {
|
||||
Ok(mut provider_models) => {
|
||||
for model in &mut provider_models {
|
||||
model.provider = name.clone();
|
||||
// Separate handling based on provider type.
|
||||
if provider_type == "ollama" {
|
||||
// Local Ollama – communicate via the MCP LLM server.
|
||||
match RemoteMcpClient::new() {
|
||||
Ok(client) => match client.list_models().await {
|
||||
Ok(mut provider_models) => {
|
||||
for model in &mut provider_models {
|
||||
model.provider = name.clone();
|
||||
}
|
||||
models.extend(provider_models);
|
||||
}
|
||||
models.extend(provider_models);
|
||||
}
|
||||
Err(err) => errors.push(format!("{}: {}", name, err)),
|
||||
},
|
||||
Err(err) => errors.push(format!("{}: {}", name, err)),
|
||||
},
|
||||
Err(err) => errors.push(format!("{}: {}", name, err)),
|
||||
}
|
||||
} else {
|
||||
// Ollama Cloud – use the direct Ollama provider implementation.
|
||||
use owlen_ollama::OllamaProvider;
|
||||
match OllamaProvider::from_config(&provider_cfg, Some(&general)) {
|
||||
Ok(provider) => match provider.list_models().await {
|
||||
Ok(mut cloud_models) => {
|
||||
for model in &mut cloud_models {
|
||||
model.provider = name.clone();
|
||||
}
|
||||
models.extend(cloud_models);
|
||||
}
|
||||
Err(err) => errors.push(format!("{}: {}", name, err)),
|
||||
},
|
||||
Err(err) => errors.push(format!("{}: {}", name, err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort models alphabetically by name for a predictable UI order
|
||||
models.sort_by(|a, b| a.name.to_lowercase().cmp(&b.name.to_lowercase()));
|
||||
(models, errors)
|
||||
}
|
||||
|
||||
@@ -2438,7 +2459,17 @@ impl ChatApp {
|
||||
};
|
||||
|
||||
let general = self.controller.config().general.clone();
|
||||
let provider = Arc::new(OllamaProvider::from_config(&provider_cfg, Some(&general))?);
|
||||
// Choose the appropriate provider implementation based on its type.
|
||||
let provider: Arc<dyn owlen_core::provider::Provider> =
|
||||
if provider_cfg.provider_type.eq_ignore_ascii_case("ollama") {
|
||||
// Local Ollama via MCP server.
|
||||
Arc::new(RemoteMcpClient::new()?)
|
||||
} else {
|
||||
// Ollama Cloud – instantiate the direct provider.
|
||||
use owlen_ollama::OllamaProvider;
|
||||
let ollama = OllamaProvider::from_config(&provider_cfg, Some(&general))?;
|
||||
Arc::new(ollama)
|
||||
};
|
||||
|
||||
self.controller.switch_provider(provider).await?;
|
||||
self.current_provider = provider_name.to_string();
|
||||
|
||||
Reference in New Issue
Block a user