feat(tui): add command queue and thought summaries

This commit is contained in:
2025-10-26 02:38:10 +01:00
parent 76e59c2d0e
commit 44b07c8e27
2 changed files with 513 additions and 29 deletions

View File

@@ -1,7 +1,13 @@
# Agents Upgrade Plan
- feat: implement resumable command queue, automatic thought summaries, and queued execution controls to match Codex CLI session management and Claude Codes scripted workflows
- feat: implement resumable command queue, automatic thought summaries, and queued execution controls to match Codex CLI session management and Claude Codes scripted workflows**shipped** (`:queue` commands, automatic thought toasts, resumable submissions)
- feat: add first-class prompt, agent, and sub-agent configuration via `.owlen/agents` plus reusable prompt libraries, mirroring Codex custom prompts and Claudes configurable agents
- feat: deliver official VS Code extension and browser workspace so Owlen runs alongside Codexs IDE plugin and Claude Codes app-based surfaces
- feat: support multimodal inputs (images, rich artifacts) and preview panes so non-text context matches Codex CLI image handling and Claude Codes artifact outputs
- feat: integrate repository automation (GitHub PR review, commit templating, Claude SDK-style automation APIs) to reach parity with Codex CLIs GitHub integration and Claude Codes CLI/SDK automation
- feat: implement Codex-style non-blocking TUI so commands remain usable while backend work runs:
1. Add an `AppEvent` channel and dispatch layer in `crates/owlen-tui/src/app/mod.rs` that mirrors the `tokio::select!` loop used in `codex-rs/tui/src/app.rs:190-197` to multiplex UI input, session events, and background updates without blocking redraws.
2. Refactor `ChatApp::process_pending_llm_request` and related helpers to spawn tasks that submit prompts via `SessionController` and stream results back through the new channel, following `codex-rs/tui/src/chatwidget/agent.rs:16-61` so the request lifecycle no longer stalls the UI thread.
3. Track active-turn state plus queued inputs inside `ChatApp` and surface them through the status pane—similar to `codex-rs/tui/src/chatwidget.rs:1105-1132` and `codex-rs/tui/src/bottom_pane/mod.rs:334-352,378-383`—so users can enqueue commands/slash actions while a turn is executing.
4. Introduce a frame requester/draw scheduler (accessible from `ChatApp` and background tasks) that coalesces redraws like `codex-rs/tui/src/tui.rs:234-390`, ensuring notifications, queue updates, and streaming deltas trigger renders without blocking the event loop.
5. Extend input handling and regression tests to cover concurrent queued messages, cancellation, and post-turn flushing, echoing the completion hooks in `codex-rs/tui/src/chatwidget.rs:436-455` and keeping `/help` and command palette responsive under load.

View File

