diff --git a/agents.md b/agents.md index d79dccd..467dd5f 100644 --- a/agents.md +++ b/agents.md @@ -1,7 +1,13 @@ # Agents Upgrade Plan -- feat: implement resumable command queue, automatic thought summaries, and queued execution controls to match Codex CLI session management and Claude Code’s scripted workflows +- feat: implement resumable command queue, automatic thought summaries, and queued execution controls to match Codex CLI session management and Claude Code’s scripted workflows — **shipped** (`:queue` commands, automatic thought toasts, resumable submissions) - feat: add first-class prompt, agent, and sub-agent configuration via `.owlen/agents` plus reusable prompt libraries, mirroring Codex custom prompts and Claude’s configurable agents - feat: deliver official VS Code extension and browser workspace so Owlen runs alongside Codex’s IDE plugin and Claude Code’s app-based surfaces - feat: support multimodal inputs (images, rich artifacts) and preview panes so non-text context matches Codex CLI image handling and Claude Code’s artifact outputs - feat: integrate repository automation (GitHub PR review, commit templating, Claude SDK-style automation APIs) to reach parity with Codex CLI’s GitHub integration and Claude Code’s CLI/SDK automation +- feat: implement Codex-style non-blocking TUI so commands remain usable while backend work runs: + 1. Add an `AppEvent` channel and dispatch layer in `crates/owlen-tui/src/app/mod.rs` that mirrors the `tokio::select!` loop used in `codex-rs/tui/src/app.rs:190-197` to multiplex UI input, session events, and background updates without blocking redraws. + 2. Refactor `ChatApp::process_pending_llm_request` and related helpers to spawn tasks that submit prompts via `SessionController` and stream results back through the new channel, following `codex-rs/tui/src/chatwidget/agent.rs:16-61` so the request lifecycle no longer stalls the UI thread. + 3. Track active-turn state plus queued inputs inside `ChatApp` and surface them through the status pane—similar to `codex-rs/tui/src/chatwidget.rs:1105-1132` and `codex-rs/tui/src/bottom_pane/mod.rs:334-352,378-383`—so users can enqueue commands/slash actions while a turn is executing. + 4. Introduce a frame requester/draw scheduler (accessible from `ChatApp` and background tasks) that coalesces redraws like `codex-rs/tui/src/tui.rs:234-390`, ensuring notifications, queue updates, and streaming deltas trigger renders without blocking the event loop. + 5. Extend input handling and regression tests to cover concurrent queued messages, cancellation, and post-turn flushing, echoing the completion hooks in `codex-rs/tui/src/chatwidget.rs:436-455` and keeping `/help` and command palette responsive under load. diff --git a/crates/owlen-tui/src/chat_app.rs b/crates/owlen-tui/src/chat_app.rs index 78bf71c..97d8e56 100644 --- a/crates/owlen-tui/src/chat_app.rs +++ b/crates/owlen-tui/src/chat_app.rs @@ -113,6 +113,8 @@ const DOUBLE_CTRL_C_WINDOW: Duration = Duration::from_millis(1500); pub(crate) const MIN_MESSAGE_CARD_WIDTH: usize = 14; const MOUSE_SCROLL_STEP: isize = 3; const DEFAULT_CONTEXT_WINDOW_TOKENS: u32 = 8_192; +const MAX_QUEUE_ATTEMPTS: u8 = 3; +const THOUGHT_SUMMARY_LIMIT: usize = 5; #[derive(Clone, Copy, Debug, Default)] pub struct ContextUsage { @@ -821,6 +823,16 @@ pub struct ChatApp { operating_mode: owlen_core::mode::Mode, /// Flag indicating new messages arrived while scrolled away from tail new_message_alert: bool, + /// Pending queue of user submissions waiting for execution + command_queue: VecDeque, + /// Whether queued execution is paused + queue_paused: bool, + /// Metadata for the currently running command (if any) + active_command: Option, + /// Rolling buffer of recent thought summaries + thought_summaries: VecDeque, + /// Cached headline summary from the most recent turn + latest_thought_summary: Option, } #[derive(Clone, Debug)] @@ -833,6 +845,77 @@ pub struct ConsentDialogState { pub tool_calls: Vec, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum QueueSource { + User, + Resume, +} + +impl QueueSource { + fn label(self) -> &'static str { + match self { + QueueSource::User => "user", + QueueSource::Resume => "retry", + } + } +} + +#[derive(Debug, Clone)] +struct QueuedCommand { + content: String, + enqueued_at: DateTime, + source: QueueSource, + attempts: u8, +} + +impl QueuedCommand { + fn new(content: String, source: QueueSource) -> Self { + Self { + content, + enqueued_at: Utc::now(), + source, + attempts: 0, + } + } + + fn from_active(active: ActiveCommand) -> Option { + if active.attempts + 1 >= MAX_QUEUE_ATTEMPTS { + return None; + } + Some(Self { + content: active.content, + enqueued_at: active.enqueued_at, + source: QueueSource::Resume, + attempts: active.attempts + 1, + }) + } +} + +#[derive(Debug)] +struct ActiveCommand { + response_id: Option, + content: String, + source: QueueSource, + attempts: u8, + enqueued_at: DateTime, +} + +impl ActiveCommand { + fn new(content: String, source: QueueSource, attempts: u8, enqueued_at: DateTime) -> Self { + Self { + response_id: None, + content, + source, + attempts, + enqueued_at, + } + } + + fn record_response(&mut self, response_id: Uuid) { + self.response_id = Some(response_id); + } +} + #[derive(Clone)] struct MessageCacheEntry { theme_name: String, @@ -1140,6 +1223,11 @@ impl ChatApp { }, onboarding_step: 0, guidance_settings, + command_queue: VecDeque::new(), + queue_paused: false, + active_command: None, + thought_summaries: VecDeque::new(), + latest_thought_summary: None, }; app.mvu_model.composer.mode = InputMode::Normal; @@ -2632,6 +2720,264 @@ impl ChatApp { self.toasts.push_with_hint(message, level, shortcut_hint); } + fn is_backend_busy(&self) -> bool { + self.pending_llm_request + || !self.streaming.is_empty() + || self.pending_tool_execution.is_some() + || self.pending_consent.is_some() + } + + fn should_queue_submission(&self) -> bool { + self.queue_paused + || self.is_backend_busy() + || self.active_command.is_some() + || !self.command_queue.is_empty() + } + + fn enqueue_submission(&mut self, content: String, source: QueueSource) { + let trimmed = content.trim(); + if trimmed.is_empty() { + self.error = Some("Cannot queue empty message".to_string()); + return; + } + + let entry = QueuedCommand::new(trimmed.to_string(), source); + self.command_queue.push_back(entry); + let pending = self.command_queue.len(); + if self.queue_paused { + self.status = format!("Queued request · {pending} pending (queue paused)"); + } else { + self.status = format!("Queued request · {pending} pending"); + } + self.push_toast( + ToastLevel::Info, + format!("Queued request — {pending} pending"), + ); + + self.start_next_queued_command(); + } + + fn start_user_turn_internal( + &mut self, + content: String, + source: QueueSource, + attempts: u8, + enqueued_at: DateTime, + ) { + let message_body = content.trim(); + if message_body.is_empty() { + self.error = Some("Cannot send empty message".to_string()); + self.status = "Message discarded".to_string(); + return; + } + + let mut references = Self::extract_resource_references(message_body); + references.sort(); + references.dedup(); + self.pending_resource_refs = references; + + let _message_id = self + .controller + .conversation_mut() + .push_user_message(message_body.to_string()); + + self.auto_scroll.stick_to_bottom = true; + self.pending_llm_request = true; + if matches!(source, QueueSource::User) && attempts == 0 { + self.status = "Message sent".to_string(); + } + self.error = None; + + self.active_command = Some(ActiveCommand::new( + message_body.to_string(), + source, + attempts, + enqueued_at, + )); + } + + fn start_user_turn_from_queue(&mut self, entry: QueuedCommand) { + let pending = self.command_queue.len(); + self.status = if pending == 0 { + "Processing queued request".to_string() + } else { + format!("Processing queued request · {pending} remaining") + }; + self.start_user_turn_internal( + entry.content, + entry.source, + entry.attempts, + entry.enqueued_at, + ); + } + + fn start_next_queued_command(&mut self) { + if self.queue_paused || self.is_backend_busy() || self.active_command.is_some() { + return; + } + if let Some(entry) = self.command_queue.pop_front() { + self.start_user_turn_from_queue(entry); + } + } + + fn mark_active_command_succeeded(&mut self) { + if let Some(active) = self.active_command.take() { + self.capture_thought_summary(active.response_id); + } + + if !self.command_queue.is_empty() { + let pending = self.command_queue.len(); + self.status = format!("Ready · {pending} queued"); + } else if self.status.to_ascii_lowercase().contains("queued") { + self.status = "Ready".to_string(); + } + + self.start_next_queued_command(); + } + + fn mark_active_command_failed(&mut self, reason: Option) { + if let Some(message) = reason.as_ref() { + self.push_toast(ToastLevel::Warning, message.clone()); + } + + if let Some(active) = self.active_command.take() { + if let Some(entry) = QueuedCommand::from_active(active) { + self.command_queue.push_front(entry); + let pending = self.command_queue.len(); + self.status = format!("Request queued for retry · {pending} pending"); + } else { + self.status = + "Request failed repeatedly and was removed from the queue".to_string(); + } + } + + if let Some(message) = reason { + self.error = Some(message); + } + + self.start_next_queued_command(); + } + + fn capture_thought_summary(&mut self, response_hint: Option) { + let conversation = self.controller.conversation(); + let maybe_message = if let Some(id) = response_hint { + conversation + .messages + .iter() + .find(|msg| msg.id == id && matches!(msg.role, Role::Assistant)) + } else { + conversation + .messages + .iter() + .rev() + .find(|msg| matches!(msg.role, Role::Assistant)) + }; + + let Some(message) = maybe_message else { + return; + }; + + let (_, thinking) = self.formatter().extract_thinking(&message.content); + let Some(thinking) = thinking else { + return; + }; + + let Some(summary) = Self::summarize_thinking(&thinking) else { + return; + }; + + if self.latest_thought_summary.as_deref() == Some(summary.as_str()) { + return; + } + + self.latest_thought_summary = Some(summary.clone()); + self.thought_summaries.push_front(summary.clone()); + while self.thought_summaries.len() > THOUGHT_SUMMARY_LIMIT { + self.thought_summaries.pop_back(); + } + + self.push_toast_with_hint( + ToastLevel::Info, + format!("Thought summary: {}", summary), + ":queue status", + ); + self.set_system_status(format!("🧠 {}", summary)); + } + + fn queue_status_summary(&self) -> String { + let pending = self.command_queue.len(); + let paused_flag = if self.queue_paused { + "paused" + } else { + "running" + }; + let active_label = if let Some(active) = &self.active_command { + let attempt = active.attempts as usize + 1; + format!("active {} (attempt {})", active.source.label(), attempt) + } else { + "idle".to_string() + }; + let summary = self + .latest_thought_summary + .as_deref() + .unwrap_or("no summary yet"); + format!( + "Queue: {pending} pending · {paused_flag} · {active_label} · last summary: {summary}" + ) + } + + fn run_queue_command(&mut self, args: &[&str]) { + if args.is_empty() { + self.status = self.queue_status_summary(); + self.error = None; + return; + } + + match args[0].to_ascii_lowercase().as_str() { + "status" => { + self.status = self.queue_status_summary(); + self.error = None; + } + "pause" => { + self.queue_paused = true; + let pending = self.command_queue.len(); + self.status = format!("Queue paused · {pending} pending"); + self.error = None; + } + "resume" => { + self.queue_paused = false; + let pending = self.command_queue.len(); + self.status = format!("Queue resumed · {pending} pending"); + self.error = None; + self.start_next_queued_command(); + } + "clear" => { + let cleared = self.command_queue.len(); + self.command_queue.clear(); + self.status = format!("Cleared queue · removed {cleared} entries"); + self.error = None; + } + "next" => { + if self.is_backend_busy() || self.active_command.is_some() { + self.status = "A request is already active; cannot start next.".to_string(); + self.error = None; + } else if let Some(entry) = self.command_queue.pop_front() { + self.start_user_turn_from_queue(entry); + } else { + self.status = "Queue is empty".to_string(); + self.error = None; + } + } + other => { + self.error = Some(format!( + "Unknown queue action '{}'. Use pause, resume, status, next, or clear.", + other + )); + self.status = "Usage: :queue [status|pause|resume|next|clear]".to_string(); + } + } + } + fn prune_toasts(&mut self) { self.toasts.retain_active(); } @@ -7640,6 +7986,12 @@ impl ChatApp { self.command_palette.clear(); return Ok(AppState::Running); } + "queue" => { + self.run_queue_command(args); + self.set_input_mode(InputMode::Normal); + self.command_palette.clear(); + return Ok(AppState::Running); + } "c" | "clear" => { self.controller.clear(); self.chat_line_offset = 0; @@ -9489,6 +9841,9 @@ impl ChatApp { } => { self.controller.apply_stream_chunk(message_id, &response)?; self.invalidate_message_cache(&message_id); + if let Some(active) = self.active_command.as_mut() { + active.record_response(message_id); + } // Update thinking content in real-time during streaming self.update_thinking_from_last_message(); @@ -9525,6 +9880,7 @@ impl ChatApp { }); } else { self.status = "Ready".to_string(); + self.mark_active_command_succeeded(); } } } @@ -9542,7 +9898,8 @@ impl ChatApp { self.stream_tasks.clear(); self.message_line_cache.clear(); } - self.error = Some(message); + self.error = Some(message.clone()); + self.mark_active_command_failed(Some(message)); } SessionEvent::ToolExecutionNeeded { message_id, @@ -9557,7 +9914,8 @@ impl ChatApp { } SessionEvent::AgentCompleted { answer } => { // Agent finished, add final answer to conversation - self.controller + let message_id = self + .controller .conversation_mut() .push_assistant_message(answer); self.notify_new_activity(); @@ -9566,13 +9924,20 @@ impl ChatApp { self.agent_actions = None; self.status = "Agent completed successfully".to_string(); self.stop_loading_animation(); + if let Some(active) = self.active_command.as_mut() { + active.record_response(message_id); + } + self.update_thinking_from_last_message(); + self.mark_active_command_succeeded(); } SessionEvent::AgentFailed { error } => { // Agent failed, show error - self.error = Some(format!("Agent failed: {}", error)); + let message = format!("Agent failed: {}", error); + self.error = Some(message.clone()); self.agent_running = false; self.agent_actions = None; self.stop_loading_animation(); + self.mark_active_command_failed(Some(message)); } SessionEvent::OAuthPoll { server, @@ -11514,29 +11879,19 @@ impl ChatApp { } fn send_user_message_and_request_response(&mut self) { - let content = self.controller.input_buffer().text().trim().to_string(); + let raw = self.controller.input_buffer_mut().commit_to_history(); + let content = raw.trim().to_string(); if content.is_empty() { self.error = Some("Cannot send empty message".to_string()); return; } - // Step 1: Add user message to conversation immediately (synchronous) - let message = self.controller.input_buffer_mut().commit_to_history(); - let mut references = Self::extract_resource_references(&message); - references.sort(); - references.dedup(); - self.pending_resource_refs = references; - self.controller - .conversation_mut() - .push_user_message(message.clone()); + if self.should_queue_submission() { + self.enqueue_submission(content, QueueSource::User); + return; + } - // Auto-scroll to bottom when sending a message - self.auto_scroll.stick_to_bottom = true; - - // Step 2: Set flag to process LLM request on next event loop iteration - self.pending_llm_request = true; - self.status = "Message sent".to_string(); - self.error = None; + self.start_user_turn_internal(content, QueueSource::User, 0, Utc::now()); } pub fn has_active_generation(&self) -> bool { @@ -11585,6 +11940,17 @@ impl ChatApp { self.status = "Generation cancelled".to_string(); self.set_system_status("Generation cancelled".to_string()); self.update_thinking_from_last_message(); + if let Some(active) = self.active_command.take() { + if let Some(entry) = QueuedCommand::from_active(active) { + self.command_queue.push_front(entry); + } else { + self.push_toast( + ToastLevel::Warning, + "Cancelled request exceeded retry budget and was dropped.", + ); + } + } + self.start_next_queued_command(); } Ok(cancelled) @@ -11598,6 +11964,10 @@ impl ChatApp { self.pending_tool_execution = None; self.pending_consent = None; self.queued_consents.clear(); + self.command_queue.clear(); + self.active_command = None; + self.latest_thought_summary = None; + self.thought_summaries.clear(); self.keymap_state.reset(); self.visual_start = None; self.visual_end = None; @@ -11675,6 +12045,11 @@ impl ChatApp { self.status = "Ready".to_string(); self.error = None; self.refresh_usage_summary().await?; + self.update_thinking_from_last_message(); + if let Some(active) = self.active_command.as_mut() { + active.record_response(response.message.id); + } + self.mark_active_command_succeeded(); Ok(()) } Ok(Ok(SessionOutcome::Streaming { @@ -11684,6 +12059,9 @@ impl ChatApp { self.status = "Model loaded. Generating response... (streaming)".to_string(); self.spawn_stream(response_id, stream); + if let Some(active) = self.active_command.as_mut() { + active.record_response(response_id); + } match self.controller.mark_stream_placeholder(response_id, "▌") { Ok(_) => self.error = None, Err(err) => { @@ -11694,7 +12072,9 @@ impl ChatApp { } Ok(Err(err)) => { self.stop_loading_animation(); - if self.handle_provider_error(&err).await? { + let handled = self.handle_provider_error(&err).await?; + if handled { + self.mark_active_command_failed(Some(err.to_string())); return Ok(()); } let message = err.to_string(); @@ -11707,15 +12087,19 @@ impl ChatApp { let _ = self.refresh_models().await; self.set_input_mode(InputMode::ProviderSelection); } else { - self.error = Some(message); + self.error = Some(message.clone()); self.status = "Request failed".to_string(); } + let failure_notice = self.error.clone().unwrap_or_else(|| message.clone()); + self.mark_active_command_failed(Some(failure_notice)); Ok(()) } Err(_) => { - self.error = Some("Request timed out. Check if Ollama is running.".to_string()); + let timeout_message = "Request timed out. Check if Ollama is running.".to_string(); + self.error = Some(timeout_message.clone()); self.status = "Request timed out".to_string(); self.stop_loading_animation(); + self.mark_active_command_failed(Some(timeout_message)); Ok(()) } } @@ -11921,7 +12305,8 @@ impl ChatApp { // Run agent match executor.run(user_message).await { Ok(result) => { - self.controller + let message_id = self + .controller .conversation_mut() .push_assistant_message(result.answer); self.agent_running = false; @@ -11929,14 +12314,21 @@ impl ChatApp { self.agent_actions = None; self.status = format!("Agent completed in {} iterations", result.iterations); self.stop_loading_animation(); + if let Some(active) = self.active_command.as_mut() { + active.record_response(message_id); + } + self.update_thinking_from_last_message(); + self.mark_active_command_succeeded(); Ok(()) } Err(e) => { - self.error = Some(format!("Agent failed: {}", e)); + let message = format!("Agent failed: {}", e); + self.error = Some(message.clone()); self.agent_running = false; self.agent_mode = false; self.agent_actions = None; self.stop_loading_animation(); + self.mark_active_command_failed(Some(message)); Ok(()) } } @@ -11986,6 +12378,9 @@ impl ChatApp { self.status = "Tool results sent. Generating response...".to_string(); self.set_system_status("✓ Tools executed successfully".to_string()); self.spawn_stream(response_id, stream); + if let Some(active) = self.active_command.as_mut() { + active.record_response(response_id); + } match self.controller.mark_stream_placeholder(response_id, "▌") { Ok(_) => self.error = None, Err(err) => { @@ -11994,19 +12389,26 @@ impl ChatApp { } Ok(()) } - Ok(SessionOutcome::Complete(_response)) => { + Ok(SessionOutcome::Complete(response)) => { // Tool execution complete without streaming (shouldn't happen in streaming mode) self.stop_loading_animation(); self.status = "✓ Tool execution complete".to_string(); self.set_system_status("✓ Tool execution complete".to_string()); self.error = None; + if let Some(active) = self.active_command.as_mut() { + active.record_response(response.message.id); + } + self.update_thinking_from_last_message(); + self.mark_active_command_succeeded(); Ok(()) } Err(err) => { self.stop_loading_animation(); self.status = "Tool execution failed".to_string(); - self.set_system_status(format!("❌ Tool execution failed: {}", err)); - self.error = Some(format!("Tool execution failed: {}", err)); + let failure_message = format!("❌ Tool execution failed: {}", err); + self.set_system_status(failure_message.clone()); + self.error = Some(failure_message.clone()); + self.mark_active_command_failed(Some(failure_message)); Ok(()) } } @@ -12475,6 +12877,52 @@ fn capitalize_first(input: &str) -> String { } } +impl ChatApp { + fn summarize_thinking(thinking: &str) -> Option { + let mut segments = thinking + .split('\n') + .map(|line| line.trim()) + .filter(|line| !line.is_empty()); + + let mut summary = segments.next()?.to_string(); + + if summary.chars().count() < 120 { + if let Some(next) = segments.next() { + if !next.is_empty() { + if !summary.ends_with('.') && !summary.ends_with('!') && !summary.ends_with('?') + { + summary.push('.'); + } + summary.push(' '); + summary.push_str(next); + } + } + } + + if summary.chars().count() > 160 { + summary = Self::truncate_to_chars(&summary, 160); + } + + Some(summary) + } + + fn truncate_to_chars(input: &str, limit: usize) -> String { + if input.chars().count() <= limit { + return input.to_string(); + } + + let mut result = String::new(); + for (idx, ch) in input.chars().enumerate() { + if idx + 1 >= limit { + result.push('…'); + break; + } + result.push(ch); + } + result + } +} + pub(crate) fn role_label_parts(role: &Role) -> (&'static str, &'static str) { match role { Role::User => ("👤", "You"), @@ -14224,3 +14672,33 @@ impl UiRuntime for ChatApp { ChatApp::streaming_count(self) } } + +#[cfg(test)] +mod queue_tests { + use super::ChatApp; + + #[test] + fn summarize_thinking_uses_first_line() { + let text = "Outline the plan.\nThen execute."; + let summary = ChatApp::summarize_thinking(text).expect("summary"); + assert!( + summary.starts_with("Outline the plan"), + "summary should start with first line" + ); + } + + #[test] + fn summarize_thinking_truncates_long_text() { + let long_segment = "a".repeat(200); + let text = format!("{long_segment}\nNext steps"); + let summary = ChatApp::summarize_thinking(&text).expect("summary"); + assert!( + summary.chars().count() <= 160, + "summary should be limited to 160 characters" + ); + assert!( + summary.ends_with('…'), + "summary should end with ellipsis when truncated" + ); + } +}