feat: implement lightweight agent with gRPC and mTLS support
Agent Package (internal/agent/): - Agent struct with all collectors and memory-efficient pooling - Run loop with configurable collection interval - Graceful shutdown with context cancellation - Auto-reconnection callback for re-registration gRPC Client (internal/agent/client.go): - mTLS support with CA, agent cert, and key - Bidirectional streaming for metrics - Heartbeat fallback when streaming fails - Exponential backoff with jitter for reconnection - Concurrent reconnection handling with mutex Protocol Buffers (proto/tyto.proto): - AgentService with Stream, Register, Heartbeat RPCs - MetricsReport with summary fields for aggregation - ConfigUpdate and Command messages for server control - RegisterStatus enum for registration workflow CLI Integration (cmd/tyto/main.go): - Full agent subcommand with flag parsing - Support for --id, --server, --interval, --ca-cert, etc. - Environment variable overrides (TYTO_AGENT_*) - Signal handling for graceful shutdown Build System (Makefile): - Cross-compilation for linux/amd64, arm64, armv7 - Stripped binaries with version info - Proto generation target - Test and coverage targets Config Updates: - DefaultConfig() and LoadFromPath() functions - Agent config properly parsed from YAML 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -5,6 +5,7 @@ node_modules/
|
||||
frontend/build/
|
||||
frontend/.svelte-kit/
|
||||
backend/server
|
||||
backend/build/
|
||||
|
||||
# Environment
|
||||
.env
|
||||
|
||||
149
backend/Makefile
Normal file
149
backend/Makefile
Normal file
@@ -0,0 +1,149 @@
|
||||
# Tyto Backend Makefile
|
||||
# Supports cross-compilation for multiple architectures
|
||||
|
||||
VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
|
||||
BUILD_TIME := $(shell date -u +"%Y-%m-%dT%H:%M:%SZ")
|
||||
LDFLAGS := -ldflags="-s -w -X main.version=$(VERSION) -X main.buildTime=$(BUILD_TIME)"
|
||||
|
||||
# Output directories
|
||||
BUILD_DIR := build
|
||||
PROTO_DIR := proto
|
||||
|
||||
# Default target
|
||||
.PHONY: all
|
||||
all: build
|
||||
|
||||
# Build for current platform
|
||||
.PHONY: build
|
||||
build:
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto ./cmd/tyto
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/server ./cmd/server
|
||||
|
||||
# Build server binary only
|
||||
.PHONY: server
|
||||
server:
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/server ./cmd/server
|
||||
|
||||
# Build tyto CLI (includes agent)
|
||||
.PHONY: tyto
|
||||
tyto:
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto ./cmd/tyto
|
||||
|
||||
# Build lightweight agent for Linux amd64
|
||||
.PHONY: agent-linux-amd64
|
||||
agent-linux-amd64:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-agent-linux-amd64 ./cmd/tyto
|
||||
|
||||
# Build lightweight agent for Linux arm64
|
||||
.PHONY: agent-linux-arm64
|
||||
agent-linux-arm64:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-agent-linux-arm64 ./cmd/tyto
|
||||
|
||||
# Build lightweight agent for Linux arm (32-bit, e.g., Raspberry Pi)
|
||||
.PHONY: agent-linux-arm
|
||||
agent-linux-arm:
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-agent-linux-armv7 ./cmd/tyto
|
||||
|
||||
# Build all agent variants
|
||||
.PHONY: agents
|
||||
agents: agent-linux-amd64 agent-linux-arm64 agent-linux-arm
|
||||
|
||||
# Build all binaries for all platforms
|
||||
.PHONY: release
|
||||
release: clean
|
||||
@mkdir -p $(BUILD_DIR)
|
||||
# Linux amd64
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-linux-amd64 ./cmd/tyto
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-server-linux-amd64 ./cmd/server
|
||||
# Linux arm64
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-linux-arm64 ./cmd/tyto
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=arm64 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-server-linux-arm64 ./cmd/server
|
||||
# Linux arm (32-bit)
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 \
|
||||
go build $(LDFLAGS) -o $(BUILD_DIR)/tyto-linux-armv7 ./cmd/tyto
|
||||
|
||||
# Generate protobuf code
|
||||
.PHONY: proto
|
||||
proto:
|
||||
protoc --go_out=. --go-grpc_out=. $(PROTO_DIR)/tyto.proto
|
||||
|
||||
# Run tests
|
||||
.PHONY: test
|
||||
test:
|
||||
go test -v -race ./...
|
||||
|
||||
# Run tests with coverage
|
||||
.PHONY: test-coverage
|
||||
test-coverage:
|
||||
go test -v -race -coverprofile=coverage.out ./...
|
||||
go tool cover -html=coverage.out -o coverage.html
|
||||
|
||||
# Lint code
|
||||
.PHONY: lint
|
||||
lint:
|
||||
golangci-lint run ./...
|
||||
|
||||
# Format code
|
||||
.PHONY: fmt
|
||||
fmt:
|
||||
go fmt ./...
|
||||
goimports -w .
|
||||
|
||||
# Clean build artifacts
|
||||
.PHONY: clean
|
||||
clean:
|
||||
rm -rf $(BUILD_DIR)
|
||||
rm -f coverage.out coverage.html
|
||||
|
||||
# Install dependencies
|
||||
.PHONY: deps
|
||||
deps:
|
||||
go mod download
|
||||
go mod tidy
|
||||
|
||||
# Show binary sizes
|
||||
.PHONY: sizes
|
||||
sizes: agents
|
||||
@echo "Binary sizes:"
|
||||
@ls -lh $(BUILD_DIR)/tyto-agent-* 2>/dev/null | awk '{print $$5, $$9}'
|
||||
|
||||
# Docker build
|
||||
.PHONY: docker
|
||||
docker:
|
||||
docker build -t tyto-backend:latest .
|
||||
|
||||
# Help
|
||||
.PHONY: help
|
||||
help:
|
||||
@echo "Tyto Backend Makefile"
|
||||
@echo ""
|
||||
@echo "Usage: make [target]"
|
||||
@echo ""
|
||||
@echo "Targets:"
|
||||
@echo " build Build for current platform"
|
||||
@echo " server Build server binary only"
|
||||
@echo " tyto Build tyto CLI (includes agent)"
|
||||
@echo " agents Build agent for all Linux architectures"
|
||||
@echo " release Build all binaries for all platforms"
|
||||
@echo " proto Generate protobuf code"
|
||||
@echo " test Run tests"
|
||||
@echo " test-coverage Run tests with coverage report"
|
||||
@echo " lint Run linter"
|
||||
@echo " fmt Format code"
|
||||
@echo " clean Remove build artifacts"
|
||||
@echo " deps Download and tidy dependencies"
|
||||
@echo " sizes Show agent binary sizes"
|
||||
@echo " docker Build Docker image"
|
||||
@echo " help Show this help"
|
||||
@echo ""
|
||||
@echo "Agent targets:"
|
||||
@echo " agent-linux-amd64 Build agent for Linux x86_64"
|
||||
@echo " agent-linux-arm64 Build agent for Linux ARM64 (aarch64)"
|
||||
@echo " agent-linux-arm Build agent for Linux ARM32 (armv7)"
|
||||
@@ -3,8 +3,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"tyto/internal/agent"
|
||||
"tyto/internal/config"
|
||||
)
|
||||
|
||||
const usage = `Tyto - System Monitoring
|
||||
@@ -82,13 +89,156 @@ Options:
|
||||
--ca-cert FILE CA certificate file
|
||||
--cert FILE Agent certificate file
|
||||
--key FILE Agent key file
|
||||
--config FILE Config file path
|
||||
|
||||
Environment:
|
||||
TYTO_AGENT_ID Agent identifier
|
||||
TYTO_SERVER_URL Server URL
|
||||
TYTO_AGENT_INTERVAL Collection interval
|
||||
`)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("Agent mode not implemented yet.")
|
||||
// Parse flags
|
||||
var (
|
||||
agentID string
|
||||
serverURL string
|
||||
interval string
|
||||
caCert string
|
||||
agentCert string
|
||||
agentKey string
|
||||
configFile string
|
||||
)
|
||||
|
||||
for i := 0; i < len(args); i++ {
|
||||
switch args[i] {
|
||||
case "--id":
|
||||
if i+1 < len(args) {
|
||||
agentID = args[i+1]
|
||||
i++
|
||||
}
|
||||
case "--server":
|
||||
if i+1 < len(args) {
|
||||
serverURL = args[i+1]
|
||||
i++
|
||||
}
|
||||
case "--interval":
|
||||
if i+1 < len(args) {
|
||||
interval = args[i+1]
|
||||
i++
|
||||
}
|
||||
case "--ca-cert":
|
||||
if i+1 < len(args) {
|
||||
caCert = args[i+1]
|
||||
i++
|
||||
}
|
||||
case "--cert":
|
||||
if i+1 < len(args) {
|
||||
agentCert = args[i+1]
|
||||
i++
|
||||
}
|
||||
case "--key":
|
||||
if i+1 < len(args) {
|
||||
agentKey = args[i+1]
|
||||
i++
|
||||
}
|
||||
case "--config":
|
||||
if i+1 < len(args) {
|
||||
configFile = args[i+1]
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Load configuration
|
||||
cfg := loadAgentConfig(configFile, agentID, serverURL, interval, caCert, agentCert, agentKey)
|
||||
|
||||
// Validate required fields
|
||||
if cfg.Agent.ID == "" {
|
||||
fmt.Fprintln(os.Stderr, "Error: Agent ID is required (--id or TYTO_AGENT_ID)")
|
||||
os.Exit(1)
|
||||
}
|
||||
if cfg.Agent.ServerURL == "" {
|
||||
fmt.Fprintln(os.Stderr, "Error: Server URL is required (--server or TYTO_SERVER_URL)")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Printf("Starting Tyto agent '%s'...\n", cfg.Agent.ID)
|
||||
fmt.Printf("Connecting to server: %s\n", cfg.Agent.ServerURL)
|
||||
fmt.Printf("Collection interval: %s\n", cfg.Agent.Interval)
|
||||
|
||||
// Create and run agent
|
||||
a := agent.New(cfg)
|
||||
|
||||
// Handle graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
<-sigCh
|
||||
fmt.Println("\nReceived shutdown signal, stopping agent...")
|
||||
a.Stop()
|
||||
cancel()
|
||||
}()
|
||||
|
||||
if err := a.Run(ctx); err != nil && err != context.Canceled {
|
||||
fmt.Fprintf(os.Stderr, "Agent error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Println("Agent stopped")
|
||||
}
|
||||
|
||||
// loadAgentConfig builds configuration from file, flags, and environment.
|
||||
func loadAgentConfig(configFile, agentID, serverURL, interval, caCert, agentCert, agentKey string) *config.Config {
|
||||
// Start with defaults
|
||||
cfg := config.DefaultConfig()
|
||||
cfg.Mode = config.ModeAgent
|
||||
|
||||
// Load from file if specified
|
||||
if configFile != "" {
|
||||
if fileCfg, err := config.LoadFromPath(configFile); err == nil {
|
||||
cfg = fileCfg
|
||||
}
|
||||
}
|
||||
|
||||
// Override from environment variables
|
||||
if v := os.Getenv("TYTO_AGENT_ID"); v != "" && agentID == "" {
|
||||
agentID = v
|
||||
}
|
||||
if v := os.Getenv("TYTO_SERVER_URL"); v != "" && serverURL == "" {
|
||||
serverURL = v
|
||||
}
|
||||
if v := os.Getenv("TYTO_AGENT_INTERVAL"); v != "" && interval == "" {
|
||||
interval = v
|
||||
}
|
||||
|
||||
// Override from flags
|
||||
if agentID != "" {
|
||||
cfg.Agent.ID = agentID
|
||||
}
|
||||
if serverURL != "" {
|
||||
cfg.Agent.ServerURL = serverURL
|
||||
}
|
||||
if interval != "" {
|
||||
if d, err := time.ParseDuration(interval); err == nil {
|
||||
cfg.Agent.Interval = d
|
||||
}
|
||||
}
|
||||
|
||||
// TLS configuration
|
||||
if caCert != "" {
|
||||
cfg.Agent.TLS.CACert = caCert
|
||||
}
|
||||
if agentCert != "" {
|
||||
cfg.Agent.TLS.AgentCert = agentCert
|
||||
}
|
||||
if agentKey != "" {
|
||||
cfg.Agent.TLS.AgentKey = agentKey
|
||||
}
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ require github.com/gin-contrib/cors v1.7.2
|
||||
|
||||
require (
|
||||
github.com/godbus/dbus/v5 v5.1.0
|
||||
google.golang.org/grpc v1.68.0
|
||||
google.golang.org/protobuf v1.35.2
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@@ -33,9 +35,9 @@ require (
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.2.12 // indirect
|
||||
golang.org/x/arch v0.8.0 // indirect
|
||||
golang.org/x/crypto v0.23.0 // indirect
|
||||
golang.org/x/net v0.25.0 // indirect
|
||||
golang.org/x/sys v0.20.0 // indirect
|
||||
golang.org/x/text v0.15.0 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
golang.org/x/crypto v0.27.0 // indirect
|
||||
golang.org/x/net v0.29.0 // indirect
|
||||
golang.org/x/sys v0.25.0 // indirect
|
||||
golang.org/x/text v0.18.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
|
||||
)
|
||||
|
||||
@@ -30,8 +30,10 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
|
||||
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
@@ -77,20 +79,22 @@ github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZ
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
|
||||
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
|
||||
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
|
||||
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
|
||||
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
|
||||
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
|
||||
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
|
||||
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
|
||||
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
|
||||
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
|
||||
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
|
||||
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
|
||||
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
|
||||
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
|
||||
google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
|
||||
google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
|
||||
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
|
||||
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
|
||||
262
backend/internal/agent/agent.go
Normal file
262
backend/internal/agent/agent.go
Normal file
@@ -0,0 +1,262 @@
|
||||
// Package agent implements a lightweight Tyto agent that collects metrics
|
||||
// and reports them to a central server via gRPC.
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"tyto/internal/collectors"
|
||||
"tyto/internal/collectors/gpu"
|
||||
"tyto/internal/config"
|
||||
"tyto/internal/models"
|
||||
pb "tyto/internal/proto"
|
||||
)
|
||||
|
||||
// Agent collects metrics and reports to a central server.
|
||||
type Agent struct {
|
||||
config *config.Config
|
||||
client *Client
|
||||
|
||||
// Collectors
|
||||
system *collectors.SystemCollector
|
||||
cpu *collectors.CPUCollector
|
||||
memory *collectors.MemoryCollector
|
||||
disk *collectors.DiskCollector
|
||||
network *collectors.NetworkCollector
|
||||
process *collectors.ProcessCollector
|
||||
temperature *collectors.TemperatureCollector
|
||||
gpuManager *gpu.Manager
|
||||
docker *collectors.DockerCollector
|
||||
systemd *collectors.SystemdCollector
|
||||
|
||||
// Pooling for memory efficiency
|
||||
metricsPool sync.Pool
|
||||
|
||||
// Control
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// New creates a new agent with the given configuration.
|
||||
func New(cfg *config.Config) *Agent {
|
||||
a := &Agent{
|
||||
config: cfg,
|
||||
stopCh: make(chan struct{}),
|
||||
metricsPool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &models.AllMetrics{}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Initialize collectors
|
||||
a.initCollectors()
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
func (a *Agent) initCollectors() {
|
||||
cfg := a.config
|
||||
|
||||
a.system = collectors.NewSystemCollector(cfg.ProcPath)
|
||||
a.cpu = collectors.NewCPUCollector(cfg.ProcPath, cfg.SysPath)
|
||||
a.memory = collectors.NewMemoryCollector(cfg.ProcPath)
|
||||
a.disk = collectors.NewDiskCollector(cfg.ProcPath, cfg.MtabPath)
|
||||
a.network = collectors.NewNetworkCollector(cfg.ProcPath)
|
||||
a.process = collectors.NewProcessCollector(cfg.ProcPath)
|
||||
a.temperature = collectors.NewTemperatureCollector(cfg.SysPath)
|
||||
a.gpuManager = gpu.NewManager(cfg.SysPath)
|
||||
a.docker = collectors.NewDockerCollector(cfg.DockerSock)
|
||||
a.systemd = collectors.NewSystemdCollector()
|
||||
}
|
||||
|
||||
// Run starts the agent's main loop.
|
||||
func (a *Agent) Run(ctx context.Context) error {
|
||||
log.Printf("Agent %s starting...", a.config.Agent.ID)
|
||||
|
||||
// Create gRPC client
|
||||
client, err := NewClient(a.config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
a.client = client
|
||||
|
||||
// Set up reconnection callback to re-register
|
||||
a.client.SetOnReconnect(func() {
|
||||
log.Println("Reconnected to server, re-registering...")
|
||||
if err := a.register(context.Background()); err != nil {
|
||||
log.Printf("Re-registration failed: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Connect to server with retry
|
||||
if err := a.client.ConnectWithRetry(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer a.client.Close()
|
||||
|
||||
// Register with server
|
||||
if err := a.register(ctx); err != nil {
|
||||
log.Printf("Registration failed: %v", err)
|
||||
// Continue anyway, server might accept unregistered agents
|
||||
}
|
||||
|
||||
// Start collection loop
|
||||
return a.runLoop(ctx)
|
||||
}
|
||||
|
||||
func (a *Agent) register(ctx context.Context) error {
|
||||
info := a.collectAgentInfo()
|
||||
return a.client.Register(ctx, info)
|
||||
}
|
||||
|
||||
func (a *Agent) collectAgentInfo() *pb.AgentInfo {
|
||||
sysInfo, _ := a.system.Collect()
|
||||
|
||||
capabilities := []string{}
|
||||
if a.gpuManager.Available() {
|
||||
capabilities = append(capabilities, "gpu")
|
||||
}
|
||||
// Check docker availability
|
||||
if dockerStats, err := a.docker.Collect(); err == nil && dockerStats.Available {
|
||||
capabilities = append(capabilities, "docker")
|
||||
}
|
||||
// Check systemd availability
|
||||
if systemdStats, err := a.systemd.Collect(); err == nil && systemdStats.Available {
|
||||
capabilities = append(capabilities, "systemd")
|
||||
}
|
||||
|
||||
return &pb.AgentInfo{
|
||||
AgentId: a.config.Agent.ID,
|
||||
Hostname: sysInfo.Hostname,
|
||||
Os: sysInfo.OS,
|
||||
Architecture: sysInfo.Architecture,
|
||||
Version: "1.0.0", // TODO: Use build version
|
||||
Capabilities: capabilities,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) runLoop(ctx context.Context) error {
|
||||
ticker := time.NewTicker(a.config.Agent.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
log.Printf("Starting collection loop (interval: %s)", a.config.Agent.Interval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Agent stopping (context cancelled)")
|
||||
return ctx.Err()
|
||||
|
||||
case <-a.stopCh:
|
||||
log.Println("Agent stopping (stop signal)")
|
||||
return nil
|
||||
|
||||
case <-ticker.C:
|
||||
if err := a.collectAndSend(ctx); err != nil {
|
||||
log.Printf("Collection/send error: %v", err)
|
||||
// Don't return, keep trying
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) collectAndSend(ctx context.Context) error {
|
||||
// Get metrics struct from pool
|
||||
metrics := a.metricsPool.Get().(*models.AllMetrics)
|
||||
defer a.metricsPool.Put(metrics)
|
||||
|
||||
// Reset metrics
|
||||
*metrics = models.AllMetrics{
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Collect all metrics
|
||||
a.collect(metrics)
|
||||
|
||||
// Send to server
|
||||
err := a.client.SendMetrics(ctx, a.config.Agent.ID, metrics)
|
||||
if err != nil {
|
||||
// Check if we need to reconnect
|
||||
if !a.client.IsConnected() {
|
||||
log.Println("Connection lost, attempting reconnection...")
|
||||
a.wg.Add(1)
|
||||
go func() {
|
||||
defer a.wg.Done()
|
||||
if reconnErr := a.client.Reconnect(ctx); reconnErr != nil {
|
||||
log.Printf("Reconnection failed: %v", reconnErr)
|
||||
}
|
||||
}()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) collect(m *models.AllMetrics) {
|
||||
// System info
|
||||
if sys, err := a.system.Collect(); err == nil {
|
||||
m.System = sys
|
||||
}
|
||||
|
||||
// CPU
|
||||
if cpu, err := a.cpu.Collect(); err == nil {
|
||||
m.CPU = cpu
|
||||
}
|
||||
|
||||
// Memory
|
||||
if mem, err := a.memory.Collect(); err == nil {
|
||||
m.Memory = mem
|
||||
}
|
||||
|
||||
// Disk
|
||||
if disk, err := a.disk.Collect(); err == nil {
|
||||
m.Disk = disk
|
||||
}
|
||||
|
||||
// Network
|
||||
if net, err := a.network.Collect(); err == nil {
|
||||
m.Network = net
|
||||
}
|
||||
|
||||
// Processes
|
||||
if proc, err := a.process.Collect(); err == nil {
|
||||
m.Processes = proc
|
||||
}
|
||||
|
||||
// Temperature
|
||||
if temp, err := a.temperature.Collect(); err == nil {
|
||||
m.Temperature = temp
|
||||
}
|
||||
|
||||
// GPU
|
||||
if gpuStats, err := a.gpuManager.Collect(); err == nil {
|
||||
m.GPU = models.AMDGPUStatsFromGPUInfo(gpuStats)
|
||||
}
|
||||
|
||||
// Docker
|
||||
if docker, err := a.docker.Collect(); err == nil {
|
||||
m.Docker = docker
|
||||
}
|
||||
|
||||
// Systemd
|
||||
if systemd, err := a.systemd.Collect(); err == nil {
|
||||
m.Systemd = systemd
|
||||
}
|
||||
}
|
||||
|
||||
// Stop signals the agent to stop and waits for cleanup.
|
||||
func (a *Agent) Stop() {
|
||||
close(a.stopCh)
|
||||
a.wg.Wait()
|
||||
}
|
||||
|
||||
// SerializeMetrics converts metrics to JSON bytes.
|
||||
func SerializeMetrics(m *models.AllMetrics) ([]byte, error) {
|
||||
return json.Marshal(m)
|
||||
}
|
||||
402
backend/internal/agent/client.go
Normal file
402
backend/internal/agent/client.go
Normal file
@@ -0,0 +1,402 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
||||
"tyto/internal/config"
|
||||
"tyto/internal/models"
|
||||
pb "tyto/internal/proto"
|
||||
)
|
||||
|
||||
// Reconnection parameters
|
||||
const (
|
||||
initialBackoff = 1 * time.Second
|
||||
maxBackoff = 60 * time.Second
|
||||
backoffFactor = 2.0
|
||||
jitterFactor = 0.2 // 20% jitter
|
||||
)
|
||||
|
||||
// Client handles gRPC communication with the central server.
|
||||
type Client struct {
|
||||
config *config.Config
|
||||
conn *grpc.ClientConn
|
||||
client pb.AgentServiceClient
|
||||
stream pb.AgentService_StreamClient
|
||||
streamMu sync.Mutex
|
||||
|
||||
// Reconnection state
|
||||
connected bool
|
||||
reconnecting bool
|
||||
reconnectMu sync.Mutex
|
||||
backoff time.Duration
|
||||
reconnectCh chan struct{}
|
||||
onReconnect func() // Callback when reconnected
|
||||
}
|
||||
|
||||
// NewClient creates a new gRPC client.
|
||||
func NewClient(cfg *config.Config) (*Client, error) {
|
||||
return &Client{
|
||||
config: cfg,
|
||||
backoff: initialBackoff,
|
||||
reconnectCh: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SetOnReconnect sets a callback to be called when the client reconnects.
|
||||
func (c *Client) SetOnReconnect(fn func()) {
|
||||
c.onReconnect = fn
|
||||
}
|
||||
|
||||
// Connect establishes a connection to the server.
|
||||
func (c *Client) Connect(ctx context.Context) error {
|
||||
opts, err := c.dialOptions()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create dial options: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("Connecting to server: %s", c.config.Agent.ServerURL)
|
||||
|
||||
conn, err := grpc.DialContext(ctx, c.config.Agent.ServerURL, opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect: %w", err)
|
||||
}
|
||||
|
||||
c.conn = conn
|
||||
c.client = pb.NewAgentServiceClient(conn)
|
||||
c.connected = true
|
||||
c.backoff = initialBackoff // Reset backoff on successful connection
|
||||
|
||||
log.Println("Connected to server")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConnectWithRetry attempts to connect with exponential backoff.
|
||||
func (c *Client) ConnectWithRetry(ctx context.Context) error {
|
||||
for {
|
||||
err := c.Connect(ctx)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
delay := c.nextBackoff()
|
||||
log.Printf("Connection failed: %v. Retrying in %s...", err, delay)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(delay):
|
||||
// Continue to next attempt
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nextBackoff calculates the next backoff duration with jitter.
|
||||
func (c *Client) nextBackoff() time.Duration {
|
||||
c.reconnectMu.Lock()
|
||||
defer c.reconnectMu.Unlock()
|
||||
|
||||
// Add jitter: +/- 20% of current backoff
|
||||
jitter := time.Duration(float64(c.backoff) * jitterFactor * (2*randFloat() - 1))
|
||||
delay := c.backoff + jitter
|
||||
|
||||
// Increase backoff for next time
|
||||
c.backoff = time.Duration(float64(c.backoff) * backoffFactor)
|
||||
if c.backoff > maxBackoff {
|
||||
c.backoff = maxBackoff
|
||||
}
|
||||
|
||||
return delay
|
||||
}
|
||||
|
||||
// randFloat returns a random float64 in [0, 1).
|
||||
func randFloat() float64 {
|
||||
return float64(time.Now().UnixNano()%1000) / 1000.0
|
||||
}
|
||||
|
||||
// Reconnect attempts to reconnect after a disconnection.
|
||||
func (c *Client) Reconnect(ctx context.Context) error {
|
||||
c.reconnectMu.Lock()
|
||||
if c.reconnecting {
|
||||
c.reconnectMu.Unlock()
|
||||
// Wait for ongoing reconnection
|
||||
select {
|
||||
case <-c.reconnectCh:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
c.reconnecting = true
|
||||
c.reconnectMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
c.reconnectMu.Lock()
|
||||
c.reconnecting = false
|
||||
// Signal waiting goroutines
|
||||
close(c.reconnectCh)
|
||||
c.reconnectCh = make(chan struct{})
|
||||
c.reconnectMu.Unlock()
|
||||
}()
|
||||
|
||||
// Close existing connection
|
||||
c.Close()
|
||||
|
||||
// Reconnect with retry
|
||||
if err := c.ConnectWithRetry(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Notify callback
|
||||
if c.onReconnect != nil {
|
||||
c.onReconnect()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) dialOptions() ([]grpc.DialOption, error) {
|
||||
opts := []grpc.DialOption{
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 30 * time.Second,
|
||||
Timeout: 10 * time.Second,
|
||||
PermitWithoutStream: true,
|
||||
}),
|
||||
}
|
||||
|
||||
// Configure TLS if certificates are provided
|
||||
agentCfg := c.config.Agent
|
||||
if agentCfg.TLS.CACert != "" && agentCfg.TLS.AgentCert != "" {
|
||||
tlsConfig, err := c.loadTLSConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
|
||||
} else {
|
||||
// Insecure mode for development
|
||||
log.Println("Warning: Running without TLS (insecure mode)")
|
||||
opts = append(opts, grpc.WithInsecure())
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
func (c *Client) loadTLSConfig() (*tls.Config, error) {
|
||||
agentCfg := c.config.Agent
|
||||
|
||||
// Load CA certificate
|
||||
caCert, err := os.ReadFile(agentCfg.TLS.CACert)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read CA cert: %w", err)
|
||||
}
|
||||
|
||||
caCertPool := x509.NewCertPool()
|
||||
if !caCertPool.AppendCertsFromPEM(caCert) {
|
||||
return nil, fmt.Errorf("failed to parse CA cert")
|
||||
}
|
||||
|
||||
// Load agent certificate
|
||||
cert, err := tls.LoadX509KeyPair(agentCfg.TLS.AgentCert, agentCfg.TLS.AgentKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load agent cert: %w", err)
|
||||
}
|
||||
|
||||
return &tls.Config{
|
||||
Certificates: []tls.Certificate{cert},
|
||||
RootCAs: caCertPool,
|
||||
MinVersion: tls.VersionTLS12,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Register sends a registration request to the server.
|
||||
func (c *Client) Register(ctx context.Context, info *pb.AgentInfo) error {
|
||||
req := &pb.RegisterRequest{
|
||||
AgentId: c.config.Agent.ID,
|
||||
Info: info,
|
||||
}
|
||||
|
||||
resp, err := c.client.Register(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("registration failed: %w", err)
|
||||
}
|
||||
|
||||
switch resp.Status {
|
||||
case pb.RegisterStatus_REGISTER_STATUS_ACCEPTED:
|
||||
log.Println("Registration accepted")
|
||||
case pb.RegisterStatus_REGISTER_STATUS_PENDING_APPROVAL:
|
||||
log.Println("Registration pending approval")
|
||||
case pb.RegisterStatus_REGISTER_STATUS_ALREADY_REGISTERED:
|
||||
log.Println("Already registered")
|
||||
case pb.RegisterStatus_REGISTER_STATUS_REJECTED:
|
||||
return fmt.Errorf("registration rejected: %s", resp.Message)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendMetrics sends collected metrics to the server.
|
||||
func (c *Client) SendMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error {
|
||||
// Serialize metrics to JSON
|
||||
metricsJSON, err := SerializeMetrics(metrics)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to serialize metrics: %w", err)
|
||||
}
|
||||
|
||||
// Create metrics report
|
||||
report := &pb.MetricsReport{
|
||||
AgentId: agentID,
|
||||
TimestampMs: metrics.Timestamp.UnixMilli(),
|
||||
MetricsJson: metricsJSON,
|
||||
|
||||
// Summary metrics for quick aggregation
|
||||
CpuUsage: metrics.CPU.TotalUsage,
|
||||
MemoryPercent: float64(metrics.Memory.Used) / float64(metrics.Memory.Total) * 100,
|
||||
GpuUtilization: int32(metrics.GPU.Utilization),
|
||||
}
|
||||
|
||||
// Calculate disk usage percent (max across mounts)
|
||||
var maxDiskPercent float64
|
||||
for _, mount := range metrics.Disk.Mounts {
|
||||
if mount.UsedPercent > maxDiskPercent {
|
||||
maxDiskPercent = mount.UsedPercent
|
||||
}
|
||||
}
|
||||
report.DiskPercent = maxDiskPercent
|
||||
|
||||
// Try to use stream first, fall back to heartbeat
|
||||
if err := c.sendViaStream(ctx, report); err != nil {
|
||||
log.Printf("Stream send failed, using heartbeat: %v", err)
|
||||
return c.sendViaHeartbeat(ctx, agentID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) sendViaStream(ctx context.Context, report *pb.MetricsReport) error {
|
||||
c.streamMu.Lock()
|
||||
defer c.streamMu.Unlock()
|
||||
|
||||
// Establish stream if not exists
|
||||
if c.stream == nil {
|
||||
stream, err := c.client.Stream(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create stream: %w", err)
|
||||
}
|
||||
c.stream = stream
|
||||
|
||||
// Start goroutine to handle server messages
|
||||
go c.handleServerMessages()
|
||||
}
|
||||
|
||||
// Send metrics via stream
|
||||
msg := &pb.AgentMessage{
|
||||
Payload: &pb.AgentMessage_Metrics{
|
||||
Metrics: report,
|
||||
},
|
||||
}
|
||||
|
||||
if err := c.stream.Send(msg); err != nil {
|
||||
c.stream = nil // Reset stream on error
|
||||
return fmt.Errorf("failed to send metrics: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) sendViaHeartbeat(ctx context.Context, agentID string) error {
|
||||
req := &pb.HeartbeatRequest{
|
||||
AgentId: agentID,
|
||||
TimestampMs: time.Now().UnixMilli(),
|
||||
}
|
||||
|
||||
_, err := c.client.Heartbeat(ctx, req)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) handleServerMessages() {
|
||||
for {
|
||||
c.streamMu.Lock()
|
||||
stream := c.stream
|
||||
c.streamMu.Unlock()
|
||||
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
log.Printf("Stream receive error: %v", err)
|
||||
c.streamMu.Lock()
|
||||
c.stream = nil
|
||||
c.streamMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
c.handleMessage(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) handleMessage(msg *pb.ServerMessage) {
|
||||
switch payload := msg.Payload.(type) {
|
||||
case *pb.ServerMessage_Ack:
|
||||
// Message acknowledged
|
||||
if !payload.Ack.Success {
|
||||
log.Printf("Server error: %s", payload.Ack.Error)
|
||||
}
|
||||
|
||||
case *pb.ServerMessage_Config:
|
||||
log.Printf("Received config update: interval=%ds",
|
||||
payload.Config.CollectionIntervalSeconds)
|
||||
// TODO: Apply config update
|
||||
|
||||
case *pb.ServerMessage_Command:
|
||||
c.handleCommand(payload.Command)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) handleCommand(cmd *pb.Command) {
|
||||
switch cmd.Type {
|
||||
case pb.CommandType_COMMAND_TYPE_COLLECT_NOW:
|
||||
log.Println("Received collect-now command")
|
||||
// TODO: Trigger immediate collection
|
||||
|
||||
case pb.CommandType_COMMAND_TYPE_DISCONNECT:
|
||||
log.Println("Received disconnect command")
|
||||
c.Close()
|
||||
|
||||
default:
|
||||
log.Printf("Unknown command type: %v", cmd.Type)
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the connection to the server.
|
||||
func (c *Client) Close() error {
|
||||
c.connected = false
|
||||
|
||||
c.streamMu.Lock()
|
||||
if c.stream != nil {
|
||||
c.stream.CloseSend()
|
||||
c.stream = nil
|
||||
}
|
||||
c.streamMu.Unlock()
|
||||
|
||||
if c.conn != nil {
|
||||
return c.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsConnected returns true if connected to the server.
|
||||
func (c *Client) IsConnected() bool {
|
||||
return c.connected
|
||||
}
|
||||
@@ -242,6 +242,63 @@ func Load() *Config {
|
||||
return cfg
|
||||
}
|
||||
|
||||
// DefaultConfig returns a config with default values.
|
||||
func DefaultConfig() *Config {
|
||||
return &Config{
|
||||
Mode: ModeStandalone,
|
||||
Port: "8080",
|
||||
RefreshSeconds: 5,
|
||||
ProcPath: "/proc",
|
||||
SysPath: "/sys",
|
||||
MtabPath: "/etc/mtab",
|
||||
DockerSock: "/var/run/docker.sock",
|
||||
Alerts: AlertConfig{
|
||||
CPUThreshold: 90.0,
|
||||
MemoryThreshold: 90.0,
|
||||
DiskThreshold: 90.0,
|
||||
TempThreshold: 80.0,
|
||||
},
|
||||
Server: ServerConfig{
|
||||
GRPCPort: 9849,
|
||||
Registration: RegistrationConfig{
|
||||
AutoEnabled: true,
|
||||
RequireApproval: true,
|
||||
},
|
||||
},
|
||||
Agent: AgentConfig{
|
||||
Interval: 5 * time.Second,
|
||||
},
|
||||
Database: DatabaseConfig{
|
||||
Type: "sqlite",
|
||||
SQLitePath: "/var/lib/tyto/tyto.db",
|
||||
},
|
||||
RefreshInterval: 5 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
// LoadFromPath loads configuration from a specific file path.
|
||||
func LoadFromPath(path string) (*Config, error) {
|
||||
cfg := DefaultConfig()
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := yaml.Unmarshal(data, cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse intervals from seconds
|
||||
cfg.RefreshInterval = time.Duration(cfg.RefreshSeconds) * time.Second
|
||||
cfg.Agent.Interval = time.Duration(cfg.Agent.IntervalSeconds) * time.Second
|
||||
if cfg.Agent.Interval == 0 {
|
||||
cfg.Agent.Interval = 5 * time.Second
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// IsStandalone returns true if running in standalone mode.
|
||||
func (c *Config) IsStandalone() bool {
|
||||
return c.Mode == ModeStandalone || c.Mode == ""
|
||||
|
||||
1173
backend/internal/proto/tyto.pb.go
Normal file
1173
backend/internal/proto/tyto.pb.go
Normal file
File diff suppressed because it is too large
Load Diff
202
backend/internal/proto/tyto_grpc.pb.go
Normal file
202
backend/internal/proto/tyto_grpc.pb.go
Normal file
@@ -0,0 +1,202 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.6.0
|
||||
// - protoc v6.33.1
|
||||
// source: tyto.proto
|
||||
|
||||
package proto
|
||||
|
||||
import (
|
||||
context "context"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.64.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion9
|
||||
|
||||
const (
|
||||
AgentService_Stream_FullMethodName = "/tyto.AgentService/Stream"
|
||||
AgentService_Register_FullMethodName = "/tyto.AgentService/Register"
|
||||
AgentService_Heartbeat_FullMethodName = "/tyto.AgentService/Heartbeat"
|
||||
)
|
||||
|
||||
// AgentServiceClient is the client API for AgentService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
//
|
||||
// AgentService handles communication between agents and the central server.
|
||||
type AgentServiceClient interface {
|
||||
// Stream establishes a bidirectional stream for metrics and control messages.
|
||||
Stream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentMessage, ServerMessage], error)
|
||||
// Register allows an agent to request registration with the server.
|
||||
Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error)
|
||||
// Heartbeat is a simple health check (alternative to streaming).
|
||||
Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error)
|
||||
}
|
||||
|
||||
type agentServiceClient struct {
|
||||
cc grpc.ClientConnInterface
|
||||
}
|
||||
|
||||
func NewAgentServiceClient(cc grpc.ClientConnInterface) AgentServiceClient {
|
||||
return &agentServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *agentServiceClient) Stream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[AgentMessage, ServerMessage], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &AgentService_ServiceDesc.Streams[0], AgentService_Stream_FullMethodName, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &grpc.GenericClientStream[AgentMessage, ServerMessage]{ClientStream: stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type AgentService_StreamClient = grpc.BidiStreamingClient[AgentMessage, ServerMessage]
|
||||
|
||||
func (c *agentServiceClient) Register(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (*RegisterResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(RegisterResponse)
|
||||
err := c.cc.Invoke(ctx, AgentService_Register_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *agentServiceClient) Heartbeat(ctx context.Context, in *HeartbeatRequest, opts ...grpc.CallOption) (*HeartbeatResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(HeartbeatResponse)
|
||||
err := c.cc.Invoke(ctx, AgentService_Heartbeat_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// AgentServiceServer is the server API for AgentService service.
|
||||
// All implementations must embed UnimplementedAgentServiceServer
|
||||
// for forward compatibility.
|
||||
//
|
||||
// AgentService handles communication between agents and the central server.
|
||||
type AgentServiceServer interface {
|
||||
// Stream establishes a bidirectional stream for metrics and control messages.
|
||||
Stream(grpc.BidiStreamingServer[AgentMessage, ServerMessage]) error
|
||||
// Register allows an agent to request registration with the server.
|
||||
Register(context.Context, *RegisterRequest) (*RegisterResponse, error)
|
||||
// Heartbeat is a simple health check (alternative to streaming).
|
||||
Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error)
|
||||
mustEmbedUnimplementedAgentServiceServer()
|
||||
}
|
||||
|
||||
// UnimplementedAgentServiceServer must be embedded to have
|
||||
// forward compatible implementations.
|
||||
//
|
||||
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||
// pointer dereference when methods are called.
|
||||
type UnimplementedAgentServiceServer struct{}
|
||||
|
||||
func (UnimplementedAgentServiceServer) Stream(grpc.BidiStreamingServer[AgentMessage, ServerMessage]) error {
|
||||
return status.Error(codes.Unimplemented, "method Stream not implemented")
|
||||
}
|
||||
func (UnimplementedAgentServiceServer) Register(context.Context, *RegisterRequest) (*RegisterResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method Register not implemented")
|
||||
}
|
||||
func (UnimplementedAgentServiceServer) Heartbeat(context.Context, *HeartbeatRequest) (*HeartbeatResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "method Heartbeat not implemented")
|
||||
}
|
||||
func (UnimplementedAgentServiceServer) mustEmbedUnimplementedAgentServiceServer() {}
|
||||
func (UnimplementedAgentServiceServer) testEmbeddedByValue() {}
|
||||
|
||||
// UnsafeAgentServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to AgentServiceServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeAgentServiceServer interface {
|
||||
mustEmbedUnimplementedAgentServiceServer()
|
||||
}
|
||||
|
||||
func RegisterAgentServiceServer(s grpc.ServiceRegistrar, srv AgentServiceServer) {
|
||||
// If the following call panics, it indicates UnimplementedAgentServiceServer was
|
||||
// embedded by pointer and is nil. This will cause panics if an
|
||||
// unimplemented method is ever invoked, so we test this at initialization
|
||||
// time to prevent it from happening at runtime later due to I/O.
|
||||
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||
t.testEmbeddedByValue()
|
||||
}
|
||||
s.RegisterService(&AgentService_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _AgentService_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(AgentServiceServer).Stream(&grpc.GenericServerStream[AgentMessage, ServerMessage]{ServerStream: stream})
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type AgentService_StreamServer = grpc.BidiStreamingServer[AgentMessage, ServerMessage]
|
||||
|
||||
func _AgentService_Register_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(RegisterRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(AgentServiceServer).Register(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: AgentService_Register_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(AgentServiceServer).Register(ctx, req.(*RegisterRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _AgentService_Heartbeat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(HeartbeatRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(AgentServiceServer).Heartbeat(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: AgentService_Heartbeat_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(AgentServiceServer).Heartbeat(ctx, req.(*HeartbeatRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
// AgentService_ServiceDesc is the grpc.ServiceDesc for AgentService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var AgentService_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "tyto.AgentService",
|
||||
HandlerType: (*AgentServiceServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "Register",
|
||||
Handler: _AgentService_Register_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Heartbeat",
|
||||
Handler: _AgentService_Heartbeat_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "Stream",
|
||||
Handler: _AgentService_Stream_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "tyto.proto",
|
||||
}
|
||||
129
backend/proto/tyto.proto
Normal file
129
backend/proto/tyto.proto
Normal file
@@ -0,0 +1,129 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package tyto;
|
||||
|
||||
option go_package = "tyto/internal/proto";
|
||||
|
||||
// AgentService handles communication between agents and the central server.
|
||||
service AgentService {
|
||||
// Stream establishes a bidirectional stream for metrics and control messages.
|
||||
rpc Stream(stream AgentMessage) returns (stream ServerMessage);
|
||||
|
||||
// Register allows an agent to request registration with the server.
|
||||
rpc Register(RegisterRequest) returns (RegisterResponse);
|
||||
|
||||
// Heartbeat is a simple health check (alternative to streaming).
|
||||
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
|
||||
}
|
||||
|
||||
// AgentMessage is sent from agent to server.
|
||||
message AgentMessage {
|
||||
oneof payload {
|
||||
MetricsReport metrics = 1;
|
||||
HeartbeatRequest heartbeat = 2;
|
||||
AgentInfo info = 3;
|
||||
}
|
||||
}
|
||||
|
||||
// ServerMessage is sent from server to agent.
|
||||
message ServerMessage {
|
||||
oneof payload {
|
||||
ConfigUpdate config = 1;
|
||||
Ack ack = 2;
|
||||
Command command = 3;
|
||||
}
|
||||
}
|
||||
|
||||
// MetricsReport contains collected metrics from an agent.
|
||||
message MetricsReport {
|
||||
string agent_id = 1;
|
||||
int64 timestamp_ms = 2;
|
||||
bytes metrics_json = 3; // AllMetrics serialized as JSON
|
||||
|
||||
// Optional individual metrics for server-side aggregation
|
||||
double cpu_usage = 4;
|
||||
double memory_percent = 5;
|
||||
double disk_percent = 6;
|
||||
int32 gpu_utilization = 7;
|
||||
}
|
||||
|
||||
// HeartbeatRequest is a lightweight keep-alive message.
|
||||
message HeartbeatRequest {
|
||||
string agent_id = 1;
|
||||
int64 timestamp_ms = 2;
|
||||
int64 uptime_seconds = 3;
|
||||
}
|
||||
|
||||
// HeartbeatResponse acknowledges the heartbeat.
|
||||
message HeartbeatResponse {
|
||||
int64 server_time_ms = 1;
|
||||
bool config_changed = 2;
|
||||
}
|
||||
|
||||
// AgentInfo describes the agent's system.
|
||||
message AgentInfo {
|
||||
string agent_id = 1;
|
||||
string hostname = 2;
|
||||
string os = 3;
|
||||
string architecture = 4;
|
||||
string version = 5;
|
||||
repeated string capabilities = 6; // e.g., "gpu", "docker", "systemd"
|
||||
}
|
||||
|
||||
// RegisterRequest is sent when an agent first connects.
|
||||
message RegisterRequest {
|
||||
string agent_id = 1;
|
||||
AgentInfo info = 2;
|
||||
string token = 3; // Optional registration token
|
||||
}
|
||||
|
||||
// RegisterResponse indicates if registration was accepted.
|
||||
message RegisterResponse {
|
||||
bool accepted = 1;
|
||||
string message = 2;
|
||||
RegisterStatus status = 3;
|
||||
AgentConfig config = 4; // Initial config if accepted
|
||||
}
|
||||
|
||||
enum RegisterStatus {
|
||||
REGISTER_STATUS_UNKNOWN = 0;
|
||||
REGISTER_STATUS_ACCEPTED = 1;
|
||||
REGISTER_STATUS_PENDING_APPROVAL = 2;
|
||||
REGISTER_STATUS_REJECTED = 3;
|
||||
REGISTER_STATUS_ALREADY_REGISTERED = 4;
|
||||
}
|
||||
|
||||
// ConfigUpdate pushes configuration changes to an agent.
|
||||
message ConfigUpdate {
|
||||
int32 collection_interval_seconds = 1;
|
||||
repeated string enabled_collectors = 2;
|
||||
map<string, string> settings = 3;
|
||||
}
|
||||
|
||||
// AgentConfig is the full agent configuration.
|
||||
message AgentConfig {
|
||||
int32 collection_interval_seconds = 1;
|
||||
repeated string enabled_collectors = 2;
|
||||
map<string, string> settings = 3;
|
||||
}
|
||||
|
||||
// Ack acknowledges receipt of a message.
|
||||
message Ack {
|
||||
int64 message_id = 1;
|
||||
bool success = 2;
|
||||
string error = 3;
|
||||
}
|
||||
|
||||
// Command is sent from server to agent.
|
||||
message Command {
|
||||
CommandType type = 1;
|
||||
map<string, string> params = 2;
|
||||
}
|
||||
|
||||
enum CommandType {
|
||||
COMMAND_TYPE_UNKNOWN = 0;
|
||||
COMMAND_TYPE_COLLECT_NOW = 1; // Force immediate collection
|
||||
COMMAND_TYPE_RESTART = 2; // Restart the agent
|
||||
COMMAND_TYPE_UPDATE_CONFIG = 3; // Apply new config
|
||||
COMMAND_TYPE_DISCONNECT = 4; // Graceful disconnect
|
||||
}
|
||||
Reference in New Issue
Block a user