From 49531864f1a9f44482ecb4b4005392b0132c72df Mon Sep 17 00:00:00 2001 From: vikingowl Date: Fri, 26 Dec 2025 23:23:36 +0100 Subject: [PATCH] feat(plan): Add DAG types and validation for Conductor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of Conductor integration - DAG-based plan execution support: Types: - Add StepExecutionResult enum (Success, Failed, Skipped, Pending, InProgress) - Add SkipReason enum for tracking why steps were skipped - Add PauseReason enum for execution pause states - Add DagValidationResult enum for validation outcomes PlanStep extensions: - Add depends_on: Vec for explicit step dependencies - Add execution_result: Option for tracking - Add retry_count: u8 for failure retry tracking AccumulatedPlanStatus extensions: - Add ExecutingParallel { active_count, remaining } variant - Add Paused(PauseReason) variant - Add Aborted { reason } variant DAG validation (Kahn's algorithm): - validate_dag() returns execution order with parallelism levels - Detects cycles and missing dependencies - dependencies_satisfied() checks if step can execute - get_ready_steps() returns executable steps - skip_with_dependents() cascades failure through graph Tests: 12 new DAG validation tests covering chains, parallelism, diamonds, cycles, missing deps, and skip cascading. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- crates/app/cli/src/agent_manager.rs | 2 +- crates/app/cli/src/engine.rs | 2 +- crates/app/ui/src/components/plan_panel.rs | 33 +- crates/tools/plan/src/lib.rs | 632 ++++++++++++++++++++- 4 files changed, 656 insertions(+), 13 deletions(-) diff --git a/crates/app/cli/src/agent_manager.rs b/crates/app/cli/src/agent_manager.rs index d6b6a06..804e933 100644 --- a/crates/app/cli/src/agent_manager.rs +++ b/crates/app/cli/src/agent_manager.rs @@ -345,7 +345,7 @@ impl AgentManager { let mut guard = self.state.lock().await; if let Some(plan) = guard.current_plan_mut() { plan.finalize(); - (plan.steps.len(), plan.status) + (plan.steps.len(), plan.status.clone()) } else { (0, AccumulatedPlanStatus::Completed) } diff --git a/crates/app/cli/src/engine.rs b/crates/app/cli/src/engine.rs index ecf9c0b..26d9891 100644 --- a/crates/app/cli/src/engine.rs +++ b/crates/app/cli/src/engine.rs @@ -45,7 +45,7 @@ pub async fn run_engine_loop_dynamic( if let Some(plan) = guard.current_plan_mut() { plan.finalize(); let total_steps = plan.steps.len(); - let status = plan.status; + let status = plan.status.clone(); drop(guard); let _ = tx_ui.send(Message::AgentResponse(AgentResponse::PlanComplete { total_steps, diff --git a/crates/app/ui/src/components/plan_panel.rs b/crates/app/ui/src/components/plan_panel.rs index b0d2e9c..0b5dc94 100644 --- a/crates/app/ui/src/components/plan_panel.rs +++ b/crates/app/ui/src/components/plan_panel.rs @@ -115,19 +115,34 @@ impl PlanPanel { let mut lines: Vec = Vec::new(); // Title line with status - let status_str = match plan.status { - AccumulatedPlanStatus::Accumulating => "Accumulating", - AccumulatedPlanStatus::Reviewing => "Reviewing", - AccumulatedPlanStatus::Executing => "Executing", - AccumulatedPlanStatus::Completed => "Completed", - AccumulatedPlanStatus::Cancelled => "Cancelled", + let status_str = match &plan.status { + AccumulatedPlanStatus::Accumulating => "Accumulating".to_string(), + AccumulatedPlanStatus::Reviewing => "Reviewing".to_string(), + AccumulatedPlanStatus::Executing => "Executing".to_string(), + AccumulatedPlanStatus::ExecutingParallel { active_count, remaining } => { + format!("Executing ({} active, {} left)", active_count, remaining) + } + AccumulatedPlanStatus::Paused(reason) => { + match reason { + tools_plan::PauseReason::StepFailed { .. } => "Paused (Step Failed)".to_string(), + tools_plan::PauseReason::AwaitingPermission { .. } => "Paused (Permission)".to_string(), + } + } + AccumulatedPlanStatus::Completed => "Completed".to_string(), + AccumulatedPlanStatus::Cancelled => "Cancelled".to_string(), + AccumulatedPlanStatus::Aborted { reason } => format!("Aborted: {}", reason), }; - let status_color = match plan.status { + let status_color = match &plan.status { AccumulatedPlanStatus::Accumulating => self.theme.palette.warning, AccumulatedPlanStatus::Reviewing => self.theme.palette.info, - AccumulatedPlanStatus::Executing => self.theme.palette.success, + AccumulatedPlanStatus::Executing | AccumulatedPlanStatus::ExecutingParallel { .. } => { + self.theme.palette.success + } + AccumulatedPlanStatus::Paused(_) => self.theme.palette.warning, AccumulatedPlanStatus::Completed => self.theme.palette.success, - AccumulatedPlanStatus::Cancelled => self.theme.palette.error, + AccumulatedPlanStatus::Cancelled | AccumulatedPlanStatus::Aborted { .. } => { + self.theme.palette.error + } }; let name = plan.name.as_deref().unwrap_or("unnamed"); diff --git a/crates/tools/plan/src/lib.rs b/crates/tools/plan/src/lib.rs index cf9e1a6..752d7c5 100644 --- a/crates/tools/plan/src/lib.rs +++ b/crates/tools/plan/src/lib.rs @@ -10,6 +10,7 @@ use color_eyre::eyre::Result; use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::path::PathBuf; use chrono::{DateTime, Utc}; use uuid::Uuid; @@ -18,6 +19,91 @@ use uuid::Uuid; // Plan Execution Types (Multi-turn tool call accumulation) // ============================================================================ +/// Result of executing a single plan step +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum StepExecutionResult { + /// Step completed successfully + Success { + /// Output produced by the step + output: String, + /// Duration of execution in milliseconds + duration_ms: u64, + }, + /// Step failed with an error + Failed { + /// Error message + error: String, + /// Whether this failure was after a retry + after_retry: bool, + }, + /// Step was skipped (dependency failed or user chose to skip) + Skipped { + /// Reason for skipping + reason: SkipReason, + }, + /// Step is waiting for dependencies + Pending, + /// Step is currently executing + InProgress, +} + +/// Reason why a step was skipped +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum SkipReason { + /// A dependency failed and user chose not to retry + DependencyFailed { + /// ID of the step that failed + failed_step_id: String, + }, + /// User explicitly skipped this step + UserSkipped, + /// Step was rejected during approval + Rejected, +} + +/// Reason why plan execution is paused +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum PauseReason { + /// Step failed after automatic retry + StepFailed { + /// ID of the failed step + step_id: String, + /// Error message + error: String, + /// Number of attempts made + attempt: usize, + }, + /// Waiting for user permission + AwaitingPermission { + /// ID of the step awaiting permission + step_id: String, + /// Tool that requires permission + tool: String, + }, +} + +/// Result of DAG validation +#[derive(Debug, Clone, PartialEq)] +pub enum DagValidationResult { + /// DAG is valid + Valid { + /// Execution order: each inner Vec contains steps that can run in parallel + execution_order: Vec>, + /// Maximum number of steps that can run in parallel + max_parallelism: usize, + }, + /// DAG has a cycle + CycleDetected { + /// Steps involved in the cycle + cycle_steps: Vec, + }, + /// DAG has missing dependencies + MissingDependencies { + /// Map of step ID to missing dependency IDs + missing: HashMap>, + }, +} + /// A single proposed tool call in an accumulated plan #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PlanStep { @@ -33,6 +119,16 @@ pub struct PlanStep { pub rationale: Option, /// User's approval decision: None = pending, Some(true) = approved, Some(false) = rejected pub approved: Option, + // DAG dependency fields + /// Step IDs that must complete successfully before this step can execute + #[serde(default)] + pub depends_on: Vec, + /// Execution result (populated during execution) + #[serde(default)] + pub execution_result: Option, + /// Number of retry attempts made + #[serde(default)] + pub retry_count: u8, } impl PlanStep { @@ -45,6 +141,9 @@ impl PlanStep { args, rationale: None, approved: None, + depends_on: Vec::new(), + execution_result: None, + retry_count: 0, } } @@ -54,6 +153,12 @@ impl PlanStep { self } + /// Set the dependencies for this step + pub fn with_depends_on(mut self, depends_on: Vec) -> Self { + self.depends_on = depends_on; + self + } + /// Check if this step is pending approval pub fn is_pending(&self) -> bool { self.approved.is_none() @@ -68,22 +173,69 @@ impl PlanStep { pub fn is_rejected(&self) -> bool { self.approved == Some(false) } + + /// Check if this step has completed execution (success or failure) + pub fn is_executed(&self) -> bool { + matches!( + &self.execution_result, + Some(StepExecutionResult::Success { .. }) | Some(StepExecutionResult::Failed { .. }) + ) + } + + /// Check if this step succeeded + pub fn is_success(&self) -> bool { + matches!(&self.execution_result, Some(StepExecutionResult::Success { .. })) + } + + /// Check if this step failed + pub fn is_failed(&self) -> bool { + matches!(&self.execution_result, Some(StepExecutionResult::Failed { .. })) + } + + /// Check if this step is currently executing + pub fn is_in_progress(&self) -> bool { + matches!(&self.execution_result, Some(StepExecutionResult::InProgress)) + } + + /// Check if this step is skipped + pub fn is_skipped(&self) -> bool { + matches!(&self.execution_result, Some(StepExecutionResult::Skipped { .. })) + } + + /// Check if this step has dependencies + pub fn has_dependencies(&self) -> bool { + !self.depends_on.is_empty() + } } /// Status of an accumulated plan -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub enum AccumulatedPlanStatus { /// Agent is proposing steps (accumulating) #[default] Accumulating, /// User is reviewing the plan Reviewing, - /// Approved steps are being executed + /// Approved steps are being executed (sequential mode) Executing, + /// Approved steps are being executed in parallel (DAG mode) + ExecutingParallel { + /// Number of steps currently running + active_count: usize, + /// Total steps remaining (excluding completed) + remaining: usize, + }, + /// Execution paused waiting for user decision + Paused(PauseReason), /// All approved steps have been executed Completed, /// Plan was cancelled by user Cancelled, + /// Plan execution was aborted due to failure + Aborted { + /// Reason for aborting + reason: String, + }, } /// An accumulated plan of proposed tool calls @@ -242,6 +394,225 @@ impl AccumulatedPlan { pub fn cancel(&mut self) { self.status = AccumulatedPlanStatus::Cancelled; } + + /// Abort the plan with a reason + pub fn abort(&mut self, reason: String) { + self.status = AccumulatedPlanStatus::Aborted { reason }; + } + + /// Pause the plan for user decision + pub fn pause(&mut self, reason: PauseReason) { + self.status = AccumulatedPlanStatus::Paused(reason); + } + + /// Start parallel execution with tracking + pub fn start_parallel_execution(&mut self) { + let remaining = self.steps.iter() + .filter(|s| s.is_approved() && s.execution_result.is_none()) + .count(); + self.status = AccumulatedPlanStatus::ExecutingParallel { + active_count: 0, + remaining, + }; + } + + /// Update parallel execution progress + pub fn update_parallel_progress(&mut self, active: usize) { + let remaining = self.steps.iter() + .filter(|s| s.is_approved() && !s.is_success() && !s.is_skipped()) + .count(); + self.status = AccumulatedPlanStatus::ExecutingParallel { + active_count: active, + remaining, + }; + } + + // ======================================================================== + // DAG Validation and Execution Helpers + // ======================================================================== + + /// Validate the DAG structure of the plan using Kahn's algorithm + /// + /// Returns execution order (groups of parallel steps) if valid, + /// or error details if cycles or missing dependencies are found. + pub fn validate_dag(&self) -> DagValidationResult { + let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect(); + + // Check for missing dependencies + let mut missing: HashMap> = HashMap::new(); + for step in &self.steps { + for dep in &step.depends_on { + if !step_ids.contains(dep.as_str()) { + missing.entry(step.id.clone()) + .or_default() + .push(dep.clone()); + } + } + } + + if !missing.is_empty() { + return DagValidationResult::MissingDependencies { missing }; + } + + // Build in-degree map and adjacency list for Kahn's algorithm + let mut in_degree: HashMap<&str, usize> = HashMap::new(); + let mut dependents: HashMap<&str, Vec<&str>> = HashMap::new(); + + for step in &self.steps { + in_degree.entry(&step.id).or_insert(0); + for dep in &step.depends_on { + *in_degree.entry(&step.id).or_insert(0) += 1; + dependents.entry(dep.as_str()).or_default().push(&step.id); + } + } + + // Kahn's algorithm with level tracking for parallelism + let mut execution_order: Vec> = Vec::new(); + let mut queue: VecDeque<&str> = in_degree.iter() + .filter(|(_, deg)| **deg == 0) + .map(|(id, _)| *id) + .collect(); + + let mut processed = 0; + let mut max_parallelism = 0; + + while !queue.is_empty() { + // All items in current queue can run in parallel + let current_level: Vec = queue.drain(..).map(|s| s.to_string()).collect(); + let level_size = current_level.len(); + max_parallelism = max_parallelism.max(level_size); + + // Process current level and find next level + for step_id in ¤t_level { + processed += 1; + if let Some(deps) = dependents.get(step_id.as_str()) { + for dep in deps { + if let Some(deg) = in_degree.get_mut(dep) { + *deg -= 1; + if *deg == 0 { + queue.push_back(dep); + } + } + } + } + } + + execution_order.push(current_level); + } + + // Check for cycle (not all nodes processed) + if processed != self.steps.len() { + let cycle_steps: Vec = self.steps.iter() + .filter(|s| in_degree.get(s.id.as_str()).copied().unwrap_or(0) > 0) + .map(|s| s.id.clone()) + .collect(); + return DagValidationResult::CycleDetected { cycle_steps }; + } + + DagValidationResult::Valid { + execution_order, + max_parallelism, + } + } + + /// Check if all dependencies for a step have completed successfully + pub fn dependencies_satisfied(&self, step_id: &str) -> bool { + let step = match self.steps.iter().find(|s| s.id == step_id) { + Some(s) => s, + None => return false, + }; + + step.depends_on.iter().all(|dep_id| { + self.steps.iter() + .find(|s| s.id == *dep_id) + .map(|s| s.is_success()) + .unwrap_or(false) + }) + } + + /// Get steps that are ready to execute (approved, deps satisfied, not yet executed) + pub fn get_ready_steps(&self) -> Vec<&PlanStep> { + self.steps.iter() + .filter(|s| s.is_approved()) + .filter(|s| s.execution_result.is_none()) + .filter(|s| self.dependencies_satisfied(&s.id)) + .collect() + } + + /// Get steps that are blocked (waiting for dependencies) + pub fn get_blocked_steps(&self) -> Vec<&PlanStep> { + self.steps.iter() + .filter(|s| s.is_approved()) + .filter(|s| s.execution_result.is_none()) + .filter(|s| !self.dependencies_satisfied(&s.id)) + .collect() + } + + /// Get the IDs of steps that depend on a given step + pub fn get_dependents(&self, step_id: &str) -> Vec<&String> { + self.steps.iter() + .filter(|s| s.depends_on.contains(&step_id.to_string())) + .map(|s| &s.id) + .collect() + } + + /// Mark a step and all its dependents as skipped due to dependency failure + pub fn skip_with_dependents(&mut self, step_id: &str) { + let dependents: Vec = self.get_dependents(step_id) + .iter() + .map(|s| (*s).clone()) + .collect(); + + // Skip the step itself if not already executed + if let Some(step) = self.steps.iter_mut().find(|s| s.id == step_id) { + if step.execution_result.is_none() { + step.execution_result = Some(StepExecutionResult::Skipped { + reason: SkipReason::UserSkipped, + }); + } + } + + // Recursively skip dependents + for dep_id in dependents { + if let Some(step) = self.steps.iter_mut().find(|s| s.id == dep_id) { + if step.execution_result.is_none() { + step.execution_result = Some(StepExecutionResult::Skipped { + reason: SkipReason::DependencyFailed { + failed_step_id: step_id.to_string(), + }, + }); + } + } + // Recurse to skip transitive dependents + self.skip_with_dependents(&dep_id); + } + } + + /// Get step by ID + pub fn get_step(&self, id: &str) -> Option<&PlanStep> { + self.steps.iter().find(|s| s.id == id) + } + + /// Get mutable step by ID + pub fn get_step_mut(&mut self, id: &str) -> Option<&mut PlanStep> { + self.steps.iter_mut().find(|s| s.id == id) + } + + /// Check if execution is complete (all approved steps executed or skipped) + pub fn is_execution_complete(&self) -> bool { + self.steps.iter() + .filter(|s| s.is_approved()) + .all(|s| s.is_success() || s.is_skipped() || s.is_failed()) + } + + /// Get execution summary counts + pub fn execution_counts(&self) -> (usize, usize, usize, usize) { + let approved = self.steps.iter().filter(|s| s.is_approved()).count(); + let succeeded = self.steps.iter().filter(|s| s.is_success()).count(); + let failed = self.steps.iter().filter(|s| s.is_failed()).count(); + let skipped = self.steps.iter().filter(|s| s.is_skipped()).count(); + (approved, succeeded, failed, skipped) + } } /// User's approval decisions for plan steps @@ -878,4 +1249,261 @@ mod tests { manager.delete_accumulated_plan(&id).await.unwrap(); assert!(manager.load_accumulated_plan(&id).await.is_err()); } + + // ======================================================================== + // DAG Validation Tests + // ======================================================================== + + #[test] + fn test_dag_validation_simple_chain() { + // A -> B -> C (linear chain) + let mut plan = AccumulatedPlan::new(); + + let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})); + let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})) + .with_depends_on(vec!["a".into()]); + let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({})) + .with_depends_on(vec!["b".into()]); + + plan.add_step(step_a); + plan.add_step(step_b); + plan.add_step(step_c); + + match plan.validate_dag() { + DagValidationResult::Valid { execution_order, max_parallelism } => { + assert_eq!(execution_order.len(), 3); // 3 levels + assert_eq!(max_parallelism, 1); // No parallelism in a chain + assert!(execution_order[0].contains(&"a".to_string())); + assert!(execution_order[1].contains(&"b".to_string())); + assert!(execution_order[2].contains(&"c".to_string())); + } + other => panic!("Expected Valid, got {:?}", other), + } + } + + #[test] + fn test_dag_validation_parallel_steps() { + // A -> B, A -> C (B and C can run in parallel after A) + let mut plan = AccumulatedPlan::new(); + + let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})); + let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})) + .with_depends_on(vec!["a".into()]); + let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({})) + .with_depends_on(vec!["a".into()]); + + plan.add_step(step_a); + plan.add_step(step_b); + plan.add_step(step_c); + + match plan.validate_dag() { + DagValidationResult::Valid { execution_order, max_parallelism } => { + assert_eq!(execution_order.len(), 2); // 2 levels + assert_eq!(max_parallelism, 2); // B and C in parallel + assert!(execution_order[0].contains(&"a".to_string())); + assert!(execution_order[1].contains(&"b".to_string())); + assert!(execution_order[1].contains(&"c".to_string())); + } + other => panic!("Expected Valid, got {:?}", other), + } + } + + #[test] + fn test_dag_validation_diamond() { + // Diamond: A -> B, A -> C, B -> D, C -> D + let mut plan = AccumulatedPlan::new(); + + let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})); + let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})) + .with_depends_on(vec!["a".into()]); + let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({})) + .with_depends_on(vec!["a".into()]); + let step_d = PlanStep::new("d".into(), 1, "bash".into(), json!({})) + .with_depends_on(vec!["b".into(), "c".into()]); + + plan.add_step(step_a); + plan.add_step(step_b); + plan.add_step(step_c); + plan.add_step(step_d); + + match plan.validate_dag() { + DagValidationResult::Valid { execution_order, max_parallelism } => { + assert_eq!(execution_order.len(), 3); // 3 levels: A, then B+C, then D + assert_eq!(max_parallelism, 2); // B and C in parallel + } + other => panic!("Expected Valid, got {:?}", other), + } + } + + #[test] + fn test_dag_validation_cycle_detection() { + // Cycle: A -> B -> C -> A + let mut plan = AccumulatedPlan::new(); + + let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})) + .with_depends_on(vec!["c".into()]); + let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})) + .with_depends_on(vec!["a".into()]); + let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({})) + .with_depends_on(vec!["b".into()]); + + plan.add_step(step_a); + plan.add_step(step_b); + plan.add_step(step_c); + + match plan.validate_dag() { + DagValidationResult::CycleDetected { cycle_steps } => { + assert_eq!(cycle_steps.len(), 3); + assert!(cycle_steps.contains(&"a".to_string())); + assert!(cycle_steps.contains(&"b".to_string())); + assert!(cycle_steps.contains(&"c".to_string())); + } + other => panic!("Expected CycleDetected, got {:?}", other), + } + } + + #[test] + fn test_dag_validation_missing_dependency() { + let mut plan = AccumulatedPlan::new(); + + let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})); + let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})) + .with_depends_on(vec!["nonexistent".into()]); + + plan.add_step(step_a); + plan.add_step(step_b); + + match plan.validate_dag() { + DagValidationResult::MissingDependencies { missing } => { + assert!(missing.contains_key("b")); + assert!(missing["b"].contains(&"nonexistent".to_string())); + } + other => panic!("Expected MissingDependencies, got {:?}", other), + } + } + + #[test] + fn test_dag_validation_no_dependencies() { + // All independent steps (max parallelism) + let mut plan = AccumulatedPlan::new(); + + plan.add_step(PlanStep::new("a".into(), 1, "read".into(), json!({}))); + plan.add_step(PlanStep::new("b".into(), 1, "read".into(), json!({}))); + plan.add_step(PlanStep::new("c".into(), 1, "read".into(), json!({}))); + + match plan.validate_dag() { + DagValidationResult::Valid { execution_order, max_parallelism } => { + assert_eq!(execution_order.len(), 1); // All in one level + assert_eq!(max_parallelism, 3); // All 3 can run in parallel + } + other => panic!("Expected Valid, got {:?}", other), + } + } + + #[test] + fn test_dependencies_satisfied() { + let mut plan = AccumulatedPlan::new(); + + let mut step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})); + step_a.execution_result = Some(StepExecutionResult::Success { + output: "ok".into(), + duration_ms: 100, + }); + + let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})) + .with_depends_on(vec!["a".into()]); + + plan.add_step(step_a); + plan.add_step(step_b); + + // A succeeded, so B's deps are satisfied + assert!(plan.dependencies_satisfied("b")); + + // A has no deps, so they're trivially satisfied + assert!(plan.dependencies_satisfied("a")); + } + + #[test] + fn test_get_ready_steps() { + let mut plan = AccumulatedPlan::new(); + + let mut step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})); + step_a.approved = Some(true); + + let mut step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})); + step_b.approved = Some(true); + step_b.depends_on = vec!["a".into()]; + + plan.add_step(step_a); + plan.add_step(step_b); + + // Only A is ready (B depends on A which hasn't executed) + let ready = plan.get_ready_steps(); + assert_eq!(ready.len(), 1); + assert_eq!(ready[0].id, "a"); + } + + #[test] + fn test_skip_with_dependents() { + let mut plan = AccumulatedPlan::new(); + + plan.add_step(PlanStep::new("a".into(), 1, "read".into(), json!({}))); + plan.add_step( + PlanStep::new("b".into(), 1, "edit".into(), json!({})) + .with_depends_on(vec!["a".into()]) + ); + plan.add_step( + PlanStep::new("c".into(), 1, "write".into(), json!({})) + .with_depends_on(vec!["b".into()]) + ); + + // Skip A, should cascade to B and C + plan.skip_with_dependents("a"); + + assert!(plan.steps[0].is_skipped()); + assert!(plan.steps[1].is_skipped()); + assert!(plan.steps[2].is_skipped()); + + // Check the reason for cascaded skips + if let Some(StepExecutionResult::Skipped { reason: SkipReason::DependencyFailed { failed_step_id } }) = &plan.steps[1].execution_result { + assert_eq!(failed_step_id, "a"); + } else { + panic!("Expected DependencyFailed skip reason"); + } + } + + #[test] + fn test_execution_counts() { + let mut plan = AccumulatedPlan::new(); + + let mut step_a = PlanStep::new("a".into(), 1, "read".into(), json!({})); + step_a.approved = Some(true); + step_a.execution_result = Some(StepExecutionResult::Success { + output: "ok".into(), + duration_ms: 100, + }); + + let mut step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({})); + step_b.approved = Some(true); + step_b.execution_result = Some(StepExecutionResult::Failed { + error: "oops".into(), + after_retry: true, + }); + + let mut step_c = PlanStep::new("c".into(), 1, "write".into(), json!({})); + step_c.approved = Some(true); + step_c.execution_result = Some(StepExecutionResult::Skipped { + reason: SkipReason::UserSkipped, + }); + + plan.add_step(step_a); + plan.add_step(step_b); + plan.add_step(step_c); + + let (approved, succeeded, failed, skipped) = plan.execution_counts(); + assert_eq!(approved, 3); + assert_eq!(succeeded, 1); + assert_eq!(failed, 1); + assert_eq!(skipped, 1); + } }