refactor(workspace)!: move MCP crates under crates/mcp/ and update paths

This commit is contained in:
2025-10-17 00:31:35 +02:00
parent 3271697f6b
commit d4030dc598
14 changed files with 10 additions and 10 deletions

View File

@@ -0,0 +1,597 @@
#![allow(
unused_imports,
unused_variables,
dead_code,
clippy::unnecessary_cast,
clippy::manual_flatten,
clippy::empty_line_after_outer_attr
)]
use owlen_core::Provider;
use owlen_core::ProviderConfig;
use owlen_core::config::{Config as OwlenConfig, ensure_provider_config};
use owlen_core::mcp::protocol::{
ErrorCode, InitializeParams, InitializeResult, PROTOCOL_VERSION, RequestId, RpcError,
RpcErrorResponse, RpcNotification, RpcRequest, RpcResponse, ServerCapabilities, ServerInfo,
methods,
};
use owlen_core::mcp::{McpToolCall, McpToolDescriptor, McpToolResponse};
use owlen_core::providers::OllamaProvider;
use owlen_core::types::{ChatParameters, ChatRequest, Message};
use serde::Deserialize;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt};
use tokio_stream::StreamExt;
// Suppress warnings are handled by the crate-level attribute at the top.
/// Arguments for the generate_text tool
#[derive(Debug, Deserialize)]
struct GenerateTextArgs {
messages: Vec<Message>,
temperature: Option<f32>,
max_tokens: Option<u32>,
model: String,
stream: bool,
}
/// Simple tool descriptor for generate_text
fn generate_text_descriptor() -> McpToolDescriptor {
McpToolDescriptor {
name: "generate_text".to_string(),
description: "Generate text using Ollama LLM. Each message must have 'role' (user/assistant/system) and 'content' (string) fields.".to_string(),
input_schema: json!({
"type": "object",
"properties": {
"messages": {
"type": "array",
"items": {
"type": "object",
"properties": {
"role": {
"type": "string",
"enum": ["user", "assistant", "system"],
"description": "The role of the message sender"
},
"content": {
"type": "string",
"description": "The message content"
}
},
"required": ["role", "content"]
},
"description": "Array of message objects with role and content"
},
"temperature": {"type": ["number", "null"], "description": "Sampling temperature (0.0-2.0)"},
"max_tokens": {"type": ["integer", "null"], "description": "Maximum tokens to generate"},
"model": {"type": "string", "description": "Model name (e.g., llama3.2:latest)"},
"stream": {"type": "boolean", "description": "Whether to stream the response"}
},
"required": ["messages", "model", "stream"]
}),
requires_network: true,
requires_filesystem: vec![],
}
}
/// Tool descriptor for resources/get (read file)
fn resources_get_descriptor() -> McpToolDescriptor {
McpToolDescriptor {
name: "resources/get".to_string(),
description: "Read and return the TEXT CONTENTS of a single FILE. Use this to read the contents of code files, config files, or text documents. Do NOT use for directories.".to_string(),
input_schema: json!({
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the FILE (not directory) to read"}
},
"required": ["path"]
}),
requires_network: false,
requires_filesystem: vec!["read".to_string()],
}
}
/// Tool descriptor for resources/list (list directory)
fn resources_list_descriptor() -> McpToolDescriptor {
McpToolDescriptor {
name: "resources/list".to_string(),
description: "List the NAMES of all files and directories in a directory. Use this to see what files exist in a folder, or to list directory contents. Returns an array of file/directory names.".to_string(),
input_schema: json!({
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the DIRECTORY to list (use '.' for current directory)"}
}
}),
requires_network: false,
requires_filesystem: vec!["read".to_string()],
}
}
fn provider_from_config() -> Result<Arc<dyn Provider>, RpcError> {
let mut config = OwlenConfig::load(None).unwrap_or_default();
let requested_name =
env::var("OWLEN_PROVIDER").unwrap_or_else(|_| config.general.default_provider.clone());
let provider_key = canonical_provider_name(&requested_name);
if config.provider(&provider_key).is_none() {
ensure_provider_config(&mut config, &provider_key);
}
let provider_cfg: ProviderConfig =
config.provider(&provider_key).cloned().ok_or_else(|| {
RpcError::internal_error(format!(
"Provider '{provider_key}' not found in configuration"
))
})?;
match provider_cfg.provider_type.as_str() {
"ollama" | "ollama_cloud" => {
let provider = OllamaProvider::from_config(&provider_cfg, Some(&config.general))
.map_err(|e| {
RpcError::internal_error(format!(
"Failed to init Ollama provider from config: {e}"
))
})?;
Ok(Arc::new(provider) as Arc<dyn Provider>)
}
other => Err(RpcError::internal_error(format!(
"Unsupported provider type '{other}' for MCP LLM server"
))),
}
}
fn create_provider() -> Result<Arc<dyn Provider>, RpcError> {
if let Ok(url) = env::var("OLLAMA_URL") {
let provider = OllamaProvider::new(&url).map_err(|e| {
RpcError::internal_error(format!("Failed to init Ollama provider: {e}"))
})?;
return Ok(Arc::new(provider) as Arc<dyn Provider>);
}
provider_from_config()
}
fn canonical_provider_name(name: &str) -> String {
let normalized = name.trim().to_ascii_lowercase().replace('-', "_");
match normalized.as_str() {
"" => "ollama_local".to_string(),
"ollama" | "ollama_local" => "ollama_local".to_string(),
"ollama_cloud" => "ollama_cloud".to_string(),
other => other.to_string(),
}
}
async fn handle_generate_text(args: GenerateTextArgs) -> Result<String, RpcError> {
let provider = create_provider()?;
let parameters = ChatParameters {
temperature: args.temperature,
max_tokens: args.max_tokens.map(|v| v as u32),
stream: args.stream,
extra: HashMap::new(),
};
let request = ChatRequest {
model: args.model,
messages: args.messages,
parameters,
tools: None,
};
// Use streaming API and collect output
let mut stream = provider
.stream_prompt(request)
.await
.map_err(|e| RpcError::internal_error(format!("Chat request failed: {}", e)))?;
let mut content = String::new();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(resp) => {
content.push_str(&resp.message.content);
if resp.is_final {
break;
}
}
Err(e) => {
return Err(RpcError::internal_error(format!("Stream error: {}", e)));
}
}
}
Ok(content)
}
async fn handle_request(req: &RpcRequest) -> Result<Value, RpcError> {
match req.method.as_str() {
methods::INITIALIZE => {
let params = req
.params
.as_ref()
.ok_or_else(|| RpcError::invalid_params("Missing params for initialize"))?;
let init: InitializeParams = serde_json::from_value(params.clone())
.map_err(|e| RpcError::invalid_params(format!("Invalid init params: {}", e)))?;
if !init.protocol_version.eq(PROTOCOL_VERSION) {
return Err(RpcError::new(
ErrorCode::INVALID_REQUEST,
format!(
"Incompatible protocol version. Client: {}, Server: {}",
init.protocol_version, PROTOCOL_VERSION
),
));
}
let result = InitializeResult {
protocol_version: PROTOCOL_VERSION.to_string(),
server_info: ServerInfo {
name: "owlen-mcp-llm-server".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
capabilities: ServerCapabilities {
supports_tools: Some(true),
supports_resources: Some(false),
supports_streaming: Some(true),
},
};
serde_json::to_value(result).map_err(|e| {
RpcError::internal_error(format!("Failed to serialize init result: {}", e))
})
}
methods::TOOLS_LIST => {
let tools = vec![
generate_text_descriptor(),
resources_get_descriptor(),
resources_list_descriptor(),
];
Ok(json!(tools))
}
// New method to list available Ollama models via the provider.
methods::MODELS_LIST => {
let provider = create_provider()?;
let models = provider
.list_models()
.await
.map_err(|e| RpcError::internal_error(format!("Failed to list models: {}", e)))?;
serde_json::to_value(models).map_err(|e| {
RpcError::internal_error(format!("Failed to serialize model list: {}", e))
})
}
methods::TOOLS_CALL => {
// For streaming we will send incremental notifications directly from here.
// The caller (main loop) will handle writing the final response.
Err(RpcError::internal_error(
"TOOLS_CALL should be handled in main loop for streaming",
))
}
_ => Err(RpcError::method_not_found(&req.method)),
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let root = env::current_dir()?; // not used but kept for parity
let mut stdin = io::BufReader::new(io::stdin());
let mut stdout = io::stdout();
loop {
let mut line = String::new();
match stdin.read_line(&mut line).await {
Ok(0) => break,
Ok(_) => {
let req: RpcRequest = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
let err = RpcErrorResponse::new(
RequestId::Number(0),
RpcError::parse_error(format!("Parse error: {}", e)),
);
let s = serde_json::to_string(&err)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
let id = req.id.clone();
// Streaming tool calls (generate_text) are handled specially to emit incremental notifications.
if req.method == methods::TOOLS_CALL {
// Parse the tool call
let params = match &req.params {
Some(p) => p,
None => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::invalid_params("Missing params for tool call"),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
let call: McpToolCall = match serde_json::from_value(params.clone()) {
Ok(c) => c,
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::invalid_params(format!("Invalid tool call: {}", e)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
// Dispatch based on the requested tool name.
// Handle resources tools manually.
if call.name.starts_with("resources/get") {
let path = call
.arguments
.get("path")
.and_then(|v| v.as_str())
.unwrap_or("");
match std::fs::read_to_string(path) {
Ok(content) => {
let response = McpToolResponse {
name: call.name,
success: true,
output: json!(content),
metadata: HashMap::new(),
duration_ms: 0,
};
let payload = match serde_json::to_value(&response) {
Ok(value) => value,
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!(
"Failed to serialize resource response: {}",
e
)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
let final_resp = RpcResponse::new(id.clone(), payload);
let s = serde_json::to_string(&final_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!("Failed to read file: {}", e)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
}
}
if call.name.starts_with("resources/list") {
let path = call
.arguments
.get("path")
.and_then(|v| v.as_str())
.unwrap_or(".");
match std::fs::read_dir(path) {
Ok(entries) => {
let mut names = Vec::new();
for entry in entries.flatten() {
if let Some(name) = entry.file_name().to_str() {
names.push(name.to_string());
}
}
let response = McpToolResponse {
name: call.name,
success: true,
output: json!(names),
metadata: HashMap::new(),
duration_ms: 0,
};
let payload = match serde_json::to_value(&response) {
Ok(value) => value,
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!(
"Failed to serialize directory listing: {}",
e
)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
let final_resp = RpcResponse::new(id.clone(), payload);
let s = serde_json::to_string(&final_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!("Failed to list dir: {}", e)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
}
}
// Expect generate_text tool for the remaining path.
if call.name != "generate_text" {
let err_resp =
RpcErrorResponse::new(id.clone(), RpcError::tool_not_found(&call.name));
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
let args: GenerateTextArgs =
match serde_json::from_value(call.arguments.clone()) {
Ok(a) => a,
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::invalid_params(format!("Invalid arguments: {}", e)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
// Initialize provider and start streaming
let provider = match create_provider() {
Ok(p) => p,
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!(
"Failed to initialize provider: {:?}",
e
)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
let parameters = ChatParameters {
temperature: args.temperature,
max_tokens: args.max_tokens.map(|v| v as u32),
stream: true,
extra: HashMap::new(),
};
let request = ChatRequest {
model: args.model,
messages: args.messages,
parameters,
tools: None,
};
let mut stream = match provider.stream_prompt(request).await {
Ok(s) => s,
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!("Chat request failed: {}", e)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
// Accumulate full content while sending incremental progress notifications
let mut final_content = String::new();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(resp) => {
// Append chunk to the final content buffer
final_content.push_str(&resp.message.content);
// Emit a progress notification for the UI
let notif = RpcNotification::new(
"tools/call/progress",
Some(json!({ "content": resp.message.content })),
);
let s = serde_json::to_string(&notif)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
if resp.is_final {
break;
}
}
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!("Stream error: {}", e)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
break;
}
}
}
// After streaming, send the final tool response containing the full content
let final_output = final_content.clone();
let response = McpToolResponse {
name: call.name,
success: true,
output: json!(final_output),
metadata: HashMap::new(),
duration_ms: 0,
};
let payload = match serde_json::to_value(&response) {
Ok(value) => value,
Err(e) => {
let err_resp = RpcErrorResponse::new(
id.clone(),
RpcError::internal_error(format!(
"Failed to serialize final streaming response: {}",
e
)),
);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
};
let final_resp = RpcResponse::new(id.clone(), payload);
let s = serde_json::to_string(&final_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
continue;
}
// Nonstreaming requests are handled by the generic handler
match handle_request(&req).await {
Ok(res) => {
let resp = RpcResponse::new(id, res);
let s = serde_json::to_string(&resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
}
Err(err) => {
let err_resp = RpcErrorResponse::new(id, err);
let s = serde_json::to_string(&err_resp)?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
}
}
}
Err(e) => {
eprintln!("Read error: {}", e);
break;
}
}
}
Ok(())
}