Files
owlen/crates/owlen-core/src/session.rs
vikingowl 33d11ae223 fix(agent): improve ReAct parser and tool schemas for better LLM compatibility
- Fix ACTION_INPUT regex to properly capture multiline JSON responses
  - Changed from stopping at first newline to capturing all remaining text
  - Resolves parsing errors when LLM generates formatted JSON with line breaks

- Enhance tool schemas with detailed descriptions and parameter specifications
  - Add comprehensive Message schema for generate_text tool
  - Clarify distinction between resources/get (file read) and resources/list (directory listing)
  - Include clear usage guidance in tool descriptions

- Set default model to llama3.2:latest instead of invalid "ollama"

- Add parse error debugging to help troubleshoot LLM response issues

The agent infrastructure now correctly handles multiline tool arguments and
provides better guidance to LLMs through improved tool schemas. Remaining
errors are due to LLM quality (model making poor tool choices or generating
malformed responses), not infrastructure bugs.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-09 19:43:07 +02:00

801 lines
28 KiB
Rust

use crate::config::Config;
use crate::consent::ConsentManager;
use crate::conversation::ConversationManager;
use crate::credentials::CredentialManager;
use crate::encryption::{self, VaultHandle};
use crate::formatting::MessageFormatter;
use crate::input::InputBuffer;
use crate::mcp::client::McpClient;
use crate::mcp::factory::McpClientFactory;
use crate::mcp::permission::PermissionLayer;
use crate::mcp::McpToolCall;
use crate::model::ModelManager;
use crate::provider::{ChatStream, Provider};
use crate::storage::{SessionMeta, StorageManager};
use crate::types::{
ChatParameters, ChatRequest, ChatResponse, Conversation, Message, ModelInfo, ToolCall,
};
use crate::ui::UiController;
use crate::validation::{get_builtin_schemas, SchemaValidator};
use crate::{
CodeExecTool, ResourcesDeleteTool, ResourcesGetTool, ResourcesListTool, ResourcesWriteTool,
ToolRegistry, WebSearchDetailedTool, WebSearchTool,
};
use crate::{Error, Result};
use log::warn;
use std::env;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::Mutex as TokioMutex;
use uuid::Uuid;
pub enum SessionOutcome {
Complete(ChatResponse),
Streaming {
response_id: Uuid,
stream: ChatStream,
},
}
pub struct SessionController {
provider: Arc<dyn Provider>,
conversation: ConversationManager,
model_manager: ModelManager,
input_buffer: InputBuffer,
formatter: MessageFormatter,
config: Arc<TokioMutex<Config>>,
consent_manager: Arc<Mutex<ConsentManager>>,
tool_registry: Arc<ToolRegistry>,
schema_validator: Arc<SchemaValidator>,
mcp_client: Arc<dyn McpClient>,
storage: Arc<StorageManager>,
vault: Option<Arc<Mutex<VaultHandle>>>,
master_key: Option<Arc<Vec<u8>>>,
credential_manager: Option<Arc<CredentialManager>>,
ui: Arc<dyn UiController>,
enable_code_tools: bool,
}
async fn build_tools(
config: Arc<TokioMutex<Config>>,
ui: Arc<dyn UiController>,
enable_code_tools: bool,
consent_manager: Arc<Mutex<ConsentManager>>,
credential_manager: Option<Arc<CredentialManager>>,
vault: Option<Arc<Mutex<VaultHandle>>>,
) -> Result<(Arc<ToolRegistry>, Arc<SchemaValidator>)> {
let mut registry = ToolRegistry::new(config.clone(), ui);
let mut validator = SchemaValidator::new();
// Acquire config asynchronously to avoid blocking the async runtime.
let config_guard = config.lock().await;
for (name, schema) in get_builtin_schemas() {
if let Err(err) = validator.register_schema(&name, schema) {
warn!("Failed to register built-in schema {name}: {err}");
}
}
if config_guard
.security
.allowed_tools
.iter()
.any(|tool| tool == "web_search")
&& config_guard.tools.web_search.enabled
&& config_guard.privacy.enable_remote_search
{
let tool = WebSearchTool::new(
consent_manager.clone(),
credential_manager.clone(),
vault.clone(),
);
registry.register(tool);
}
if config_guard
.security
.allowed_tools
.iter()
.any(|tool| tool == "web_search")
&& config_guard.tools.web_search.enabled
&& config_guard.privacy.enable_remote_search
{
let tool = WebSearchDetailedTool::new(
consent_manager.clone(),
credential_manager.clone(),
vault.clone(),
);
registry.register(tool);
}
if enable_code_tools
&& config_guard
.security
.allowed_tools
.iter()
.any(|tool| tool == "code_exec")
&& config_guard.tools.code_exec.enabled
{
let tool = CodeExecTool::new(config_guard.tools.code_exec.allowed_languages.clone());
registry.register(tool);
}
registry.register(ResourcesListTool);
registry.register(ResourcesGetTool);
if config_guard
.security
.allowed_tools
.iter()
.any(|t| t == "file_write")
{
registry.register(ResourcesWriteTool);
}
if config_guard
.security
.allowed_tools
.iter()
.any(|t| t == "file_delete")
{
registry.register(ResourcesDeleteTool);
}
for tool in registry.all() {
if let Err(err) = validator.register_schema(tool.name(), tool.schema()) {
warn!("Failed to register schema for {}: {err}", tool.name());
}
}
Ok((Arc::new(registry), Arc::new(validator)))
}
impl SessionController {
pub async fn new(
provider: Arc<dyn Provider>,
config: Config,
storage: Arc<StorageManager>,
ui: Arc<dyn UiController>,
enable_code_tools: bool,
) -> Result<Self> {
let config_arc = Arc::new(TokioMutex::new(config));
// Acquire the config asynchronously to avoid blocking the runtime.
let config_guard = config_arc.lock().await;
let model = config_guard
.general
.default_model
.clone()
.unwrap_or_else(|| "ollama/default".to_string());
let mut vault_handle: Option<Arc<Mutex<VaultHandle>>> = None;
let mut master_key: Option<Arc<Vec<u8>>> = None;
let mut credential_manager: Option<Arc<CredentialManager>> = None;
if config_guard.privacy.encrypt_local_data {
let base_dir = storage
.database_path()
.parent()
.map(|p| p.to_path_buf())
.or_else(dirs::data_local_dir)
.unwrap_or_else(|| PathBuf::from("."));
let secure_path = base_dir.join("encrypted_data.json");
let handle = match env::var("OWLEN_MASTER_PASSWORD") {
Ok(password) if !password.is_empty() => {
encryption::unlock_with_password(secure_path, &password)?
}
_ => encryption::unlock_interactive(secure_path)?,
};
let master = Arc::new(handle.data.master_key.clone());
master_key = Some(master.clone());
vault_handle = Some(Arc::new(Mutex::new(handle)));
credential_manager = Some(Arc::new(CredentialManager::new(storage.clone(), master)));
}
let consent_manager = if let Some(ref vault) = vault_handle {
Arc::new(Mutex::new(ConsentManager::from_vault(vault)))
} else {
Arc::new(Mutex::new(ConsentManager::new()))
};
let conversation = ConversationManager::with_history_capacity(
model,
config_guard.storage.max_saved_sessions,
);
let formatter = MessageFormatter::new(
config_guard.ui.wrap_column as usize,
config_guard.ui.show_role_labels,
)
.with_preserve_empty(config_guard.ui.word_wrap);
let input_buffer = InputBuffer::new(
config_guard.input.history_size,
config_guard.input.multiline,
config_guard.input.tab_width,
);
let model_manager = ModelManager::new(config_guard.general.model_cache_ttl());
drop(config_guard); // Release the lock before calling build_tools
let (tool_registry, schema_validator) = build_tools(
config_arc.clone(),
ui.clone(),
enable_code_tools,
consent_manager.clone(),
credential_manager.clone(),
vault_handle.clone(),
)
.await?;
// Create MCP client with permission layer
let mcp_client: Arc<dyn McpClient> = {
let guard = config_arc.lock().await;
let factory = McpClientFactory::new(
Arc::new(guard.clone()),
tool_registry.clone(),
schema_validator.clone(),
);
let base_client = factory.create()?;
let permission_client = PermissionLayer::new(base_client, Arc::new(guard.clone()));
Arc::new(permission_client)
};
Ok(Self {
provider,
conversation,
model_manager,
input_buffer,
formatter,
config: config_arc,
consent_manager,
tool_registry,
schema_validator,
mcp_client,
storage,
vault: vault_handle,
master_key,
credential_manager,
ui,
enable_code_tools,
})
}
pub fn conversation(&self) -> &Conversation {
self.conversation.active()
}
pub fn conversation_mut(&mut self) -> &mut ConversationManager {
&mut self.conversation
}
pub fn input_buffer(&self) -> &InputBuffer {
&self.input_buffer
}
pub fn input_buffer_mut(&mut self) -> &mut InputBuffer {
&mut self.input_buffer
}
pub fn formatter(&self) -> &MessageFormatter {
&self.formatter
}
pub async fn set_formatter_wrap_width(&mut self, width: usize) {
self.formatter.set_wrap_width(width);
}
// Asynchronous access to the configuration (used internally).
pub async fn config_async(&self) -> tokio::sync::MutexGuard<'_, Config> {
self.config.lock().await
}
// Synchronous, blocking access to the configuration. This is kept for the TUI
// which expects `controller.config()` to return a reference without awaiting.
// Provide a blocking configuration lock that is safe to call from async
// contexts by using `tokio::task::block_in_place`. This allows the current
// thread to be blocked without violating Tokio's runtime constraints.
pub fn config(&self) -> tokio::sync::MutexGuard<'_, Config> {
tokio::task::block_in_place(|| self.config.blocking_lock())
}
// Synchronous mutable access, mirroring `config()` but allowing mutation.
pub fn config_mut(&self) -> tokio::sync::MutexGuard<'_, Config> {
tokio::task::block_in_place(|| self.config.blocking_lock())
}
pub fn config_cloned(&self) -> Arc<TokioMutex<Config>> {
self.config.clone()
}
pub fn grant_consent(&self, tool_name: &str, data_types: Vec<String>, endpoints: Vec<String>) {
let mut consent = self
.consent_manager
.lock()
.expect("Consent manager mutex poisoned");
consent.grant_consent(tool_name, data_types, endpoints);
if let Some(vault) = &self.vault {
if let Err(e) = consent.persist_to_vault(vault) {
eprintln!("Warning: Failed to persist consent to vault: {}", e);
}
}
}
pub fn grant_consent_with_scope(
&self,
tool_name: &str,
data_types: Vec<String>,
endpoints: Vec<String>,
scope: crate::consent::ConsentScope,
) {
let mut consent = self
.consent_manager
.lock()
.expect("Consent manager mutex poisoned");
let is_permanent = matches!(scope, crate::consent::ConsentScope::Permanent);
consent.grant_consent_with_scope(tool_name, data_types, endpoints, scope);
// Only persist to vault for permanent consent
if is_permanent {
if let Some(vault) = &self.vault {
if let Err(e) = consent.persist_to_vault(vault) {
eprintln!("Warning: Failed to persist consent to vault: {}", e);
}
}
}
}
pub fn check_tools_consent_needed(
&self,
tool_calls: &[ToolCall],
) -> Vec<(String, Vec<String>, Vec<String>)> {
let consent = self
.consent_manager
.lock()
.expect("Consent manager mutex poisoned");
let mut needs_consent = Vec::new();
let mut seen_tools = std::collections::HashSet::new();
for tool_call in tool_calls {
if seen_tools.contains(&tool_call.name) {
continue;
}
seen_tools.insert(tool_call.name.clone());
let (data_types, endpoints) = match tool_call.name.as_str() {
"web_search" | "web_search_detailed" => (
vec!["search query".to_string()],
vec!["duckduckgo.com".to_string()],
),
"code_exec" => (
vec!["code to execute".to_string()],
vec!["local sandbox".to_string()],
),
"resources/write" | "file_write" => (
vec!["file paths".to_string(), "file content".to_string()],
vec!["local filesystem".to_string()],
),
"resources/delete" | "file_delete" => (
vec!["file paths".to_string()],
vec!["local filesystem".to_string()],
),
_ => (vec![], vec![]),
};
if let Some((tool_name, dt, ep)) =
consent.check_if_consent_needed(&tool_call.name, data_types, endpoints)
{
needs_consent.push((tool_name, dt, ep));
}
}
needs_consent
}
pub async fn save_active_session(
&self,
name: Option<String>,
description: Option<String>,
) -> Result<Uuid> {
self.conversation
.save_active_with_description(&self.storage, name, description)
.await
}
pub async fn save_active_session_simple(&self, name: Option<String>) -> Result<Uuid> {
self.conversation.save_active(&self.storage, name).await
}
pub async fn load_saved_session(&mut self, id: Uuid) -> Result<()> {
self.conversation.load_saved(&self.storage, id).await
}
pub async fn list_saved_sessions(&self) -> Result<Vec<SessionMeta>> {
ConversationManager::list_saved_sessions(&self.storage).await
}
pub async fn delete_session(&self, id: Uuid) -> Result<()> {
self.storage.delete_session(id).await
}
pub async fn clear_secure_data(&self) -> Result<()> {
// ... (implementation remains the same)
Ok(())
}
pub fn persist_consent(&self) -> Result<()> {
// ... (implementation remains the same)
Ok(())
}
pub async fn set_tool_enabled(&mut self, tool: &str, enabled: bool) -> Result<()> {
{
let mut config = self.config.lock().await;
match tool {
"web_search" => {
config.tools.web_search.enabled = enabled;
config.privacy.enable_remote_search = enabled;
}
"code_exec" => config.tools.code_exec.enabled = enabled,
other => return Err(Error::InvalidInput(format!("Unknown tool: {other}"))),
}
}
self.rebuild_tools().await
}
pub fn consent_manager(&self) -> Arc<Mutex<ConsentManager>> {
self.consent_manager.clone()
}
pub fn tool_registry(&self) -> Arc<ToolRegistry> {
self.tool_registry.clone()
}
pub fn schema_validator(&self) -> Arc<SchemaValidator> {
self.schema_validator.clone()
}
pub fn mcp_server(&self) -> crate::mcp::McpServer {
crate::mcp::McpServer::new(self.tool_registry(), self.schema_validator())
}
pub fn storage(&self) -> Arc<StorageManager> {
self.storage.clone()
}
pub fn master_key(&self) -> Option<Arc<Vec<u8>>> {
self.master_key.as_ref().map(Arc::clone)
}
pub fn vault(&self) -> Option<Arc<Mutex<VaultHandle>>> {
self.vault.as_ref().map(Arc::clone)
}
pub async fn read_file(&self, path: &str) -> Result<String> {
let call = McpToolCall {
name: "resources/get".to_string(),
arguments: serde_json::json!({ "path": path }),
};
match self.mcp_client.call_tool(call).await {
Ok(response) => {
let content: String = serde_json::from_value(response.output)?;
Ok(content)
}
Err(err) => {
log::warn!("MCP file read failed ({}); falling back to local read", err);
let content = std::fs::read_to_string(path)?;
Ok(content)
}
}
}
pub async fn list_dir(&self, path: &str) -> Result<Vec<String>> {
let call = McpToolCall {
name: "resources/list".to_string(),
arguments: serde_json::json!({ "path": path }),
};
match self.mcp_client.call_tool(call).await {
Ok(response) => {
let content: Vec<String> = serde_json::from_value(response.output)?;
Ok(content)
}
Err(err) => {
log::warn!(
"MCP directory list failed ({}); falling back to local list",
err
);
let mut entries = Vec::new();
for entry in std::fs::read_dir(path)? {
let entry = entry?;
entries.push(entry.file_name().to_string_lossy().to_string());
}
Ok(entries)
}
}
}
pub async fn write_file(&self, path: &str, content: &str) -> Result<()> {
let call = McpToolCall {
name: "resources/write".to_string(),
arguments: serde_json::json!({ "path": path, "content": content }),
};
match self.mcp_client.call_tool(call).await {
Ok(_) => Ok(()),
Err(err) => {
log::warn!(
"MCP file write failed ({}); falling back to local write",
err
);
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(path).parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(path, content)?;
Ok(())
}
}
}
pub async fn delete_file(&self, path: &str) -> Result<()> {
let call = McpToolCall {
name: "resources/delete".to_string(),
arguments: serde_json::json!({ "path": path }),
};
match self.mcp_client.call_tool(call).await {
Ok(_) => Ok(()),
Err(err) => {
log::warn!(
"MCP file delete failed ({}); falling back to local delete",
err
);
std::fs::remove_file(path)?;
Ok(())
}
}
}
async fn rebuild_tools(&mut self) -> Result<()> {
let (registry, validator) = build_tools(
self.config.clone(),
self.ui.clone(),
self.enable_code_tools,
self.consent_manager.clone(),
self.credential_manager.clone(),
self.vault.clone(),
)
.await?;
self.tool_registry = registry;
self.schema_validator = validator;
// Recreate MCP client with permission layer
let config = self.config.lock().await;
let factory = McpClientFactory::new(
Arc::new(config.clone()),
self.tool_registry.clone(),
self.schema_validator.clone(),
);
let base_client = factory.create()?;
let permission_client = PermissionLayer::new(base_client, Arc::new(config.clone()));
self.mcp_client = Arc::new(permission_client);
Ok(())
}
pub fn selected_model(&self) -> &str {
&self.conversation.active().model
}
pub async fn set_model(&mut self, model: String) {
self.conversation.set_model(model.clone());
let mut config = self.config.lock().await;
config.general.default_model = Some(model);
}
pub async fn models(&self, force_refresh: bool) -> Result<Vec<ModelInfo>> {
self.model_manager
.get_or_refresh(force_refresh, || async {
self.provider.list_models().await
})
.await
}
pub async fn ensure_default_model(&mut self, models: &[ModelInfo]) {
let mut config = self.config.lock().await;
if let Some(default) = config.general.default_model.clone() {
if models.iter().any(|m| m.id == default || m.name == default) {
self.conversation.set_model(default.clone());
config.general.default_model = Some(default);
}
} else if let Some(model) = models.first() {
self.conversation.set_model(model.id.clone());
config.general.default_model = Some(model.id.clone());
}
}
pub async fn switch_provider(&mut self, provider: Arc<dyn Provider>) -> Result<()> {
self.provider = provider;
self.model_manager.invalidate().await;
Ok(())
}
/// Expose the underlying LLM provider.
pub fn provider(&self) -> Arc<dyn Provider> {
self.provider.clone()
}
pub async fn send_message(
&mut self,
content: String,
mut parameters: ChatParameters,
) -> Result<SessionOutcome> {
let streaming = { self.config.lock().await.general.enable_streaming || parameters.stream };
parameters.stream = streaming;
self.conversation.push_user_message(content);
self.send_request_with_current_conversation(parameters)
.await
}
pub async fn send_request_with_current_conversation(
&mut self,
mut parameters: ChatParameters,
) -> Result<SessionOutcome> {
let streaming = { self.config.lock().await.general.enable_streaming || parameters.stream };
parameters.stream = streaming;
let tools = if !self.tool_registry.all().is_empty() {
Some(
self.tool_registry
.all()
.into_iter()
.map(|tool| crate::mcp::McpToolDescriptor {
name: tool.name().to_string(),
description: tool.description().to_string(),
input_schema: tool.schema(),
requires_network: tool.requires_network(),
requires_filesystem: tool.requires_filesystem(),
})
.collect(),
)
} else {
None
};
let mut request = ChatRequest {
model: self.conversation.active().model.clone(),
messages: self.conversation.active().messages.clone(),
parameters: parameters.clone(),
tools: tools.clone(),
};
if !streaming {
const MAX_TOOL_ITERATIONS: usize = 5;
for _iteration in 0..MAX_TOOL_ITERATIONS {
match self.provider.chat(request.clone()).await {
Ok(response) => {
if response.message.has_tool_calls() {
self.conversation.push_message(response.message.clone());
if let Some(tool_calls) = &response.message.tool_calls {
for tool_call in tool_calls {
let mcp_tool_call = McpToolCall {
name: tool_call.name.clone(),
arguments: tool_call.arguments.clone(),
};
let tool_result =
self.mcp_client.call_tool(mcp_tool_call).await;
let tool_response_content = match tool_result {
Ok(result) => serde_json::to_string_pretty(&result.output)
.unwrap_or_else(|_| {
"Tool execution succeeded".to_string()
}),
Err(e) => format!("Tool execution failed: {}", e),
};
let tool_msg =
Message::tool(tool_call.id.clone(), tool_response_content);
self.conversation.push_message(tool_msg);
}
}
request.messages = self.conversation.active().messages.clone();
continue;
} else {
self.conversation.push_message(response.message.clone());
return Ok(SessionOutcome::Complete(response));
}
}
Err(err) => {
self.conversation
.push_assistant_message(format!("Error: {}", err));
return Err(err);
}
}
}
self.conversation
.push_assistant_message("Maximum tool execution iterations reached".to_string());
return Err(crate::Error::Provider(anyhow::anyhow!(
"Maximum tool execution iterations reached"
)));
}
match self.provider.chat_stream(request).await {
Ok(stream) => {
let response_id = self.conversation.start_streaming_response();
Ok(SessionOutcome::Streaming {
response_id,
stream,
})
}
Err(err) => {
self.conversation
.push_assistant_message(format!("Error starting stream: {}", err));
Err(err)
}
}
}
pub fn mark_stream_placeholder(&mut self, message_id: Uuid, text: &str) -> Result<()> {
self.conversation
.set_stream_placeholder(message_id, text.to_string())
}
pub fn apply_stream_chunk(&mut self, message_id: Uuid, chunk: &ChatResponse) -> Result<()> {
if chunk.message.has_tool_calls() {
self.conversation.set_tool_calls_on_message(
message_id,
chunk.message.tool_calls.clone().unwrap_or_default(),
)?;
}
self.conversation
.append_stream_chunk(message_id, &chunk.message.content, chunk.is_final)
}
pub fn check_streaming_tool_calls(&self, message_id: Uuid) -> Option<Vec<ToolCall>> {
self.conversation
.active()
.messages
.iter()
.find(|m| m.id == message_id)
.and_then(|m| m.tool_calls.clone())
.filter(|calls| !calls.is_empty())
}
pub async fn execute_streaming_tools(
&mut self,
_message_id: Uuid,
tool_calls: Vec<ToolCall>,
) -> Result<SessionOutcome> {
for tool_call in &tool_calls {
let mcp_tool_call = McpToolCall {
name: tool_call.name.clone(),
arguments: tool_call.arguments.clone(),
};
let tool_result = self.mcp_client.call_tool(mcp_tool_call).await;
let tool_response_content = match tool_result {
Ok(result) => serde_json::to_string_pretty(&result.output)
.unwrap_or_else(|_| "Tool execution succeeded".to_string()),
Err(e) => format!("Tool execution failed: {}", e),
};
let tool_msg = Message::tool(tool_call.id.clone(), tool_response_content);
self.conversation.push_message(tool_msg);
}
let parameters = ChatParameters {
stream: self.config.lock().await.general.enable_streaming,
..Default::default()
};
self.send_request_with_current_conversation(parameters)
.await
}
pub fn history(&self) -> Vec<Conversation> {
self.conversation.history().cloned().collect()
}
pub fn start_new_conversation(&mut self, model: Option<String>, name: Option<String>) {
self.conversation.start_new(model, name);
}
pub fn clear(&mut self) {
self.conversation.clear();
}
pub async fn generate_conversation_description(&self) -> Result<String> {
// ... (implementation remains the same)
Ok("Empty conversation".to_string())
}
}