feat(core): Implement Permission Interceptor and async request/response

This commit is contained in:
2025-12-26 19:24:03 +01:00
parent f5d465b5a9
commit 159f2d3330
4 changed files with 153 additions and 90 deletions

View File

@@ -1,20 +1,27 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::state::AppState;
use tokio::sync::{mpsc, Mutex};
use crate::state::{AppState, AppMode};
use llm_core::{LlmProvider, ChatMessage, ChatOptions};
use color_eyre::eyre::Result;
use futures::StreamExt;
use crate::messages::{Message, AgentResponse};
/// Manages the lifecycle and state of the agent
pub struct AgentManager {
client: Arc<dyn LlmProvider>,
state: Arc<Mutex<AppState>>,
tx_ui: Option<mpsc::Sender<Message>>,
}
impl AgentManager {
/// Create a new AgentManager
pub fn new(client: Arc<dyn LlmProvider>, state: Arc<Mutex<AppState>>) -> Self {
Self { client, state }
Self { client, state, tx_ui: None }
}
/// Set the UI message sender
pub fn with_ui_sender(mut self, tx: mpsc::Sender<Message>) -> Self {
self.tx_ui = Some(tx);
self
}
/// Get a reference to the LLM client
@@ -27,11 +34,48 @@ impl AgentManager {
&self.state
}
/// Check if a tool execution is permitted
pub async fn check_permission(&self, tool: &str, context: Option<&str>) -> Result<bool> {
let mode = {
let guard = self.state.lock().await;
guard.mode
};
match mode {
AppMode::AcceptAll => Ok(true),
AppMode::Normal | AppMode::Plan => {
// Request permission from UI
if let Some(tx) = &self.tx_ui {
let _ = tx.send(Message::AgentResponse(AgentResponse::PermissionRequest {
tool: tool.to_string(),
context: context.map(|s| s.to_string()),
})).await;
// Wait for result
let notify = {
let guard = self.state.lock().await;
guard.permission_notify.clone()
};
notify.notified().await;
let result = {
let mut guard = self.state.lock().await;
guard.last_permission_result.take().unwrap_or(false)
};
Ok(result)
} else {
// No UI sender, default to deny for safety
Ok(false)
}
}
}
}
/// Spawn a sub-agent for a specific task
pub async fn spawn_sub_agent(&self, _task: &str) -> Result<String> {
// Basic placeholder implementation for Sub-Agent support
// In a real implementation, this would create a new AgentManager instance
// with a focused system prompt and context.
Ok("Sub-agent task completed (mock)".to_string())
}
@@ -43,16 +87,12 @@ impl AgentManager {
guard.add_message(ChatMessage::user(input.to_string()));
}
// 2. Prepare context (Sliding Window logic would go here)
// 2. Prepare context
let messages = {
let guard = self.state.lock().await;
guard.messages.clone()
};
// 3. Call LLM (Non-streaming for now to simplify step logic, or use stream internally)
// For the Reasoning Loop, we need to handle potential tool calls.
// This initial implementation just chats.
let options = ChatOptions::default();
let response = self.client.chat(&messages, &options, None).await?;
@@ -70,7 +110,7 @@ impl AgentManager {
#[cfg(test)]
mod tests {
use super::*;
use llm_core::{LlmProvider, ChatMessage, ChatOptions, Tool, ChunkStream, ChatResponse};
use llm_core::{Tool, ChunkStream, ChatResponse};
use async_trait::async_trait;
struct MockProvider;
@@ -119,4 +159,25 @@ mod tests {
let result = manager.spawn_sub_agent("Do something").await.unwrap();
assert_eq!(result, "Sub-agent task completed (mock)");
}
}
#[tokio::test]
async fn test_permission_request() {
let client = Arc::new(MockProvider);
let state = Arc::new(Mutex::new(AppState::new()));
let (tx, mut rx) = mpsc::channel(1);
let manager = AgentManager::new(client, state.clone()).with_ui_sender(tx);
let state_clone = state.clone();
tokio::spawn(async move {
// Simulate UI receiving request and granting permission
if let Some(Message::AgentResponse(AgentResponse::PermissionRequest { .. })) = rx.recv().await {
let mut guard = state_clone.lock().await;
guard.set_permission_result(true);
}
});
let allowed = manager.check_permission("bash", Some("ls")).await.unwrap();
assert!(allowed);
}
}

View File

@@ -4,6 +4,7 @@ use tokio::sync::{mpsc, Mutex};
use std::sync::Arc;
use llm_core::{LlmProvider, ChatMessage, ChatOptions};
use futures::StreamExt;
use crate::agent_manager::AgentManager;
/// The main background task that handles logic, API calls, and state updates.
pub async fn run_engine_loop(
@@ -12,52 +13,64 @@ pub async fn run_engine_loop(
client: Arc<dyn LlmProvider>,
state: Arc<Mutex<AppState>>,
) {
let agent_manager = Arc::new(AgentManager::new(client, state.clone()).with_ui_sender(tx_ui.clone()));
while let Some(msg) = rx.recv().await {
match msg {
Message::UserAction(UserAction::Input(text)) => {
// Update history with user message
let messages = {
let mut guard = state.lock().await;
guard.add_message(ChatMessage::user(text.clone()));
guard.messages.clone()
};
let tx_ui_clone = tx_ui.clone();
let agent_manager_clone = agent_manager.clone();
// Spawn a task for the agent interaction so the engine loop can
// continue receiving messages (like PermissionResult).
tokio::spawn(async move {
let messages = {
let mut guard = agent_manager_clone.state().lock().await;
guard.add_message(ChatMessage::user(text.clone()));
guard.messages.clone()
};
// Use default options for now
let options = ChatOptions::default();
let options = ChatOptions::default();
match client.chat_stream(&messages, &options, None).await {
Ok(mut stream) => {
let mut full_response = String::new();
match agent_manager_clone.client().chat_stream(&messages, &options, None).await {
Ok(mut stream) => {
let mut full_response = String::new();
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => {
if let Some(content) = chunk.content {
full_response.push_str(&content);
if let Err(e) = tx_ui.send(Message::AgentResponse(AgentResponse::Token(content))).await {
eprintln!("Failed to send token to UI: {}", e);
break;
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => {
if let Some(content) = chunk.content {
full_response.push_str(&content);
let _ = tx_ui_clone.send(Message::AgentResponse(AgentResponse::Token(content))).await;
}
}
}
Err(e) => {
let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await;
Err(e) => {
let _ = tx_ui_clone.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await;
}
}
}
}
// Add assistant response to history
{
let mut guard = state.lock().await;
guard.add_message(ChatMessage::assistant(full_response));
}
{
let mut guard = agent_manager_clone.state().lock().await;
guard.add_message(ChatMessage::assistant(full_response));
}
let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Complete)).await;
let _ = tx_ui_clone.send(Message::AgentResponse(AgentResponse::Complete)).await;
}
Err(e) => {
let _ = tx_ui_clone.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await;
}
}
Err(e) => {
let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await;
}
}
});
}
Message::UserAction(UserAction::PermissionResult(res)) => {
let mut guard = state.lock().await;
guard.set_permission_result(res);
}
Message::UserAction(UserAction::Exit) => {
let mut guard = state.lock().await;
guard.running = false;
break;
}
_ => {}
}
@@ -67,8 +80,7 @@ pub async fn run_engine_loop(
#[cfg(test)]
mod tests {
use super::*;
use crate::messages::{Message, UserAction, AgentResponse};
use llm_core::{LlmProvider, LlmError, ChatMessage, ChatOptions, Tool, ChunkStream, StreamChunk};
use llm_core::{LlmError, Tool, ChunkStream, StreamChunk};
use async_trait::async_trait;
use futures::stream;
@@ -110,29 +122,21 @@ mod tests {
tx_in.send(Message::UserAction(UserAction::Input("Hi".to_string()))).await.unwrap();
// Verify streaming responses
if let Some(Message::AgentResponse(AgentResponse::Token(s))) = rx_out.recv().await {
assert_eq!(s, "Hello");
} else {
panic!("Expected Token(Hello)");
}
if let Some(Message::AgentResponse(AgentResponse::Token(s))) = rx_out.recv().await {
assert_eq!(s, " World");
} else {
panic!("Expected Token( World)");
}
if let Some(Message::AgentResponse(AgentResponse::Complete)) = rx_out.recv().await {
// OK
} else {
panic!("Expected Complete");
let mut tokens = Vec::new();
while let Some(msg) = rx_out.recv().await {
match msg {
Message::AgentResponse(AgentResponse::Token(s)) => tokens.push(s),
Message::AgentResponse(AgentResponse::Complete) => break,
_ => {}
}
}
assert_eq!(tokens, vec!["Hello", " World"]);
}
#[tokio::test]
async fn test_engine_state_update() {
let (tx_in, rx_in) = mpsc::channel(1);
let (tx_out, mut rx_out) = mpsc::channel(10);
async fn test_engine_permission_result() {
let (tx_in, rx_in) = mpsc::channel(1);
let (tx_out, _rx_out) = mpsc::channel(10);
let client = Arc::new(MockProvider);
let state = Arc::new(Mutex::new(AppState::new()));
@@ -143,26 +147,13 @@ mod tests {
run_engine_loop(rx_in, tx_out, client, state_clone).await;
});
// Send a message
tx_in.send(Message::UserAction(UserAction::Input("Hi".to_string()))).await.unwrap();
// Wait for completion
while let Some(msg) = rx_out.recv().await {
if let Message::AgentResponse(AgentResponse::Complete) = msg {
break;
}
}
// Verify state
// Send a PermissionResult
tx_in.send(Message::UserAction(UserAction::PermissionResult(true))).await.unwrap();
// Give it a moment to process
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let guard = state.lock().await;
assert_eq!(guard.messages.len(), 2); // User + Assistant
match &guard.messages[0].role {
llm_core::Role::User => {},
_ => panic!("First message should be User"),
}
match &guard.messages[1].role {
llm_core::Role::Assistant => {},
_ => panic!("Second message should be Assistant"),
}
assert_eq!(guard.last_permission_result, Some(true));
}
}

View File

@@ -11,6 +11,7 @@ pub enum Message {
pub enum UserAction {
Input(String),
Command(String),
PermissionResult(bool),
Exit,
}
@@ -18,6 +19,7 @@ pub enum UserAction {
pub enum AgentResponse {
Token(String),
ToolCall { name: String, args: String },
PermissionRequest { tool: String, context: Option<String> },
Complete,
Error(String),
}

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, Notify};
use llm_core::ChatMessage;
use serde::{Deserialize, Serialize};
@@ -22,6 +22,8 @@ pub struct AppState {
pub messages: Vec<ChatMessage>,
pub running: bool,
pub mode: AppMode,
pub last_permission_result: Option<bool>,
pub permission_notify: Arc<Notify>,
}
impl AppState {
@@ -30,12 +32,19 @@ impl AppState {
messages: Vec::new(),
running: true,
mode: AppMode::Normal,
last_permission_result: None,
permission_notify: Arc::new(Notify::new()),
}
}
pub fn add_message(&mut self, message: ChatMessage) {
self.messages.push(message);
}
pub fn set_permission_result(&mut self, result: bool) {
self.last_permission_result = Some(result);
self.permission_notify.notify_one();
}
}
#[cfg(test)]