feat(tui): add AppEvent dispatch loop
This commit is contained in:
@@ -6,21 +6,19 @@ mod worker;
|
||||
pub mod messages;
|
||||
pub use worker::background_worker;
|
||||
|
||||
use std::{
|
||||
io,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use std::{io, sync::Arc, time::Duration};
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use crossterm::event::{self, KeyEventKind};
|
||||
use crossterm::event;
|
||||
use owlen_core::{provider::ProviderManager, state::AppState};
|
||||
use ratatui::{Terminal, backend::CrosstermBackend};
|
||||
use tokio::{
|
||||
sync::mpsc::{self, error::TryRecvError},
|
||||
task::{AbortHandle, JoinHandle, yield_now},
|
||||
sync::mpsc,
|
||||
task::{self, AbortHandle, JoinHandle},
|
||||
time::{MissedTickBehavior, interval},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{Event, SessionEvent, events};
|
||||
@@ -28,6 +26,20 @@ use crate::{Event, SessionEvent, events};
|
||||
pub use handler::MessageState;
|
||||
pub use messages::AppMessage;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum AppEvent {
|
||||
Message(AppMessage),
|
||||
Session(SessionEvent),
|
||||
Ui(Event),
|
||||
FrameTick,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
enum LoopControl {
|
||||
Continue,
|
||||
Exit(AppState),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait UiRuntime: MessageState {
|
||||
async fn handle_ui_event(&mut self, event: Event) -> Result<AppState>;
|
||||
@@ -105,109 +117,154 @@ impl App {
|
||||
.take()
|
||||
.expect("App::run called without an available message receiver");
|
||||
|
||||
let poll_interval = Duration::from_millis(16);
|
||||
let mut last_frame = Instant::now();
|
||||
let frame_interval = Duration::from_millis(16);
|
||||
let (app_event_tx, mut app_event_rx) = mpsc::unbounded_channel::<AppEvent>();
|
||||
let (input_cancel, input_handle) = Self::spawn_input_listener(app_event_tx.clone());
|
||||
drop(app_event_tx);
|
||||
|
||||
let mut frame_interval = interval(Duration::from_millis(16));
|
||||
frame_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
let mut worker_handle = Some(self.spawn_background_worker());
|
||||
|
||||
let exit_state = AppState::Quit;
|
||||
'main: loop {
|
||||
state.advance_loading_animation();
|
||||
|
||||
state.process_pending_llm_request().await?;
|
||||
state.process_pending_tool_execution().await?;
|
||||
state.poll_controller_events()?;
|
||||
let mut exit_state = AppState::Quit;
|
||||
|
||||
loop {
|
||||
match session_rx.try_recv() {
|
||||
Ok(session_event) => {
|
||||
state.handle_session_event(session_event).await?;
|
||||
}
|
||||
Err(TryRecvError::Empty) => break,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
break 'main;
|
||||
}
|
||||
}
|
||||
}
|
||||
self.pump_background(state).await?;
|
||||
|
||||
loop {
|
||||
match message_rx.try_recv() {
|
||||
Ok(message) => {
|
||||
if self.handle_message(state, message) {
|
||||
if let Some(handle) = worker_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
break 'main;
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
|
||||
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
|
||||
}
|
||||
}
|
||||
let next_event = tokio::select! {
|
||||
Some(event) = app_event_rx.recv() => event,
|
||||
Some(message) = message_rx.recv() => AppEvent::Message(message),
|
||||
Some(session_event) = session_rx.recv() => AppEvent::Session(session_event),
|
||||
_ = frame_interval.tick() => AppEvent::FrameTick,
|
||||
else => break,
|
||||
};
|
||||
|
||||
if last_frame.elapsed() >= frame_interval {
|
||||
let is_frame_tick = matches!(next_event, AppEvent::FrameTick);
|
||||
|
||||
match self.dispatch_app_event(state, next_event).await? {
|
||||
LoopControl::Continue => {
|
||||
if is_frame_tick {
|
||||
render(terminal, state)?;
|
||||
last_frame = Instant::now();
|
||||
}
|
||||
}
|
||||
LoopControl::Exit(state_value) => {
|
||||
exit_state = state_value;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.handle_message(state, AppMessage::Tick) {
|
||||
input_cancel.cancel();
|
||||
let _ = input_handle.await;
|
||||
|
||||
if let Some(handle) = worker_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
break 'main;
|
||||
}
|
||||
|
||||
match event::poll(poll_interval) {
|
||||
Ok(true) => match event::read() {
|
||||
Ok(raw_event) => {
|
||||
if let Some(ui_event) = events::from_crossterm_event(raw_event) {
|
||||
if let Event::Key(key) = &ui_event {
|
||||
if key.kind == KeyEventKind::Press {
|
||||
let _ = self.message_tx.send(AppMessage::KeyPress(*key));
|
||||
}
|
||||
} else if let Event::Resize(width, height) = &ui_event {
|
||||
let _ = self.message_tx.send(AppMessage::Resize {
|
||||
width: *width,
|
||||
height: *height,
|
||||
});
|
||||
}
|
||||
|
||||
if matches!(state.handle_ui_event(ui_event).await?, AppState::Quit) {
|
||||
if let Some(handle) = worker_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
break 'main;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
if let Some(handle) = worker_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
return Err(err.into());
|
||||
}
|
||||
},
|
||||
Ok(false) => {}
|
||||
Err(err) => {
|
||||
if let Some(handle) = worker_handle.take() {
|
||||
handle.abort();
|
||||
}
|
||||
return Err(err.into());
|
||||
}
|
||||
}
|
||||
|
||||
yield_now().await;
|
||||
}
|
||||
|
||||
if let Some(handle) = worker_handle {
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
self.message_rx = Some(message_rx);
|
||||
|
||||
Ok(exit_state)
|
||||
}
|
||||
|
||||
async fn pump_background<State>(&mut self, state: &mut State) -> Result<()>
|
||||
where
|
||||
State: UiRuntime,
|
||||
{
|
||||
state.advance_loading_animation();
|
||||
state.process_pending_llm_request().await?;
|
||||
state.process_pending_tool_execution().await?;
|
||||
state.poll_controller_events()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn dispatch_app_event<State>(
|
||||
&mut self,
|
||||
state: &mut State,
|
||||
event: AppEvent,
|
||||
) -> Result<LoopControl>
|
||||
where
|
||||
State: UiRuntime,
|
||||
{
|
||||
let control = match event {
|
||||
AppEvent::Message(message) => {
|
||||
if self.handle_message(state, message) {
|
||||
LoopControl::Exit(AppState::Quit)
|
||||
} else {
|
||||
LoopControl::Continue
|
||||
}
|
||||
}
|
||||
AppEvent::Session(session_event) => {
|
||||
state.handle_session_event(session_event).await?;
|
||||
LoopControl::Continue
|
||||
}
|
||||
AppEvent::Ui(ui_event) => {
|
||||
match &ui_event {
|
||||
Event::Key(key) => {
|
||||
let _ = self.message_tx.send(AppMessage::KeyPress(*key));
|
||||
}
|
||||
Event::Resize(width, height) => {
|
||||
let _ = self.message_tx.send(AppMessage::Resize {
|
||||
width: *width,
|
||||
height: *height,
|
||||
});
|
||||
}
|
||||
Event::Tick => {
|
||||
if self.handle_message(state, AppMessage::Tick) {
|
||||
return Ok(LoopControl::Exit(AppState::Quit));
|
||||
}
|
||||
return Ok(LoopControl::Continue);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let outcome = state.handle_ui_event(ui_event).await?;
|
||||
if matches!(outcome, AppState::Quit) {
|
||||
LoopControl::Exit(outcome)
|
||||
} else {
|
||||
LoopControl::Continue
|
||||
}
|
||||
}
|
||||
AppEvent::FrameTick => {
|
||||
if self.handle_message(state, AppMessage::Tick) {
|
||||
LoopControl::Exit(AppState::Quit)
|
||||
} else {
|
||||
LoopControl::Continue
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(control)
|
||||
}
|
||||
|
||||
fn spawn_input_listener(
|
||||
sender: mpsc::UnboundedSender<AppEvent>,
|
||||
) -> (CancellationToken, JoinHandle<()>) {
|
||||
let cancellation = CancellationToken::new();
|
||||
let handle = task::spawn_blocking({
|
||||
let cancellation = cancellation.clone();
|
||||
move || {
|
||||
let poll_interval = Duration::from_millis(16);
|
||||
while !cancellation.is_cancelled() {
|
||||
match event::poll(poll_interval) {
|
||||
Ok(true) => match event::read() {
|
||||
Ok(raw_event) => {
|
||||
if let Some(ui_event) = events::from_crossterm_event(raw_event) {
|
||||
if sender.send(AppEvent::Ui(ui_event)).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => continue,
|
||||
},
|
||||
Ok(false) => {}
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
(cancellation, handle)
|
||||
}
|
||||
}
|
||||
|
||||
struct ActiveGeneration {
|
||||
|
||||
Reference in New Issue
Block a user