feat(cli): add MCP management subcommand with add/list/remove commands

Introduce `McpCommand` enum and handlers in `owlen-cli` to manage MCP server registrations, including adding, listing, and removing servers across configuration scopes. Add scoped configuration support (`ScopedMcpServer`, `McpConfigScope`) and OAuth token handling in core config, alongside runtime refresh of MCP servers. Implement toast notifications in the TUI (`render_toasts`, `Toast`, `ToastLevel`) and integrate async handling for session events. Update config loading, validation, and schema versioning to accommodate new MCP scopes and resources. Add `httpmock` as a dev dependency for testing.
This commit is contained in:
2025-10-13 17:54:14 +02:00
parent 0da8a3f193
commit 690f5c7056
23 changed files with 3388 additions and 74 deletions

View File

@@ -1,4 +1,4 @@
use crate::config::Config;
use crate::config::{Config, McpResourceConfig, McpServerConfig};
use crate::consent::ConsentManager;
use crate::conversation::ConversationManager;
use crate::credentials::CredentialManager;
@@ -9,8 +9,10 @@ use crate::mcp::McpToolCall;
use crate::mcp::client::McpClient;
use crate::mcp::factory::McpClientFactory;
use crate::mcp::permission::PermissionLayer;
use crate::mcp::remote_client::{McpRuntimeSecrets, RemoteMcpClient};
use crate::mode::Mode;
use crate::model::{DetailedModelInfo, ModelManager};
use crate::oauth::{DeviceAuthorization, DevicePollState, OAuthClient};
use crate::providers::OllamaProvider;
use crate::storage::{SessionMeta, StorageManager};
use crate::types::{
@@ -24,8 +26,10 @@ use crate::{
ToolRegistry, WebScrapeTool, WebSearchDetailedTool, WebSearchTool,
};
use crate::{Error, Result};
use chrono::Utc;
use log::warn;
use serde_json::Value;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::env;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
@@ -96,6 +100,7 @@ pub struct SessionController {
tool_registry: Arc<ToolRegistry>,
schema_validator: Arc<SchemaValidator>,
mcp_client: Arc<dyn McpClient>,
named_mcp_clients: HashMap<String, Arc<dyn McpClient>>,
storage: Arc<StorageManager>,
vault: Option<Arc<Mutex<VaultHandle>>>,
master_key: Option<Arc<Vec<u8>>>,
@@ -103,6 +108,7 @@ pub struct SessionController {
ui: Arc<dyn UiController>,
enable_code_tools: bool,
current_mode: Mode,
missing_oauth_servers: Vec<String>,
}
async fn build_tools(
@@ -211,6 +217,112 @@ async fn build_tools(
}
impl SessionController {
async fn create_mcp_clients(
config: Arc<TokioMutex<Config>>,
tool_registry: Arc<ToolRegistry>,
schema_validator: Arc<SchemaValidator>,
credential_manager: Option<Arc<CredentialManager>>,
initial_mode: Mode,
) -> Result<(
Arc<dyn McpClient>,
HashMap<String, Arc<dyn McpClient>>,
Vec<String>,
)> {
let guard = config.lock().await;
let config_arc = Arc::new(guard.clone());
let factory = McpClientFactory::new(config_arc.clone(), tool_registry, schema_validator);
let mut missing_oauth_servers = Vec::new();
let primary_runtime = if let Some(primary_cfg) = guard.effective_mcp_servers().first() {
let (runtime, missing) =
Self::runtime_secrets_for_server(credential_manager.clone(), primary_cfg).await?;
if missing {
missing_oauth_servers.push(primary_cfg.name.clone());
}
runtime
} else {
None
};
let base_client = factory.create_with_secrets(primary_runtime)?;
let primary: Arc<dyn McpClient> =
Arc::new(PermissionLayer::new(base_client, config_arc.clone()));
primary.set_mode(initial_mode).await?;
let mut clients: HashMap<String, Arc<dyn McpClient>> = HashMap::new();
if let Some(primary_cfg) = guard.effective_mcp_servers().first() {
clients.insert(primary_cfg.name.clone(), Arc::clone(&primary));
}
for server_cfg in guard.effective_mcp_servers().iter().skip(1) {
let (runtime, missing) =
Self::runtime_secrets_for_server(credential_manager.clone(), server_cfg).await?;
if missing {
missing_oauth_servers.push(server_cfg.name.clone());
}
match RemoteMcpClient::new_with_runtime(server_cfg, runtime) {
Ok(remote) => {
let client: Arc<dyn McpClient> =
Arc::new(PermissionLayer::new(Box::new(remote), config_arc.clone()));
if let Err(err) = client.set_mode(initial_mode).await {
warn!(
"Failed to initialize MCP server '{}' in mode {:?}: {}",
server_cfg.name, initial_mode, err
);
}
clients.insert(server_cfg.name.clone(), Arc::clone(&client));
}
Err(err) => warn!(
"Failed to initialize MCP server '{}': {}",
server_cfg.name, err
),
}
}
drop(guard);
Ok((primary, clients, missing_oauth_servers))
}
async fn runtime_secrets_for_server(
credential_manager: Option<Arc<CredentialManager>>,
server: &McpServerConfig,
) -> Result<(Option<McpRuntimeSecrets>, bool)> {
if let Some(oauth) = &server.oauth {
if let Some(manager) = credential_manager {
match manager.load_oauth_token(&server.name).await? {
Some(token) => {
if token.access_token.trim().is_empty() || token.is_expired(Utc::now()) {
return Ok((None, true));
}
let mut secrets = McpRuntimeSecrets::default();
if let Some(env_name) = oauth.token_env.as_deref() {
secrets
.env_overrides
.insert(env_name.to_string(), token.access_token.clone());
}
if matches!(
server.transport.to_ascii_lowercase().as_str(),
"http" | "websocket"
) {
let header_value =
format!("{}{}", oauth.header_prefix(), token.access_token);
secrets.http_header =
Some((oauth.header_name().to_string(), header_value));
}
Ok((Some(secrets), false))
}
None => Ok((None, true)),
}
} else {
Ok((None, true))
}
} else {
Ok((None, false))
}
}
pub async fn new(
provider: Arc<dyn Provider>,
config: Config,
@@ -292,19 +404,14 @@ impl SessionController {
)
.await?;
// Create MCP client with permission layer
let mcp_client: Arc<dyn McpClient> = {
let guard = config_arc.lock().await;
let factory = McpClientFactory::new(
Arc::new(guard.clone()),
tool_registry.clone(),
schema_validator.clone(),
);
let base_client = factory.create()?;
let client = Arc::new(PermissionLayer::new(base_client, Arc::new(guard.clone())));
client.set_mode(initial_mode).await?;
client
};
let (mcp_client, named_mcp_clients, missing_oauth_servers) = Self::create_mcp_clients(
config_arc.clone(),
tool_registry.clone(),
schema_validator.clone(),
credential_manager.clone(),
initial_mode,
)
.await?;
Ok(Self {
provider,
@@ -317,6 +424,7 @@ impl SessionController {
tool_registry,
schema_validator,
mcp_client,
named_mcp_clients,
storage,
vault: vault_handle,
master_key,
@@ -324,6 +432,7 @@ impl SessionController {
ui,
enable_code_tools,
current_mode: initial_mode,
missing_oauth_servers,
})
}
@@ -355,6 +464,63 @@ impl SessionController {
self.formatter.set_role_label_mode(mode);
}
/// Return the configured resource references aggregated across scopes.
pub async fn configured_resources(&self) -> Vec<McpResourceConfig> {
let guard = self.config.lock().await;
guard.effective_mcp_resources().to_vec()
}
/// Resolve a resource reference of the form `server:uri` (optionally prefixed with `@`).
pub async fn resolve_resource_reference(&self, reference: &str) -> Result<Option<String>> {
let (server, uri) = match Self::split_resource_reference(reference) {
Some(parts) => parts,
None => return Ok(None),
};
let resource_defined = {
let guard = self.config.lock().await;
guard.find_resource(&server, &uri).is_some()
};
if !resource_defined {
return Ok(None);
}
let client = self
.named_mcp_clients
.get(&server)
.cloned()
.ok_or_else(|| {
Error::Config(format!(
"MCP server '{}' referenced by resource '{}' is not available",
server, uri
))
})?;
let call = McpToolCall {
name: "resources/get".to_string(),
arguments: json!({ "uri": uri, "path": uri }),
};
let response = client.call_tool(call).await?;
if let Some(text) = extract_resource_content(&response.output) {
return Ok(Some(text));
}
let formatted = serde_json::to_string_pretty(&response.output)
.unwrap_or_else(|_| response.output.to_string());
Ok(Some(formatted))
}
fn split_resource_reference(reference: &str) -> Option<(String, String)> {
let trimmed = reference.trim();
let without_prefix = trimmed.strip_prefix('@').unwrap_or(trimmed);
let (server, uri) = without_prefix.split_once(':')?;
if server.is_empty() || uri.is_empty() {
return None;
}
Some((server.to_string(), uri.to_string()))
}
// Asynchronous access to the configuration (used internally).
pub async fn config_async(&self) -> tokio::sync::MutexGuard<'_, Config> {
self.config.lock().await
@@ -378,6 +544,21 @@ impl SessionController {
self.config.clone()
}
pub async fn reload_mcp_clients(&mut self) -> Result<()> {
let (primary, named, missing) = Self::create_mcp_clients(
self.config.clone(),
self.tool_registry.clone(),
self.schema_validator.clone(),
self.credential_manager.clone(),
self.current_mode,
)
.await?;
self.mcp_client = primary;
self.named_mcp_clients = named;
self.missing_oauth_servers = missing;
Ok(())
}
pub fn grant_consent(&self, tool_name: &str, data_types: Vec<String>, endpoints: Vec<String>) {
let mut consent = self
.consent_manager
@@ -525,6 +706,115 @@ impl SessionController {
self.schema_validator.clone()
}
pub fn credential_manager(&self) -> Option<Arc<CredentialManager>> {
self.credential_manager.clone()
}
pub fn pending_oauth_servers(&self) -> Vec<String> {
self.missing_oauth_servers.clone()
}
pub async fn start_oauth_device_flow(&self, server: &str) -> Result<DeviceAuthorization> {
let oauth_config = {
let config = self.config.lock().await;
let server_cfg = config
.effective_mcp_servers()
.iter()
.find(|entry| entry.name == server)
.ok_or_else(|| {
Error::Config(format!("No MCP server named '{server}' is configured"))
})?;
server_cfg.oauth.clone().ok_or_else(|| {
Error::Config(format!(
"MCP server '{server}' does not define an OAuth configuration"
))
})?
};
let client = OAuthClient::new(oauth_config)?;
client.start_device_authorization().await
}
pub async fn poll_oauth_device_flow(
&mut self,
server: &str,
authorization: &DeviceAuthorization,
) -> Result<DevicePollState> {
let oauth_config = {
let config = self.config.lock().await;
let server_cfg = config
.effective_mcp_servers()
.iter()
.find(|entry| entry.name == server)
.ok_or_else(|| {
Error::Config(format!("No MCP server named '{server}' is configured"))
})?;
server_cfg.oauth.clone().ok_or_else(|| {
Error::Config(format!(
"MCP server '{server}' does not define an OAuth configuration"
))
})?
};
let client = OAuthClient::new(oauth_config)?;
match client.poll_device_token(authorization).await? {
DevicePollState::Pending { retry_in } => Ok(DevicePollState::Pending { retry_in }),
DevicePollState::Complete(token) => {
let manager = self.credential_manager.as_ref().cloned().ok_or_else(|| {
Error::Config(
"OAuth token storage requires encrypted local data; set \
privacy.encrypt_local_data = true in the configuration."
.to_string(),
)
})?;
manager.store_oauth_token(server, &token).await?;
self.missing_oauth_servers.retain(|entry| entry != server);
Ok(DevicePollState::Complete(token))
}
}
}
pub async fn list_mcp_tools(&self) -> Vec<(String, crate::mcp::McpToolDescriptor)> {
let mut entries = Vec::new();
for (server, client) in self.named_mcp_clients.iter() {
let server_name = server.clone();
let client = Arc::clone(client);
match client.list_tools().await {
Ok(tools) => {
for descriptor in tools {
entries.push((server_name.clone(), descriptor));
}
}
Err(err) => {
warn!(
"Failed to list tools for MCP server '{}': {}",
server_name, err
);
}
}
}
entries
}
pub async fn call_mcp_tool(
&self,
server: &str,
tool: &str,
arguments: Value,
) -> Result<crate::mcp::McpToolResponse> {
let client = self.named_mcp_clients.get(server).cloned().ok_or_else(|| {
Error::Config(format!("No MCP server named '{}' is registered", server))
})?;
client
.call_tool(McpToolCall {
name: tool.to_string(),
arguments,
})
.await
}
pub fn mcp_server(&self) -> crate::mcp::McpServer {
crate::mcp::McpServer::new(self.tool_registry(), self.schema_validator())
}
@@ -985,3 +1275,195 @@ impl SessionController {
Ok("Empty conversation".to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Provider;
use crate::config::{Config, McpMode, McpOAuthConfig, McpServerConfig};
use crate::llm::test_utils::MockProvider;
use crate::storage::StorageManager;
use crate::ui::NoOpUiController;
use chrono::Utc;
use httpmock::prelude::*;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tempfile::tempdir;
const SERVER_NAME: &str = "oauth-test";
fn build_oauth_config(server: &MockServer) -> McpOAuthConfig {
McpOAuthConfig {
client_id: "owlen-client".to_string(),
client_secret: None,
authorize_url: server.url("/authorize"),
token_url: server.url("/token"),
device_authorization_url: Some(server.url("/device")),
redirect_url: None,
scopes: vec!["repo".to_string()],
token_env: Some("OAUTH_TOKEN".to_string()),
header: Some("Authorization".to_string()),
header_prefix: Some("Bearer ".to_string()),
}
}
fn build_config(server: &MockServer) -> Config {
let mut config = Config::default();
config.mcp.mode = McpMode::LocalOnly;
let oauth = build_oauth_config(server);
let mut env = HashMap::new();
env.insert("OWLEN_ENV".to_string(), "test".to_string());
config.mcp_servers = vec![McpServerConfig {
name: SERVER_NAME.to_string(),
command: server.url("/mcp"),
args: Vec::new(),
transport: "http".to_string(),
env,
oauth: Some(oauth),
}];
config.refresh_mcp_servers(None).unwrap();
config
}
async fn build_session(server: &MockServer) -> (SessionController, tempfile::TempDir) {
unsafe {
std::env::set_var("OWLEN_MASTER_PASSWORD", "test-password");
}
let temp_dir = tempdir().expect("tempdir");
let storage_path = temp_dir.path().join("owlen.db");
let storage = Arc::new(
StorageManager::with_database_path(storage_path)
.await
.expect("storage"),
);
let config = build_config(server);
let provider: Arc<dyn Provider> = Arc::new(MockProvider::default()) as Arc<dyn Provider>;
let ui = Arc::new(NoOpUiController);
let session = SessionController::new(provider, config, storage, ui, false)
.await
.expect("session");
(session, temp_dir)
}
#[tokio::test]
async fn start_oauth_device_flow_returns_details() {
let server = MockServer::start_async().await;
let device = server
.mock_async(|when, then| {
when.method(POST).path("/device");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"device_code": "device-abc",
"user_code": "ABCD-1234",
"verification_uri": "https://example.test/activate",
"verification_uri_complete": "https://example.test/activate?user_code=ABCD-1234",
"expires_in": 600,
"interval": 5,
"message": "Enter the code to continue."
}));
})
.await;
let (session, _dir) = build_session(&server).await;
let authorization = session
.start_oauth_device_flow(SERVER_NAME)
.await
.expect("device flow");
assert_eq!(authorization.user_code, "ABCD-1234");
assert_eq!(
authorization.verification_uri_complete.as_deref(),
Some("https://example.test/activate?user_code=ABCD-1234")
);
assert!(authorization.expires_at > Utc::now());
device.assert_async().await;
}
#[tokio::test]
async fn poll_oauth_device_flow_stores_token_and_updates_state() {
let server = MockServer::start_async().await;
let device = server
.mock_async(|when, then| {
when.method(POST).path("/device");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"device_code": "device-xyz",
"user_code": "WXYZ-9999",
"verification_uri": "https://example.test/activate",
"verification_uri_complete": "https://example.test/activate?user_code=WXYZ-9999",
"expires_in": 600,
"interval": 5
}));
})
.await;
let token = server
.mock_async(|when, then| {
when.method(POST)
.path("/token")
.body_contains("device_code=device-xyz");
then.status(200)
.header("content-type", "application/json")
.json_body(json!({
"access_token": "new-access-token",
"refresh_token": "refresh-token",
"expires_in": 3600,
"token_type": "Bearer"
}));
})
.await;
let (mut session, _dir) = build_session(&server).await;
assert_eq!(session.pending_oauth_servers(), vec![SERVER_NAME]);
let authorization = session
.start_oauth_device_flow(SERVER_NAME)
.await
.expect("device flow");
match session
.poll_oauth_device_flow(SERVER_NAME, &authorization)
.await
.expect("token poll")
{
DevicePollState::Complete(token_info) => {
assert_eq!(token_info.access_token, "new-access-token");
assert_eq!(token_info.refresh_token.as_deref(), Some("refresh-token"));
}
other => panic!("expected token completion, got {other:?}"),
}
assert!(
session
.pending_oauth_servers()
.iter()
.all(|entry| entry != SERVER_NAME),
"server should be removed from pending list"
);
let stored = session
.credential_manager()
.expect("credential manager")
.load_oauth_token(SERVER_NAME)
.await
.expect("load token")
.expect("token present");
assert_eq!(stored.access_token, "new-access-token");
assert_eq!(stored.refresh_token.as_deref(), Some("refresh-token"));
device.assert_async().await;
token.assert_async().await;
}
}