#![allow( unused_imports, unused_variables, dead_code, clippy::unnecessary_cast, clippy::manual_flatten, clippy::empty_line_after_outer_attr )] use owlen_core::Provider; use owlen_core::ProviderConfig; use owlen_core::config::{Config as OwlenConfig, ensure_provider_config}; use owlen_core::mcp::protocol::{ ErrorCode, InitializeParams, InitializeResult, PROTOCOL_VERSION, RequestId, RpcError, RpcErrorResponse, RpcNotification, RpcRequest, RpcResponse, ServerCapabilities, ServerInfo, methods, }; use owlen_core::mcp::{McpToolCall, McpToolDescriptor, McpToolResponse}; use owlen_core::providers::OllamaProvider; use owlen_core::types::{ChatParameters, ChatRequest, Message}; use serde::Deserialize; use serde_json::{Value, json}; use std::collections::HashMap; use std::env; use std::sync::Arc; use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt}; use tokio_stream::StreamExt; // Suppress warnings are handled by the crate-level attribute at the top. /// Arguments for the generate_text tool #[derive(Debug, Deserialize)] struct GenerateTextArgs { messages: Vec, temperature: Option, max_tokens: Option, model: String, stream: bool, } /// Simple tool descriptor for generate_text fn generate_text_descriptor() -> McpToolDescriptor { McpToolDescriptor { name: "generate_text".to_string(), description: "Generate text using Ollama LLM. Each message must have 'role' (user/assistant/system) and 'content' (string) fields.".to_string(), input_schema: json!({ "type": "object", "properties": { "messages": { "type": "array", "items": { "type": "object", "properties": { "role": { "type": "string", "enum": ["user", "assistant", "system"], "description": "The role of the message sender" }, "content": { "type": "string", "description": "The message content" } }, "required": ["role", "content"] }, "description": "Array of message objects with role and content" }, "temperature": {"type": ["number", "null"], "description": "Sampling temperature (0.0-2.0)"}, "max_tokens": {"type": ["integer", "null"], "description": "Maximum tokens to generate"}, "model": {"type": "string", "description": "Model name (e.g., llama3.2:latest)"}, "stream": {"type": "boolean", "description": "Whether to stream the response"} }, "required": ["messages", "model", "stream"] }), requires_network: true, requires_filesystem: vec![], } } /// Tool descriptor for resources/get (read file) fn resources_get_descriptor() -> McpToolDescriptor { McpToolDescriptor { name: "resources_get".to_string(), description: "Read and return the TEXT CONTENTS of a single FILE. Use this to read the contents of code files, config files, or text documents. Do NOT use for directories.".to_string(), input_schema: json!({ "type": "object", "properties": { "path": {"type": "string", "description": "Path to the FILE (not directory) to read"} }, "required": ["path"] }), requires_network: false, requires_filesystem: vec!["read".to_string()], } } /// Tool descriptor for resources/list (list directory) fn resources_list_descriptor() -> McpToolDescriptor { McpToolDescriptor { name: "resources_list".to_string(), description: "List the NAMES of all files and directories in a directory. Use this to see what files exist in a folder, or to list directory contents. Returns an array of file/directory names.".to_string(), input_schema: json!({ "type": "object", "properties": { "path": {"type": "string", "description": "Path to the DIRECTORY to list (use '.' for current directory)"} } }), requires_network: false, requires_filesystem: vec!["read".to_string()], } } fn provider_from_config() -> Result, RpcError> { let mut config = OwlenConfig::load(None).unwrap_or_default(); let requested_name = env::var("OWLEN_PROVIDER").unwrap_or_else(|_| config.general.default_provider.clone()); let provider_key = canonical_provider_name(&requested_name); if config.provider(&provider_key).is_none() { ensure_provider_config(&mut config, &provider_key); } let provider_cfg: ProviderConfig = config.provider(&provider_key).cloned().ok_or_else(|| { RpcError::internal_error(format!( "Provider '{provider_key}' not found in configuration" )) })?; match provider_cfg.provider_type.as_str() { "ollama" | "ollama_cloud" => { let provider = OllamaProvider::from_config(&provider_key, &provider_cfg, Some(&config.general)) .map_err(|e| { RpcError::internal_error(format!( "Failed to init Ollama provider from config: {e}" )) })?; Ok(Arc::new(provider) as Arc) } other => Err(RpcError::internal_error(format!( "Unsupported provider type '{other}' for MCP LLM server" ))), } } fn create_provider() -> Result, RpcError> { if let Ok(url) = env::var("OLLAMA_URL") { let provider = OllamaProvider::new(&url).map_err(|e| { RpcError::internal_error(format!("Failed to init Ollama provider: {e}")) })?; return Ok(Arc::new(provider) as Arc); } provider_from_config() } fn canonical_provider_name(name: &str) -> String { let normalized = name.trim().to_ascii_lowercase().replace('-', "_"); match normalized.as_str() { "" => "ollama_local".to_string(), "ollama" | "ollama_local" => "ollama_local".to_string(), "ollama_cloud" => "ollama_cloud".to_string(), other => other.to_string(), } } async fn handle_generate_text(args: GenerateTextArgs) -> Result { let provider = create_provider()?; let parameters = ChatParameters { temperature: args.temperature, max_tokens: args.max_tokens.map(|v| v as u32), stream: args.stream, extra: HashMap::new(), }; let request = ChatRequest { model: args.model, messages: args.messages, parameters, tools: None, }; // Use streaming API and collect output let mut stream = provider .stream_prompt(request) .await .map_err(|e| RpcError::internal_error(format!("Chat request failed: {}", e)))?; let mut content = String::new(); while let Some(chunk) = stream.next().await { match chunk { Ok(resp) => { content.push_str(&resp.message.content); if resp.is_final { break; } } Err(e) => { return Err(RpcError::internal_error(format!("Stream error: {}", e))); } } } Ok(content) } async fn handle_request(req: &RpcRequest) -> Result { match req.method.as_str() { methods::INITIALIZE => { let params = req .params .as_ref() .ok_or_else(|| RpcError::invalid_params("Missing params for initialize"))?; let init: InitializeParams = serde_json::from_value(params.clone()) .map_err(|e| RpcError::invalid_params(format!("Invalid init params: {}", e)))?; if !init.protocol_version.eq(PROTOCOL_VERSION) { return Err(RpcError::new( ErrorCode::INVALID_REQUEST, format!( "Incompatible protocol version. Client: {}, Server: {}", init.protocol_version, PROTOCOL_VERSION ), )); } let result = InitializeResult { protocol_version: PROTOCOL_VERSION.to_string(), server_info: ServerInfo { name: "owlen-mcp-llm-server".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), }, capabilities: ServerCapabilities { supports_tools: Some(true), supports_resources: Some(false), supports_streaming: Some(true), }, }; serde_json::to_value(result).map_err(|e| { RpcError::internal_error(format!("Failed to serialize init result: {}", e)) }) } methods::TOOLS_LIST => { let tools = vec![ generate_text_descriptor(), resources_get_descriptor(), resources_list_descriptor(), ]; Ok(json!(tools)) } // New method to list available Ollama models via the provider. methods::MODELS_LIST => { let provider = create_provider()?; let models = provider .list_models() .await .map_err(|e| RpcError::internal_error(format!("Failed to list models: {}", e)))?; serde_json::to_value(models).map_err(|e| { RpcError::internal_error(format!("Failed to serialize model list: {}", e)) }) } methods::TOOLS_CALL => { // For streaming we will send incremental notifications directly from here. // The caller (main loop) will handle writing the final response. Err(RpcError::internal_error( "TOOLS_CALL should be handled in main loop for streaming", )) } _ => Err(RpcError::method_not_found(&req.method)), } } #[tokio::main] async fn main() -> anyhow::Result<()> { let root = env::current_dir()?; // not used but kept for parity let mut stdin = io::BufReader::new(io::stdin()); let mut stdout = io::stdout(); loop { let mut line = String::new(); match stdin.read_line(&mut line).await { Ok(0) => break, Ok(_) => { let req: RpcRequest = match serde_json::from_str(&line) { Ok(r) => r, Err(e) => { let err = RpcErrorResponse::new( RequestId::Number(0), RpcError::parse_error(format!("Parse error: {}", e)), ); let s = serde_json::to_string(&err)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; let id = req.id.clone(); // Streaming tool calls (generate_text) are handled specially to emit incremental notifications. if req.method == methods::TOOLS_CALL { // Parse the tool call let params = match &req.params { Some(p) => p, None => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::invalid_params("Missing params for tool call"), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; let call: McpToolCall = match serde_json::from_value(params.clone()) { Ok(c) => c, Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::invalid_params(format!("Invalid tool call: {}", e)), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; // Dispatch based on the requested tool name. // Handle resources tools manually. if call.name.starts_with("resources_get") { let path = call .arguments .get("path") .and_then(|v| v.as_str()) .unwrap_or(""); match std::fs::read_to_string(path) { Ok(content) => { let response = McpToolResponse { name: call.name, success: true, output: json!(content), metadata: HashMap::new(), duration_ms: 0, }; let payload = match serde_json::to_value(&response) { Ok(value) => value, Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!( "Failed to serialize resource response: {}", e )), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; let final_resp = RpcResponse::new(id.clone(), payload); let s = serde_json::to_string(&final_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!("Failed to read file: {}", e)), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } } } if call.name.starts_with("resources_list") { let path = call .arguments .get("path") .and_then(|v| v.as_str()) .unwrap_or("."); match std::fs::read_dir(path) { Ok(entries) => { let mut names = Vec::new(); for entry in entries.flatten() { if let Some(name) = entry.file_name().to_str() { names.push(name.to_string()); } } let response = McpToolResponse { name: call.name, success: true, output: json!(names), metadata: HashMap::new(), duration_ms: 0, }; let payload = match serde_json::to_value(&response) { Ok(value) => value, Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!( "Failed to serialize directory listing: {}", e )), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; let final_resp = RpcResponse::new(id.clone(), payload); let s = serde_json::to_string(&final_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!("Failed to list dir: {}", e)), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } } } // Expect generate_text tool for the remaining path. if call.name != "generate_text" { let err_resp = RpcErrorResponse::new(id.clone(), RpcError::tool_not_found(&call.name)); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } let args: GenerateTextArgs = match serde_json::from_value(call.arguments.clone()) { Ok(a) => a, Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::invalid_params(format!("Invalid arguments: {}", e)), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; // Initialize provider and start streaming let provider = match create_provider() { Ok(p) => p, Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!( "Failed to initialize provider: {:?}", e )), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; let parameters = ChatParameters { temperature: args.temperature, max_tokens: args.max_tokens.map(|v| v as u32), stream: true, extra: HashMap::new(), }; let request = ChatRequest { model: args.model, messages: args.messages, parameters, tools: None, }; let mut stream = match provider.stream_prompt(request).await { Ok(s) => s, Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!("Chat request failed: {}", e)), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; // Accumulate full content while sending incremental progress notifications let mut final_content = String::new(); while let Some(chunk) = stream.next().await { match chunk { Ok(resp) => { // Append chunk to the final content buffer final_content.push_str(&resp.message.content); // Emit a progress notification for the UI let notif = RpcNotification::new( "tools/call/progress", Some(json!({ "content": resp.message.content })), ); let s = serde_json::to_string(¬if)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; if resp.is_final { break; } } Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!("Stream error: {}", e)), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; break; } } } // After streaming, send the final tool response containing the full content let final_output = final_content.clone(); let response = McpToolResponse { name: call.name, success: true, output: json!(final_output), metadata: HashMap::new(), duration_ms: 0, }; let payload = match serde_json::to_value(&response) { Ok(value) => value, Err(e) => { let err_resp = RpcErrorResponse::new( id.clone(), RpcError::internal_error(format!( "Failed to serialize final streaming response: {}", e )), ); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } }; let final_resp = RpcResponse::new(id.clone(), payload); let s = serde_json::to_string(&final_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; continue; } // Non‑streaming requests are handled by the generic handler match handle_request(&req).await { Ok(res) => { let resp = RpcResponse::new(id, res); let s = serde_json::to_string(&resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; } Err(err) => { let err_resp = RpcErrorResponse::new(id, err); let s = serde_json::to_string(&err_resp)?; stdout.write_all(s.as_bytes()).await?; stdout.write_all(b"\n").await?; stdout.flush().await?; } } } Err(e) => { eprintln!("Read error: {}", e); break; } } } Ok(()) }