From 5e781c0e04aa85a796c112cb9b07f56aae4fe425 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Sun, 28 Dec 2025 07:42:44 +0100 Subject: [PATCH] feat: implement lightweight agent with gRPC and mTLS support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Agent Package (internal/agent/): - Agent struct with all collectors and memory-efficient pooling - Run loop with configurable collection interval - Graceful shutdown with context cancellation - Auto-reconnection callback for re-registration gRPC Client (internal/agent/client.go): - mTLS support with CA, agent cert, and key - Bidirectional streaming for metrics - Heartbeat fallback when streaming fails - Exponential backoff with jitter for reconnection - Concurrent reconnection handling with mutex Protocol Buffers (proto/tyto.proto): - AgentService with Stream, Register, Heartbeat RPCs - MetricsReport with summary fields for aggregation - ConfigUpdate and Command messages for server control - RegisterStatus enum for registration workflow CLI Integration (cmd/tyto/main.go): - Full agent subcommand with flag parsing - Support for --id, --server, --interval, --ca-cert, etc. - Environment variable overrides (TYTO_AGENT_*) - Signal handling for graceful shutdown Build System (Makefile): - Cross-compilation for linux/amd64, arm64, armv7 - Stripped binaries with version info - Proto generation target - Test and coverage targets Config Updates: - DefaultConfig() and LoadFromPath() functions - Agent config properly parsed from YAML 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .gitignore | 1 + backend/Makefile | 149 +++ backend/cmd/tyto/main.go | 152 ++- backend/go.mod | 12 +- backend/go.sum | 32 +- backend/internal/agent/agent.go | 262 ++++++ backend/internal/agent/client.go | 402 ++++++++ backend/internal/config/config.go | 57 ++ backend/internal/proto/tyto.pb.go | 1173 ++++++++++++++++++++++++ backend/internal/proto/tyto_grpc.pb.go | 202 ++++ backend/proto/tyto.proto | 129 +++ 11 files changed, 2551 insertions(+), 20 deletions(-) create mode 100644 backend/Makefile create mode 100644 backend/internal/agent/agent.go create mode 100644 backend/internal/agent/client.go create mode 100644 backend/internal/proto/tyto.pb.go create mode 100644 backend/internal/proto/tyto_grpc.pb.go create mode 100644 backend/proto/tyto.proto diff --git a/.gitignore b/.gitignore index 5467b7d..7039997 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ node_modules/ frontend/build/ frontend/.svelte-kit/ backend/server +backend/build/ # Environment .env diff --git a/backend/Makefile b/backend/Makefile new file mode 100644 index 0000000..317398a --- /dev/null +++ b/backend/Makefile @@ -0,0 +1,149 @@ +# Tyto Backend Makefile +# Supports cross-compilation for multiple architectures + +VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev") +BUILD_TIME := $(shell date -u +"%Y-%m-%dT%H:%M:%SZ") +LDFLAGS := -ldflags="-s -w -X main.version=$(VERSION) -X main.buildTime=$(BUILD_TIME)" + +# Output directories +BUILD_DIR := build +PROTO_DIR := proto + +# Default target +.PHONY: all +all: build + +# Build for current platform +.PHONY: build +build: + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto ./cmd/tyto + go build $(LDFLAGS) -o $(BUILD_DIR)/server ./cmd/server + +# Build server binary only +.PHONY: server +server: + go build $(LDFLAGS) -o $(BUILD_DIR)/server ./cmd/server + +# Build tyto CLI (includes agent) +.PHONY: tyto +tyto: + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto ./cmd/tyto + +# Build lightweight agent for Linux amd64 +.PHONY: agent-linux-amd64 +agent-linux-amd64: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-agent-linux-amd64 ./cmd/tyto + +# Build lightweight agent for Linux arm64 +.PHONY: agent-linux-arm64 +agent-linux-arm64: + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-agent-linux-arm64 ./cmd/tyto + +# Build lightweight agent for Linux arm (32-bit, e.g., Raspberry Pi) +.PHONY: agent-linux-arm +agent-linux-arm: + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-agent-linux-armv7 ./cmd/tyto + +# Build all agent variants +.PHONY: agents +agents: agent-linux-amd64 agent-linux-arm64 agent-linux-arm + +# Build all binaries for all platforms +.PHONY: release +release: clean + @mkdir -p $(BUILD_DIR) + # Linux amd64 + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-linux-amd64 ./cmd/tyto + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-server-linux-amd64 ./cmd/server + # Linux arm64 + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-linux-arm64 ./cmd/tyto + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-server-linux-arm64 ./cmd/server + # Linux arm (32-bit) + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 \ + go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-linux-armv7 ./cmd/tyto + +# Generate protobuf code +.PHONY: proto +proto: + protoc --go_out=. --go-grpc_out=. $(PROTO_DIR)/tyto.proto + +# Run tests +.PHONY: test +test: + go test -v -race ./... + +# Run tests with coverage +.PHONY: test-coverage +test-coverage: + go test -v -race -coverprofile=coverage.out ./... + go tool cover -html=coverage.out -o coverage.html + +# Lint code +.PHONY: lint +lint: + golangci-lint run ./... + +# Format code +.PHONY: fmt +fmt: + go fmt ./... + goimports -w . + +# Clean build artifacts +.PHONY: clean +clean: + rm -rf $(BUILD_DIR) + rm -f coverage.out coverage.html + +# Install dependencies +.PHONY: deps +deps: + go mod download + go mod tidy + +# Show binary sizes +.PHONY: sizes +sizes: agents + @echo "Binary sizes:" + @ls -lh $(BUILD_DIR)/tyto-agent-* 2>/dev/null | awk '{print $$5, $$9}' + +# Docker build +.PHONY: docker +docker: + docker build -t tyto-backend:latest . + +# Help +.PHONY: help +help: + @echo "Tyto Backend Makefile" + @echo "" + @echo "Usage: make [target]" + @echo "" + @echo "Targets:" + @echo " build Build for current platform" + @echo " server Build server binary only" + @echo " tyto Build tyto CLI (includes agent)" + @echo " agents Build agent for all Linux architectures" + @echo " release Build all binaries for all platforms" + @echo " proto Generate protobuf code" + @echo " test Run tests" + @echo " test-coverage Run tests with coverage report" + @echo " lint Run linter" + @echo " fmt Format code" + @echo " clean Remove build artifacts" + @echo " deps Download and tidy dependencies" + @echo " sizes Show agent binary sizes" + @echo " docker Build Docker image" + @echo " help Show this help" + @echo "" + @echo "Agent targets:" + @echo " agent-linux-amd64 Build agent for Linux x86_64" + @echo " agent-linux-arm64 Build agent for Linux ARM64 (aarch64)" + @echo " agent-linux-arm Build agent for Linux ARM32 (armv7)" diff --git a/backend/cmd/tyto/main.go b/backend/cmd/tyto/main.go index 6a0bd9f..ceec6e4 100644 --- a/backend/cmd/tyto/main.go +++ b/backend/cmd/tyto/main.go @@ -3,8 +3,15 @@ package main import ( + "context" "fmt" "os" + "os/signal" + "syscall" + "time" + + "tyto/internal/agent" + "tyto/internal/config" ) const usage = `Tyto - System Monitoring @@ -82,13 +89,156 @@ Options: --ca-cert FILE CA certificate file --cert FILE Agent certificate file --key FILE Agent key file + --config FILE Config file path Environment: TYTO_AGENT_ID Agent identifier TYTO_SERVER_URL Server URL + TYTO_AGENT_INTERVAL Collection interval `) return } - fmt.Println("Agent mode not implemented yet.") + // Parse flags + var ( + agentID string + serverURL string + interval string + caCert string + agentCert string + agentKey string + configFile string + ) + + for i := 0; i < len(args); i++ { + switch args[i] { + case "--id": + if i+1 < len(args) { + agentID = args[i+1] + i++ + } + case "--server": + if i+1 < len(args) { + serverURL = args[i+1] + i++ + } + case "--interval": + if i+1 < len(args) { + interval = args[i+1] + i++ + } + case "--ca-cert": + if i+1 < len(args) { + caCert = args[i+1] + i++ + } + case "--cert": + if i+1 < len(args) { + agentCert = args[i+1] + i++ + } + case "--key": + if i+1 < len(args) { + agentKey = args[i+1] + i++ + } + case "--config": + if i+1 < len(args) { + configFile = args[i+1] + i++ + } + } + } + + // Load configuration + cfg := loadAgentConfig(configFile, agentID, serverURL, interval, caCert, agentCert, agentKey) + + // Validate required fields + if cfg.Agent.ID == "" { + fmt.Fprintln(os.Stderr, "Error: Agent ID is required (--id or TYTO_AGENT_ID)") + os.Exit(1) + } + if cfg.Agent.ServerURL == "" { + fmt.Fprintln(os.Stderr, "Error: Server URL is required (--server or TYTO_SERVER_URL)") + os.Exit(1) + } + + fmt.Printf("Starting Tyto agent '%s'...\n", cfg.Agent.ID) + fmt.Printf("Connecting to server: %s\n", cfg.Agent.ServerURL) + fmt.Printf("Collection interval: %s\n", cfg.Agent.Interval) + + // Create and run agent + a := agent.New(cfg) + + // Handle graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigCh + fmt.Println("\nReceived shutdown signal, stopping agent...") + a.Stop() + cancel() + }() + + if err := a.Run(ctx); err != nil && err != context.Canceled { + fmt.Fprintf(os.Stderr, "Agent error: %v\n", err) + os.Exit(1) + } + + fmt.Println("Agent stopped") +} + +// loadAgentConfig builds configuration from file, flags, and environment. +func loadAgentConfig(configFile, agentID, serverURL, interval, caCert, agentCert, agentKey string) *config.Config { + // Start with defaults + cfg := config.DefaultConfig() + cfg.Mode = config.ModeAgent + + // Load from file if specified + if configFile != "" { + if fileCfg, err := config.LoadFromPath(configFile); err == nil { + cfg = fileCfg + } + } + + // Override from environment variables + if v := os.Getenv("TYTO_AGENT_ID"); v != "" && agentID == "" { + agentID = v + } + if v := os.Getenv("TYTO_SERVER_URL"); v != "" && serverURL == "" { + serverURL = v + } + if v := os.Getenv("TYTO_AGENT_INTERVAL"); v != "" && interval == "" { + interval = v + } + + // Override from flags + if agentID != "" { + cfg.Agent.ID = agentID + } + if serverURL != "" { + cfg.Agent.ServerURL = serverURL + } + if interval != "" { + if d, err := time.ParseDuration(interval); err == nil { + cfg.Agent.Interval = d + } + } + + // TLS configuration + if caCert != "" { + cfg.Agent.TLS.CACert = caCert + } + if agentCert != "" { + cfg.Agent.TLS.AgentCert = agentCert + } + if agentKey != "" { + cfg.Agent.TLS.AgentKey = agentKey + } + + return cfg } diff --git a/backend/go.mod b/backend/go.mod index 9244738..3d4f527 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -8,6 +8,8 @@ require github.com/gin-contrib/cors v1.7.2 require ( github.com/godbus/dbus/v5 v5.1.0 + google.golang.org/grpc v1.68.0 + google.golang.org/protobuf v1.35.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -33,9 +35,9 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect golang.org/x/arch v0.8.0 // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect ) diff --git a/backend/go.sum b/backend/go.sum index 921a20e..0be6411 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -30,8 +30,10 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= @@ -77,20 +79,22 @@ github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZ golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= +google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/backend/internal/agent/agent.go b/backend/internal/agent/agent.go new file mode 100644 index 0000000..2fde29f --- /dev/null +++ b/backend/internal/agent/agent.go @@ -0,0 +1,262 @@ +// Package agent implements a lightweight Tyto agent that collects metrics +// and reports them to a central server via gRPC. +package agent + +import ( + "context" + "encoding/json" + "log" + "sync" + "time" + + "tyto/internal/collectors" + "tyto/internal/collectors/gpu" + "tyto/internal/config" + "tyto/internal/models" + pb "tyto/internal/proto" +) + +// Agent collects metrics and reports to a central server. +type Agent struct { + config *config.Config + client *Client + + // Collectors + system *collectors.SystemCollector + cpu *collectors.CPUCollector + memory *collectors.MemoryCollector + disk *collectors.DiskCollector + network *collectors.NetworkCollector + process *collectors.ProcessCollector + temperature *collectors.TemperatureCollector + gpuManager *gpu.Manager + docker *collectors.DockerCollector + systemd *collectors.SystemdCollector + + // Pooling for memory efficiency + metricsPool sync.Pool + + // Control + stopCh chan struct{} + wg sync.WaitGroup +} + +// New creates a new agent with the given configuration. +func New(cfg *config.Config) *Agent { + a := &Agent{ + config: cfg, + stopCh: make(chan struct{}), + metricsPool: sync.Pool{ + New: func() interface{} { + return &models.AllMetrics{} + }, + }, + } + + // Initialize collectors + a.initCollectors() + + return a +} + +func (a *Agent) initCollectors() { + cfg := a.config + + a.system = collectors.NewSystemCollector(cfg.ProcPath) + a.cpu = collectors.NewCPUCollector(cfg.ProcPath, cfg.SysPath) + a.memory = collectors.NewMemoryCollector(cfg.ProcPath) + a.disk = collectors.NewDiskCollector(cfg.ProcPath, cfg.MtabPath) + a.network = collectors.NewNetworkCollector(cfg.ProcPath) + a.process = collectors.NewProcessCollector(cfg.ProcPath) + a.temperature = collectors.NewTemperatureCollector(cfg.SysPath) + a.gpuManager = gpu.NewManager(cfg.SysPath) + a.docker = collectors.NewDockerCollector(cfg.DockerSock) + a.systemd = collectors.NewSystemdCollector() +} + +// Run starts the agent's main loop. +func (a *Agent) Run(ctx context.Context) error { + log.Printf("Agent %s starting...", a.config.Agent.ID) + + // Create gRPC client + client, err := NewClient(a.config) + if err != nil { + return err + } + a.client = client + + // Set up reconnection callback to re-register + a.client.SetOnReconnect(func() { + log.Println("Reconnected to server, re-registering...") + if err := a.register(context.Background()); err != nil { + log.Printf("Re-registration failed: %v", err) + } + }) + + // Connect to server with retry + if err := a.client.ConnectWithRetry(ctx); err != nil { + return err + } + defer a.client.Close() + + // Register with server + if err := a.register(ctx); err != nil { + log.Printf("Registration failed: %v", err) + // Continue anyway, server might accept unregistered agents + } + + // Start collection loop + return a.runLoop(ctx) +} + +func (a *Agent) register(ctx context.Context) error { + info := a.collectAgentInfo() + return a.client.Register(ctx, info) +} + +func (a *Agent) collectAgentInfo() *pb.AgentInfo { + sysInfo, _ := a.system.Collect() + + capabilities := []string{} + if a.gpuManager.Available() { + capabilities = append(capabilities, "gpu") + } + // Check docker availability + if dockerStats, err := a.docker.Collect(); err == nil && dockerStats.Available { + capabilities = append(capabilities, "docker") + } + // Check systemd availability + if systemdStats, err := a.systemd.Collect(); err == nil && systemdStats.Available { + capabilities = append(capabilities, "systemd") + } + + return &pb.AgentInfo{ + AgentId: a.config.Agent.ID, + Hostname: sysInfo.Hostname, + Os: sysInfo.OS, + Architecture: sysInfo.Architecture, + Version: "1.0.0", // TODO: Use build version + Capabilities: capabilities, + } +} + +func (a *Agent) runLoop(ctx context.Context) error { + ticker := time.NewTicker(a.config.Agent.Interval) + defer ticker.Stop() + + log.Printf("Starting collection loop (interval: %s)", a.config.Agent.Interval) + + for { + select { + case <-ctx.Done(): + log.Println("Agent stopping (context cancelled)") + return ctx.Err() + + case <-a.stopCh: + log.Println("Agent stopping (stop signal)") + return nil + + case <-ticker.C: + if err := a.collectAndSend(ctx); err != nil { + log.Printf("Collection/send error: %v", err) + // Don't return, keep trying + } + } + } +} + +func (a *Agent) collectAndSend(ctx context.Context) error { + // Get metrics struct from pool + metrics := a.metricsPool.Get().(*models.AllMetrics) + defer a.metricsPool.Put(metrics) + + // Reset metrics + *metrics = models.AllMetrics{ + Timestamp: time.Now(), + } + + // Collect all metrics + a.collect(metrics) + + // Send to server + err := a.client.SendMetrics(ctx, a.config.Agent.ID, metrics) + if err != nil { + // Check if we need to reconnect + if !a.client.IsConnected() { + log.Println("Connection lost, attempting reconnection...") + a.wg.Add(1) + go func() { + defer a.wg.Done() + if reconnErr := a.client.Reconnect(ctx); reconnErr != nil { + log.Printf("Reconnection failed: %v", reconnErr) + } + }() + } + return err + } + + return nil +} + +func (a *Agent) collect(m *models.AllMetrics) { + // System info + if sys, err := a.system.Collect(); err == nil { + m.System = sys + } + + // CPU + if cpu, err := a.cpu.Collect(); err == nil { + m.CPU = cpu + } + + // Memory + if mem, err := a.memory.Collect(); err == nil { + m.Memory = mem + } + + // Disk + if disk, err := a.disk.Collect(); err == nil { + m.Disk = disk + } + + // Network + if net, err := a.network.Collect(); err == nil { + m.Network = net + } + + // Processes + if proc, err := a.process.Collect(); err == nil { + m.Processes = proc + } + + // Temperature + if temp, err := a.temperature.Collect(); err == nil { + m.Temperature = temp + } + + // GPU + if gpuStats, err := a.gpuManager.Collect(); err == nil { + m.GPU = models.AMDGPUStatsFromGPUInfo(gpuStats) + } + + // Docker + if docker, err := a.docker.Collect(); err == nil { + m.Docker = docker + } + + // Systemd + if systemd, err := a.systemd.Collect(); err == nil { + m.Systemd = systemd + } +} + +// Stop signals the agent to stop and waits for cleanup. +func (a *Agent) Stop() { + close(a.stopCh) + a.wg.Wait() +} + +// SerializeMetrics converts metrics to JSON bytes. +func SerializeMetrics(m *models.AllMetrics) ([]byte, error) { + return json.Marshal(m) +} diff --git a/backend/internal/agent/client.go b/backend/internal/agent/client.go new file mode 100644 index 0000000..6b28b56 --- /dev/null +++ b/backend/internal/agent/client.go @@ -0,0 +1,402 @@ +package agent + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "log" + "os" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + + "tyto/internal/config" + "tyto/internal/models" + pb "tyto/internal/proto" +) + +// Reconnection parameters +const ( + initialBackoff = 1 * time.Second + maxBackoff = 60 * time.Second + backoffFactor = 2.0 + jitterFactor = 0.2 // 20% jitter +) + +// Client handles gRPC communication with the central server. +type Client struct { + config *config.Config + conn *grpc.ClientConn + client pb.AgentServiceClient + stream pb.AgentService_StreamClient + streamMu sync.Mutex + + // Reconnection state + connected bool + reconnecting bool + reconnectMu sync.Mutex + backoff time.Duration + reconnectCh chan struct{} + onReconnect func() // Callback when reconnected +} + +// NewClient creates a new gRPC client. +func NewClient(cfg *config.Config) (*Client, error) { + return &Client{ + config: cfg, + backoff: initialBackoff, + reconnectCh: make(chan struct{}), + }, nil +} + +// SetOnReconnect sets a callback to be called when the client reconnects. +func (c *Client) SetOnReconnect(fn func()) { + c.onReconnect = fn +} + +// Connect establishes a connection to the server. +func (c *Client) Connect(ctx context.Context) error { + opts, err := c.dialOptions() + if err != nil { + return fmt.Errorf("failed to create dial options: %w", err) + } + + log.Printf("Connecting to server: %s", c.config.Agent.ServerURL) + + conn, err := grpc.DialContext(ctx, c.config.Agent.ServerURL, opts...) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + c.conn = conn + c.client = pb.NewAgentServiceClient(conn) + c.connected = true + c.backoff = initialBackoff // Reset backoff on successful connection + + log.Println("Connected to server") + return nil +} + +// ConnectWithRetry attempts to connect with exponential backoff. +func (c *Client) ConnectWithRetry(ctx context.Context) error { + for { + err := c.Connect(ctx) + if err == nil { + return nil + } + + delay := c.nextBackoff() + log.Printf("Connection failed: %v. Retrying in %s...", err, delay) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + // Continue to next attempt + } + } +} + +// nextBackoff calculates the next backoff duration with jitter. +func (c *Client) nextBackoff() time.Duration { + c.reconnectMu.Lock() + defer c.reconnectMu.Unlock() + + // Add jitter: +/- 20% of current backoff + jitter := time.Duration(float64(c.backoff) * jitterFactor * (2*randFloat() - 1)) + delay := c.backoff + jitter + + // Increase backoff for next time + c.backoff = time.Duration(float64(c.backoff) * backoffFactor) + if c.backoff > maxBackoff { + c.backoff = maxBackoff + } + + return delay +} + +// randFloat returns a random float64 in [0, 1). +func randFloat() float64 { + return float64(time.Now().UnixNano()%1000) / 1000.0 +} + +// Reconnect attempts to reconnect after a disconnection. +func (c *Client) Reconnect(ctx context.Context) error { + c.reconnectMu.Lock() + if c.reconnecting { + c.reconnectMu.Unlock() + // Wait for ongoing reconnection + select { + case <-c.reconnectCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } + c.reconnecting = true + c.reconnectMu.Unlock() + + defer func() { + c.reconnectMu.Lock() + c.reconnecting = false + // Signal waiting goroutines + close(c.reconnectCh) + c.reconnectCh = make(chan struct{}) + c.reconnectMu.Unlock() + }() + + // Close existing connection + c.Close() + + // Reconnect with retry + if err := c.ConnectWithRetry(ctx); err != nil { + return err + } + + // Notify callback + if c.onReconnect != nil { + c.onReconnect() + } + + return nil +} + +func (c *Client) dialOptions() ([]grpc.DialOption, error) { + opts := []grpc.DialOption{ + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: true, + }), + } + + // Configure TLS if certificates are provided + agentCfg := c.config.Agent + if agentCfg.TLS.CACert != "" && agentCfg.TLS.AgentCert != "" { + tlsConfig, err := c.loadTLSConfig() + if err != nil { + return nil, err + } + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + } else { + // Insecure mode for development + log.Println("Warning: Running without TLS (insecure mode)") + opts = append(opts, grpc.WithInsecure()) + } + + return opts, nil +} + +func (c *Client) loadTLSConfig() (*tls.Config, error) { + agentCfg := c.config.Agent + + // Load CA certificate + caCert, err := os.ReadFile(agentCfg.TLS.CACert) + if err != nil { + return nil, fmt.Errorf("failed to read CA cert: %w", err) + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to parse CA cert") + } + + // Load agent certificate + cert, err := tls.LoadX509KeyPair(agentCfg.TLS.AgentCert, agentCfg.TLS.AgentKey) + if err != nil { + return nil, fmt.Errorf("failed to load agent cert: %w", err) + } + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + MinVersion: tls.VersionTLS12, + }, nil +} + +// Register sends a registration request to the server. +func (c *Client) Register(ctx context.Context, info *pb.AgentInfo) error { + req := &pb.RegisterRequest{ + AgentId: c.config.Agent.ID, + Info: info, + } + + resp, err := c.client.Register(ctx, req) + if err != nil { + return fmt.Errorf("registration failed: %w", err) + } + + switch resp.Status { + case pb.RegisterStatus_REGISTER_STATUS_ACCEPTED: + log.Println("Registration accepted") + case pb.RegisterStatus_REGISTER_STATUS_PENDING_APPROVAL: + log.Println("Registration pending approval") + case pb.RegisterStatus_REGISTER_STATUS_ALREADY_REGISTERED: + log.Println("Already registered") + case pb.RegisterStatus_REGISTER_STATUS_REJECTED: + return fmt.Errorf("registration rejected: %s", resp.Message) + } + + return nil +} + +// SendMetrics sends collected metrics to the server. +func (c *Client) SendMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error { + // Serialize metrics to JSON + metricsJSON, err := SerializeMetrics(metrics) + if err != nil { + return fmt.Errorf("failed to serialize metrics: %w", err) + } + + // Create metrics report + report := &pb.MetricsReport{ + AgentId: agentID, + TimestampMs: metrics.Timestamp.UnixMilli(), + MetricsJson: metricsJSON, + + // Summary metrics for quick aggregation + CpuUsage: metrics.CPU.TotalUsage, + MemoryPercent: float64(metrics.Memory.Used) / float64(metrics.Memory.Total) * 100, + GpuUtilization: int32(metrics.GPU.Utilization), + } + + // Calculate disk usage percent (max across mounts) + var maxDiskPercent float64 + for _, mount := range metrics.Disk.Mounts { + if mount.UsedPercent > maxDiskPercent { + maxDiskPercent = mount.UsedPercent + } + } + report.DiskPercent = maxDiskPercent + + // Try to use stream first, fall back to heartbeat + if err := c.sendViaStream(ctx, report); err != nil { + log.Printf("Stream send failed, using heartbeat: %v", err) + return c.sendViaHeartbeat(ctx, agentID) + } + + return nil +} + +func (c *Client) sendViaStream(ctx context.Context, report *pb.MetricsReport) error { + c.streamMu.Lock() + defer c.streamMu.Unlock() + + // Establish stream if not exists + if c.stream == nil { + stream, err := c.client.Stream(ctx) + if err != nil { + return fmt.Errorf("failed to create stream: %w", err) + } + c.stream = stream + + // Start goroutine to handle server messages + go c.handleServerMessages() + } + + // Send metrics via stream + msg := &pb.AgentMessage{ + Payload: &pb.AgentMessage_Metrics{ + Metrics: report, + }, + } + + if err := c.stream.Send(msg); err != nil { + c.stream = nil // Reset stream on error + return fmt.Errorf("failed to send metrics: %w", err) + } + + return nil +} + +func (c *Client) sendViaHeartbeat(ctx context.Context, agentID string) error { + req := &pb.HeartbeatRequest{ + AgentId: agentID, + TimestampMs: time.Now().UnixMilli(), + } + + _, err := c.client.Heartbeat(ctx, req) + return err +} + +func (c *Client) handleServerMessages() { + for { + c.streamMu.Lock() + stream := c.stream + c.streamMu.Unlock() + + if stream == nil { + return + } + + msg, err := stream.Recv() + if err != nil { + log.Printf("Stream receive error: %v", err) + c.streamMu.Lock() + c.stream = nil + c.streamMu.Unlock() + return + } + + c.handleMessage(msg) + } +} + +func (c *Client) handleMessage(msg *pb.ServerMessage) { + switch payload := msg.Payload.(type) { + case *pb.ServerMessage_Ack: + // Message acknowledged + if !payload.Ack.Success { + log.Printf("Server error: %s", payload.Ack.Error) + } + + case *pb.ServerMessage_Config: + log.Printf("Received config update: interval=%ds", + payload.Config.CollectionIntervalSeconds) + // TODO: Apply config update + + case *pb.ServerMessage_Command: + c.handleCommand(payload.Command) + } +} + +func (c *Client) handleCommand(cmd *pb.Command) { + switch cmd.Type { + case pb.CommandType_COMMAND_TYPE_COLLECT_NOW: + log.Println("Received collect-now command") + // TODO: Trigger immediate collection + + case pb.CommandType_COMMAND_TYPE_DISCONNECT: + log.Println("Received disconnect command") + c.Close() + + default: + log.Printf("Unknown command type: %v", cmd.Type) + } +} + +// Close closes the connection to the server. +func (c *Client) Close() error { + c.connected = false + + c.streamMu.Lock() + if c.stream != nil { + c.stream.CloseSend() + c.stream = nil + } + c.streamMu.Unlock() + + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// IsConnected returns true if connected to the server. +func (c *Client) IsConnected() bool { + return c.connected +} diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index c58e92a..e354b47 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -242,6 +242,63 @@ func Load() *Config { return cfg } +// DefaultConfig returns a config with default values. +func DefaultConfig() *Config { + return &Config{ + Mode: ModeStandalone, + Port: "8080", + RefreshSeconds: 5, + ProcPath: "/proc", + SysPath: "/sys", + MtabPath: "/etc/mtab", + DockerSock: "/var/run/docker.sock", + Alerts: AlertConfig{ + CPUThreshold: 90.0, + MemoryThreshold: 90.0, + DiskThreshold: 90.0, + TempThreshold: 80.0, + }, + Server: ServerConfig{ + GRPCPort: 9849, + Registration: RegistrationConfig{ + AutoEnabled: true, + RequireApproval: true, + }, + }, + Agent: AgentConfig{ + Interval: 5 * time.Second, + }, + Database: DatabaseConfig{ + Type: "sqlite", + SQLitePath: "/var/lib/tyto/tyto.db", + }, + RefreshInterval: 5 * time.Second, + } +} + +// LoadFromPath loads configuration from a specific file path. +func LoadFromPath(path string) (*Config, error) { + cfg := DefaultConfig() + + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + if err := yaml.Unmarshal(data, cfg); err != nil { + return nil, err + } + + // Parse intervals from seconds + cfg.RefreshInterval = time.Duration(cfg.RefreshSeconds) * time.Second + cfg.Agent.Interval = time.Duration(cfg.Agent.IntervalSeconds) * time.Second + if cfg.Agent.Interval == 0 { + cfg.Agent.Interval = 5 * time.Second + } + + return cfg, nil +} + // IsStandalone returns true if running in standalone mode. func (c *Config) IsStandalone() bool { return c.Mode == ModeStandalone || c.Mode == "" diff --git a/backend/internal/proto/tyto.pb.go b/backend/internal/proto/tyto.pb.go new file mode 100644 index 0000000..2bebbed --- /dev/null +++ b/backend/internal/proto/tyto.pb.go @@ -0,0 +1,1173 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.11 +// protoc v6.33.1 +// source: tyto.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type RegisterStatus int32 + +const ( + RegisterStatus_REGISTER_STATUS_UNKNOWN RegisterStatus = 0 + RegisterStatus_REGISTER_STATUS_ACCEPTED RegisterStatus = 1 + RegisterStatus_REGISTER_STATUS_PENDING_APPROVAL RegisterStatus = 2 + RegisterStatus_REGISTER_STATUS_REJECTED RegisterStatus = 3 + RegisterStatus_REGISTER_STATUS_ALREADY_REGISTERED RegisterStatus = 4 +) + +// Enum value maps for RegisterStatus. +var ( + RegisterStatus_name = map[int32]string{ + 0: "REGISTER_STATUS_UNKNOWN", + 1: "REGISTER_STATUS_ACCEPTED", + 2: "REGISTER_STATUS_PENDING_APPROVAL", + 3: "REGISTER_STATUS_REJECTED", + 4: "REGISTER_STATUS_ALREADY_REGISTERED", + } + RegisterStatus_value = map[string]int32{ + "REGISTER_STATUS_UNKNOWN": 0, + "REGISTER_STATUS_ACCEPTED": 1, + "REGISTER_STATUS_PENDING_APPROVAL": 2, + "REGISTER_STATUS_REJECTED": 3, + "REGISTER_STATUS_ALREADY_REGISTERED": 4, + } +) + +func (x RegisterStatus) Enum() *RegisterStatus { + p := new(RegisterStatus) + *p = x + return p +} + +func (x RegisterStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (RegisterStatus) Descriptor() protoreflect.EnumDescriptor { + return file_tyto_proto_enumTypes[0].Descriptor() +} + +func (RegisterStatus) Type() protoreflect.EnumType { + return &file_tyto_proto_enumTypes[0] +} + +func (x RegisterStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use RegisterStatus.Descriptor instead. +func (RegisterStatus) EnumDescriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{0} +} + +type CommandType int32 + +const ( + CommandType_COMMAND_TYPE_UNKNOWN CommandType = 0 + CommandType_COMMAND_TYPE_COLLECT_NOW CommandType = 1 // Force immediate collection + CommandType_COMMAND_TYPE_RESTART CommandType = 2 // Restart the agent + CommandType_COMMAND_TYPE_UPDATE_CONFIG CommandType = 3 // Apply new config + CommandType_COMMAND_TYPE_DISCONNECT CommandType = 4 // Graceful disconnect +) + +// Enum value maps for CommandType. +var ( + CommandType_name = map[int32]string{ + 0: "COMMAND_TYPE_UNKNOWN", + 1: "COMMAND_TYPE_COLLECT_NOW", + 2: "COMMAND_TYPE_RESTART", + 3: "COMMAND_TYPE_UPDATE_CONFIG", + 4: "COMMAND_TYPE_DISCONNECT", + } + CommandType_value = map[string]int32{ + "COMMAND_TYPE_UNKNOWN": 0, + "COMMAND_TYPE_COLLECT_NOW": 1, + "COMMAND_TYPE_RESTART": 2, + "COMMAND_TYPE_UPDATE_CONFIG": 3, + "COMMAND_TYPE_DISCONNECT": 4, + } +) + +func (x CommandType) Enum() *CommandType { + p := new(CommandType) + *p = x + return p +} + +func (x CommandType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (CommandType) Descriptor() protoreflect.EnumDescriptor { + return file_tyto_proto_enumTypes[1].Descriptor() +} + +func (CommandType) Type() protoreflect.EnumType { + return &file_tyto_proto_enumTypes[1] +} + +func (x CommandType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use CommandType.Descriptor instead. +func (CommandType) EnumDescriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{1} +} + +// AgentMessage is sent from agent to server. +type AgentMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Payload: + // + // *AgentMessage_Metrics + // *AgentMessage_Heartbeat + // *AgentMessage_Info + Payload isAgentMessage_Payload `protobuf_oneof:"payload"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentMessage) Reset() { + *x = AgentMessage{} + mi := &file_tyto_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentMessage) ProtoMessage() {} + +func (x *AgentMessage) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentMessage.ProtoReflect.Descriptor instead. +func (*AgentMessage) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{0} +} + +func (x *AgentMessage) GetPayload() isAgentMessage_Payload { + if x != nil { + return x.Payload + } + return nil +} + +func (x *AgentMessage) GetMetrics() *MetricsReport { + if x != nil { + if x, ok := x.Payload.(*AgentMessage_Metrics); ok { + return x.Metrics + } + } + return nil +} + +func (x *AgentMessage) GetHeartbeat() *HeartbeatRequest { + if x != nil { + if x, ok := x.Payload.(*AgentMessage_Heartbeat); ok { + return x.Heartbeat + } + } + return nil +} + +func (x *AgentMessage) GetInfo() *AgentInfo { + if x != nil { + if x, ok := x.Payload.(*AgentMessage_Info); ok { + return x.Info + } + } + return nil +} + +type isAgentMessage_Payload interface { + isAgentMessage_Payload() +} + +type AgentMessage_Metrics struct { + Metrics *MetricsReport `protobuf:"bytes,1,opt,name=metrics,proto3,oneof"` +} + +type AgentMessage_Heartbeat struct { + Heartbeat *HeartbeatRequest `protobuf:"bytes,2,opt,name=heartbeat,proto3,oneof"` +} + +type AgentMessage_Info struct { + Info *AgentInfo `protobuf:"bytes,3,opt,name=info,proto3,oneof"` +} + +func (*AgentMessage_Metrics) isAgentMessage_Payload() {} + +func (*AgentMessage_Heartbeat) isAgentMessage_Payload() {} + +func (*AgentMessage_Info) isAgentMessage_Payload() {} + +// ServerMessage is sent from server to agent. +type ServerMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Payload: + // + // *ServerMessage_Config + // *ServerMessage_Ack + // *ServerMessage_Command + Payload isServerMessage_Payload `protobuf_oneof:"payload"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ServerMessage) Reset() { + *x = ServerMessage{} + mi := &file_tyto_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ServerMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerMessage) ProtoMessage() {} + +func (x *ServerMessage) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerMessage.ProtoReflect.Descriptor instead. +func (*ServerMessage) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{1} +} + +func (x *ServerMessage) GetPayload() isServerMessage_Payload { + if x != nil { + return x.Payload + } + return nil +} + +func (x *ServerMessage) GetConfig() *ConfigUpdate { + if x != nil { + if x, ok := x.Payload.(*ServerMessage_Config); ok { + return x.Config + } + } + return nil +} + +func (x *ServerMessage) GetAck() *Ack { + if x != nil { + if x, ok := x.Payload.(*ServerMessage_Ack); ok { + return x.Ack + } + } + return nil +} + +func (x *ServerMessage) GetCommand() *Command { + if x != nil { + if x, ok := x.Payload.(*ServerMessage_Command); ok { + return x.Command + } + } + return nil +} + +type isServerMessage_Payload interface { + isServerMessage_Payload() +} + +type ServerMessage_Config struct { + Config *ConfigUpdate `protobuf:"bytes,1,opt,name=config,proto3,oneof"` +} + +type ServerMessage_Ack struct { + Ack *Ack `protobuf:"bytes,2,opt,name=ack,proto3,oneof"` +} + +type ServerMessage_Command struct { + Command *Command `protobuf:"bytes,3,opt,name=command,proto3,oneof"` +} + +func (*ServerMessage_Config) isServerMessage_Payload() {} + +func (*ServerMessage_Ack) isServerMessage_Payload() {} + +func (*ServerMessage_Command) isServerMessage_Payload() {} + +// MetricsReport contains collected metrics from an agent. +type MetricsReport struct { + state protoimpl.MessageState `protogen:"open.v1"` + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"` + MetricsJson []byte `protobuf:"bytes,3,opt,name=metrics_json,json=metricsJson,proto3" json:"metrics_json,omitempty"` // AllMetrics serialized as JSON + // Optional individual metrics for server-side aggregation + CpuUsage float64 `protobuf:"fixed64,4,opt,name=cpu_usage,json=cpuUsage,proto3" json:"cpu_usage,omitempty"` + MemoryPercent float64 `protobuf:"fixed64,5,opt,name=memory_percent,json=memoryPercent,proto3" json:"memory_percent,omitempty"` + DiskPercent float64 `protobuf:"fixed64,6,opt,name=disk_percent,json=diskPercent,proto3" json:"disk_percent,omitempty"` + GpuUtilization int32 `protobuf:"varint,7,opt,name=gpu_utilization,json=gpuUtilization,proto3" json:"gpu_utilization,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MetricsReport) Reset() { + *x = MetricsReport{} + mi := &file_tyto_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MetricsReport) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MetricsReport) ProtoMessage() {} + +func (x *MetricsReport) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MetricsReport.ProtoReflect.Descriptor instead. +func (*MetricsReport) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{2} +} + +func (x *MetricsReport) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +func (x *MetricsReport) GetTimestampMs() int64 { + if x != nil { + return x.TimestampMs + } + return 0 +} + +func (x *MetricsReport) GetMetricsJson() []byte { + if x != nil { + return x.MetricsJson + } + return nil +} + +func (x *MetricsReport) GetCpuUsage() float64 { + if x != nil { + return x.CpuUsage + } + return 0 +} + +func (x *MetricsReport) GetMemoryPercent() float64 { + if x != nil { + return x.MemoryPercent + } + return 0 +} + +func (x *MetricsReport) GetDiskPercent() float64 { + if x != nil { + return x.DiskPercent + } + return 0 +} + +func (x *MetricsReport) GetGpuUtilization() int32 { + if x != nil { + return x.GpuUtilization + } + return 0 +} + +// HeartbeatRequest is a lightweight keep-alive message. +type HeartbeatRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"` + UptimeSeconds int64 `protobuf:"varint,3,opt,name=uptime_seconds,json=uptimeSeconds,proto3" json:"uptime_seconds,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HeartbeatRequest) Reset() { + *x = HeartbeatRequest{} + mi := &file_tyto_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HeartbeatRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeartbeatRequest) ProtoMessage() {} + +func (x *HeartbeatRequest) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeartbeatRequest.ProtoReflect.Descriptor instead. +func (*HeartbeatRequest) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{3} +} + +func (x *HeartbeatRequest) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +func (x *HeartbeatRequest) GetTimestampMs() int64 { + if x != nil { + return x.TimestampMs + } + return 0 +} + +func (x *HeartbeatRequest) GetUptimeSeconds() int64 { + if x != nil { + return x.UptimeSeconds + } + return 0 +} + +// HeartbeatResponse acknowledges the heartbeat. +type HeartbeatResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + ServerTimeMs int64 `protobuf:"varint,1,opt,name=server_time_ms,json=serverTimeMs,proto3" json:"server_time_ms,omitempty"` + ConfigChanged bool `protobuf:"varint,2,opt,name=config_changed,json=configChanged,proto3" json:"config_changed,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *HeartbeatResponse) Reset() { + *x = HeartbeatResponse{} + mi := &file_tyto_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *HeartbeatResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeartbeatResponse) ProtoMessage() {} + +func (x *HeartbeatResponse) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeartbeatResponse.ProtoReflect.Descriptor instead. +func (*HeartbeatResponse) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{4} +} + +func (x *HeartbeatResponse) GetServerTimeMs() int64 { + if x != nil { + return x.ServerTimeMs + } + return 0 +} + +func (x *HeartbeatResponse) GetConfigChanged() bool { + if x != nil { + return x.ConfigChanged + } + return false +} + +// AgentInfo describes the agent's system. +type AgentInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` + Hostname string `protobuf:"bytes,2,opt,name=hostname,proto3" json:"hostname,omitempty"` + Os string `protobuf:"bytes,3,opt,name=os,proto3" json:"os,omitempty"` + Architecture string `protobuf:"bytes,4,opt,name=architecture,proto3" json:"architecture,omitempty"` + Version string `protobuf:"bytes,5,opt,name=version,proto3" json:"version,omitempty"` + Capabilities []string `protobuf:"bytes,6,rep,name=capabilities,proto3" json:"capabilities,omitempty"` // e.g., "gpu", "docker", "systemd" + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentInfo) Reset() { + *x = AgentInfo{} + mi := &file_tyto_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentInfo) ProtoMessage() {} + +func (x *AgentInfo) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentInfo.ProtoReflect.Descriptor instead. +func (*AgentInfo) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{5} +} + +func (x *AgentInfo) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +func (x *AgentInfo) GetHostname() string { + if x != nil { + return x.Hostname + } + return "" +} + +func (x *AgentInfo) GetOs() string { + if x != nil { + return x.Os + } + return "" +} + +func (x *AgentInfo) GetArchitecture() string { + if x != nil { + return x.Architecture + } + return "" +} + +func (x *AgentInfo) GetVersion() string { + if x != nil { + return x.Version + } + return "" +} + +func (x *AgentInfo) GetCapabilities() []string { + if x != nil { + return x.Capabilities + } + return nil +} + +// RegisterRequest is sent when an agent first connects. +type RegisterRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + AgentId string `protobuf:"bytes,1,opt,name=agent_id,json=agentId,proto3" json:"agent_id,omitempty"` + Info *AgentInfo `protobuf:"bytes,2,opt,name=info,proto3" json:"info,omitempty"` + Token string `protobuf:"bytes,3,opt,name=token,proto3" json:"token,omitempty"` // Optional registration token + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterRequest) Reset() { + *x = RegisterRequest{} + mi := &file_tyto_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterRequest) ProtoMessage() {} + +func (x *RegisterRequest) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead. +func (*RegisterRequest) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{6} +} + +func (x *RegisterRequest) GetAgentId() string { + if x != nil { + return x.AgentId + } + return "" +} + +func (x *RegisterRequest) GetInfo() *AgentInfo { + if x != nil { + return x.Info + } + return nil +} + +func (x *RegisterRequest) GetToken() string { + if x != nil { + return x.Token + } + return "" +} + +// RegisterResponse indicates if registration was accepted. +type RegisterResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Accepted bool `protobuf:"varint,1,opt,name=accepted,proto3" json:"accepted,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + Status RegisterStatus `protobuf:"varint,3,opt,name=status,proto3,enum=tyto.RegisterStatus" json:"status,omitempty"` + Config *AgentConfig `protobuf:"bytes,4,opt,name=config,proto3" json:"config,omitempty"` // Initial config if accepted + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterResponse) Reset() { + *x = RegisterResponse{} + mi := &file_tyto_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterResponse) ProtoMessage() {} + +func (x *RegisterResponse) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterResponse.ProtoReflect.Descriptor instead. +func (*RegisterResponse) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{7} +} + +func (x *RegisterResponse) GetAccepted() bool { + if x != nil { + return x.Accepted + } + return false +} + +func (x *RegisterResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +func (x *RegisterResponse) GetStatus() RegisterStatus { + if x != nil { + return x.Status + } + return RegisterStatus_REGISTER_STATUS_UNKNOWN +} + +func (x *RegisterResponse) GetConfig() *AgentConfig { + if x != nil { + return x.Config + } + return nil +} + +// ConfigUpdate pushes configuration changes to an agent. +type ConfigUpdate struct { + state protoimpl.MessageState `protogen:"open.v1"` + CollectionIntervalSeconds int32 `protobuf:"varint,1,opt,name=collection_interval_seconds,json=collectionIntervalSeconds,proto3" json:"collection_interval_seconds,omitempty"` + EnabledCollectors []string `protobuf:"bytes,2,rep,name=enabled_collectors,json=enabledCollectors,proto3" json:"enabled_collectors,omitempty"` + Settings map[string]string `protobuf:"bytes,3,rep,name=settings,proto3" json:"settings,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ConfigUpdate) Reset() { + *x = ConfigUpdate{} + mi := &file_tyto_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ConfigUpdate) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConfigUpdate) ProtoMessage() {} + +func (x *ConfigUpdate) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConfigUpdate.ProtoReflect.Descriptor instead. +func (*ConfigUpdate) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{8} +} + +func (x *ConfigUpdate) GetCollectionIntervalSeconds() int32 { + if x != nil { + return x.CollectionIntervalSeconds + } + return 0 +} + +func (x *ConfigUpdate) GetEnabledCollectors() []string { + if x != nil { + return x.EnabledCollectors + } + return nil +} + +func (x *ConfigUpdate) GetSettings() map[string]string { + if x != nil { + return x.Settings + } + return nil +} + +// AgentConfig is the full agent configuration. +type AgentConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + CollectionIntervalSeconds int32 `protobuf:"varint,1,opt,name=collection_interval_seconds,json=collectionIntervalSeconds,proto3" json:"collection_interval_seconds,omitempty"` + EnabledCollectors []string `protobuf:"bytes,2,rep,name=enabled_collectors,json=enabledCollectors,proto3" json:"enabled_collectors,omitempty"` + Settings map[string]string `protobuf:"bytes,3,rep,name=settings,proto3" json:"settings,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentConfig) Reset() { + *x = AgentConfig{} + mi := &file_tyto_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentConfig) ProtoMessage() {} + +func (x *AgentConfig) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentConfig.ProtoReflect.Descriptor instead. +func (*AgentConfig) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{9} +} + +func (x *AgentConfig) GetCollectionIntervalSeconds() int32 { + if x != nil { + return x.CollectionIntervalSeconds + } + return 0 +} + +func (x *AgentConfig) GetEnabledCollectors() []string { + if x != nil { + return x.EnabledCollectors + } + return nil +} + +func (x *AgentConfig) GetSettings() map[string]string { + if x != nil { + return x.Settings + } + return nil +} + +// Ack acknowledges receipt of a message. +type Ack struct { + state protoimpl.MessageState `protogen:"open.v1"` + MessageId int64 `protobuf:"varint,1,opt,name=message_id,json=messageId,proto3" json:"message_id,omitempty"` + Success bool `protobuf:"varint,2,opt,name=success,proto3" json:"success,omitempty"` + Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Ack) Reset() { + *x = Ack{} + mi := &file_tyto_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Ack) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Ack) ProtoMessage() {} + +func (x *Ack) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Ack.ProtoReflect.Descriptor instead. +func (*Ack) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{10} +} + +func (x *Ack) GetMessageId() int64 { + if x != nil { + return x.MessageId + } + return 0 +} + +func (x *Ack) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *Ack) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +// Command is sent from server to agent. +type Command struct { + state protoimpl.MessageState `protogen:"open.v1"` + Type CommandType `protobuf:"varint,1,opt,name=type,proto3,enum=tyto.CommandType" json:"type,omitempty"` + Params map[string]string `protobuf:"bytes,2,rep,name=params,proto3" json:"params,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Command) Reset() { + *x = Command{} + mi := &file_tyto_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Command) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Command) ProtoMessage() {} + +func (x *Command) ProtoReflect() protoreflect.Message { + mi := &file_tyto_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Command.ProtoReflect.Descriptor instead. +func (*Command) Descriptor() ([]byte, []int) { + return file_tyto_proto_rawDescGZIP(), []int{11} +} + +func (x *Command) GetType() CommandType { + if x != nil { + return x.Type + } + return CommandType_COMMAND_TYPE_UNKNOWN +} + +func (x *Command) GetParams() map[string]string { + if x != nil { + return x.Params + } + return nil +} + +var File_tyto_proto protoreflect.FileDescriptor + +const file_tyto_proto_rawDesc = "" + + "\n" + + "\n" + + "tyto.proto\x12\x04tyto\"\xa9\x01\n" + + "\fAgentMessage\x12/\n" + + "\ametrics\x18\x01 \x01(\v2\x13.tyto.MetricsReportH\x00R\ametrics\x126\n" + + "\theartbeat\x18\x02 \x01(\v2\x16.tyto.HeartbeatRequestH\x00R\theartbeat\x12%\n" + + "\x04info\x18\x03 \x01(\v2\x0f.tyto.AgentInfoH\x00R\x04infoB\t\n" + + "\apayload\"\x92\x01\n" + + "\rServerMessage\x12,\n" + + "\x06config\x18\x01 \x01(\v2\x12.tyto.ConfigUpdateH\x00R\x06config\x12\x1d\n" + + "\x03ack\x18\x02 \x01(\v2\t.tyto.AckH\x00R\x03ack\x12)\n" + + "\acommand\x18\x03 \x01(\v2\r.tyto.CommandH\x00R\acommandB\t\n" + + "\apayload\"\x80\x02\n" + + "\rMetricsReport\x12\x19\n" + + "\bagent_id\x18\x01 \x01(\tR\aagentId\x12!\n" + + "\ftimestamp_ms\x18\x02 \x01(\x03R\vtimestampMs\x12!\n" + + "\fmetrics_json\x18\x03 \x01(\fR\vmetricsJson\x12\x1b\n" + + "\tcpu_usage\x18\x04 \x01(\x01R\bcpuUsage\x12%\n" + + "\x0ememory_percent\x18\x05 \x01(\x01R\rmemoryPercent\x12!\n" + + "\fdisk_percent\x18\x06 \x01(\x01R\vdiskPercent\x12'\n" + + "\x0fgpu_utilization\x18\a \x01(\x05R\x0egpuUtilization\"w\n" + + "\x10HeartbeatRequest\x12\x19\n" + + "\bagent_id\x18\x01 \x01(\tR\aagentId\x12!\n" + + "\ftimestamp_ms\x18\x02 \x01(\x03R\vtimestampMs\x12%\n" + + "\x0euptime_seconds\x18\x03 \x01(\x03R\ruptimeSeconds\"`\n" + + "\x11HeartbeatResponse\x12$\n" + + "\x0eserver_time_ms\x18\x01 \x01(\x03R\fserverTimeMs\x12%\n" + + "\x0econfig_changed\x18\x02 \x01(\bR\rconfigChanged\"\xb4\x01\n" + + "\tAgentInfo\x12\x19\n" + + "\bagent_id\x18\x01 \x01(\tR\aagentId\x12\x1a\n" + + "\bhostname\x18\x02 \x01(\tR\bhostname\x12\x0e\n" + + "\x02os\x18\x03 \x01(\tR\x02os\x12\"\n" + + "\farchitecture\x18\x04 \x01(\tR\farchitecture\x12\x18\n" + + "\aversion\x18\x05 \x01(\tR\aversion\x12\"\n" + + "\fcapabilities\x18\x06 \x03(\tR\fcapabilities\"g\n" + + "\x0fRegisterRequest\x12\x19\n" + + "\bagent_id\x18\x01 \x01(\tR\aagentId\x12#\n" + + "\x04info\x18\x02 \x01(\v2\x0f.tyto.AgentInfoR\x04info\x12\x14\n" + + "\x05token\x18\x03 \x01(\tR\x05token\"\xa1\x01\n" + + "\x10RegisterResponse\x12\x1a\n" + + "\baccepted\x18\x01 \x01(\bR\baccepted\x12\x18\n" + + "\amessage\x18\x02 \x01(\tR\amessage\x12,\n" + + "\x06status\x18\x03 \x01(\x0e2\x14.tyto.RegisterStatusR\x06status\x12)\n" + + "\x06config\x18\x04 \x01(\v2\x11.tyto.AgentConfigR\x06config\"\xf8\x01\n" + + "\fConfigUpdate\x12>\n" + + "\x1bcollection_interval_seconds\x18\x01 \x01(\x05R\x19collectionIntervalSeconds\x12-\n" + + "\x12enabled_collectors\x18\x02 \x03(\tR\x11enabledCollectors\x12<\n" + + "\bsettings\x18\x03 \x03(\v2 .tyto.ConfigUpdate.SettingsEntryR\bsettings\x1a;\n" + + "\rSettingsEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xf6\x01\n" + + "\vAgentConfig\x12>\n" + + "\x1bcollection_interval_seconds\x18\x01 \x01(\x05R\x19collectionIntervalSeconds\x12-\n" + + "\x12enabled_collectors\x18\x02 \x03(\tR\x11enabledCollectors\x12;\n" + + "\bsettings\x18\x03 \x03(\v2\x1f.tyto.AgentConfig.SettingsEntryR\bsettings\x1a;\n" + + "\rSettingsEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"T\n" + + "\x03Ack\x12\x1d\n" + + "\n" + + "message_id\x18\x01 \x01(\x03R\tmessageId\x12\x18\n" + + "\asuccess\x18\x02 \x01(\bR\asuccess\x12\x14\n" + + "\x05error\x18\x03 \x01(\tR\x05error\"\x9e\x01\n" + + "\aCommand\x12%\n" + + "\x04type\x18\x01 \x01(\x0e2\x11.tyto.CommandTypeR\x04type\x121\n" + + "\x06params\x18\x02 \x03(\v2\x19.tyto.Command.ParamsEntryR\x06params\x1a9\n" + + "\vParamsEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01*\xb7\x01\n" + + "\x0eRegisterStatus\x12\x1b\n" + + "\x17REGISTER_STATUS_UNKNOWN\x10\x00\x12\x1c\n" + + "\x18REGISTER_STATUS_ACCEPTED\x10\x01\x12$\n" + + " REGISTER_STATUS_PENDING_APPROVAL\x10\x02\x12\x1c\n" + + "\x18REGISTER_STATUS_REJECTED\x10\x03\x12&\n" + + "\"REGISTER_STATUS_ALREADY_REGISTERED\x10\x04*\x9c\x01\n" + + "\vCommandType\x12\x18\n" + + "\x14COMMAND_TYPE_UNKNOWN\x10\x00\x12\x1c\n" + + "\x18COMMAND_TYPE_COLLECT_NOW\x10\x01\x12\x18\n" + + "\x14COMMAND_TYPE_RESTART\x10\x02\x12\x1e\n" + + "\x1aCOMMAND_TYPE_UPDATE_CONFIG\x10\x03\x12\x1b\n" + + "\x17COMMAND_TYPE_DISCONNECT\x10\x042\xbe\x01\n" + + "\fAgentService\x125\n" + + "\x06Stream\x12\x12.tyto.AgentMessage\x1a\x13.tyto.ServerMessage(\x010\x01\x129\n" + + "\bRegister\x12\x15.tyto.RegisterRequest\x1a\x16.tyto.RegisterResponse\x12<\n" + + "\tHeartbeat\x12\x16.tyto.HeartbeatRequest\x1a\x17.tyto.HeartbeatResponseB\x15Z\x13tyto/internal/protob\x06proto3" + +var ( + file_tyto_proto_rawDescOnce sync.Once + file_tyto_proto_rawDescData []byte +) + +func file_tyto_proto_rawDescGZIP() []byte { + file_tyto_proto_rawDescOnce.Do(func() { + file_tyto_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_tyto_proto_rawDesc), len(file_tyto_proto_rawDesc))) + }) + return file_tyto_proto_rawDescData +} + +var file_tyto_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_tyto_proto_msgTypes = make([]protoimpl.MessageInfo, 15) +var file_tyto_proto_goTypes = []any{ + (RegisterStatus)(0), // 0: tyto.RegisterStatus + (CommandType)(0), // 1: tyto.CommandType + (*AgentMessage)(nil), // 2: tyto.AgentMessage + (*ServerMessage)(nil), // 3: tyto.ServerMessage + (*MetricsReport)(nil), // 4: tyto.MetricsReport + (*HeartbeatRequest)(nil), // 5: tyto.HeartbeatRequest + (*HeartbeatResponse)(nil), // 6: tyto.HeartbeatResponse + (*AgentInfo)(nil), // 7: tyto.AgentInfo + (*RegisterRequest)(nil), // 8: tyto.RegisterRequest + (*RegisterResponse)(nil), // 9: tyto.RegisterResponse + (*ConfigUpdate)(nil), // 10: tyto.ConfigUpdate + (*AgentConfig)(nil), // 11: tyto.AgentConfig + (*Ack)(nil), // 12: tyto.Ack + (*Command)(nil), // 13: tyto.Command + nil, // 14: tyto.ConfigUpdate.SettingsEntry + nil, // 15: tyto.AgentConfig.SettingsEntry + nil, // 16: tyto.Command.ParamsEntry +} +var file_tyto_proto_depIdxs = []int32{ + 4, // 0: tyto.AgentMessage.metrics:type_name -> tyto.MetricsReport + 5, // 1: tyto.AgentMessage.heartbeat:type_name -> tyto.HeartbeatRequest + 7, // 2: tyto.AgentMessage.info:type_name -> tyto.AgentInfo + 10, // 3: tyto.ServerMessage.config:type_name -> tyto.ConfigUpdate + 12, // 4: tyto.ServerMessage.ack:type_name -> tyto.Ack + 13, // 5: tyto.ServerMessage.command:type_name -> tyto.Command + 7, // 6: tyto.RegisterRequest.info:type_name -> tyto.AgentInfo + 0, // 7: tyto.RegisterResponse.status:type_name -> tyto.RegisterStatus + 11, // 8: tyto.RegisterResponse.config:type_name -> tyto.AgentConfig + 14, // 9: tyto.ConfigUpdate.settings:type_name -> tyto.ConfigUpdate.SettingsEntry + 15, // 10: tyto.AgentConfig.settings:type_name -> tyto.AgentConfig.SettingsEntry + 1, // 11: tyto.Command.type:type_name -> tyto.CommandType + 16, // 12: tyto.Command.params:type_name -> tyto.Command.ParamsEntry + 2, // 13: tyto.AgentService.Stream:input_type -> tyto.AgentMessage + 8, // 14: tyto.AgentService.Register:input_type -> tyto.RegisterRequest + 5, // 15: tyto.AgentService.Heartbeat:input_type -> tyto.HeartbeatRequest + 3, // 16: tyto.AgentService.Stream:output_type -> tyto.ServerMessage + 9, // 17: tyto.AgentService.Register:output_type -> tyto.RegisterResponse + 6, // 18: tyto.AgentService.Heartbeat:output_type -> tyto.HeartbeatResponse + 16, // [16:19] is the sub-list for method output_type + 13, // [13:16] is the sub-list for method input_type + 13, // [13:13] is the sub-list for extension type_name + 13, // [13:13] is the sub-list for extension extendee + 0, // [0:13] is the sub-list for field type_name +} + +func init() { file_tyto_proto_init() } +func file_tyto_proto_init() { + if File_tyto_proto != nil { + return + } + file_tyto_proto_msgTypes[0].OneofWrappers = []any{ + (*AgentMessage_Metrics)(nil), + (*AgentMessage_Heartbeat)(nil), + (*AgentMessage_Info)(nil), + } + file_tyto_proto_msgTypes[1].OneofWrappers = []any{ + (*ServerMessage_Config)(nil), + (*ServerMessage_Ack)(nil), + (*ServerMessage_Command)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_tyto_proto_rawDesc), len(file_tyto_proto_rawDesc)), + NumEnums: 2, + NumMessages: 15, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_tyto_proto_goTypes, + DependencyIndexes: file_tyto_proto_depIdxs, + EnumInfos: file_tyto_proto_enumTypes, + MessageInfos: file_tyto_proto_msgTypes, + }.Build() + File_tyto_proto = out.File + file_tyto_proto_goTypes = nil + file_tyto_proto_depIdxs = nil +} diff --git a/backend/internal/proto/tyto_grpc.pb.go b/backend/internal/proto/tyto_grpc.pb.go new file mode 100644 index 0000000..4a6450e --- /dev/null +++ b/backend/internal/proto/tyto_grpc.pb.go @@ -0,0 +1,202 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.6.0 +// - protoc v6.33.1 +// source: tyto.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + AgentService_Stream_FullMethodName = "/tyto.AgentService/Stream" + AgentService_Register_FullMethodName = "/tyto.AgentService/Register" + AgentService_Heartbeat_FullMethodName = "/tyto.AgentService/Heartbeat" +) + +// AgentServiceClient is the client API for AgentService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +// +// AgentService handles communication between agents and the central server. +type AgentServiceClient interface { + // Stream establishes a bidirectional stream for metrics and control messages. + Stream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentMessage, ServerMessage], error) + // Register allows an agent to request registration with the server. + Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) + // Heartbeat is a simple health check (alternative to streaming). + Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) +} + +type agentServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewAgentServiceClient(cc grpc.ClientConnInterface) AgentServiceClient { + return &agentServiceClient{cc} +} + +func (c *agentServiceClient) Stream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentMessage, ServerMessage], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &AgentService_ServiceDesc.Streams[0], AgentService_Stream_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[AgentMessage, ServerMessage]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type AgentService_StreamClient = grpc.BidiStreamingClient[AgentMessage, ServerMessage] + +func (c *agentServiceClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(RegisterResponse) + err := c.cc.Invoke(ctx, AgentService_Register_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *agentServiceClient) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(HeartbeatResponse) + err := c.cc.Invoke(ctx, AgentService_Heartbeat_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// AgentServiceServer is the server API for AgentService service. +// All implementations must embed UnimplementedAgentServiceServer +// for forward compatibility. +// +// AgentService handles communication between agents and the central server. +type AgentServiceServer interface { + // Stream establishes a bidirectional stream for metrics and control messages. + Stream(grpc.BidiStreamingServer[AgentMessage, ServerMessage]) error + // Register allows an agent to request registration with the server. + Register(context.Context, *RegisterRequest) (*RegisterResponse, error) + // Heartbeat is a simple health check (alternative to streaming). + Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) + mustEmbedUnimplementedAgentServiceServer() +} + +// UnimplementedAgentServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedAgentServiceServer struct{} + +func (UnimplementedAgentServiceServer) Stream(grpc.BidiStreamingServer[AgentMessage, ServerMessage]) error { + return status.Error(codes.Unimplemented, "method Stream not implemented") +} +func (UnimplementedAgentServiceServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Register not implemented") +} +func (UnimplementedAgentServiceServer) Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) { + return nil, status.Error(codes.Unimplemented, "method Heartbeat not implemented") +} +func (UnimplementedAgentServiceServer) mustEmbedUnimplementedAgentServiceServer() {} +func (UnimplementedAgentServiceServer) testEmbeddedByValue() {} + +// UnsafeAgentServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to AgentServiceServer will +// result in compilation errors. +type UnsafeAgentServiceServer interface { + mustEmbedUnimplementedAgentServiceServer() +} + +func RegisterAgentServiceServer(s grpc.ServiceRegistrar, srv AgentServiceServer) { + // If the following call panics, it indicates UnimplementedAgentServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&AgentService_ServiceDesc, srv) +} + +func _AgentService_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(AgentServiceServer).Stream(&grpc.GenericServerStream[AgentMessage, ServerMessage]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type AgentService_StreamServer = grpc.BidiStreamingServer[AgentMessage, ServerMessage] + +func _AgentService_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RegisterRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AgentServiceServer).Register(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: AgentService_Register_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AgentServiceServer).Register(ctx, req.(*RegisterRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _AgentService_Heartbeat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HeartbeatRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AgentServiceServer).Heartbeat(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: AgentService_Heartbeat_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AgentServiceServer).Heartbeat(ctx, req.(*HeartbeatRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// AgentService_ServiceDesc is the grpc.ServiceDesc for AgentService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var AgentService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "tyto.AgentService", + HandlerType: (*AgentServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Register", + Handler: _AgentService_Register_Handler, + }, + { + MethodName: "Heartbeat", + Handler: _AgentService_Heartbeat_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Stream", + Handler: _AgentService_Stream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "tyto.proto", +} diff --git a/backend/proto/tyto.proto b/backend/proto/tyto.proto new file mode 100644 index 0000000..a771e00 --- /dev/null +++ b/backend/proto/tyto.proto @@ -0,0 +1,129 @@ +syntax = "proto3"; + +package tyto; + +option go_package = "tyto/internal/proto"; + +// AgentService handles communication between agents and the central server. +service AgentService { + // Stream establishes a bidirectional stream for metrics and control messages. + rpc Stream(stream AgentMessage) returns (stream ServerMessage); + + // Register allows an agent to request registration with the server. + rpc Register(RegisterRequest) returns (RegisterResponse); + + // Heartbeat is a simple health check (alternative to streaming). + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); +} + +// AgentMessage is sent from agent to server. +message AgentMessage { + oneof payload { + MetricsReport metrics = 1; + HeartbeatRequest heartbeat = 2; + AgentInfo info = 3; + } +} + +// ServerMessage is sent from server to agent. +message ServerMessage { + oneof payload { + ConfigUpdate config = 1; + Ack ack = 2; + Command command = 3; + } +} + +// MetricsReport contains collected metrics from an agent. +message MetricsReport { + string agent_id = 1; + int64 timestamp_ms = 2; + bytes metrics_json = 3; // AllMetrics serialized as JSON + + // Optional individual metrics for server-side aggregation + double cpu_usage = 4; + double memory_percent = 5; + double disk_percent = 6; + int32 gpu_utilization = 7; +} + +// HeartbeatRequest is a lightweight keep-alive message. +message HeartbeatRequest { + string agent_id = 1; + int64 timestamp_ms = 2; + int64 uptime_seconds = 3; +} + +// HeartbeatResponse acknowledges the heartbeat. +message HeartbeatResponse { + int64 server_time_ms = 1; + bool config_changed = 2; +} + +// AgentInfo describes the agent's system. +message AgentInfo { + string agent_id = 1; + string hostname = 2; + string os = 3; + string architecture = 4; + string version = 5; + repeated string capabilities = 6; // e.g., "gpu", "docker", "systemd" +} + +// RegisterRequest is sent when an agent first connects. +message RegisterRequest { + string agent_id = 1; + AgentInfo info = 2; + string token = 3; // Optional registration token +} + +// RegisterResponse indicates if registration was accepted. +message RegisterResponse { + bool accepted = 1; + string message = 2; + RegisterStatus status = 3; + AgentConfig config = 4; // Initial config if accepted +} + +enum RegisterStatus { + REGISTER_STATUS_UNKNOWN = 0; + REGISTER_STATUS_ACCEPTED = 1; + REGISTER_STATUS_PENDING_APPROVAL = 2; + REGISTER_STATUS_REJECTED = 3; + REGISTER_STATUS_ALREADY_REGISTERED = 4; +} + +// ConfigUpdate pushes configuration changes to an agent. +message ConfigUpdate { + int32 collection_interval_seconds = 1; + repeated string enabled_collectors = 2; + map settings = 3; +} + +// AgentConfig is the full agent configuration. +message AgentConfig { + int32 collection_interval_seconds = 1; + repeated string enabled_collectors = 2; + map settings = 3; +} + +// Ack acknowledges receipt of a message. +message Ack { + int64 message_id = 1; + bool success = 2; + string error = 3; +} + +// Command is sent from server to agent. +message Command { + CommandType type = 1; + map params = 2; +} + +enum CommandType { + COMMAND_TYPE_UNKNOWN = 0; + COMMAND_TYPE_COLLECT_NOW = 1; // Force immediate collection + COMMAND_TYPE_RESTART = 2; // Restart the agent + COMMAND_TYPE_UPDATE_CONFIG = 3; // Apply new config + COMMAND_TYPE_DISCONNECT = 4; // Graceful disconnect +}