Server Package (internal/server/): - Registry: Agent registration with approval workflow, persistence - Hub: Connection manager for connected agents, message routing - GRPCServer: mTLS-enabled gRPC server with interceptors - SSEBridge: Bridges agent metrics to browser SSE clients Registry Features: - JSON file-based persistence - Agent lifecycle: pending -> approved -> connected -> offline - Revocation support for certificate-based agent removal - Automatic last-seen tracking Hub Features: - Bidirectional gRPC stream handling - MetricsSubscriber interface for metric distribution - Stale connection detection and cleanup - Broadcast and per-agent command sending gRPC Server: - Unary and stream interceptors for auth - Agent ID extraction from mTLS certificates - Delegation to Hub for business logic Agent Management API: - GET/DELETE /api/v1/agents - List/remove agents - GET /api/v1/agents/pending - Pending approvals - POST /api/v1/agents/pending/:id/approve|reject - GET /api/v1/agents/:id/metrics - Latest agent metrics - GET /api/v1/agents/connected - Connected agents Server Mode Startup: - Full initialization of registry, hub, gRPC, SSE bridge - Graceful shutdown with signal handling - Agent mode now uses the agent package 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
276 lines
5.8 KiB
Go
276 lines
5.8 KiB
Go
package server
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"tyto/internal/models"
|
|
pb "tyto/internal/proto"
|
|
)
|
|
|
|
// SSEBridge bridges the Hub to SSE clients for multi-device monitoring.
|
|
// It implements MetricsSubscriber to receive agent updates.
|
|
type SSEBridge struct {
|
|
hub *Hub
|
|
clients map[chan []byte]bool
|
|
mu sync.RWMutex
|
|
broadcast chan []byte
|
|
|
|
// Cache of latest metrics per device
|
|
metricsCache map[string]*DeviceMetrics
|
|
cacheMu sync.RWMutex
|
|
|
|
stopCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// DeviceMetrics wraps metrics with device info.
|
|
type DeviceMetrics struct {
|
|
DeviceID string `json:"deviceId"`
|
|
Hostname string `json:"hostname,omitempty"`
|
|
Status string `json:"status"`
|
|
LastUpdated time.Time `json:"lastUpdated"`
|
|
Metrics *models.AllMetrics `json:"metrics"`
|
|
}
|
|
|
|
// MultiDeviceSnapshot is sent to SSE clients.
|
|
type MultiDeviceSnapshot struct {
|
|
Type string `json:"type"`
|
|
Timestamp string `json:"timestamp"`
|
|
Devices map[string]*DeviceMetrics `json:"devices"`
|
|
}
|
|
|
|
// DeviceUpdate is sent when a single device's metrics change.
|
|
type DeviceUpdate struct {
|
|
Type string `json:"type"`
|
|
DeviceID string `json:"deviceId"`
|
|
Status string `json:"status"`
|
|
Metrics *models.AllMetrics `json:"metrics"`
|
|
}
|
|
|
|
// DeviceStatusChange is sent when a device connects/disconnects.
|
|
type DeviceStatusChange struct {
|
|
Type string `json:"type"`
|
|
DeviceID string `json:"deviceId"`
|
|
Status string `json:"status"`
|
|
Hostname string `json:"hostname,omitempty"`
|
|
}
|
|
|
|
// NewSSEBridge creates a new SSE bridge.
|
|
func NewSSEBridge(hub *Hub) *SSEBridge {
|
|
bridge := &SSEBridge{
|
|
hub: hub,
|
|
clients: make(map[chan []byte]bool),
|
|
broadcast: make(chan []byte, 256),
|
|
metricsCache: make(map[string]*DeviceMetrics),
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
// Subscribe to hub events
|
|
hub.Subscribe(bridge)
|
|
|
|
return bridge
|
|
}
|
|
|
|
// Start begins the broadcast loop.
|
|
func (b *SSEBridge) Start() {
|
|
b.wg.Add(1)
|
|
go b.broadcastLoop()
|
|
}
|
|
|
|
// Stop stops the bridge.
|
|
func (b *SSEBridge) Stop() {
|
|
close(b.stopCh)
|
|
b.wg.Wait()
|
|
}
|
|
|
|
func (b *SSEBridge) broadcastLoop() {
|
|
defer b.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-b.stopCh:
|
|
return
|
|
case data := <-b.broadcast:
|
|
b.mu.RLock()
|
|
for client := range b.clients {
|
|
select {
|
|
case client <- data:
|
|
default:
|
|
// Client buffer full, skip
|
|
}
|
|
}
|
|
b.mu.RUnlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Register adds an SSE client.
|
|
func (b *SSEBridge) Register(client chan []byte) {
|
|
b.mu.Lock()
|
|
b.clients[client] = true
|
|
b.mu.Unlock()
|
|
|
|
// Send current snapshot to new client
|
|
go b.sendSnapshot(client)
|
|
}
|
|
|
|
// Unregister removes an SSE client.
|
|
func (b *SSEBridge) Unregister(client chan []byte) {
|
|
b.mu.Lock()
|
|
delete(b.clients, client)
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *SSEBridge) sendSnapshot(client chan []byte) {
|
|
b.cacheMu.RLock()
|
|
snapshot := MultiDeviceSnapshot{
|
|
Type: "snapshot",
|
|
Timestamp: time.Now().Format(time.RFC3339),
|
|
Devices: b.metricsCache,
|
|
}
|
|
b.cacheMu.RUnlock()
|
|
|
|
data, err := json.Marshal(snapshot)
|
|
if err != nil {
|
|
log.Printf("Failed to marshal snapshot: %v", err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case client <- data:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// MetricsSubscriber implementation
|
|
|
|
// OnAgentMetrics is called when an agent sends metrics.
|
|
func (b *SSEBridge) OnAgentMetrics(agentID string, metrics *models.AllMetrics) {
|
|
// Update cache
|
|
b.cacheMu.Lock()
|
|
if existing, ok := b.metricsCache[agentID]; ok {
|
|
existing.Metrics = metrics
|
|
existing.LastUpdated = time.Now()
|
|
existing.Status = "online"
|
|
} else {
|
|
b.metricsCache[agentID] = &DeviceMetrics{
|
|
DeviceID: agentID,
|
|
Status: "online",
|
|
LastUpdated: time.Now(),
|
|
Metrics: metrics,
|
|
}
|
|
}
|
|
b.cacheMu.Unlock()
|
|
|
|
// Broadcast update
|
|
update := DeviceUpdate{
|
|
Type: "update",
|
|
DeviceID: agentID,
|
|
Status: "online",
|
|
Metrics: metrics,
|
|
}
|
|
|
|
data, err := json.Marshal(update)
|
|
if err != nil {
|
|
log.Printf("Failed to marshal update: %v", err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case b.broadcast <- data:
|
|
default:
|
|
log.Println("Broadcast buffer full, dropping update")
|
|
}
|
|
}
|
|
|
|
// OnAgentConnected is called when an agent connects.
|
|
func (b *SSEBridge) OnAgentConnected(agentID string, info *pb.AgentInfo) {
|
|
hostname := ""
|
|
if info != nil {
|
|
hostname = info.Hostname
|
|
}
|
|
|
|
// Update cache
|
|
b.cacheMu.Lock()
|
|
if existing, ok := b.metricsCache[agentID]; ok {
|
|
existing.Status = "online"
|
|
existing.Hostname = hostname
|
|
} else {
|
|
b.metricsCache[agentID] = &DeviceMetrics{
|
|
DeviceID: agentID,
|
|
Hostname: hostname,
|
|
Status: "online",
|
|
}
|
|
}
|
|
b.cacheMu.Unlock()
|
|
|
|
// Broadcast status change
|
|
change := DeviceStatusChange{
|
|
Type: "connected",
|
|
DeviceID: agentID,
|
|
Status: "online",
|
|
Hostname: hostname,
|
|
}
|
|
|
|
data, err := json.Marshal(change)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case b.broadcast <- data:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// OnAgentDisconnected is called when an agent disconnects.
|
|
func (b *SSEBridge) OnAgentDisconnected(agentID string) {
|
|
// Update cache
|
|
b.cacheMu.Lock()
|
|
if existing, ok := b.metricsCache[agentID]; ok {
|
|
existing.Status = "offline"
|
|
}
|
|
b.cacheMu.Unlock()
|
|
|
|
// Broadcast status change
|
|
change := DeviceStatusChange{
|
|
Type: "disconnected",
|
|
DeviceID: agentID,
|
|
Status: "offline",
|
|
}
|
|
|
|
data, err := json.Marshal(change)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
select {
|
|
case b.broadcast <- data:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// GetSnapshot returns the current metrics snapshot.
|
|
func (b *SSEBridge) GetSnapshot() map[string]*DeviceMetrics {
|
|
b.cacheMu.RLock()
|
|
defer b.cacheMu.RUnlock()
|
|
|
|
snapshot := make(map[string]*DeviceMetrics, len(b.metricsCache))
|
|
for k, v := range b.metricsCache {
|
|
snapshot[k] = v
|
|
}
|
|
return snapshot
|
|
}
|
|
|
|
// GetDeviceMetrics returns metrics for a specific device.
|
|
func (b *SSEBridge) GetDeviceMetrics(deviceID string) (*DeviceMetrics, bool) {
|
|
b.cacheMu.RLock()
|
|
defer b.cacheMu.RUnlock()
|
|
|
|
metrics, ok := b.metricsCache[deviceID]
|
|
return metrics, ok
|
|
}
|