//! Core agent orchestration for the Owlen AI agent. //! //! This crate provides the central engine that coordinates Large Language Models (LLMs), //! a rich set of system tools, and user interaction through an event-driven loop. pub mod session; pub mod system_prompt; pub mod git; pub mod compact; pub mod messages; pub mod state; use color_eyre::eyre::{Result, eyre}; use futures_util::StreamExt; use llm_core::{ChatMessage, ChatOptions, LlmProvider, Tool, ToolParameters}; use permissions::{PermissionDecision, PermissionManager, Tool as PermTool}; use jsonrpc; use plugins::{ExternalToolDefinition, ExternalToolTransport}; use serde_json::{json, Value}; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; use tools_ask::AskSender; use tools_bash::ShellManager; use tools_todo::TodoList; pub use session::{ SessionStats, SessionHistory, ToolCallRecord, Checkpoint, CheckpointManager, FileDiff, ToolCallsBuilder, }; pub use system_prompt::{ SystemPromptBuilder, default_base_prompt, generate_tool_instructions, }; pub use git::{ GitState, GitFileStatus, detect_git_state, is_safe_git_command, is_destructive_git_command, format_git_status, }; // Re-export planning mode types pub use tools_plan::{ // Agent mode (document-based planning) AgentMode, PlanManager, PlanStatus, PlanMetadata, // Plan execution types (tool call accumulation) PlanStep, AccumulatedPlan, AccumulatedPlanStatus, PlanApproval, // Helper functions enter_plan_mode, exit_plan_mode, is_tool_allowed_in_plan_mode, }; // Re-export compaction types pub use compact::{Compactor, TokenCounter}; /// Events emitted during agent loop execution #[derive(Debug, Clone)] pub enum AgentEvent { /// LLM is generating text TextDelta(String), /// Tool execution starting ToolStart { tool_name: String, tool_id: String, }, /// Tool produced output (may be partial) ToolOutput { tool_id: String, content: String, is_error: bool, }, /// Tool execution completed ToolEnd { tool_id: String, success: bool, }, /// Agent loop completed Done { final_response: String, }, /// Error occurred Error(String), } pub type AgentEventSender = mpsc::Sender; pub type AgentEventReceiver = mpsc::Receiver; /// Creates a channel for agent events with a buffer size of 100. pub fn create_event_channel() -> (AgentEventSender, AgentEventReceiver) { mpsc::channel(100) } /// Optional context for tools that need external dependencies or shared state. #[derive(Clone)] pub struct ToolContext { /// Todo list for the `todo_write` tool. pub todo_list: Option, /// Channel for asking user questions via the `ask_user` tool. pub ask_sender: Option, /// Shell manager for handling background shells. pub shell_manager: Option, /// Plan manager for managing implementation plans in planning mode. pub plan_manager: Option>, /// Current agent mode (e.g., Normal or Planning). pub agent_mode: Arc>, /// Registry of external tools from plugins pub external_tools: Arc>, } impl Default for ToolContext { fn default() -> Self { Self { todo_list: None, ask_sender: None, shell_manager: None, plan_manager: None, agent_mode: Arc::new(RwLock::new(AgentMode::Normal)), external_tools: Arc::new(HashMap::new()), } } } impl ToolContext { /// Creates a new, empty `ToolContext`. pub fn new() -> Self { Self::default() } /// Adds a `TodoList` to the context. pub fn with_todo_list(mut self, list: TodoList) -> Self { self.todo_list = Some(list); self } /// Adds an `AskSender` to the context for user interaction. pub fn with_ask_sender(mut self, sender: AskSender) -> Self { self.ask_sender = Some(sender); self } /// Adds a `ShellManager` to the context for managing bash sessions. pub fn with_shell_manager(mut self, manager: ShellManager) -> Self { self.shell_manager = Some(manager); self } /// Adds a `PlanManager` to the context. pub fn with_plan_manager(mut self, manager: PlanManager) -> Self { self.plan_manager = Some(Arc::new(manager)); self } /// Initializes a `PlanManager` with the given project root and adds it to the context. pub fn with_project_root(mut self, project_root: PathBuf) -> Self { self.plan_manager = Some(Arc::new(PlanManager::new(project_root))); self } /// Checks if the agent is currently in planning mode. pub async fn is_planning(&self) -> bool { self.agent_mode.read().await.is_planning() } /// Returns the current `AgentMode`. pub async fn get_mode(&self) -> AgentMode { self.agent_mode.read().await.clone() } /// Sets the current `AgentMode`. pub async fn set_mode(&self, mode: AgentMode) { *self.agent_mode.write().await = mode; } /// Adds external tools from plugins to the context. pub fn with_external_tools(mut self, tools: HashMap) -> Self { self.external_tools = Arc::new(tools); self } /// Gets an external tool by name, if it exists. pub fn get_external_tool(&self, name: &str) -> Option<&ExternalToolDefinition> { self.external_tools.get(name) } /// Returns true if an external tool with the given name exists. pub fn has_external_tool(&self, name: &str) -> bool { self.external_tools.contains_key(name) } } /// Returns definitions for all available tools that the agent can use. pub fn get_tool_definitions() -> Vec { vec![ Tool::function( "read", "Read the contents of a file", ToolParameters::object( json!({ "path": { "type": "string", "description": "The path to the file to read" } }), vec!["path".to_string()], ), ), Tool::function( "glob", "Find files matching a glob pattern (e.g., '**/*.rs' for all Rust files)", ToolParameters::object( json!({ "pattern": { "type": "string", "description": "Glob pattern to match files (e.g., '**/*.toml', '*.md')" } }), vec!["pattern".to_string()], ), ), Tool::function( "grep", "Search for a pattern in files within a directory", ToolParameters::object( json!({ "root": { "type": "string", "description": "Root directory to search in" }, "pattern": { "type": "string", "description": "Pattern to search for" } }), vec!["root".to_string(), "pattern".to_string()], ), ), Tool::function( "write", "Write content to a file", ToolParameters::object( json!({ "path": { "type": "string", "description": "Path where the file should be written" }, "content": { "type": "string", "description": "Content to write to the file" } }), vec!["path".to_string(), "content".to_string()], ), ), Tool::function( "edit", "Edit a file by replacing old text with new text", ToolParameters::object( json!({ "path": { "type": "string", "description": "Path to the file to edit" }, "old_string": { "type": "string", "description": "Text to find and replace" }, "new_string": { "type": "string", "description": "Text to replace with" } }), vec!["path".to_string(), "old_string".to_string(), "new_string".to_string()], ), ), Tool::function( "bash", "Execute a bash command. Use carefully and only when necessary.", ToolParameters::object( json!({ "command": { "type": "string", "description": "The bash command to execute" } }), vec!["command".to_string()], ), ), Tool::function( "multi_edit", "Apply multiple edits to a file atomically", ToolParameters::object( json!({ "path": { "type": "string", "description": "Path to the file to edit" }, "edits": { "type": "array", "items": { "type": "object", "properties": { "old_string": { "type": "string" }, "new_string": { "type": "string" } }, "required": ["old_string", "new_string"] }, "description": "List of edit operations" } }), vec!["path".to_string(), "edits".to_string()], ), ), Tool::function( "ls", "List contents of a directory", ToolParameters::object( json!({ "path": { "type": "string", "description": "Directory path to list" }, "show_hidden": { "type": "boolean", "description": "Show hidden files", "default": false } }), vec!["path".to_string()], ), ), Tool::function( "web_search", "Search the web for information", ToolParameters::object( json!({ "query": { "type": "string", "description": "Search query" }, "max_results": { "type": "integer", "description": "Maximum results", "default": 10 } }), vec!["query".to_string()], ), ), Tool::function( "todo_write", "Update the task list", ToolParameters::object( json!({ "todos": { "type": "array", "items": { "type": "object", "properties": { "content": { "type": "string" }, "status": { "type": "string", "enum": ["pending", "in_progress", "completed"] }, "active_form": { "type": "string" } }, "required": ["content", "status", "active_form"] } } }), vec!["todos".to_string()], ), ), Tool::function( "ask_user", "Ask the user a question with options", ToolParameters::object( json!({ "questions": { "type": "array", "items": { "type": "object", "properties": { "question": { "type": "string" }, "header": { "type": "string" }, "options": { "type": "array", "items": { "type": "object", "properties": { "label": { "type": "string" }, "description": { "type": "string" } } } }, "multi_select": { "type": "boolean" } } } } }), vec!["questions".to_string()], ), ), Tool::function( "bash_output", "Get output from a background shell", ToolParameters::object( json!({ "shell_id": { "type": "string", "description": "ID of the background shell" } }), vec!["shell_id".to_string()], ), ), Tool::function( "kill_shell", "Terminate a background shell", ToolParameters::object( json!({ "shell_id": { "type": "string", "description": "ID of the shell to kill" } }), vec!["shell_id".to_string()], ), ), Tool::function( "enter_plan_mode", "Enter planning mode for complex tasks that require careful planning before implementation. In planning mode, only read-only tools are available.", ToolParameters::object( json!({}), vec![], ), ), Tool::function( "exit_plan_mode", "Exit planning mode after presenting the implementation plan. The plan will be shown to the user for approval.", ToolParameters::object( json!({}), vec![], ), ), ] } /// Converts an external tool definition to an LLM-compatible Tool definition. fn external_tool_to_llm_tool(ext: &ExternalToolDefinition) -> Tool { // Convert ExternalToolSchema to the JSON format expected by ToolParameters let properties: serde_json::Map = ext.input_schema.properties .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(); Tool::function( &ext.name, &ext.description, ToolParameters::object( Value::Object(properties), ext.input_schema.required.clone(), ), ) } /// Returns all tool definitions including external tools from plugins. /// /// This function merges the built-in tools with any external tools /// registered in the ToolContext. pub fn get_tool_definitions_with_external(ctx: &ToolContext) -> Vec { let mut tools = get_tool_definitions(); // Add external tools from plugins for ext_tool in ctx.external_tools.values() { tools.push(external_tool_to_llm_tool(ext_tool)); } tools } /// Executes a single tool call and returns its result as a string. /// /// This function handles permission checking and interacts with various tool-specific /// backends (filesystem, shell, etc.) using the provided `ToolContext`. pub async fn execute_tool( tool_name: &str, arguments: &Value, perms: &PermissionManager, ctx: &ToolContext, ) -> Result { match tool_name { "read" => { let path = arguments["path"] .as_str() .ok_or_else(|| eyre!("Missing 'path' argument"))?; // Check permission match perms.check(PermTool::Read, Some(path)) { PermissionDecision::Allow => { let content = tools_fs::read_file(path)?; Ok(content) } PermissionDecision::Ask => { Err(eyre!("Permission required: Read operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: Read operation is blocked")) } } } "glob" => { let pattern = arguments["pattern"] .as_str() .ok_or_else(|| eyre!("Missing 'pattern' argument"))?; // Check permission match perms.check(PermTool::Glob, None) { PermissionDecision::Allow => { let files = tools_fs::glob_list(pattern)?; Ok(files.join("\n")) } PermissionDecision::Ask => { Err(eyre!("Permission required: Glob operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: Glob operation is blocked")) } } } "grep" => { let root = arguments["root"] .as_str() .ok_or_else(|| eyre!("Missing 'root' argument"))?; let pattern = arguments["pattern"] .as_str() .ok_or_else(|| eyre!("Missing 'pattern' argument"))?; // Check permission match perms.check(PermTool::Grep, None) { PermissionDecision::Allow => { let results = tools_fs::grep(root, pattern)?; let lines: Vec = results .into_iter() .map(|(path, line_num, text)| format!("{}:{}:{}", path, line_num, text)) .collect(); Ok(lines.join("\n")) } PermissionDecision::Ask => { Err(eyre!("Permission required: Grep operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: Grep operation is blocked")) } } } "write" => { let path = arguments["path"] .as_str() .ok_or_else(|| eyre!("Missing 'path' argument"))?; let content = arguments["content"] .as_str() .ok_or_else(|| eyre!("Missing 'content' argument"))?; // Check permission match perms.check(PermTool::Write, Some(path)) { PermissionDecision::Allow => { tools_fs::write_file(path, content)?; Ok(format!("File written successfully: {}", path)) } PermissionDecision::Ask => { Err(eyre!("Permission required: Write operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: Write operation is blocked")) } } } "edit" => { let path = arguments["path"] .as_str() .ok_or_else(|| eyre!("Missing 'path' argument"))?; let old_string = arguments["old_string"] .as_str() .ok_or_else(|| eyre!("Missing 'old_string' argument"))?; let new_string = arguments["new_string"] .as_str() .ok_or_else(|| eyre!("Missing 'new_string' argument"))?; // Check permission match perms.check(PermTool::Edit, Some(path)) { PermissionDecision::Allow => { tools_fs::edit_file(path, old_string, new_string)?; Ok(format!("File edited successfully: {}", path)) } PermissionDecision::Ask => { Err(eyre!("Permission required: Edit operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: Edit operation is blocked")) } } } "bash" => { let command = arguments["command"] .as_str() .ok_or_else(|| eyre!("Missing 'command' argument"))?; // Check permission match perms.check(PermTool::Bash, Some(command)) { PermissionDecision::Allow => { let mut session = tools_bash::BashSession::new().await?; let output = session.execute(command, None).await?; let result = if !output.stdout.is_empty() { output.stdout } else if !output.stderr.is_empty() { format!("stderr: {}", output.stderr) } else { "Command executed successfully with no output".to_string() }; Ok(result) } PermissionDecision::Ask => { Err(eyre!("Permission required: Bash operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: Bash operation is blocked")) } } } "multi_edit" => { let path = arguments["path"] .as_str() .ok_or_else(|| eyre!("Missing 'path' argument"))?; let edits_value = arguments["edits"] .as_array() .ok_or_else(|| eyre!("Missing or invalid 'edits' argument"))?; // Parse edits let edits: Vec = serde_json::from_value(json!(edits_value))?; // Check permission match perms.check(PermTool::MultiEdit, Some(path)) { PermissionDecision::Allow => { let result = tools_fs::multi_edit_file(path, edits)?; Ok(result) } PermissionDecision::Ask => { Err(eyre!("Permission required: MultiEdit operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: MultiEdit operation is blocked")) } } } "ls" => { let path = arguments["path"] .as_str() .ok_or_else(|| eyre!("Missing 'path' argument"))?; let show_hidden = arguments.get("show_hidden") .and_then(|v| v.as_bool()) .unwrap_or(false); // Check permission match perms.check(PermTool::LS, Some(path)) { PermissionDecision::Allow => { let entries = tools_fs::list_directory(path, show_hidden)?; let output = entries .into_iter() .map(|e| { let type_marker = if e.is_dir { "/" } else { "" }; let size = e.size.map(|s| format!(" ({}B)", s)).unwrap_or_default(); format!("{}{}{}", e.name, type_marker, size) }) .collect::>() .join("\n"); Ok(output) } PermissionDecision::Ask => { Err(eyre!("Permission required: LS operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: LS operation is blocked")) } } } "web_search" => { let query = arguments["query"] .as_str() .ok_or_else(|| eyre!("Missing 'query' argument"))?; let max_results = arguments.get("max_results") .and_then(|v| v.as_u64()) .unwrap_or(10) as usize; // Check permission match perms.check(PermTool::WebSearch, None) { PermissionDecision::Allow => { // Use DuckDuckGo search provider let provider = Box::new(tools_web::DuckDuckGoSearchProvider::with_max_results(max_results)); let client = tools_web::WebSearchClient::new(provider); let results = client.search(query).await?; let formatted = tools_web::format_search_results(&results); Ok(formatted) } PermissionDecision::Ask => { Err(eyre!("Permission required: WebSearch operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: WebSearch operation is blocked")) } } } "todo_write" => { let todo_list = ctx.todo_list.as_ref() .ok_or_else(|| eyre!("TodoList not available in this context"))?; // Check permission match perms.check(PermTool::TodoWrite, None) { PermissionDecision::Allow => { let todos = tools_todo::parse_todos(arguments)?; todo_list.write(todos); Ok(todo_list.format_display()) } PermissionDecision::Ask => { Err(eyre!("Permission required: TodoWrite operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: TodoWrite operation is blocked")) } } } "ask_user" => { let sender = ctx.ask_sender.as_ref() .ok_or_else(|| eyre!("AskUser not available in this context"))?; // Check permission match perms.check(PermTool::AskUserQuestion, None) { PermissionDecision::Allow => { let questions = tools_ask::parse_questions(arguments)?; let answers = tools_ask::ask_user(sender, questions).await?; Ok(serde_json::to_string_pretty(&answers)?) } PermissionDecision::Ask => { Err(eyre!("Permission required: AskUser operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: AskUser operation is blocked")) } } } "bash_output" => { let manager = ctx.shell_manager.as_ref() .ok_or_else(|| eyre!("ShellManager not available in this context"))?; let shell_id = arguments["shell_id"] .as_str() .ok_or_else(|| eyre!("Missing 'shell_id' argument"))?; // Check permission match perms.check(PermTool::BashOutput, Some(shell_id)) { PermissionDecision::Allow => { tools_bash::bash_output(manager, shell_id) } PermissionDecision::Ask => { Err(eyre!("Permission required: BashOutput operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: BashOutput operation is blocked")) } } } "kill_shell" => { let manager = ctx.shell_manager.as_ref() .ok_or_else(|| eyre!("ShellManager not available in this context"))?; let shell_id = arguments["shell_id"] .as_str() .ok_or_else(|| eyre!("Missing 'shell_id' argument"))?; // Check permission match perms.check(PermTool::KillShell, Some(shell_id)) { PermissionDecision::Allow => { tools_bash::kill_shell(manager, shell_id) } PermissionDecision::Ask => { Err(eyre!("Permission required: KillShell operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: KillShell operation is blocked")) } } } "enter_plan_mode" => { let plan_manager = ctx.plan_manager.as_ref() .ok_or_else(|| eyre!("PlanManager not available - cannot enter planning mode"))?; // Check permission match perms.check(PermTool::EnterPlanMode, None) { PermissionDecision::Allow => { // Check if already in planning mode if ctx.is_planning().await { return Err(eyre!("Already in planning mode")); } // Create plan file let plan_path = plan_manager.create_plan().await?; // Enter planning mode ctx.set_mode(tools_plan::enter_plan_mode(plan_path.clone())).await; Ok(format!( "Entered planning mode. Plan file created at: {}\n\n\ In planning mode, only read-only tools are available. \ Use `exit_plan_mode` when you have finished writing your plan.", plan_path.display() )) } PermissionDecision::Ask => { Err(eyre!("Permission required: EnterPlanMode operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: EnterPlanMode operation is blocked")) } } } "exit_plan_mode" => { // Check permission match perms.check(PermTool::ExitPlanMode, None) { PermissionDecision::Allow => { // Check if in planning mode let mode = ctx.get_mode().await; match mode { AgentMode::Planning { plan_file, started_at } => { let plan_manager = ctx.plan_manager.as_ref() .ok_or_else(|| eyre!("PlanManager not available"))?; // Read the plan content let plan_content = plan_manager.read_plan(&plan_file).await .unwrap_or_else(|_| "No plan content written.".to_string()); // Update plan status to pending approval let _ = plan_manager.set_status(&plan_file, tools_plan::PlanStatus::PendingApproval).await; // Exit planning mode ctx.set_mode(tools_plan::exit_plan_mode()).await; let duration = chrono::Utc::now().signed_duration_since(started_at); let minutes = duration.num_minutes(); Ok(format!( "Exited planning mode after {} minutes.\n\n\ Plan file: {}\n\n\ ## Plan Content:\n\n{}\n\n\ The plan is now awaiting your approval.", minutes, plan_file.display(), plan_content )) } AgentMode::Normal => { Err(eyre!("Not in planning mode")) } } } PermissionDecision::Ask => { Err(eyre!("Permission required: ExitPlanMode operation needs approval")) } PermissionDecision::Deny => { Err(eyre!("Permission denied: ExitPlanMode operation is blocked")) } } } // Check for external tools from plugins _ => { // Look up the tool in the external tools registry if let Some(ext_tool) = ctx.get_external_tool(tool_name) { // Check permission for external tool execution // External tools require explicit permission (treated like Bash) match perms.check(PermTool::Bash, Some(&format!("external:{}", tool_name))) { PermissionDecision::Allow => { execute_external_tool(ext_tool, arguments).await } PermissionDecision::Ask => { Err(eyre!("Permission required: External tool '{}' needs approval", tool_name)) } PermissionDecision::Deny => { Err(eyre!("Permission denied: External tool '{}' is blocked", tool_name)) } } } else { Err(eyre!("Unknown tool: {}", tool_name)) } } } } /// Executes an external tool via JSON-RPC. async fn execute_external_tool( tool: &ExternalToolDefinition, arguments: &Value, ) -> Result { match &tool.transport { ExternalToolTransport::Stdio => { // Get command and args let command = tool.command.as_ref() .ok_or_else(|| eyre!("Stdio tool '{}' missing command", tool.name))?; // Spawn the process and call via JSON-RPC let executor = jsonrpc::ToolExecutor::with_timeout(tool.timeout_ms); let result = executor.execute_stdio( command, &tool.args, &std::collections::HashMap::new(), // TODO: env vars from plugin arguments.clone(), Some(tool.timeout_ms), ).await?; // Convert JSON result to string match result { Value::String(s) => Ok(s), other => Ok(serde_json::to_string_pretty(&other)?), } } ExternalToolTransport::Http => { let url = tool.url.as_ref() .ok_or_else(|| eyre!("HTTP tool '{}' missing url", tool.name))?; // HTTP transport not yet implemented Err(eyre!("HTTP transport for tool '{}' at {} is not yet implemented", tool.name, url)) } } } /// Runs the core agent loop for a given user prompt. /// /// This function iteratively calls the LLM provider, processes tool execution requests, /// and feeds the results back to the model until a final text response is generated /// or the iteration limit is reached. pub async fn run_agent_loop( provider: &P, user_prompt: &str, options: &ChatOptions, perms: &PermissionManager, ctx: &ToolContext, ) -> Result { let tools = get_tool_definitions(); let mut messages = vec![ChatMessage::user(user_prompt)]; let max_iterations = 10; // Prevent infinite loops let mut iteration = 0; loop { iteration += 1; if iteration > max_iterations { return Err(eyre!("Max iterations reached")); } // Call LLM with messages and tools let mut stream = provider .chat_stream(&messages, options, Some(&tools)) .await .map_err(|e| eyre!("LLM provider error: {}", e))?; let mut response_content = String::new(); let mut accumulated_tool_calls: Vec = Vec::new(); // Collect the streamed response while let Some(chunk) = stream.next().await { let chunk = chunk.map_err(|e| eyre!("Stream error: {}", e))?; if let Some(content) = chunk.content { response_content.push_str(&content); } // Accumulate tool calls from deltas if let Some(deltas) = chunk.tool_calls { for delta in deltas { // Ensure the accumulated_tool_calls vec is large enough while accumulated_tool_calls.len() <= delta.index { accumulated_tool_calls.push(llm_core::ToolCall { id: String::new(), call_type: "function".to_string(), function: llm_core::FunctionCall { name: String::new(), arguments: Value::Null, }, }); } let tool_call = &mut accumulated_tool_calls[delta.index]; if let Some(id) = delta.id { tool_call.id = id; } if let Some(name) = delta.function_name { tool_call.function.name = name; } if let Some(args_delta) = delta.arguments_delta { // Accumulate the arguments string let current_args = if tool_call.function.arguments.is_null() { String::new() } else { tool_call.function.arguments.to_string() }; let new_args = current_args + &args_delta; // Try to parse as JSON, but keep as string if incomplete tool_call.function.arguments = serde_json::from_str(&new_args) .unwrap_or(Value::String(new_args)); } } } } // Drop the stream to release the borrow on messages drop(stream); // Filter out incomplete tool calls and check if we have valid ones let valid_tool_calls: Vec<_> = accumulated_tool_calls .into_iter() .filter(|tc| !tc.id.is_empty() && !tc.function.name.is_empty()) .collect(); // Check if LLM wants to call tools if !valid_tool_calls.is_empty() { // Add assistant message with tool calls messages.push(ChatMessage { role: llm_core::Role::Assistant, content: if response_content.is_empty() { None } else { Some(response_content.clone()) }, tool_calls: Some(valid_tool_calls.clone()), tool_call_id: None, name: None, }); // Execute each tool call for call in valid_tool_calls { let tool_name = &call.function.name; let arguments = &call.function.arguments; tracing::debug!(tool = %tool_name, args = %arguments, "executing tool call"); match execute_tool(tool_name, arguments, perms, ctx).await { Ok(result) => { tracing::debug!(tool = %tool_name, result = %result, "tool call succeeded"); // Add tool result message messages.push(ChatMessage::tool_result(&call.id, result)); } Err(e) => { tracing::warn!(tool = %tool_name, error = %e, "tool call failed"); // Add error message as tool result messages.push(ChatMessage::tool_result(&call.id, format!("Error: {}", e))); } } } // Continue loop to get next response continue; } // No tool calls, we're done return Ok(response_content); } } /// Runs the agent loop and streams events back through the provided channel. /// /// This allows UIs to provide real-time feedback as the LLM generates text and /// as tools are being executed. pub async fn run_agent_loop_streaming( provider: &P, user_prompt: &str, options: &ChatOptions, perms: &PermissionManager, ctx: &ToolContext, events: AgentEventSender, ) -> Result { let tools = get_tool_definitions(); let mut messages = vec![ChatMessage::user(user_prompt)]; let max_iterations = 10; let mut iteration = 0; loop { iteration += 1; if iteration > max_iterations { let _ = events.send(AgentEvent::Error("Max iterations reached".into())).await; return Err(eyre!("Max iterations reached")); } // Stream LLM response let mut stream = provider .chat_stream(&messages, options, Some(&tools)) .await .map_err(|e| { let err_msg = format!("LLM provider error: {}", e); let _ = events.try_send(AgentEvent::Error(err_msg.clone())); eyre!(err_msg) })?; let mut response_content = String::new(); let mut tool_calls_builder = ToolCallsBuilder::new(); while let Some(chunk) = stream.next().await { let chunk = chunk.map_err(|e| { let err_msg = format!("Stream error: {}", e); let _ = events.try_send(AgentEvent::Error(err_msg.clone())); eyre!(err_msg) })?; // Send text deltas if let Some(text) = &chunk.content { let _ = events.send(AgentEvent::TextDelta(text.clone())).await; response_content.push_str(text); } // Accumulate tool calls if let Some(deltas) = &chunk.tool_calls { tool_calls_builder.add_deltas(deltas); } } // Drop stream to release borrow drop(stream); let tool_calls = tool_calls_builder.build(); if tool_calls.is_empty() { let _ = events .send(AgentEvent::Done { final_response: response_content.clone(), }) .await; return Ok(response_content); } // Add assistant message messages.push(ChatMessage { role: llm_core::Role::Assistant, content: if response_content.is_empty() { None } else { Some(response_content.clone()) }, tool_calls: Some(tool_calls.clone()), tool_call_id: None, name: None, }); // Execute tools with events for call in &tool_calls { let tool_id = call.id.clone(); let tool_name = call.function.name.clone(); let _ = events .send(AgentEvent::ToolStart { tool_name: tool_name.clone(), tool_id: tool_id.clone(), }) .await; tracing::debug!(tool = %tool_name, args = %call.function.arguments, "executing tool call"); let result = execute_tool(&tool_name, &call.function.arguments, perms, ctx).await; match &result { Ok(output) => { tracing::debug!(tool = %tool_name, result = %output, "tool call succeeded"); let _ = events .send(AgentEvent::ToolOutput { tool_id: tool_id.clone(), content: output.clone(), is_error: false, }) .await; let _ = events .send(AgentEvent::ToolEnd { tool_id: tool_id.clone(), success: true, }) .await; messages.push(ChatMessage::tool_result(&tool_id, output)); } Err(e) => { tracing::warn!(tool = %tool_name, error = %e, "tool call failed"); let error_msg = e.to_string(); let _ = events .send(AgentEvent::ToolOutput { tool_id: tool_id.clone(), content: error_msg.clone(), is_error: true, }) .await; let _ = events .send(AgentEvent::ToolEnd { tool_id: tool_id.clone(), success: false, }) .await; messages.push(ChatMessage::tool_result( &tool_id, format!("Error: {}", error_msg), )); } } } } } #[cfg(test)] mod tests { use super::*; use llm_core::ToolCallDelta; #[test] fn test_tool_calls_builder() { let mut builder = ToolCallsBuilder::new(); // Add first tool call deltas builder.add_deltas(&[ ToolCallDelta { index: 0, id: Some("call_1".to_string()), function_name: Some("read".to_string()), arguments_delta: Some("{\"path\":".to_string()), } ]); // Add second tool call deltas builder.add_deltas(&[ ToolCallDelta { index: 1, id: Some("call_2".to_string()), function_name: Some("write".to_string()), arguments_delta: Some("{\"path\":\"test.txt\"".to_string()), } ]); // Add more deltas for first tool call builder.add_deltas(&[ ToolCallDelta { index: 0, id: None, function_name: None, arguments_delta: Some("\"lib.rs\"}".to_string()), } ]); // Add more deltas for second tool call builder.add_deltas(&[ ToolCallDelta { index: 1, id: None, function_name: None, arguments_delta: Some(",\"content\":\"hello\"}".to_string()), } ]); let calls = builder.build(); assert_eq!(calls.len(), 2); assert_eq!(calls[0].id, "call_1"); assert_eq!(calls[0].function.name, "read"); assert_eq!(calls[0].function.arguments, json!({"path": "lib.rs"})); assert_eq!(calls[1].id, "call_2"); assert_eq!(calls[1].function.name, "write"); assert_eq!(calls[1].function.arguments, json!({"path": "test.txt", "content": "hello"})); } }