44d0bdc032
Adds internal/provider/subprocess — a provider.Provider that spawns CLI agents (claude, gemini, vibe) as subprocesses and streams their output. - FormatParser interface + three parsers for claude-stream-json, gemini-stream-json, and vibe-streaming formats; fixtures captured from real binaries - subprocessStream: pull-based stream.Stream over subprocess stdout with bounded stderr capture (8KB) and guarded reap() to prevent double-Wait - DiscoverCLIAgents: parallel PATH scan with 10s timeout, stable ordering - Provider: only the last user message is passed as --prompt; all other request fields (history, tools, system prompt) are intentionally ignored (see package doc) - main.go: discover and register CLI arms at startup; TODO(P0c) for tier-based routing to enforce preference order explicitly
159 lines
3.5 KiB
Go
159 lines
3.5 KiB
Go
package subprocess
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os/exec"
|
|
"strings"
|
|
|
|
"somegit.dev/Owlibou/gnoma/internal/stream"
|
|
)
|
|
|
|
// subprocessStream implements stream.Stream by reading line-delimited JSON
|
|
// from a subprocess stdout and converting lines via a FormatParser.
|
|
type subprocessStream struct {
|
|
cmd *exec.Cmd
|
|
stdout io.ReadCloser
|
|
stderrBuf *bytes.Buffer
|
|
scanner *bufio.Scanner
|
|
parser FormatParser
|
|
pending []stream.Event
|
|
current stream.Event
|
|
err error
|
|
done bool
|
|
waited bool
|
|
}
|
|
|
|
// newSubprocessStream starts cmd, attaches a stdout pipe, and returns the stream.
|
|
// The caller must call Close() to release resources.
|
|
func newSubprocessStream(ctx context.Context, cmd *exec.Cmd, parser FormatParser) (*subprocessStream, error) {
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("subprocess: stdout pipe: %w", err)
|
|
}
|
|
|
|
// Capture stderr for error messages; bounded to 8KB.
|
|
stderrBuf := &bytes.Buffer{}
|
|
cmd.Stderr = &limitedWriter{w: stderrBuf, n: 8192}
|
|
|
|
// Explicitly close stdin so the subprocess doesn't block waiting for input.
|
|
cmd.Stdin = nil
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
return nil, fmt.Errorf("subprocess: start: %w", err)
|
|
}
|
|
|
|
_ = ctx // context cancellation is handled by exec.CommandContext in the caller
|
|
scanner := bufio.NewScanner(stdout)
|
|
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 1MB line buffer
|
|
|
|
return &subprocessStream{
|
|
cmd: cmd,
|
|
stdout: stdout,
|
|
stderrBuf: stderrBuf,
|
|
scanner: scanner,
|
|
parser: parser,
|
|
}, nil
|
|
}
|
|
|
|
func (s *subprocessStream) Next() bool {
|
|
if s.done || s.err != nil {
|
|
return false
|
|
}
|
|
|
|
for {
|
|
// Drain buffered events first.
|
|
if len(s.pending) > 0 {
|
|
s.current = s.pending[0]
|
|
s.pending = s.pending[1:]
|
|
return true
|
|
}
|
|
|
|
// Read next line from subprocess stdout.
|
|
if !s.scanner.Scan() {
|
|
// EOF — process has exited (or pipe closed).
|
|
if err := s.scanner.Err(); err != nil {
|
|
s.err = err
|
|
return false
|
|
}
|
|
// Emit final events from parser.
|
|
final := s.parser.Done()
|
|
if len(final) > 0 {
|
|
s.pending = final
|
|
}
|
|
// Wait for process to exit and surface any non-zero exit code.
|
|
s.reap()
|
|
s.done = true
|
|
if len(s.pending) > 0 {
|
|
s.current = s.pending[0]
|
|
s.pending = s.pending[1:]
|
|
return s.err == nil
|
|
}
|
|
return false
|
|
}
|
|
|
|
line := s.scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
evts, err := s.parser.ParseLine(line)
|
|
if err != nil {
|
|
// Non-fatal parse error: skip the line but continue.
|
|
continue
|
|
}
|
|
s.pending = append(s.pending, evts...)
|
|
}
|
|
}
|
|
|
|
func (s *subprocessStream) Current() stream.Event { return s.current }
|
|
func (s *subprocessStream) Err() error { return s.err }
|
|
|
|
func (s *subprocessStream) Close() error {
|
|
if s.cmd.Process != nil {
|
|
_ = s.cmd.Process.Kill()
|
|
}
|
|
_ = s.stdout.Close()
|
|
s.reap()
|
|
return nil
|
|
}
|
|
|
|
// reap waits for the process exactly once. Non-zero exit is stored as stream error.
|
|
func (s *subprocessStream) reap() {
|
|
if s.waited {
|
|
return
|
|
}
|
|
s.waited = true
|
|
if err := s.cmd.Wait(); err != nil {
|
|
if s.err == nil {
|
|
msg := strings.TrimSpace(s.stderrBuf.String())
|
|
if msg != "" {
|
|
s.err = fmt.Errorf("subprocess: %w: %s", err, msg)
|
|
} else {
|
|
s.err = fmt.Errorf("subprocess: %w", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// limitedWriter is a writer that stops writing after n bytes.
|
|
type limitedWriter struct {
|
|
w io.Writer
|
|
n int
|
|
}
|
|
|
|
func (lw *limitedWriter) Write(p []byte) (int, error) {
|
|
if lw.n <= 0 {
|
|
return len(p), nil // silently discard
|
|
}
|
|
if len(p) > lw.n {
|
|
p = p[:lw.n]
|
|
}
|
|
n, err := lw.w.Write(p)
|
|
lw.n -= n
|
|
return n, err
|
|
}
|