diff --git a/crates/owlen-tui/src/app/mod.rs b/crates/owlen-tui/src/app/mod.rs index b17d7f3..04227ae 100644 --- a/crates/owlen-tui/src/app/mod.rs +++ b/crates/owlen-tui/src/app/mod.rs @@ -6,21 +6,19 @@ mod worker; pub mod messages; pub use worker::background_worker; -use std::{ - io, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{io, sync::Arc, time::Duration}; use anyhow::Result; use async_trait::async_trait; -use crossterm::event::{self, KeyEventKind}; +use crossterm::event; use owlen_core::{provider::ProviderManager, state::AppState}; use ratatui::{Terminal, backend::CrosstermBackend}; use tokio::{ - sync::mpsc::{self, error::TryRecvError}, - task::{AbortHandle, JoinHandle, yield_now}, + sync::mpsc, + task::{self, AbortHandle, JoinHandle}, + time::{MissedTickBehavior, interval}, }; +use tokio_util::sync::CancellationToken; use uuid::Uuid; use crate::{Event, SessionEvent, events}; @@ -28,6 +26,20 @@ use crate::{Event, SessionEvent, events}; pub use handler::MessageState; pub use messages::AppMessage; +#[derive(Debug)] +enum AppEvent { + Message(AppMessage), + Session(SessionEvent), + Ui(Event), + FrameTick, +} + +#[derive(Debug, Clone, Copy)] +enum LoopControl { + Continue, + Exit(AppState), +} + #[async_trait] pub trait UiRuntime: MessageState { async fn handle_ui_event(&mut self, event: Event) -> Result; @@ -105,109 +117,154 @@ impl App { .take() .expect("App::run called without an available message receiver"); - let poll_interval = Duration::from_millis(16); - let mut last_frame = Instant::now(); - let frame_interval = Duration::from_millis(16); + let (app_event_tx, mut app_event_rx) = mpsc::unbounded_channel::(); + let (input_cancel, input_handle) = Self::spawn_input_listener(app_event_tx.clone()); + drop(app_event_tx); + + let mut frame_interval = interval(Duration::from_millis(16)); + frame_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); let mut worker_handle = Some(self.spawn_background_worker()); + let mut exit_state = AppState::Quit; - let exit_state = AppState::Quit; - 'main: loop { - state.advance_loading_animation(); + loop { + self.pump_background(state).await?; - state.process_pending_llm_request().await?; - state.process_pending_tool_execution().await?; - state.poll_controller_events()?; + let next_event = tokio::select! { + Some(event) = app_event_rx.recv() => event, + Some(message) = message_rx.recv() => AppEvent::Message(message), + Some(session_event) = session_rx.recv() => AppEvent::Session(session_event), + _ = frame_interval.tick() => AppEvent::FrameTick, + else => break, + }; - loop { - match session_rx.try_recv() { - Ok(session_event) => { - state.handle_session_event(session_event).await?; - } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => { - break 'main; + let is_frame_tick = matches!(next_event, AppEvent::FrameTick); + + match self.dispatch_app_event(state, next_event).await? { + LoopControl::Continue => { + if is_frame_tick { + render(terminal, state)?; } } - } - - loop { - match message_rx.try_recv() { - Ok(message) => { - if self.handle_message(state, message) { - if let Some(handle) = worker_handle.take() { - handle.abort(); - } - break 'main; - } - } - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break, - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break, + LoopControl::Exit(state_value) => { + exit_state = state_value; + break; } } - - if last_frame.elapsed() >= frame_interval { - render(terminal, state)?; - last_frame = Instant::now(); - } - - if self.handle_message(state, AppMessage::Tick) { - if let Some(handle) = worker_handle.take() { - handle.abort(); - } - break 'main; - } - - match event::poll(poll_interval) { - Ok(true) => match event::read() { - Ok(raw_event) => { - if let Some(ui_event) = events::from_crossterm_event(raw_event) { - if let Event::Key(key) = &ui_event { - if key.kind == KeyEventKind::Press { - let _ = self.message_tx.send(AppMessage::KeyPress(*key)); - } - } else if let Event::Resize(width, height) = &ui_event { - let _ = self.message_tx.send(AppMessage::Resize { - width: *width, - height: *height, - }); - } - - if matches!(state.handle_ui_event(ui_event).await?, AppState::Quit) { - if let Some(handle) = worker_handle.take() { - handle.abort(); - } - break 'main; - } - } - } - Err(err) => { - if let Some(handle) = worker_handle.take() { - handle.abort(); - } - return Err(err.into()); - } - }, - Ok(false) => {} - Err(err) => { - if let Some(handle) = worker_handle.take() { - handle.abort(); - } - return Err(err.into()); - } - } - - yield_now().await; } - if let Some(handle) = worker_handle { + input_cancel.cancel(); + let _ = input_handle.await; + + if let Some(handle) = worker_handle.take() { handle.abort(); + let _ = handle.await; } self.message_rx = Some(message_rx); Ok(exit_state) } + + async fn pump_background(&mut self, state: &mut State) -> Result<()> + where + State: UiRuntime, + { + state.advance_loading_animation(); + state.process_pending_llm_request().await?; + state.process_pending_tool_execution().await?; + state.poll_controller_events()?; + Ok(()) + } + + async fn dispatch_app_event( + &mut self, + state: &mut State, + event: AppEvent, + ) -> Result + where + State: UiRuntime, + { + let control = match event { + AppEvent::Message(message) => { + if self.handle_message(state, message) { + LoopControl::Exit(AppState::Quit) + } else { + LoopControl::Continue + } + } + AppEvent::Session(session_event) => { + state.handle_session_event(session_event).await?; + LoopControl::Continue + } + AppEvent::Ui(ui_event) => { + match &ui_event { + Event::Key(key) => { + let _ = self.message_tx.send(AppMessage::KeyPress(*key)); + } + Event::Resize(width, height) => { + let _ = self.message_tx.send(AppMessage::Resize { + width: *width, + height: *height, + }); + } + Event::Tick => { + if self.handle_message(state, AppMessage::Tick) { + return Ok(LoopControl::Exit(AppState::Quit)); + } + return Ok(LoopControl::Continue); + } + _ => {} + } + + let outcome = state.handle_ui_event(ui_event).await?; + if matches!(outcome, AppState::Quit) { + LoopControl::Exit(outcome) + } else { + LoopControl::Continue + } + } + AppEvent::FrameTick => { + if self.handle_message(state, AppMessage::Tick) { + LoopControl::Exit(AppState::Quit) + } else { + LoopControl::Continue + } + } + }; + + Ok(control) + } + + fn spawn_input_listener( + sender: mpsc::UnboundedSender, + ) -> (CancellationToken, JoinHandle<()>) { + let cancellation = CancellationToken::new(); + let handle = task::spawn_blocking({ + let cancellation = cancellation.clone(); + move || { + let poll_interval = Duration::from_millis(16); + while !cancellation.is_cancelled() { + match event::poll(poll_interval) { + Ok(true) => match event::read() { + Ok(raw_event) => { + if let Some(ui_event) = events::from_crossterm_event(raw_event) { + if sender.send(AppEvent::Ui(ui_event)).is_err() { + break; + } + } + } + Err(_) => continue, + }, + Ok(false) => {} + Err(_) => {} + } + } + } + }); + + (cancellation, handle) + } } struct ActiveGeneration {