|
|
|
|
@@ -10,6 +10,7 @@
|
|
|
|
|
|
|
|
|
|
use color_eyre::eyre::Result;
|
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
use std::collections::{HashMap, HashSet, VecDeque};
|
|
|
|
|
use std::path::PathBuf;
|
|
|
|
|
use chrono::{DateTime, Utc};
|
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
@@ -18,6 +19,91 @@ use uuid::Uuid;
|
|
|
|
|
// Plan Execution Types (Multi-turn tool call accumulation)
|
|
|
|
|
// ============================================================================
|
|
|
|
|
|
|
|
|
|
/// Result of executing a single plan step
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
|
|
|
pub enum StepExecutionResult {
|
|
|
|
|
/// Step completed successfully
|
|
|
|
|
Success {
|
|
|
|
|
/// Output produced by the step
|
|
|
|
|
output: String,
|
|
|
|
|
/// Duration of execution in milliseconds
|
|
|
|
|
duration_ms: u64,
|
|
|
|
|
},
|
|
|
|
|
/// Step failed with an error
|
|
|
|
|
Failed {
|
|
|
|
|
/// Error message
|
|
|
|
|
error: String,
|
|
|
|
|
/// Whether this failure was after a retry
|
|
|
|
|
after_retry: bool,
|
|
|
|
|
},
|
|
|
|
|
/// Step was skipped (dependency failed or user chose to skip)
|
|
|
|
|
Skipped {
|
|
|
|
|
/// Reason for skipping
|
|
|
|
|
reason: SkipReason,
|
|
|
|
|
},
|
|
|
|
|
/// Step is waiting for dependencies
|
|
|
|
|
Pending,
|
|
|
|
|
/// Step is currently executing
|
|
|
|
|
InProgress,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reason why a step was skipped
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
|
|
|
|
pub enum SkipReason {
|
|
|
|
|
/// A dependency failed and user chose not to retry
|
|
|
|
|
DependencyFailed {
|
|
|
|
|
/// ID of the step that failed
|
|
|
|
|
failed_step_id: String,
|
|
|
|
|
},
|
|
|
|
|
/// User explicitly skipped this step
|
|
|
|
|
UserSkipped,
|
|
|
|
|
/// Step was rejected during approval
|
|
|
|
|
Rejected,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Reason why plan execution is paused
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
|
|
|
|
pub enum PauseReason {
|
|
|
|
|
/// Step failed after automatic retry
|
|
|
|
|
StepFailed {
|
|
|
|
|
/// ID of the failed step
|
|
|
|
|
step_id: String,
|
|
|
|
|
/// Error message
|
|
|
|
|
error: String,
|
|
|
|
|
/// Number of attempts made
|
|
|
|
|
attempt: usize,
|
|
|
|
|
},
|
|
|
|
|
/// Waiting for user permission
|
|
|
|
|
AwaitingPermission {
|
|
|
|
|
/// ID of the step awaiting permission
|
|
|
|
|
step_id: String,
|
|
|
|
|
/// Tool that requires permission
|
|
|
|
|
tool: String,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Result of DAG validation
|
|
|
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
|
|
|
pub enum DagValidationResult {
|
|
|
|
|
/// DAG is valid
|
|
|
|
|
Valid {
|
|
|
|
|
/// Execution order: each inner Vec contains steps that can run in parallel
|
|
|
|
|
execution_order: Vec<Vec<String>>,
|
|
|
|
|
/// Maximum number of steps that can run in parallel
|
|
|
|
|
max_parallelism: usize,
|
|
|
|
|
},
|
|
|
|
|
/// DAG has a cycle
|
|
|
|
|
CycleDetected {
|
|
|
|
|
/// Steps involved in the cycle
|
|
|
|
|
cycle_steps: Vec<String>,
|
|
|
|
|
},
|
|
|
|
|
/// DAG has missing dependencies
|
|
|
|
|
MissingDependencies {
|
|
|
|
|
/// Map of step ID to missing dependency IDs
|
|
|
|
|
missing: HashMap<String, Vec<String>>,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A single proposed tool call in an accumulated plan
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
|
pub struct PlanStep {
|
|
|
|
|
@@ -33,6 +119,16 @@ pub struct PlanStep {
|
|
|
|
|
pub rationale: Option<String>,
|
|
|
|
|
/// User's approval decision: None = pending, Some(true) = approved, Some(false) = rejected
|
|
|
|
|
pub approved: Option<bool>,
|
|
|
|
|
// DAG dependency fields
|
|
|
|
|
/// Step IDs that must complete successfully before this step can execute
|
|
|
|
|
#[serde(default)]
|
|
|
|
|
pub depends_on: Vec<String>,
|
|
|
|
|
/// Execution result (populated during execution)
|
|
|
|
|
#[serde(default)]
|
|
|
|
|
pub execution_result: Option<StepExecutionResult>,
|
|
|
|
|
/// Number of retry attempts made
|
|
|
|
|
#[serde(default)]
|
|
|
|
|
pub retry_count: u8,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl PlanStep {
|
|
|
|
|
@@ -45,6 +141,9 @@ impl PlanStep {
|
|
|
|
|
args,
|
|
|
|
|
rationale: None,
|
|
|
|
|
approved: None,
|
|
|
|
|
depends_on: Vec::new(),
|
|
|
|
|
execution_result: None,
|
|
|
|
|
retry_count: 0,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -54,6 +153,12 @@ impl PlanStep {
|
|
|
|
|
self
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Set the dependencies for this step
|
|
|
|
|
pub fn with_depends_on(mut self, depends_on: Vec<String>) -> Self {
|
|
|
|
|
self.depends_on = depends_on;
|
|
|
|
|
self
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this step is pending approval
|
|
|
|
|
pub fn is_pending(&self) -> bool {
|
|
|
|
|
self.approved.is_none()
|
|
|
|
|
@@ -68,22 +173,69 @@ impl PlanStep {
|
|
|
|
|
pub fn is_rejected(&self) -> bool {
|
|
|
|
|
self.approved == Some(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this step has completed execution (success or failure)
|
|
|
|
|
pub fn is_executed(&self) -> bool {
|
|
|
|
|
matches!(
|
|
|
|
|
&self.execution_result,
|
|
|
|
|
Some(StepExecutionResult::Success { .. }) | Some(StepExecutionResult::Failed { .. })
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this step succeeded
|
|
|
|
|
pub fn is_success(&self) -> bool {
|
|
|
|
|
matches!(&self.execution_result, Some(StepExecutionResult::Success { .. }))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this step failed
|
|
|
|
|
pub fn is_failed(&self) -> bool {
|
|
|
|
|
matches!(&self.execution_result, Some(StepExecutionResult::Failed { .. }))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this step is currently executing
|
|
|
|
|
pub fn is_in_progress(&self) -> bool {
|
|
|
|
|
matches!(&self.execution_result, Some(StepExecutionResult::InProgress))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this step is skipped
|
|
|
|
|
pub fn is_skipped(&self) -> bool {
|
|
|
|
|
matches!(&self.execution_result, Some(StepExecutionResult::Skipped { .. }))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this step has dependencies
|
|
|
|
|
pub fn has_dependencies(&self) -> bool {
|
|
|
|
|
!self.depends_on.is_empty()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Status of an accumulated plan
|
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
|
|
|
|
pub enum AccumulatedPlanStatus {
|
|
|
|
|
/// Agent is proposing steps (accumulating)
|
|
|
|
|
#[default]
|
|
|
|
|
Accumulating,
|
|
|
|
|
/// User is reviewing the plan
|
|
|
|
|
Reviewing,
|
|
|
|
|
/// Approved steps are being executed
|
|
|
|
|
/// Approved steps are being executed (sequential mode)
|
|
|
|
|
Executing,
|
|
|
|
|
/// Approved steps are being executed in parallel (DAG mode)
|
|
|
|
|
ExecutingParallel {
|
|
|
|
|
/// Number of steps currently running
|
|
|
|
|
active_count: usize,
|
|
|
|
|
/// Total steps remaining (excluding completed)
|
|
|
|
|
remaining: usize,
|
|
|
|
|
},
|
|
|
|
|
/// Execution paused waiting for user decision
|
|
|
|
|
Paused(PauseReason),
|
|
|
|
|
/// All approved steps have been executed
|
|
|
|
|
Completed,
|
|
|
|
|
/// Plan was cancelled by user
|
|
|
|
|
Cancelled,
|
|
|
|
|
/// Plan execution was aborted due to failure
|
|
|
|
|
Aborted {
|
|
|
|
|
/// Reason for aborting
|
|
|
|
|
reason: String,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// An accumulated plan of proposed tool calls
|
|
|
|
|
@@ -242,6 +394,225 @@ impl AccumulatedPlan {
|
|
|
|
|
pub fn cancel(&mut self) {
|
|
|
|
|
self.status = AccumulatedPlanStatus::Cancelled;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Abort the plan with a reason
|
|
|
|
|
pub fn abort(&mut self, reason: String) {
|
|
|
|
|
self.status = AccumulatedPlanStatus::Aborted { reason };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Pause the plan for user decision
|
|
|
|
|
pub fn pause(&mut self, reason: PauseReason) {
|
|
|
|
|
self.status = AccumulatedPlanStatus::Paused(reason);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Start parallel execution with tracking
|
|
|
|
|
pub fn start_parallel_execution(&mut self) {
|
|
|
|
|
let remaining = self.steps.iter()
|
|
|
|
|
.filter(|s| s.is_approved() && s.execution_result.is_none())
|
|
|
|
|
.count();
|
|
|
|
|
self.status = AccumulatedPlanStatus::ExecutingParallel {
|
|
|
|
|
active_count: 0,
|
|
|
|
|
remaining,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Update parallel execution progress
|
|
|
|
|
pub fn update_parallel_progress(&mut self, active: usize) {
|
|
|
|
|
let remaining = self.steps.iter()
|
|
|
|
|
.filter(|s| s.is_approved() && !s.is_success() && !s.is_skipped())
|
|
|
|
|
.count();
|
|
|
|
|
self.status = AccumulatedPlanStatus::ExecutingParallel {
|
|
|
|
|
active_count: active,
|
|
|
|
|
remaining,
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ========================================================================
|
|
|
|
|
// DAG Validation and Execution Helpers
|
|
|
|
|
// ========================================================================
|
|
|
|
|
|
|
|
|
|
/// Validate the DAG structure of the plan using Kahn's algorithm
|
|
|
|
|
///
|
|
|
|
|
/// Returns execution order (groups of parallel steps) if valid,
|
|
|
|
|
/// or error details if cycles or missing dependencies are found.
|
|
|
|
|
pub fn validate_dag(&self) -> DagValidationResult {
|
|
|
|
|
let step_ids: HashSet<&str> = self.steps.iter().map(|s| s.id.as_str()).collect();
|
|
|
|
|
|
|
|
|
|
// Check for missing dependencies
|
|
|
|
|
let mut missing: HashMap<String, Vec<String>> = HashMap::new();
|
|
|
|
|
for step in &self.steps {
|
|
|
|
|
for dep in &step.depends_on {
|
|
|
|
|
if !step_ids.contains(dep.as_str()) {
|
|
|
|
|
missing.entry(step.id.clone())
|
|
|
|
|
.or_default()
|
|
|
|
|
.push(dep.clone());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !missing.is_empty() {
|
|
|
|
|
return DagValidationResult::MissingDependencies { missing };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build in-degree map and adjacency list for Kahn's algorithm
|
|
|
|
|
let mut in_degree: HashMap<&str, usize> = HashMap::new();
|
|
|
|
|
let mut dependents: HashMap<&str, Vec<&str>> = HashMap::new();
|
|
|
|
|
|
|
|
|
|
for step in &self.steps {
|
|
|
|
|
in_degree.entry(&step.id).or_insert(0);
|
|
|
|
|
for dep in &step.depends_on {
|
|
|
|
|
*in_degree.entry(&step.id).or_insert(0) += 1;
|
|
|
|
|
dependents.entry(dep.as_str()).or_default().push(&step.id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Kahn's algorithm with level tracking for parallelism
|
|
|
|
|
let mut execution_order: Vec<Vec<String>> = Vec::new();
|
|
|
|
|
let mut queue: VecDeque<&str> = in_degree.iter()
|
|
|
|
|
.filter(|(_, deg)| **deg == 0)
|
|
|
|
|
.map(|(id, _)| *id)
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
let mut processed = 0;
|
|
|
|
|
let mut max_parallelism = 0;
|
|
|
|
|
|
|
|
|
|
while !queue.is_empty() {
|
|
|
|
|
// All items in current queue can run in parallel
|
|
|
|
|
let current_level: Vec<String> = queue.drain(..).map(|s| s.to_string()).collect();
|
|
|
|
|
let level_size = current_level.len();
|
|
|
|
|
max_parallelism = max_parallelism.max(level_size);
|
|
|
|
|
|
|
|
|
|
// Process current level and find next level
|
|
|
|
|
for step_id in ¤t_level {
|
|
|
|
|
processed += 1;
|
|
|
|
|
if let Some(deps) = dependents.get(step_id.as_str()) {
|
|
|
|
|
for dep in deps {
|
|
|
|
|
if let Some(deg) = in_degree.get_mut(dep) {
|
|
|
|
|
*deg -= 1;
|
|
|
|
|
if *deg == 0 {
|
|
|
|
|
queue.push_back(dep);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
execution_order.push(current_level);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check for cycle (not all nodes processed)
|
|
|
|
|
if processed != self.steps.len() {
|
|
|
|
|
let cycle_steps: Vec<String> = self.steps.iter()
|
|
|
|
|
.filter(|s| in_degree.get(s.id.as_str()).copied().unwrap_or(0) > 0)
|
|
|
|
|
.map(|s| s.id.clone())
|
|
|
|
|
.collect();
|
|
|
|
|
return DagValidationResult::CycleDetected { cycle_steps };
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
DagValidationResult::Valid {
|
|
|
|
|
execution_order,
|
|
|
|
|
max_parallelism,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if all dependencies for a step have completed successfully
|
|
|
|
|
pub fn dependencies_satisfied(&self, step_id: &str) -> bool {
|
|
|
|
|
let step = match self.steps.iter().find(|s| s.id == step_id) {
|
|
|
|
|
Some(s) => s,
|
|
|
|
|
None => return false,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
step.depends_on.iter().all(|dep_id| {
|
|
|
|
|
self.steps.iter()
|
|
|
|
|
.find(|s| s.id == *dep_id)
|
|
|
|
|
.map(|s| s.is_success())
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get steps that are ready to execute (approved, deps satisfied, not yet executed)
|
|
|
|
|
pub fn get_ready_steps(&self) -> Vec<&PlanStep> {
|
|
|
|
|
self.steps.iter()
|
|
|
|
|
.filter(|s| s.is_approved())
|
|
|
|
|
.filter(|s| s.execution_result.is_none())
|
|
|
|
|
.filter(|s| self.dependencies_satisfied(&s.id))
|
|
|
|
|
.collect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get steps that are blocked (waiting for dependencies)
|
|
|
|
|
pub fn get_blocked_steps(&self) -> Vec<&PlanStep> {
|
|
|
|
|
self.steps.iter()
|
|
|
|
|
.filter(|s| s.is_approved())
|
|
|
|
|
.filter(|s| s.execution_result.is_none())
|
|
|
|
|
.filter(|s| !self.dependencies_satisfied(&s.id))
|
|
|
|
|
.collect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get the IDs of steps that depend on a given step
|
|
|
|
|
pub fn get_dependents(&self, step_id: &str) -> Vec<&String> {
|
|
|
|
|
self.steps.iter()
|
|
|
|
|
.filter(|s| s.depends_on.contains(&step_id.to_string()))
|
|
|
|
|
.map(|s| &s.id)
|
|
|
|
|
.collect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Mark a step and all its dependents as skipped due to dependency failure
|
|
|
|
|
pub fn skip_with_dependents(&mut self, step_id: &str) {
|
|
|
|
|
let dependents: Vec<String> = self.get_dependents(step_id)
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|s| (*s).clone())
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
// Skip the step itself if not already executed
|
|
|
|
|
if let Some(step) = self.steps.iter_mut().find(|s| s.id == step_id) {
|
|
|
|
|
if step.execution_result.is_none() {
|
|
|
|
|
step.execution_result = Some(StepExecutionResult::Skipped {
|
|
|
|
|
reason: SkipReason::UserSkipped,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Recursively skip dependents
|
|
|
|
|
for dep_id in dependents {
|
|
|
|
|
if let Some(step) = self.steps.iter_mut().find(|s| s.id == dep_id) {
|
|
|
|
|
if step.execution_result.is_none() {
|
|
|
|
|
step.execution_result = Some(StepExecutionResult::Skipped {
|
|
|
|
|
reason: SkipReason::DependencyFailed {
|
|
|
|
|
failed_step_id: step_id.to_string(),
|
|
|
|
|
},
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Recurse to skip transitive dependents
|
|
|
|
|
self.skip_with_dependents(&dep_id);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get step by ID
|
|
|
|
|
pub fn get_step(&self, id: &str) -> Option<&PlanStep> {
|
|
|
|
|
self.steps.iter().find(|s| s.id == id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get mutable step by ID
|
|
|
|
|
pub fn get_step_mut(&mut self, id: &str) -> Option<&mut PlanStep> {
|
|
|
|
|
self.steps.iter_mut().find(|s| s.id == id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if execution is complete (all approved steps executed or skipped)
|
|
|
|
|
pub fn is_execution_complete(&self) -> bool {
|
|
|
|
|
self.steps.iter()
|
|
|
|
|
.filter(|s| s.is_approved())
|
|
|
|
|
.all(|s| s.is_success() || s.is_skipped() || s.is_failed())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get execution summary counts
|
|
|
|
|
pub fn execution_counts(&self) -> (usize, usize, usize, usize) {
|
|
|
|
|
let approved = self.steps.iter().filter(|s| s.is_approved()).count();
|
|
|
|
|
let succeeded = self.steps.iter().filter(|s| s.is_success()).count();
|
|
|
|
|
let failed = self.steps.iter().filter(|s| s.is_failed()).count();
|
|
|
|
|
let skipped = self.steps.iter().filter(|s| s.is_skipped()).count();
|
|
|
|
|
(approved, succeeded, failed, skipped)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// User's approval decisions for plan steps
|
|
|
|
|
@@ -878,4 +1249,261 @@ mod tests {
|
|
|
|
|
manager.delete_accumulated_plan(&id).await.unwrap();
|
|
|
|
|
assert!(manager.load_accumulated_plan(&id).await.is_err());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ========================================================================
|
|
|
|
|
// DAG Validation Tests
|
|
|
|
|
// ========================================================================
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_dag_validation_simple_chain() {
|
|
|
|
|
// A -> B -> C (linear chain)
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}));
|
|
|
|
|
let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()]);
|
|
|
|
|
let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["b".into()]);
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
plan.add_step(step_c);
|
|
|
|
|
|
|
|
|
|
match plan.validate_dag() {
|
|
|
|
|
DagValidationResult::Valid { execution_order, max_parallelism } => {
|
|
|
|
|
assert_eq!(execution_order.len(), 3); // 3 levels
|
|
|
|
|
assert_eq!(max_parallelism, 1); // No parallelism in a chain
|
|
|
|
|
assert!(execution_order[0].contains(&"a".to_string()));
|
|
|
|
|
assert!(execution_order[1].contains(&"b".to_string()));
|
|
|
|
|
assert!(execution_order[2].contains(&"c".to_string()));
|
|
|
|
|
}
|
|
|
|
|
other => panic!("Expected Valid, got {:?}", other),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_dag_validation_parallel_steps() {
|
|
|
|
|
// A -> B, A -> C (B and C can run in parallel after A)
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}));
|
|
|
|
|
let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()]);
|
|
|
|
|
let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()]);
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
plan.add_step(step_c);
|
|
|
|
|
|
|
|
|
|
match plan.validate_dag() {
|
|
|
|
|
DagValidationResult::Valid { execution_order, max_parallelism } => {
|
|
|
|
|
assert_eq!(execution_order.len(), 2); // 2 levels
|
|
|
|
|
assert_eq!(max_parallelism, 2); // B and C in parallel
|
|
|
|
|
assert!(execution_order[0].contains(&"a".to_string()));
|
|
|
|
|
assert!(execution_order[1].contains(&"b".to_string()));
|
|
|
|
|
assert!(execution_order[1].contains(&"c".to_string()));
|
|
|
|
|
}
|
|
|
|
|
other => panic!("Expected Valid, got {:?}", other),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_dag_validation_diamond() {
|
|
|
|
|
// Diamond: A -> B, A -> C, B -> D, C -> D
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}));
|
|
|
|
|
let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()]);
|
|
|
|
|
let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()]);
|
|
|
|
|
let step_d = PlanStep::new("d".into(), 1, "bash".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["b".into(), "c".into()]);
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
plan.add_step(step_c);
|
|
|
|
|
plan.add_step(step_d);
|
|
|
|
|
|
|
|
|
|
match plan.validate_dag() {
|
|
|
|
|
DagValidationResult::Valid { execution_order, max_parallelism } => {
|
|
|
|
|
assert_eq!(execution_order.len(), 3); // 3 levels: A, then B+C, then D
|
|
|
|
|
assert_eq!(max_parallelism, 2); // B and C in parallel
|
|
|
|
|
}
|
|
|
|
|
other => panic!("Expected Valid, got {:?}", other),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_dag_validation_cycle_detection() {
|
|
|
|
|
// Cycle: A -> B -> C -> A
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["c".into()]);
|
|
|
|
|
let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()]);
|
|
|
|
|
let step_c = PlanStep::new("c".into(), 1, "write".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["b".into()]);
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
plan.add_step(step_c);
|
|
|
|
|
|
|
|
|
|
match plan.validate_dag() {
|
|
|
|
|
DagValidationResult::CycleDetected { cycle_steps } => {
|
|
|
|
|
assert_eq!(cycle_steps.len(), 3);
|
|
|
|
|
assert!(cycle_steps.contains(&"a".to_string()));
|
|
|
|
|
assert!(cycle_steps.contains(&"b".to_string()));
|
|
|
|
|
assert!(cycle_steps.contains(&"c".to_string()));
|
|
|
|
|
}
|
|
|
|
|
other => panic!("Expected CycleDetected, got {:?}", other),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_dag_validation_missing_dependency() {
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}));
|
|
|
|
|
let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["nonexistent".into()]);
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
|
|
|
|
|
match plan.validate_dag() {
|
|
|
|
|
DagValidationResult::MissingDependencies { missing } => {
|
|
|
|
|
assert!(missing.contains_key("b"));
|
|
|
|
|
assert!(missing["b"].contains(&"nonexistent".to_string()));
|
|
|
|
|
}
|
|
|
|
|
other => panic!("Expected MissingDependencies, got {:?}", other),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_dag_validation_no_dependencies() {
|
|
|
|
|
// All independent steps (max parallelism)
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
plan.add_step(PlanStep::new("a".into(), 1, "read".into(), json!({})));
|
|
|
|
|
plan.add_step(PlanStep::new("b".into(), 1, "read".into(), json!({})));
|
|
|
|
|
plan.add_step(PlanStep::new("c".into(), 1, "read".into(), json!({})));
|
|
|
|
|
|
|
|
|
|
match plan.validate_dag() {
|
|
|
|
|
DagValidationResult::Valid { execution_order, max_parallelism } => {
|
|
|
|
|
assert_eq!(execution_order.len(), 1); // All in one level
|
|
|
|
|
assert_eq!(max_parallelism, 3); // All 3 can run in parallel
|
|
|
|
|
}
|
|
|
|
|
other => panic!("Expected Valid, got {:?}", other),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_dependencies_satisfied() {
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let mut step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}));
|
|
|
|
|
step_a.execution_result = Some(StepExecutionResult::Success {
|
|
|
|
|
output: "ok".into(),
|
|
|
|
|
duration_ms: 100,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()]);
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
|
|
|
|
|
// A succeeded, so B's deps are satisfied
|
|
|
|
|
assert!(plan.dependencies_satisfied("b"));
|
|
|
|
|
|
|
|
|
|
// A has no deps, so they're trivially satisfied
|
|
|
|
|
assert!(plan.dependencies_satisfied("a"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_get_ready_steps() {
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let mut step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}));
|
|
|
|
|
step_a.approved = Some(true);
|
|
|
|
|
|
|
|
|
|
let mut step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}));
|
|
|
|
|
step_b.approved = Some(true);
|
|
|
|
|
step_b.depends_on = vec!["a".into()];
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
|
|
|
|
|
// Only A is ready (B depends on A which hasn't executed)
|
|
|
|
|
let ready = plan.get_ready_steps();
|
|
|
|
|
assert_eq!(ready.len(), 1);
|
|
|
|
|
assert_eq!(ready[0].id, "a");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_skip_with_dependents() {
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
plan.add_step(PlanStep::new("a".into(), 1, "read".into(), json!({})));
|
|
|
|
|
plan.add_step(
|
|
|
|
|
PlanStep::new("b".into(), 1, "edit".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["a".into()])
|
|
|
|
|
);
|
|
|
|
|
plan.add_step(
|
|
|
|
|
PlanStep::new("c".into(), 1, "write".into(), json!({}))
|
|
|
|
|
.with_depends_on(vec!["b".into()])
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Skip A, should cascade to B and C
|
|
|
|
|
plan.skip_with_dependents("a");
|
|
|
|
|
|
|
|
|
|
assert!(plan.steps[0].is_skipped());
|
|
|
|
|
assert!(plan.steps[1].is_skipped());
|
|
|
|
|
assert!(plan.steps[2].is_skipped());
|
|
|
|
|
|
|
|
|
|
// Check the reason for cascaded skips
|
|
|
|
|
if let Some(StepExecutionResult::Skipped { reason: SkipReason::DependencyFailed { failed_step_id } }) = &plan.steps[1].execution_result {
|
|
|
|
|
assert_eq!(failed_step_id, "a");
|
|
|
|
|
} else {
|
|
|
|
|
panic!("Expected DependencyFailed skip reason");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_execution_counts() {
|
|
|
|
|
let mut plan = AccumulatedPlan::new();
|
|
|
|
|
|
|
|
|
|
let mut step_a = PlanStep::new("a".into(), 1, "read".into(), json!({}));
|
|
|
|
|
step_a.approved = Some(true);
|
|
|
|
|
step_a.execution_result = Some(StepExecutionResult::Success {
|
|
|
|
|
output: "ok".into(),
|
|
|
|
|
duration_ms: 100,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let mut step_b = PlanStep::new("b".into(), 1, "edit".into(), json!({}));
|
|
|
|
|
step_b.approved = Some(true);
|
|
|
|
|
step_b.execution_result = Some(StepExecutionResult::Failed {
|
|
|
|
|
error: "oops".into(),
|
|
|
|
|
after_retry: true,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let mut step_c = PlanStep::new("c".into(), 1, "write".into(), json!({}));
|
|
|
|
|
step_c.approved = Some(true);
|
|
|
|
|
step_c.execution_result = Some(StepExecutionResult::Skipped {
|
|
|
|
|
reason: SkipReason::UserSkipped,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
plan.add_step(step_a);
|
|
|
|
|
plan.add_step(step_b);
|
|
|
|
|
plan.add_step(step_c);
|
|
|
|
|
|
|
|
|
|
let (approved, succeeded, failed, skipped) = plan.execution_counts();
|
|
|
|
|
assert_eq!(approved, 3);
|
|
|
|
|
assert_eq!(succeeded, 1);
|
|
|
|
|
assert_eq!(failed, 1);
|
|
|
|
|
assert_eq!(skipped, 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|