Files
tyto/backend/internal/server/bridge.go
vikingowl 80f6e788f4 feat: implement server hub for multi-device agent management
Server Package (internal/server/):
- Registry: Agent registration with approval workflow, persistence
- Hub: Connection manager for connected agents, message routing
- GRPCServer: mTLS-enabled gRPC server with interceptors
- SSEBridge: Bridges agent metrics to browser SSE clients

Registry Features:
- JSON file-based persistence
- Agent lifecycle: pending -> approved -> connected -> offline
- Revocation support for certificate-based agent removal
- Automatic last-seen tracking

Hub Features:
- Bidirectional gRPC stream handling
- MetricsSubscriber interface for metric distribution
- Stale connection detection and cleanup
- Broadcast and per-agent command sending

gRPC Server:
- Unary and stream interceptors for auth
- Agent ID extraction from mTLS certificates
- Delegation to Hub for business logic

Agent Management API:
- GET/DELETE /api/v1/agents - List/remove agents
- GET /api/v1/agents/pending - Pending approvals
- POST /api/v1/agents/pending/:id/approve|reject
- GET /api/v1/agents/:id/metrics - Latest agent metrics
- GET /api/v1/agents/connected - Connected agents

Server Mode Startup:
- Full initialization of registry, hub, gRPC, SSE bridge
- Graceful shutdown with signal handling
- Agent mode now uses the agent package

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 07:53:23 +01:00

276 lines
5.8 KiB
Go

