Files
gnoma/internal/provider/subprocess/stream.go
T
vikingowl 44d0bdc032 feat(provider): subprocess CLI provider for claude, gemini, vibe
Adds internal/provider/subprocess — a provider.Provider that spawns CLI
agents (claude, gemini, vibe) as subprocesses and streams their output.

- FormatParser interface + three parsers for claude-stream-json,
  gemini-stream-json, and vibe-streaming formats; fixtures captured from
  real binaries
- subprocessStream: pull-based stream.Stream over subprocess stdout with
  bounded stderr capture (8KB) and guarded reap() to prevent double-Wait
- DiscoverCLIAgents: parallel PATH scan with 10s timeout, stable ordering
- Provider: only the last user message is passed as --prompt; all other
  request fields (history, tools, system prompt) are intentionally ignored
  (see package doc)
- main.go: discover and register CLI arms at startup; TODO(P0c) for
  tier-based routing to enforce preference order explicitly
2026-05-07 14:29:34 +02:00

159 lines
3.5 KiB
Go

package subprocess
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os/exec"
"strings"
"somegit.dev/Owlibou/gnoma/internal/stream"
)
// subprocessStream implements stream.Stream by reading line-delimited JSON
// from a subprocess stdout and converting lines via a FormatParser.
type subprocessStream struct {
cmd *exec.Cmd
stdout io.ReadCloser
stderrBuf *bytes.Buffer
scanner *bufio.Scanner
parser FormatParser
pending []stream.Event
current stream.Event
err error
done bool
waited bool
}
// newSubprocessStream starts cmd, attaches a stdout pipe, and returns the stream.
// The caller must call Close() to release resources.
func newSubprocessStream(ctx context.Context, cmd *exec.Cmd, parser FormatParser) (*subprocessStream, error) {
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("subprocess: stdout pipe: %w", err)
}
// Capture stderr for error messages; bounded to 8KB.
stderrBuf := &bytes.Buffer{}
cmd.Stderr = &limitedWriter{w: stderrBuf, n: 8192}
// Explicitly close stdin so the subprocess doesn't block waiting for input.
cmd.Stdin = nil
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("subprocess: start: %w", err)
}
_ = ctx // context cancellation is handled by exec.CommandContext in the caller
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB line buffer
return &subprocessStream{
cmd: cmd,
stdout: stdout,
stderrBuf: stderrBuf,
scanner: scanner,
parser: parser,
}, nil
}
func (s *subprocessStream) Next() bool {
if s.done || s.err != nil {
return false
}
for {
// Drain buffered events first.
if len(s.pending) > 0 {
s.current = s.pending[0]
s.pending = s.pending[1:]
return true
}
// Read next line from subprocess stdout.
if !s.scanner.Scan() {
// EOF — process has exited (or pipe closed).
if err := s.scanner.Err(); err != nil {
s.err = err
return false
}
// Emit final events from parser.
final := s.parser.Done()
if len(final) > 0 {
s.pending = final
}
// Wait for process to exit and surface any non-zero exit code.
s.reap()
s.done = true
if len(s.pending) > 0 {
s.current = s.pending[0]
s.pending = s.pending[1:]
return s.err == nil
}
return false
}
line := s.scanner.Bytes()
if len(line) == 0 {
continue
}
evts, err := s.parser.ParseLine(line)
if err != nil {
// Non-fatal parse error: skip the line but continue.
continue
}
s.pending = append(s.pending, evts...)
}
}
func (s *subprocessStream) Current() stream.Event { return s.current }
func (s *subprocessStream) Err() error { return s.err }
func (s *subprocessStream) Close() error {
if s.cmd.Process != nil {
_ = s.cmd.Process.Kill()
}
_ = s.stdout.Close()
s.reap()
return nil
}
// reap waits for the process exactly once. Non-zero exit is stored as stream error.
func (s *subprocessStream) reap() {
if s.waited {
return
}
s.waited = true
if err := s.cmd.Wait(); err != nil {
if s.err == nil {
msg := strings.TrimSpace(s.stderrBuf.String())
if msg != "" {
s.err = fmt.Errorf("subprocess: %w: %s", err, msg)
} else {
s.err = fmt.Errorf("subprocess: %w", err)
}
}
}
}
// limitedWriter is a writer that stops writing after n bytes.
type limitedWriter struct {
w io.Writer
n int
}
func (lw *limitedWriter) Write(p []byte) (int, error) {
if lw.n <= 0 {
return len(p), nil // silently discard
}
if len(p) > lw.n {
p = p[:lw.n]
}
n, err := lw.w.Write(p)
lw.n -= n
return n, err
}