diff --git a/crates/app/cli/Cargo.toml b/crates/app/cli/Cargo.toml index 9c1fbe0..bbe72db 100644 --- a/crates/app/cli/Cargo.toml +++ b/crates/app/cli/Cargo.toml @@ -29,6 +29,8 @@ atty = "0.2" futures-util = "0.3.31" rpassword = "7" open = "5" +futures = "0.3.31" +async-trait = "0.1.89" [dev-dependencies] assert_cmd = "2.0" diff --git a/crates/app/cli/src/engine.rs b/crates/app/cli/src/engine.rs index cc4ce8e..f9370be 100644 --- a/crates/app/cli/src/engine.rs +++ b/crates/app/cli/src/engine.rs @@ -1,13 +1,46 @@ -use crate::messages::Message; +use crate::messages::{Message, UserAction, AgentResponse}; use tokio::sync::mpsc; +use std::sync::Arc; +use llm_core::{LlmProvider, ChatMessage, ChatOptions}; +use futures::StreamExt; /// The main background task that handles logic, API calls, and state updates. -pub async fn run_engine_loop(mut rx: mpsc::Receiver) { +pub async fn run_engine_loop( + mut rx: mpsc::Receiver, + tx_ui: mpsc::Sender, + client: Arc, +) { while let Some(msg) = rx.recv().await { match msg { - Message::UserAction(action) => { - println!("Engine received action: {:?}", action); - // TODO: Process action + Message::UserAction(UserAction::Input(text)) => { + // TODO: Maintain conversation history (AppState) - next task + let messages = vec![ChatMessage::user(text)]; + // Use default options for now + let options = ChatOptions::default(); + + match client.chat_stream(&messages, &options, None).await { + Ok(mut stream) => { + while let Some(result) = stream.next().await { + match result { + Ok(chunk) => { + if let Some(content) = chunk.content { + if let Err(e) = tx_ui.send(Message::AgentResponse(AgentResponse::Token(content))).await { + eprintln!("Failed to send token to UI: {}", e); + break; + } + } + } + Err(e) => { + let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await; + } + } + } + let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Complete)).await; + } + Err(e) => { + let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await; + } + } } _ => {} } @@ -17,23 +50,64 @@ pub async fn run_engine_loop(mut rx: mpsc::Receiver) { #[cfg(test)] mod tests { use super::*; - use crate::messages::{Message, UserAction}; + use crate::messages::{Message, UserAction, AgentResponse}; + use llm_core::{LlmProvider, LlmError, ChatMessage, ChatOptions, Tool, ChunkStream, StreamChunk}; + use async_trait::async_trait; + use futures::stream; + + struct MockProvider; + + #[async_trait] + impl LlmProvider for MockProvider { + fn name(&self) -> &str { "mock" } + fn model(&self) -> &str { "mock-model" } + + async fn chat_stream( + &self, + _messages: &[ChatMessage], + _options: &ChatOptions, + _tools: Option<&[Tool]>, + ) -> Result { + let chunks = vec![ + Ok(StreamChunk { content: Some("Hello".to_string()), tool_calls: None, done: false, usage: None }), + Ok(StreamChunk { content: Some(" World".to_string()), tool_calls: None, done: true, usage: None }), + ]; + Ok(Box::pin(stream::iter(chunks))) + } + } #[tokio::test] - async fn test_engine_loop_structure() { - let (tx, rx) = mpsc::channel(1); + async fn test_engine_streaming() { + let (tx_in, rx_in) = mpsc::channel(1); + let (tx_out, mut rx_out) = mpsc::channel(10); + let client = Arc::new(MockProvider); + // Spawn the engine loop - let handle = tokio::spawn(async move { - run_engine_loop(rx).await; + tokio::spawn(async move { + run_engine_loop(rx_in, tx_out, client).await; }); // Send a message - let msg = Message::UserAction(UserAction::Input("ping".to_string())); - assert!(tx.send(msg).await.is_ok()); + tx_in.send(Message::UserAction(UserAction::Input("Hi".to_string()))).await.unwrap(); - // Cleanup: dropping tx should close rx and terminate the loop - drop(tx); - assert!(handle.await.is_ok()); + // Verify streaming responses + if let Some(Message::AgentResponse(AgentResponse::Token(s))) = rx_out.recv().await { + assert_eq!(s, "Hello"); + } else { + panic!("Expected Token(Hello)"); + } + + if let Some(Message::AgentResponse(AgentResponse::Token(s))) = rx_out.recv().await { + assert_eq!(s, " World"); + } else { + panic!("Expected Token( World)"); + } + + if let Some(Message::AgentResponse(AgentResponse::Complete)) = rx_out.recv().await { + // OK + } else { + panic!("Expected Complete"); + } } -} +} \ No newline at end of file diff --git a/crates/app/cli/src/main.rs b/crates/app/cli/src/main.rs index c59c591..4733cba 100644 --- a/crates/app/cli/src/main.rs +++ b/crates/app/cli/src/main.rs @@ -334,15 +334,6 @@ async fn main() -> Result<()> { let session_id = generate_session_id(); let output_format = args.output_format; - // Initialize async engine infrastructure - let (tx, rx) = tokio::sync::mpsc::channel::(100); - // Spawn the Engine Loop - tokio::spawn(async move { - engine::run_engine_loop(rx).await; - }); - // Keep tx for future use (will be passed to UI/REPL) - let _tx = tx; - if let Some(cmd) = args.cmd { match cmd { Cmd::Read { path } => { @@ -767,6 +758,18 @@ async fn main() -> Result<()> { )?; let opts = ChatOptions::new(&model); + // Initialize async engine infrastructure + let (tx, rx) = tokio::sync::mpsc::channel::(100); + // Keep tx for future use (will be passed to UI/REPL) + let _tx = tx.clone(); + + // Spawn the Engine Loop + let client_clone = client.clone(); + let tx_clone = tx.clone(); + tokio::spawn(async move { + engine::run_engine_loop(rx, tx_clone, client_clone).await; + }); + // Check if interactive mode (no prompt provided) if args.prompt.is_empty() { // Use TUI mode unless --no-tui flag is set or not a TTY