@@ -113,6 +113,8 @@ const DOUBLE_CTRL_C_WINDOW: Duration = Duration::from_millis(1500);
pub(crate) const MIN_MESSAGE_CARD_WIDTH: usize = 14;
const MOUSE_SCROLL_STEP: isize = 3;
const DEFAULT_CONTEXT_WINDOW_TOKENS: u32 = 8_192;
const MAX_QUEUE_ATTEMPTS: u8 = 3;
const THOUGHT_SUMMARY_LIMIT: usize = 5;
#[derive(Clone, Copy, Debug, Default)]
pub struct ContextUsage {
@@ -821,6 +823,16 @@ pub struct ChatApp {
operating_mode: owlen_core::mode::Mode,
/// Flag indicating new messages arrived while scrolled away from tail
new_message_alert: bool,
/// Pending queue of user submissions waiting for execution
command_queue: VecDeque<QueuedCommand>,
/// Whether queued execution is paused
queue_paused: bool,
/// Metadata for the currently running command (if any)
active_command: Option<ActiveCommand>,
/// Rolling buffer of recent thought summaries
thought_summaries: VecDeque<String>,
/// Cached headline summary from the most recent turn
latest_thought_summary: Option<String>,
}
#[derive(Clone, Debug)]
@@ -833,6 +845,77 @@ pub struct ConsentDialogState {
pub tool_calls: Vec<owlen_core::types::ToolCall>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum QueueSource {
User,
Resume,
}
impl QueueSource {
fn label(self) -> &'static str {
match self {
QueueSource::User => "user",
QueueSource::Resume => "retry",
}
}
}
#[derive(Debug, Clone)]
struct QueuedCommand {
content: String,
enqueued_at: DateTime<Utc>,
source: QueueSource,
attempts: u8,
}
impl QueuedCommand {
fn new(content: String, source: QueueSource) -> Self {
Self {
content,
enqueued_at: Utc::now(),
source,
attempts: 0,
}
}
fn from_active(active: ActiveCommand) -> Option<Self> {
if active.attempts + 1 >= MAX_QUEUE_ATTEMPTS {
return None;
}
Some(Self {
content: active.content,
enqueued_at: active.enqueued_at,
source: QueueSource::Resume,
attempts: active.attempts + 1,
})
}
}
#[derive(Debug)]
struct ActiveCommand {
response_id: Option<Uuid>,
content: String,
source: QueueSource,
attempts: u8,
enqueued_at: DateTime<Utc>,
}
impl ActiveCommand {
fn new(content: String, source: QueueSource, attempts: u8, enqueued_at: DateTime<Utc>) -> Self {
Self {
response_id: None,
content,
source,
attempts,
enqueued_at,
}
}
fn record_response(&mut self, response_id: Uuid) {
self.response_id = Some(response_id);
}
}
#[derive(Clone)]
struct MessageCacheEntry {
theme_name: String,
@@ -1140,6 +1223,11 @@ impl ChatApp {
},
onboarding_step: 0,
guidance_settings,
command_queue: VecDeque::new(),
queue_paused: false,
active_command: None,
thought_summaries: VecDeque::new(),
latest_thought_summary: None,
};
app.mvu_model.composer.mode = InputMode::Normal;
@@ -2632,6 +2720,264 @@ impl ChatApp {
self.toasts.push_with_hint(message, level, shortcut_hint);
}
fn is_backend_busy(&self) -> bool {
self.pending_llm_request
|| !self.streaming.is_empty()
|| self.pending_tool_execution.is_some()
|| self.pending_consent.is_some()
}
fn should_queue_submission(&self) -> bool {
self.queue_paused
|| self.is_backend_busy()
|| self.active_command.is_some()
|| !self.command_queue.is_empty()
}
fn enqueue_submission(&mut self, content: String, source: QueueSource) {
let trimmed = content.trim();
if trimmed.is_empty() {
self.error = Some("Cannot queue empty message".to_string());
return;
}
let entry = QueuedCommand::new(trimmed.to_string(), source);
self.command_queue.push_back(entry);
let pending = self.command_queue.len();
if self.queue_paused {
self.status = format!("Queued request · {pending} pending (queue paused)");
} else {
self.status = format!("Queued request · {pending} pending");
}
self.push_toast(
ToastLevel::Info,
format!("Queued request — {pending} pending"),
);
self.start_next_queued_command();
}
fn start_user_turn_internal(
&mut self,
content: String,
source: QueueSource,
attempts: u8,
enqueued_at: DateTime<Utc>,
) {
let message_body = content.trim();
if message_body.is_empty() {
self.error = Some("Cannot send empty message".to_string());
self.status = "Message discarded".to_string();
return;
}
let mut references = Self::extract_resource_references(message_body);
references.sort();
references.dedup();
self.pending_resource_refs = references;
let _message_id = self
.controller
.conversation_mut()
.push_user_message(message_body.to_string());
self.auto_scroll.stick_to_bottom = true;
self.pending_llm_request = true;
if matches!(source, QueueSource::User) && attempts == 0 {
self.status = "Message sent".to_string();
}
self.error = None;
self.active_command = Some(ActiveCommand::new(
message_body.to_string(),
source,
attempts,
enqueued_at,
));
}
fn start_user_turn_from_queue(&mut self, entry: QueuedCommand) {
let pending = self.command_queue.len();
self.status = if pending == 0 {
"Processing queued request".to_string()
} else {
format!("Processing queued request · {pending} remaining")
};
self.start_user_turn_internal(
entry.content,
entry.source,
entry.attempts,
entry.enqueued_at,
);
}
fn start_next_queued_command(&mut self) {
if self.queue_paused || self.is_backend_busy() || self.active_command.is_some() {
return;
}
if let Some(entry) = self.command_queue.pop_front() {
self.start_user_turn_from_queue(entry);
}
}
fn mark_active_command_succeeded(&mut self) {
if let Some(active) = self.active_command.take() {
self.capture_thought_summary(active.response_id);
}
if !self.command_queue.is_empty() {
let pending = self.command_queue.len();
self.status = format!("Ready · {pending} queued");
} else if self.status.to_ascii_lowercase().contains("queued") {
self.status = "Ready".to_string();
}
self.start_next_queued_command();
}
fn mark_active_command_failed(&mut self, reason: Option<String>) {
if let Some(message) = reason.as_ref() {
self.push_toast(ToastLevel::Warning, message.clone());
}
if let Some(active) = self.active_command.take() {
if let Some(entry) = QueuedCommand::from_active(active) {
self.command_queue.push_front(entry);
let pending = self.command_queue.len();
self.status = format!("Request queued for retry · {pending} pending");
} else {
self.status =
"Request failed repeatedly and was removed from the queue".to_string();
}
}
if let Some(message) = reason {
self.error = Some(message);
}
self.start_next_queued_command();
}
fn capture_thought_summary(&mut self, response_hint: Option<Uuid>) {
let conversation = self.controller.conversation();
let maybe_message = if let Some(id) = response_hint {
conversation
.messages
.iter()
.find(|msg| msg.id == id && matches!(msg.role, Role::Assistant))
} else {
conversation
.messages
.iter()
.rev()
.find(|msg| matches!(msg.role, Role::Assistant))
};
let Some(message) = maybe_message else {
return;
};
let (_, thinking) = self.formatter().extract_thinking(&message.content);
let Some(thinking) = thinking else {
return;
};
let Some(summary) = Self::summarize_thinking(&thinking) else {
return;
};
if self.latest_thought_summary.as_deref() == Some(summary.as_str()) {
return;
}
self.latest_thought_summary = Some(summary.clone());
self.thought_summaries.push_front(summary.clone());
while self.thought_summaries.len() > THOUGHT_SUMMARY_LIMIT {
self.thought_summaries.pop_back();
}
self.push_toast_with_hint(
ToastLevel::Info,
format!("Thought summary: {}", summary),
":queue status",
);
self.set_system_status(format!("🧠 {}", summary));
}
fn queue_status_summary(&self) -> String {
let pending = self.command_queue.len();
let paused_flag = if self.queue_paused {
"paused"
} else {
"running"
};
let active_label = if let Some(active) = &self.active_command {
let attempt = active.attempts as usize + 1;
format!("active {} (attempt {})", active.source.label(), attempt)
} else {
"idle".to_string()
};
let summary = self
.latest_thought_summary
.as_deref()
.unwrap_or("no summary yet");
format!(
"Queue: {pending} pending · {paused_flag} · {active_label} · last summary: {summary}"
)
}
fn run_queue_command(&mut self, args: &[&str]) {
if args.is_empty() {
self.status = self.queue_status_summary();
self.error = None;
return;
}
match args[0].to_ascii_lowercase().as_str() {
"status" => {
self.status = self.queue_status_summary();
self.error = None;
}
"pause" => {
self.queue_paused = true;
let pending = self.command_queue.len();
self.status = format!("Queue paused · {pending} pending");
self.error = None;
}
"resume" => {
self.queue_paused = false;
let pending = self.command_queue.len();
self.status = format!("Queue resumed · {pending} pending");
self.error = None;
self.start_next_queued_command();
}
"clear" => {
let cleared = self.command_queue.len();
self.command_queue.clear();
self.status = format!("Cleared queue · removed {cleared} entries");
self.error = None;
}
"next" => {
if self.is_backend_busy() || self.active_command.is_some() {
self.status = "A request is already active; cannot start next.".to_string();
self.error = None;
} else if let Some(entry) = self.command_queue.pop_front() {
self.start_user_turn_from_queue(entry);
} else {
self.status = "Queue is empty".to_string();
self.error = None;
}
}
other => {
self.error = Some(format!(
"Unknown queue action '{}'. Use pause, resume, status, next, or clear.",
other
));
self.status = "Usage: :queue [status|pause|resume|next|clear]".to_string();
}
}
}
fn prune_toasts(&mut self) {
self.toasts.retain_active();
}
@@ -7640,6 +7986,12 @@ impl ChatApp {
self.command_palette.clear();
return Ok(AppState::Running);
}
"queue" => {
self.run_queue_command(args);
self.set_input_mode(InputMode::Normal);
self.command_palette.clear();
return Ok(AppState::Running);
}
"c" | "clear" => {
self.controller.clear();
self.chat_line_offset = 0;
@@ -9489,6 +9841,9 @@ impl ChatApp {
} => {
self.controller.apply_stream_chunk(message_id, &response)?;
self.invalidate_message_cache(&message_id);
if let Some(active) = self.active_command.as_mut() {
active.record_response(message_id);
}
// Update thinking content in real-time during streaming
self.update_thinking_from_last_message();
@@ -9525,6 +9880,7 @@ impl ChatApp {
});
} else {
self.status = "Ready".to_string();
self.mark_active_command_succeeded();
}
}
}
@@ -9542,7 +9898,8 @@ impl ChatApp {
self.stream_tasks.clear();
self.message_line_cache.clear();
}
self.error = Some(message);
self.error = Some(message.clone());
self.mark_active_command_failed(Some(message));
}
SessionEvent::ToolExecutionNeeded {
message_id,
@@ -9557,7 +9914,8 @@ impl ChatApp {
}
SessionEvent::AgentCompleted { answer } => {
// Agent finished, add final answer to conversation
self.controller
let message_id = self
.controller
.conversation_mut()
.push_assistant_message(answer);
self.notify_new_activity();
@@ -9566,13 +9924,20 @@ impl ChatApp {
self.agent_actions = None;
self.status = "Agent completed successfully".to_string();
self.stop_loading_animation();
if let Some(active) = self.active_command.as_mut() {
active.record_response(message_id);
}
self.update_thinking_from_last_message();
self.mark_active_command_succeeded();
}
SessionEvent::AgentFailed { error } => {
// Agent failed, show error
self.error = Some(format!("Agent failed: {}", error));
let message = format!("Agent failed: {}", error);
self.error = Some(message.clone());
self.agent_running = false;
self.agent_actions = None;
self.stop_loading_animation();
self.mark_active_command_failed(Some(message));
}
SessionEvent::OAuthPoll {
server,
@@ -11514,29 +11879,19 @@ impl ChatApp {
}
fn send_user_message_and_request_response(&mut self) {
let content = self.controller.input_buffer().text().trim().to_string();
let raw = self.controller.input_buffer_mut().commit_to_history();
let content = raw.trim().to_string();
if content.is_empty() {
self.error = Some("Cannot send empty message".to_string());
return;
}
// Step 1: Add user message to conversation immediately (synchronous)
let message = self.controller.input_buffer_mut().commit_to_history();
let mut references = Self::extract_resource_references(&message);
references.sort();
references.dedup();
self.pending_resource_refs = references;
self.controller
.conversation_mut()
.push_user_message(message.clone());
if self.should_queue_submission() {
self.enqueue_submission(content, QueueSource::User);
return;
}
// Auto-scroll to bottom when sending a message
self.auto_scroll.stick_to_bottom = true;
// Step 2: Set flag to process LLM request on next event loop iteration
self.pending_llm_request = true;
self.status = "Message sent".to_string();
self.error = None;
self.start_user_turn_internal(content, QueueSource::User, 0, Utc::now());
}
pub fn has_active_generation(&self) -> bool {
@@ -11585,6 +11940,17 @@ impl ChatApp {
self.status = "Generation cancelled".to_string();
self.set_system_status("Generation cancelled".to_string());
self.update_thinking_from_last_message();
if let Some(active) = self.active_command.take() {
if let Some(entry) = QueuedCommand::from_active(active) {
self.command_queue.push_front(entry);
} else {
self.push_toast(
ToastLevel::Warning,
"Cancelled request exceeded retry budget and was dropped.",
);
}
}
self.start_next_queued_command();
}
Ok(cancelled)
@@ -11598,6 +11964,10 @@ impl ChatApp {
self.pending_tool_execution = None;
self.pending_consent = None;
self.queued_consents.clear();
self.command_queue.clear();
self.active_command = None;
self.latest_thought_summary = None;
self.thought_summaries.clear();
self.keymap_state.reset();
self.visual_start = None;
self.visual_end = None;
@@ -11675,6 +12045,11 @@ impl ChatApp {
self.status = "Ready".to_string();
self.error = None;
self.refresh_usage_summary().await?;
self.update_thinking_from_last_message();
if let Some(active) = self.active_command.as_mut() {
active.record_response(response.message.id);
}
self.mark_active_command_succeeded();
Ok(())
}
Ok(Ok(SessionOutcome::Streaming {
@@ -11684,6 +12059,9 @@ impl ChatApp {
self.status = "Model loaded. Generating response... (streaming)".to_string();
self.spawn_stream(response_id, stream);
if let Some(active) = self.active_command.as_mut() {
active.record_response(response_id);
}
match self.controller.mark_stream_placeholder(response_id, "") {
Ok(_) => self.error = None,
Err(err) => {
@@ -11694,7 +12072,9 @@ impl ChatApp {
}
Ok(Err(err)) => {
self.stop_loading_animation();
if self.handle_provider_error(&err).await? {
let handled = self.handle_provider_error(&err).await?;
if handled {
self.mark_active_command_failed(Some(err.to_string()));
return Ok(());
}
let message = err.to_string();
@@ -11707,15 +12087,19 @@ impl ChatApp {
let _ = self.refresh_models().await;
self.set_input_mode(InputMode::ProviderSelection);
} else {
self.error = Some(message);
self.error = Some(message.clone());
self.status = "Request failed".to_string();
}
let failure_notice = self.error.clone().unwrap_or_else(|| message.clone());
self.mark_active_command_failed(Some(failure_notice));
Ok(())
}
Err(_) => {
self.error = Some("Request timed out. Check if Ollama is running.".to_string());
let timeout_message = "Request timed out. Check if Ollama is running.".to_string();
self.error = Some(timeout_message.clone());
self.status = "Request timed out".to_string();
self.stop_loading_animation();
self.mark_active_command_failed(Some(timeout_message));
Ok(())
}
}
@@ -11921,7 +12305,8 @@ impl ChatApp {
// Run agent
match executor.run(user_message).await {
Ok(result) => {
self.controller
let message_id = self
.controller
.conversation_mut()
.push_assistant_message(result.answer);
self.agent_running = false;
@@ -11929,14 +12314,21 @@ impl ChatApp {
self.agent_actions = None;
self.status = format!("Agent completed in {} iterations", result.iterations);
self.stop_loading_animation();
if let Some(active) = self.active_command.as_mut() {
active.record_response(message_id);
}
self.update_thinking_from_last_message();
self.mark_active_command_succeeded();
Ok(())
}
Err(e) => {
self.error = Some(format!("Agent failed: {}", e));
let message = format!("Agent failed: {}", e);
self.error = Some(message.clone());
self.agent_running = false;
self.agent_mode = false;
self.agent_actions = None;
self.stop_loading_animation();
self.mark_active_command_failed(Some(message));
Ok(())
}
}
@@ -11986,6 +12378,9 @@ impl ChatApp {
self.status = "Tool results sent. Generating response...".to_string();
self.set_system_status("✓ Tools executed successfully".to_string());
self.spawn_stream(response_id, stream);
if let Some(active) = self.active_command.as_mut() {
active.record_response(response_id);
}
match self.controller.mark_stream_placeholder(response_id, "") {
Ok(_) => self.error = None,
Err(err) => {
@@ -11994,19 +12389,26 @@ impl ChatApp {
}
Ok(())
}
Ok(SessionOutcome::Complete(_response)) => {
Ok(SessionOutcome::Complete(response)) => {
// Tool execution complete without streaming (shouldn't happen in streaming mode)
self.stop_loading_animation();
self.status = "✓ Tool execution complete".to_string();
self.set_system_status("✓ Tool execution complete".to_string());
self.error = None;
if let Some(active) = self.active_command.as_mut() {
active.record_response(response.message.id);
}
self.update_thinking_from_last_message();
self.mark_active_command_succeeded();
Ok(())
}
Err(err) => {
self.stop_loading_animation();
self.status = "Tool execution failed".to_string();
self.set_system_status(format!("❌ Tool execution failed: {}", err));
self.error = Some(format!("Tool execution failed: {}", err));
let failure_message = format!("❌ Tool execution failed: {}", err);
self.set_system_status(failure_message.clone());
self.error = Some(failure_message.clone());
self.mark_active_command_failed(Some(failure_message));
Ok(())
}
}
@@ -12475,6 +12877,52 @@ fn capitalize_first(input: &str) -> String {
}
}
impl ChatApp {
fn summarize_thinking(thinking: &str) -> Option<String> {
let mut segments = thinking
.split('\n')
.map(|line| line.trim())
.filter(|line| !line.is_empty());
let mut summary = segments.next()?.to_string();
if summary.chars().count() < 120 {
if let Some(next) = segments.next() {
if !next.is_empty() {
if !summary.ends_with('.') && !summary.ends_with('!') && !summary.ends_with('?')
{
summary.push('.');
}
summary.push(' ');
summary.push_str(next);
}
}
}
if summary.chars().count() > 160 {
summary = Self::truncate_to_chars(&summary, 160);
}
Some(summary)
}
fn truncate_to_chars(input: &str, limit: usize) -> String {
if input.chars().count() <= limit {
return input.to_string();
}
let mut result = String::new();
for (idx, ch) in input.chars().enumerate() {
if idx + 1 >= limit {
result.push('…');
break;
}
result.push(ch);
}
result
}
}
pub(crate) fn role_label_parts(role: &Role) -> (&'static str, &'static str) {
match role {
Role::User => ("👤", "You"),
@@ -14224,3 +14672,33 @@ impl UiRuntime for ChatApp {
ChatApp::streaming_count(self)
}
}
#[cfg(test)]
mod queue_tests {
use super::ChatApp;
#[test]
fn summarize_thinking_uses_first_line() {
let text = "Outline the plan.\nThen execute.";
let summary = ChatApp::summarize_thinking(text).expect("summary");
assert!(
summary.starts_with("Outline the plan"),
"summary should start with first line"
);
}
#[test]
fn summarize_thinking_truncates_long_text() {
let long_segment = "a".repeat(200);
let text = format!("{long_segment}\nNext steps");
let summary = ChatApp::summarize_thinking(&text).expect("summary");
assert!(
summary.chars().count() <= 160,
"summary should be limited to 160 characters"
);
assert!(
summary.ends_with('…'),
"summary should end with ellipsis when truncated"
);
}
}