package server
import (
"encoding/json"
"log"
"sync"
"time"
"tyto/internal/models"
pb "tyto/internal/proto"
)
// SSEBridge bridges the Hub to SSE clients for multi-device monitoring.
// It implements MetricsSubscriber to receive agent updates.
type SSEBridge struct {
hub *Hub
clients map[chan []byte]bool
mu sync.RWMutex
broadcast chan []byte
// Cache of latest metrics per device
metricsCache map[string]*DeviceMetrics
cacheMu sync.RWMutex
stopCh chan struct{}
wg sync.WaitGroup
}
// DeviceMetrics wraps metrics with device info.
type DeviceMetrics struct {
DeviceID string `json:"deviceId"`
Hostname string `json:"hostname,omitempty"`
Status string `json:"status"`
LastUpdated time.Time `json:"lastUpdated"`
Metrics *models.AllMetrics `json:"metrics"`
}
// MultiDeviceSnapshot is sent to SSE clients.
type MultiDeviceSnapshot struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
Devices map[string]*DeviceMetrics `json:"devices"`
}
// DeviceUpdate is sent when a single device's metrics change.
type DeviceUpdate struct {
Type string `json:"type"`
DeviceID string `json:"deviceId"`
Status string `json:"status"`
Metrics *models.AllMetrics `json:"metrics"`
}
// DeviceStatusChange is sent when a device connects/disconnects.
type DeviceStatusChange struct {
Type string `json:"type"`
DeviceID string `json:"deviceId"`
Status string `json:"status"`
Hostname string `json:"hostname,omitempty"`
}
// NewSSEBridge creates a new SSE bridge.
func NewSSEBridge(hub *Hub) *SSEBridge {
bridge := &SSEBridge{
hub: hub,
clients: make(map[chan []byte]bool),
broadcast: make(chan []byte, 256),
metricsCache: make(map[string]*DeviceMetrics),
stopCh: make(chan struct{}),
}
// Subscribe to hub events
hub.Subscribe(bridge)
return bridge
}
// Start begins the broadcast loop.
func (b *SSEBridge) Start() {
b.wg.Add(1)
go b.broadcastLoop()
}
// Stop stops the bridge.
func (b *SSEBridge) Stop() {
close(b.stopCh)
b.wg.Wait()
}
func (b *SSEBridge) broadcastLoop() {
defer b.wg.Done()
for {
select {
case <-b.stopCh:
return
case data := <-b.broadcast:
b.mu.RLock()
for client := range b.clients {
select {
case client <- data:
default:
// Client buffer full, skip
}
}
b.mu.RUnlock()
}
}
}
// Register adds an SSE client.
func (b *SSEBridge) Register(client chan []byte) {
b.mu.Lock()
b.clients[client] = true
b.mu.Unlock()
// Send current snapshot to new client
go b.sendSnapshot(client)
}
// Unregister removes an SSE client.
func (b *SSEBridge) Unregister(client chan []byte) {
b.mu.Lock()
delete(b.clients, client)
b.mu.Unlock()
}
func (b *SSEBridge) sendSnapshot(client chan []byte) {
b.cacheMu.RLock()
snapshot := MultiDeviceSnapshot{
Type: "snapshot",
Timestamp: time.Now().Format(time.RFC3339),
Devices: b.metricsCache,
}
b.cacheMu.RUnlock()
data, err := json.Marshal(snapshot)
if err != nil {
log.Printf("Failed to marshal snapshot: %v", err)
return
}
select {
case client <- data:
default:
}
}
// MetricsSubscriber implementation
// OnAgentMetrics is called when an agent sends metrics.
func (b *SSEBridge) OnAgentMetrics(agentID string, metrics *models.AllMetrics) {
// Update cache
b.cacheMu.Lock()
if existing, ok := b.metricsCache[agentID]; ok {
existing.Metrics = metrics
existing.LastUpdated = time.Now()
existing.Status = "online"
} else {
b.metricsCache[agentID] = &DeviceMetrics{
DeviceID: agentID,
Status: "online",
LastUpdated: time.Now(),
Metrics: metrics,
}
}
b.cacheMu.Unlock()
// Broadcast update
update := DeviceUpdate{
Type: "update",
DeviceID: agentID,
Status: "online",
Metrics: metrics,
}
data, err := json.Marshal(update)
if err != nil {
log.Printf("Failed to marshal update: %v", err)
return
}
select {
case b.broadcast <- data:
default:
log.Println("Broadcast buffer full, dropping update")
}
}
// OnAgentConnected is called when an agent connects.
func (b *SSEBridge) OnAgentConnected(agentID string, info *pb.AgentInfo) {
hostname := ""
if info != nil {
hostname = info.Hostname
}
// Update cache
b.cacheMu.Lock()
if existing, ok := b.metricsCache[agentID]; ok {
existing.Status = "online"
existing.Hostname = hostname
} else {
b.metricsCache[agentID] = &DeviceMetrics{
DeviceID: agentID,
Hostname: hostname,
Status: "online",
}
}
b.cacheMu.Unlock()
// Broadcast status change
change := DeviceStatusChange{
Type: "connected",
DeviceID: agentID,
Status: "online",
Hostname: hostname,
}
data, err := json.Marshal(change)
if err != nil {
return
}
select {
case b.broadcast <- data:
default:
}
}
// OnAgentDisconnected is called when an agent disconnects.
func (b *SSEBridge) OnAgentDisconnected(agentID string) {
// Update cache
b.cacheMu.Lock()
if existing, ok := b.metricsCache[agentID]; ok {
existing.Status = "offline"
}
b.cacheMu.Unlock()
// Broadcast status change
change := DeviceStatusChange{
Type: "disconnected",
DeviceID: agentID,
Status: "offline",
}
data, err := json.Marshal(change)
if err != nil {
return
}
select {
case b.broadcast <- data:
default:
}
}
// GetSnapshot returns the current metrics snapshot.
func (b *SSEBridge) GetSnapshot() map[string]*DeviceMetrics {
b.cacheMu.RLock()
defer b.cacheMu.RUnlock()
snapshot := make(map[string]*DeviceMetrics, len(b.metricsCache))
for k, v := range b.metricsCache {
snapshot[k] = v
}
return snapshot
}
// GetDeviceMetrics returns metrics for a specific device.
func (b *SSEBridge) GetDeviceMetrics(deviceID string) (*DeviceMetrics, bool) {
b.cacheMu.RLock()
defer b.cacheMu.RUnlock()
metrics, ok := b.metricsCache[deviceID]
return metrics, ok
}