use color_eyre::eyre::{Result, eyre}; use std::collections::HashMap; use std::process::Stdio; use std::sync::Arc; use parking_lot::RwLock; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, Command}; use tokio::sync::Mutex; use tokio::time::{timeout, Duration}; const MAX_OUTPUT_LINES: usize = 2000; const DEFAULT_TIMEOUT_MS: u64 = 120000; // 2 minutes const COMMAND_DELIMITER: &str = "___OWLEN_CMD_END___"; #[derive(Debug, Clone)] pub struct CommandOutput { pub stdout: String, pub stderr: String, pub exit_code: i32, pub success: bool, } pub struct BashSession { child: Mutex, last_output: Option, } impl BashSession { /// Create a new persistent bash session pub async fn new() -> Result { let child = Command::new("bash") .arg("--norc") .arg("--noprofile") .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .kill_on_drop(true) .spawn()?; // Verify the process started if child.stdin.is_none() || child.stdout.is_none() || child.stderr.is_none() { return Err(eyre!("Failed to capture bash process stdio")); } Ok(Self { child: Mutex::new(child), last_output: None, }) } /// Execute a command in the persistent bash session /// /// # Arguments /// * `command` - The bash command to execute /// * `timeout_ms` - Optional timeout in milliseconds (default: 2 minutes) pub async fn execute(&mut self, command: &str, timeout_ms: Option) -> Result { let timeout_duration = Duration::from_millis(timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS)); let result = timeout(timeout_duration, self.execute_internal(command)).await; match result { Ok(output) => { // Store the output for potential retrieval via BashOutput tool let combined = format!("{}{}", output.as_ref().map(|o| o.stdout.as_str()).unwrap_or(""), output.as_ref().map(|o| o.stderr.as_str()).unwrap_or("")); self.last_output = Some(combined); output }, Err(_) => Err(eyre!("Command timed out after {}ms", timeout_duration.as_millis())), } } async fn execute_internal(&mut self, command: &str) -> Result { let mut child = self.child.lock().await; // Take ownership of stdio handles let mut stdin = child.stdin.take().ok_or_else(|| eyre!("No stdin"))?; let stdout = child.stdout.take().ok_or_else(|| eyre!("No stdout"))?; let stderr = child.stderr.take().ok_or_else(|| eyre!("No stderr"))?; // Write command with delimiter and exit code capture let full_command = format!( "{}\necho $? > /tmp/owlen_exit_code_$$.tmp\necho '{}'\n", command, COMMAND_DELIMITER ); stdin.write_all(full_command.as_bytes()).await?; stdin.flush().await?; // Read stdout until delimiter let mut stdout_reader = BufReader::new(stdout); let mut stdout_lines = Vec::new(); let mut line = String::new(); loop { line.clear(); let n = stdout_reader.read_line(&mut line).await?; if n == 0 { return Err(eyre!("Bash process terminated unexpectedly")); } if line.trim() == COMMAND_DELIMITER { break; } stdout_lines.push(line.clone()); // Truncate if too many lines if stdout_lines.len() > MAX_OUTPUT_LINES { stdout_lines.push("<<<...output truncated...>>>\n".to_string()); break; } } // Read stderr (non-blocking, best effort) let mut stderr_reader = BufReader::new(stderr); let mut stderr_lines = Vec::new(); let mut stderr_line = String::new(); // Try to read stderr without blocking indefinitely while let Ok(result) = timeout(Duration::from_millis(100), stderr_reader.read_line(&mut stderr_line)).await { match result { Ok(n) if n > 0 => { stderr_lines.push(stderr_line.clone()); stderr_line.clear(); if stderr_lines.len() > MAX_OUTPUT_LINES { stderr_lines.push("<<<...stderr truncated...>>>\n".to_string()); break; } } _ => break, } } // Read exit code let exit_code_cmd = "cat /tmp/owlen_exit_code_$$.tmp 2>/dev/null; rm -f /tmp/owlen_exit_code_$$.tmp\n"; stdin.write_all(exit_code_cmd.as_bytes()).await?; stdin.flush().await?; let mut exit_line = String::new(); stdout_reader.read_line(&mut exit_line).await?; let exit_code: i32 = exit_line.trim().parse().unwrap_or(0); // Restore stdio handles child.stdin = Some(stdin); child.stdout = Some(stdout_reader.into_inner()); child.stderr = Some(stderr_reader.into_inner()); Ok(CommandOutput { stdout: stdout_lines.join(""), stderr: stderr_lines.join(""), exit_code, success: exit_code == 0, }) } /// Close the bash session pub async fn close(self) -> Result<()> { let mut child = self.child.into_inner(); if let Some(mut stdin) = child.stdin.take() { let _ = stdin.write_all(b"exit\n").await; let _ = stdin.flush().await; } let _ = child.wait().await?; Ok(()) } } /// Manages background bash shells by ID #[derive(Clone, Default)] pub struct ShellManager { shells: Arc>>, } impl ShellManager { pub fn new() -> Self { Self::default() } /// Start a new background shell, returns shell ID pub async fn start_shell(&self) -> Result { let id = uuid::Uuid::new_v4().to_string(); let session = BashSession::new().await?; self.shells.write().insert(id.clone(), session); Ok(id) } /// Execute command in background shell pub async fn execute(&self, shell_id: &str, command: &str, timeout: Option) -> Result { // We need to handle this carefully to avoid holding the lock across await // First check if the shell exists and clone what we need let exists = self.shells.read().contains_key(shell_id); if !exists { return Err(eyre!("Shell not found: {}", shell_id)); } // For now, we need to use a more complex approach since BashSession contains async operations // We'll execute and then update in a separate critical section let timeout_ms = timeout.map(|d| d.as_millis() as u64); // Take temporary ownership for execution let mut session = { let mut shells = self.shells.write(); shells.remove(shell_id) .ok_or_else(|| eyre!("Shell not found: {}", shell_id))? }; // Execute without holding the lock let result = session.execute(command, timeout_ms).await; // Put the session back self.shells.write().insert(shell_id.to_string(), session); result } /// Get output from a shell (BashOutput tool) pub fn get_output(&self, shell_id: &str) -> Result> { let shells = self.shells.read(); let session = shells.get(shell_id) .ok_or_else(|| eyre!("Shell not found: {}", shell_id))?; // Return any buffered output Ok(session.last_output.clone()) } /// Kill a shell (KillShell tool) pub fn kill_shell(&self, shell_id: &str) -> Result<()> { let mut shells = self.shells.write(); if shells.remove(shell_id).is_some() { Ok(()) } else { Err(eyre!("Shell not found: {}", shell_id)) } } /// List active shells pub fn list_shells(&self) -> Vec { self.shells.read().keys().cloned().collect() } } /// Start a background bash command, returns shell ID pub async fn run_background(manager: &ShellManager, command: &str) -> Result { let shell_id = manager.start_shell().await?; // Execute in background (non-blocking) tokio::spawn({ let manager = manager.clone(); let command = command.to_string(); let shell_id = shell_id.clone(); async move { let _ = manager.execute(&shell_id, &command, None).await; } }); Ok(shell_id) } /// Get output from background shell (BashOutput tool) pub fn bash_output(manager: &ShellManager, shell_id: &str) -> Result { manager.get_output(shell_id)? .ok_or_else(|| eyre!("No output available")) } /// Kill a background shell (KillShell tool) pub fn kill_shell(manager: &ShellManager, shell_id: &str) -> Result { manager.kill_shell(shell_id)?; Ok(format!("Shell {} terminated", shell_id)) } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn can_create_session() { let session = BashSession::new().await; assert!(session.is_ok()); } }