From ace9b5f273da0ebad2820ba5c6d2a09ac79be5f4 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Fri, 3 Apr 2026 21:03:51 +0200 Subject: [PATCH] feat: spawn_elfs batch tool for guaranteed parallel elf execution New spawn_elfs tool takes array of tasks, spawns all elfs simultaneously. Solves the problem of models (Mistral Small, Devstral) that serialize tool calls instead of batching them. Schema: {"tasks": [{"prompt": "...", "task_type": "..."}], "max_turns": 30} Also: - Suppress spawn_elfs tool output from chat (tree handles display) - Update M7 milestones to reflect completed deliverables - Add CC-inspired features to M8/M10: task notification system, task framework, /batch skill, coordinator mode, StreamingToolExecutor, git worktree isolation --- cmd/gnoma/main.go | 3 + docs/essentials/milestones.md | 26 ++- internal/tool/agent/batch.go | 293 ++++++++++++++++++++++++++++++++++ internal/tui/app.go | 2 +- 4 files changed, 315 insertions(+), 9 deletions(-) create mode 100644 internal/tool/agent/batch.go diff --git a/cmd/gnoma/main.go b/cmd/gnoma/main.go index e0a03ff..c2779b8 100644 --- a/cmd/gnoma/main.go +++ b/cmd/gnoma/main.go @@ -194,6 +194,9 @@ func main() { agentTool := agent.New(elfMgr) agentTool.SetProgressCh(elfProgressCh) reg.Register(agentTool) + batchTool := agent.NewBatch(elfMgr) + batchTool.SetProgressCh(elfProgressCh) + reg.Register(batchTool) // Create firewall fw := security.NewFirewall(security.FirewallConfig{ diff --git a/docs/essentials/milestones.md b/docs/essentials/milestones.md index 4303de2..63ad73d 100644 --- a/docs/essentials/milestones.md +++ b/docs/essentials/milestones.md @@ -150,15 +150,19 @@ depends_on: [vision] **Deliverables:** -- [ ] Elf interface + SyncElf + BackgroundElf implementations -- [ ] ElfManager: spawn, monitor, cancel, collect results -- [ ] Router-integrated spawning (`router.Select()` picks arm per elf) -- [ ] Parent ↔ elf communication via typed channels -- [ ] Concurrent tool execution (read-only parallel via errgroup, writes serial) +- [x] Elf interface + BackgroundElf implementation +- [x] ElfManager: spawn, monitor, cancel, collect results +- [x] Router-integrated spawning (`router.Select()` picks arm per elf) +- [x] Parent ↔ elf communication via typed channels (elf.Progress) +- [x] Concurrent tool execution (read-only parallel via WaitGroup, writes serial) +- [x] `agent` tool: single elf spawn with tree progress view +- [x] `spawn_elfs` tool: batch N elfs in one call, all run in parallel +- [x] CC-style tree view: ├─/└─ branches, tool uses, tokens, activity, Done(duration) +- [x] Elf output truncated to 2000 chars for parent context protection - [ ] Elf results feed back to router as quality signals - [ ] Coordinator mode: orchestrator dispatches to worker elfs -**Exit criteria:** Parent spawns 3 background elfs on different providers (chosen by router), collects and synthesizes results. +**Exit criteria:** Parent spawns 3 elfs via `spawn_elfs`, all run in parallel (chosen by router), tree shows live progress, results synthesized. ## M8: Extensibility @@ -175,8 +179,10 @@ depends_on: [vision] - [ ] MCP tool naming: `mcp__{server}__{tool}` - [ ] MCP tool replaceability: `replace_default` config swaps built-in tools - [ ] Plugin system: plugin.json manifest, install/enable/disable lifecycle +- [ ] `/batch` skill: decompose work into N units, spawn all via `spawn_elfs`, track progress (CC-inspired) +- [ ] Coordinator mode prompt: fan-out guidance for parallel elf dispatch, concurrency rules (read vs write) -**Exit criteria:** MCP tools appear in gnoma. `replace_default` swaps built-ins. Skills invocable. Hooks fire on tool use. +**Exit criteria:** MCP tools appear in gnoma. `replace_default` swaps built-ins. Skills invocable. Hooks fire on tool use. `/batch` decomposes and parallelizes work. ## M9: Router Advanced @@ -207,8 +213,12 @@ depends_on: [vision] - [ ] Incognito enforcement: sessions NOT persisted - [ ] Serve mode: Unix socket listener, spawn session goroutine per client - [ ] Coordinator mode: orchestrator dispatches to restricted worker elfs +- [ ] Task framework: registered tasks with lifecycle (pending/running/completed/failed), abort controllers (CC-inspired AppState.tasks) +- [ ] Task notification system: completed background elfs inject `` messages into parent conversation (CC-inspired) +- [ ] StreamingToolExecutor: concurrent-safe tool classification, sibling abort on failure (CC-inspired) +- [ ] Git worktree isolation: `isolation: "worktree"` gives each elf a separate working copy (CC-inspired) -**Exit criteria:** Resume yesterday's conversation. External client connects via serve mode. +**Exit criteria:** Resume yesterday's conversation. External client connects via serve mode. Task notifications flow from background elfs to parent. ## M11: Task Learning diff --git a/internal/tool/agent/batch.go b/internal/tool/agent/batch.go new file mode 100644 index 0000000..d5ea687 --- /dev/null +++ b/internal/tool/agent/batch.go @@ -0,0 +1,293 @@ +package agent + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "somegit.dev/Owlibou/gnoma/internal/elf" + "somegit.dev/Owlibou/gnoma/internal/stream" + "somegit.dev/Owlibou/gnoma/internal/tool" +) + +var batchSchema = json.RawMessage(`{ + "type": "object", + "properties": { + "tasks": { + "type": "array", + "description": "List of tasks to execute in parallel. Each elf gets its own conversation and tools.", + "items": { + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "The task prompt for the elf" + }, + "task_type": { + "type": "string", + "description": "Task type hint for provider routing", + "enum": ["generation", "review", "refactor", "debug", "explain", "planning"] + } + }, + "required": ["prompt"] + }, + "minItems": 1, + "maxItems": 10 + }, + "max_turns": { + "type": "integer", + "description": "Maximum tool-calling rounds per elf (default 30)" + } + }, + "required": ["tasks"] +}`) + +// BatchTool spawns multiple elfs in parallel from a single tool call. +type BatchTool struct { + manager *elf.Manager + progressCh chan<- elf.Progress +} + +func NewBatch(mgr *elf.Manager) *BatchTool { + return &BatchTool{manager: mgr} +} + +func (t *BatchTool) SetProgressCh(ch chan<- elf.Progress) { + t.progressCh = ch +} + +func (t *BatchTool) Name() string { return "spawn_elfs" } +func (t *BatchTool) Description() string { return "Spawn multiple elfs (sub-agents) in parallel. Use this when you need to run 2+ independent tasks concurrently. Each elf gets its own conversation and tools. All elfs run simultaneously and results are collected when all complete." } +func (t *BatchTool) Parameters() json.RawMessage { return batchSchema } +func (t *BatchTool) IsReadOnly() bool { return true } +func (t *BatchTool) IsDestructive() bool { return false } + +type batchArgs struct { + Tasks []batchTask `json:"tasks"` + MaxTurns int `json:"max_turns,omitempty"` +} + +type batchTask struct { + Prompt string `json:"prompt"` + TaskType string `json:"task_type,omitempty"` +} + +func (t *BatchTool) Execute(ctx context.Context, args json.RawMessage) (tool.Result, error) { + var a batchArgs + if err := json.Unmarshal(args, &a); err != nil { + return tool.Result{}, fmt.Errorf("spawn_elfs: invalid args: %w", err) + } + if len(a.Tasks) == 0 { + return tool.Result{}, fmt.Errorf("spawn_elfs: at least one task required") + } + if len(a.Tasks) > 10 { + return tool.Result{}, fmt.Errorf("spawn_elfs: max 10 tasks per batch") + } + + maxTurns := a.MaxTurns + if maxTurns <= 0 { + maxTurns = 30 + } + + systemPrompt := "You are an elf — a focused sub-agent of gnoma. Complete the given task thoroughly and concisely. Use tools as needed." + + // Spawn all elfs + type elfEntry struct { + elf elf.Elf + desc string + task batchTask + } + var elfs []elfEntry + + for _, task := range a.Tasks { + taskType := parseTaskType(task.TaskType) + e, err := t.manager.Spawn(ctx, taskType, task.Prompt, systemPrompt, maxTurns) + if err != nil { + // Clean up already-spawned elfs + for _, entry := range elfs { + entry.elf.Cancel() + } + return tool.Result{Output: fmt.Sprintf("Failed to spawn elf: %v", err)}, nil + } + + desc := task.Prompt + if len(desc) > 60 { + desc = desc[:60] + "…" + } + + elfs = append(elfs, elfEntry{elf: e, desc: desc, task: task}) + + // Send initial progress + t.sendProgress(elf.Progress{ + ElfID: e.ID(), + Description: desc, + Activity: "starting…", + }) + } + + // Wait for all elfs in parallel, forwarding progress + results := make([]elf.Result, len(elfs)) + var wg sync.WaitGroup + + for i, entry := range elfs { + wg.Add(1) + go func(idx int, e elfEntry) { + defer wg.Done() + + // Forward progress events + go t.drainEvents(e.elf, e.desc) + + // Wait with timeout + done := make(chan elf.Result, 1) + go func() { done <- e.elf.Wait() }() + + select { + case r := <-done: + results[idx] = r + case <-ctx.Done(): + e.elf.Cancel() + results[idx] = elf.Result{ + ID: e.elf.ID(), + Status: elf.StatusCancelled, + Error: ctx.Err(), + } + case <-time.After(5 * time.Minute): + e.elf.Cancel() + results[idx] = elf.Result{ + ID: e.elf.ID(), + Status: elf.StatusFailed, + Error: fmt.Errorf("timed out after 5 minutes"), + } + } + + // Send done progress + r := results[idx] + t.sendProgress(elf.Progress{ + ElfID: r.ID, + Description: e.desc, + Tokens: int(r.Usage.TotalTokens()), + Done: true, + Duration: r.Duration, + Error: errString(r.Error), + }) + }(i, entry) + } + + wg.Wait() + + // Build combined result + var b strings.Builder + fmt.Fprintf(&b, "%d elfs completed\n\n", len(results)) + + for i, r := range results { + fmt.Fprintf(&b, "--- Elf %d: %s (%s, %s) ---\n", + i+1, elfs[i].desc, + r.Status, r.Duration.Round(time.Millisecond), + ) + if r.Error != nil { + fmt.Fprintf(&b, "Error: %v\n", r.Error) + } + if r.Output != "" { + output := r.Output + const maxOutputChars = 2000 + if len(output) > maxOutputChars { + output = output[:maxOutputChars] + fmt.Sprintf("\n\n[truncated — full output was %d chars]", len(r.Output)) + } + b.WriteString(output) + } + b.WriteString("\n\n") + } + + return tool.Result{ + Output: b.String(), + Metadata: map[string]any{ + "elf_count": len(results), + "total_ms": totalDuration(results).Milliseconds(), + }, + }, nil +} + +func (t *BatchTool) drainEvents(e elf.Elf, desc string) { + toolUses := 0 + tokens := 0 + lastSend := time.Now() + textChars := 0 + + for evt := range e.Events() { + if t.progressCh == nil { + continue + } + + p := elf.Progress{ + ElfID: e.ID(), + Description: desc, + ToolUses: toolUses, + Tokens: tokens, + } + + switch evt.Type { + case stream.EventTextDelta: + textChars += len(evt.Text) + if time.Since(lastSend) < 500*time.Millisecond { + continue + } + p.Activity = fmt.Sprintf("generating… (%d chars)", textChars) + case stream.EventToolCallDone: + name := evt.ToolCallName + if name == "" { + name = "tool" + } + p.Activity = fmt.Sprintf("⚙ [%s] running…", name) + case stream.EventToolResult: + toolUses++ + p.ToolUses = toolUses + out := evt.ToolOutput + if len(out) > 60 { + out = out[:60] + "…" + } + out = strings.ReplaceAll(out, "\n", " ") + p.Activity = fmt.Sprintf("→ %s", out) + case stream.EventUsage: + if evt.Usage != nil { + tokens = int(evt.Usage.TotalTokens()) + p.Tokens = tokens + } + continue + default: + continue + } + + lastSend = time.Now() + t.sendProgress(p) + } +} + +func (t *BatchTool) sendProgress(p elf.Progress) { + if t.progressCh == nil { + return + } + select { + case t.progressCh <- p: + default: + } +} + +func errString(err error) string { + if err == nil { + return "" + } + return err.Error() +} + +func totalDuration(results []elf.Result) time.Duration { + var max time.Duration + for _, r := range results { + if r.Duration > max { + max = r.Duration + } + } + return max +} diff --git a/internal/tui/app.go b/internal/tui/app.go index 45b6bd6..2fff1cc 100644 --- a/internal/tui/app.go +++ b/internal/tui/app.go @@ -498,7 +498,7 @@ func (m Model) handleStreamEvent(evt stream.Event) (tea.Model, tea.Cmd) { m.streamBuf.Reset() } case stream.EventToolCallDone: - if evt.ToolCallName == "agent" { + if evt.ToolCallName == "agent" || evt.ToolCallName == "spawn_elfs" { // Suppress tool message — elf tree view handles display m.elfToolActive = true } else {