feat(core): Switch LLM interaction to Streaming-only

This commit is contained in:
2025-12-26 19:11:05 +01:00
parent 68e3993234
commit c54962b0e0
3 changed files with 104 additions and 25 deletions

View File

@@ -29,6 +29,8 @@ atty = "0.2"
futures-util = "0.3.31"
rpassword = "7"
open = "5"
futures = "0.3.31"
async-trait = "0.1.89"
[dev-dependencies]
assert_cmd = "2.0"

View File

@@ -1,13 +1,46 @@
use crate::messages::Message;
use crate::messages::{Message, UserAction, AgentResponse};
use tokio::sync::mpsc;
use std::sync::Arc;
use llm_core::{LlmProvider, ChatMessage, ChatOptions};
use futures::StreamExt;
/// The main background task that handles logic, API calls, and state updates.
pub async fn run_engine_loop(mut rx: mpsc::Receiver<Message>) {
pub async fn run_engine_loop(
mut rx: mpsc::Receiver<Message>,
tx_ui: mpsc::Sender<Message>,
client: Arc<dyn LlmProvider>,
) {
while let Some(msg) = rx.recv().await {
match msg {
Message::UserAction(action) => {
println!("Engine received action: {:?}", action);
// TODO: Process action
Message::UserAction(UserAction::Input(text)) => {
// TODO: Maintain conversation history (AppState) - next task
let messages = vec![ChatMessage::user(text)];
// Use default options for now
let options = ChatOptions::default();
match client.chat_stream(&messages, &options, None).await {
Ok(mut stream) => {
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => {
if let Some(content) = chunk.content {
if let Err(e) = tx_ui.send(Message::AgentResponse(AgentResponse::Token(content))).await {
eprintln!("Failed to send token to UI: {}", e);
break;
}
}
}
Err(e) => {
let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await;
}
}
}
let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Complete)).await;
}
Err(e) => {
let _ = tx_ui.send(Message::AgentResponse(AgentResponse::Error(e.to_string()))).await;
}
}
}
_ => {}
}
@@ -17,23 +50,64 @@ pub async fn run_engine_loop(mut rx: mpsc::Receiver<Message>) {
#[cfg(test)]
mod tests {
use super::*;
use crate::messages::{Message, UserAction};
use crate::messages::{Message, UserAction, AgentResponse};
use llm_core::{LlmProvider, LlmError, ChatMessage, ChatOptions, Tool, ChunkStream, StreamChunk};
use async_trait::async_trait;
use futures::stream;
struct MockProvider;
#[async_trait]
impl LlmProvider for MockProvider {
fn name(&self) -> &str { "mock" }
fn model(&self) -> &str { "mock-model" }
async fn chat_stream(
&self,
_messages: &[ChatMessage],
_options: &ChatOptions,
_tools: Option<&[Tool]>,
) -> Result<ChunkStream, LlmError> {
let chunks = vec![
Ok(StreamChunk { content: Some("Hello".to_string()), tool_calls: None, done: false, usage: None }),
Ok(StreamChunk { content: Some(" World".to_string()), tool_calls: None, done: true, usage: None }),
];
Ok(Box::pin(stream::iter(chunks)))
}
}
#[tokio::test]
async fn test_engine_loop_structure() {
let (tx, rx) = mpsc::channel(1);
async fn test_engine_streaming() {
let (tx_in, rx_in) = mpsc::channel(1);
let (tx_out, mut rx_out) = mpsc::channel(10);
let client = Arc::new(MockProvider);
// Spawn the engine loop
let handle = tokio::spawn(async move {
run_engine_loop(rx).await;
tokio::spawn(async move {
run_engine_loop(rx_in, tx_out, client).await;
});
// Send a message
let msg = Message::UserAction(UserAction::Input("ping".to_string()));
assert!(tx.send(msg).await.is_ok());
tx_in.send(Message::UserAction(UserAction::Input("Hi".to_string()))).await.unwrap();
// Cleanup: dropping tx should close rx and terminate the loop
drop(tx);
assert!(handle.await.is_ok());
// Verify streaming responses
if let Some(Message::AgentResponse(AgentResponse::Token(s))) = rx_out.recv().await {
assert_eq!(s, "Hello");
} else {
panic!("Expected Token(Hello)");
}
if let Some(Message::AgentResponse(AgentResponse::Token(s))) = rx_out.recv().await {
assert_eq!(s, " World");
} else {
panic!("Expected Token( World)");
}
if let Some(Message::AgentResponse(AgentResponse::Complete)) = rx_out.recv().await {
// OK
} else {
panic!("Expected Complete");
}
}
}
}

View File

@@ -334,15 +334,6 @@ async fn main() -> Result<()> {
let session_id = generate_session_id();
let output_format = args.output_format;
// Initialize async engine infrastructure
let (tx, rx) = tokio::sync::mpsc::channel::<messages::Message>(100);
// Spawn the Engine Loop
tokio::spawn(async move {
engine::run_engine_loop(rx).await;
});
// Keep tx for future use (will be passed to UI/REPL)
let _tx = tx;
if let Some(cmd) = args.cmd {
match cmd {
Cmd::Read { path } => {
@@ -767,6 +758,18 @@ async fn main() -> Result<()> {
)?;
let opts = ChatOptions::new(&model);
// Initialize async engine infrastructure
let (tx, rx) = tokio::sync::mpsc::channel::<messages::Message>(100);
// Keep tx for future use (will be passed to UI/REPL)
let _tx = tx.clone();
// Spawn the Engine Loop
let client_clone = client.clone();
let tx_clone = tx.clone();
tokio::spawn(async move {
engine::run_engine_loop(rx, tx_clone, client_clone).await;
});
// Check if interactive mode (no prompt provided)
if args.prompt.is_empty() {
// Use TUI mode unless --no-tui flag is set or not a TTY