Files
tyto/backend/internal/database/postgres.go
vikingowl c0e678931d feat: add database layer with SQLite and PostgreSQL support
Database Package (internal/database/):
- Database interface abstraction for multiple backends
- SQLite implementation with pure Go driver (no CGO)
- PostgreSQL implementation with connection pooling
- Factory pattern for creating database from config
- Tiered retention with automatic aggregation:
  - Raw metrics: 24h (5s resolution)
  - 1-minute aggregation: 7 days
  - 5-minute aggregation: 30 days
  - Hourly aggregation: 1 year

Schema includes:
- agents: registration, status, certificates
- users: local + LDAP authentication
- roles: RBAC with permissions JSON
- sessions: token-based authentication
- metrics_*: time-series with aggregation
- alerts: triggered alerts with acknowledgment

Configuration Updates:
- DatabaseConfig with SQLite path and PostgreSQL settings
- RetentionConfig for customizing data retention
- Environment variables: TYTO_DB_*, TYTO_DB_CONNECTION_STRING
- Default SQLite at /var/lib/tyto/tyto.db

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 08:18:48 +01:00

1053 lines
32 KiB
Go

// Package database provides PostgreSQL implementation of the Database interface.
package database
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"tyto/internal/models"
"github.com/lib/pq"
)
// PostgresDB implements the Database interface using PostgreSQL.
type PostgresDB struct {
db *sql.DB
retention RetentionConfig
}
// NewPostgresDB creates a new PostgreSQL database connection.
func NewPostgresDB(connStr string, retention RetentionConfig) (*PostgresDB, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, fmt.Errorf("open postgres: %w", err)
}
// Configure connection pool
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(5 * time.Minute)
// Verify connection
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("ping postgres: %w", err)
}
return &PostgresDB{
db: db,
retention: retention,
}, nil
}
// Close closes the database connection.
func (p *PostgresDB) Close() error {
return p.db.Close()
}
// Migrate runs database migrations.
func (p *PostgresDB) Migrate() error {
migrations := []string{
pgMigrationAgents,
pgMigrationUsers,
pgMigrationRoles,
pgMigrationSessions,
pgMigrationMetrics,
pgMigrationAlerts,
}
for i, m := range migrations {
if _, err := p.db.Exec(m); err != nil {
return fmt.Errorf("migration %d: %w", i+1, err)
}
}
return p.insertDefaultRoles()
}
// PostgreSQL migration statements
const pgMigrationAgents = `
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT,
hostname TEXT NOT NULL,
os TEXT NOT NULL,
architecture TEXT NOT NULL,
version TEXT NOT NULL,
capabilities TEXT[] DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'pending',
cert_serial TEXT,
cert_expiry TIMESTAMPTZ,
last_seen TIMESTAMPTZ,
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
tags TEXT[] DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_agents_status ON agents(status);
`
const pgMigrationUsers = `
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT,
password_hash BYTEA,
auth_provider TEXT NOT NULL DEFAULT 'local',
ldap_dn TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_login TIMESTAMPTZ,
disabled BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
`
const pgMigrationRoles = `
CREATE TABLE IF NOT EXISTS roles (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
description TEXT,
permissions JSONB NOT NULL DEFAULT '[]',
is_system BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS user_roles (
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
role_id TEXT NOT NULL REFERENCES roles(id) ON DELETE CASCADE,
PRIMARY KEY (user_id, role_id)
);
`
const pgMigrationSessions = `
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
ip_address TEXT,
user_agent TEXT
);
CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);
`
const pgMigrationMetrics = `
-- Raw metrics (high resolution, short retention)
CREATE TABLE IF NOT EXISTS metrics_raw (
id BIGSERIAL PRIMARY KEY,
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL,
data JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_metrics_raw_agent_time ON metrics_raw(agent_id, timestamp);
-- 1-minute aggregations
CREATE TABLE IF NOT EXISTS metrics_1min (
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL,
cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION,
mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION,
disk_avg DOUBLE PRECISION,
gpu_avg DOUBLE PRECISION,
sample_count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (agent_id, timestamp)
);
-- 5-minute aggregations
CREATE TABLE IF NOT EXISTS metrics_5min (
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL,
cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION,
mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION,
disk_avg DOUBLE PRECISION,
gpu_avg DOUBLE PRECISION,
sample_count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (agent_id, timestamp)
);
-- Hourly aggregations
CREATE TABLE IF NOT EXISTS metrics_hourly (
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL,
cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION,
mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION,
disk_avg DOUBLE PRECISION,
gpu_avg DOUBLE PRECISION,
sample_count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (agent_id, timestamp)
);
-- Create hypertable-like partitioning hint (for future TimescaleDB upgrade)
-- COMMENT ON TABLE metrics_raw IS 'timescale:hypertable';
`
const pgMigrationAlerts = `
CREATE TABLE IF NOT EXISTS alerts (
id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
threshold DOUBLE PRECISION NOT NULL,
triggered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ,
acknowledged BOOLEAN NOT NULL DEFAULT FALSE
);
CREATE INDEX IF NOT EXISTS idx_alerts_agent ON alerts(agent_id);
CREATE INDEX IF NOT EXISTS idx_alerts_triggered ON alerts(triggered_at);
CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity);
`
func (p *PostgresDB) insertDefaultRoles() error {
defaultRoles := []struct {
id, name, desc string
perms []string
}{
{"admin", "Administrator", "Full system access", []string{"*"}},
{"operator", "Operator", "Manage agents and alerts", []string{
"dashboard:view", "agents:view", "agents:manage",
"alerts:view", "alerts:acknowledge", "alerts:configure",
"metrics:export", "metrics:query",
}},
{"viewer", "Viewer", "Read-only access", []string{
"dashboard:view", "agents:view", "alerts:view",
}},
}
for _, r := range defaultRoles {
perms, _ := json.Marshal(r.perms)
_, err := p.db.Exec(`
INSERT INTO roles (id, name, description, permissions, is_system)
VALUES ($1, $2, $3, $4, TRUE)
ON CONFLICT (id) DO NOTHING
`, r.id, r.name, r.desc, perms)
if err != nil {
return fmt.Errorf("insert role %s: %w", r.id, err)
}
}
return nil
}
// ============================================================================
// Metrics Storage
// ============================================================================
// StoreMetrics stores raw metrics from an agent.
func (p *PostgresDB) StoreMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error {
data, err := json.Marshal(metrics)
if err != nil {
return fmt.Errorf("marshal metrics: %w", err)
}
_, err = p.db.ExecContext(ctx, `
INSERT INTO metrics_raw (agent_id, timestamp, data)
VALUES ($1, $2, $3)
`, agentID, time.Now().UTC(), data)
return err
}
// QueryMetrics queries metrics with the specified resolution.
func (p *PostgresDB) QueryMetrics(ctx context.Context, agentID string, from, to time.Time, resolution string) ([]MetricPoint, error) {
var table string
switch resolution {
case "raw":
return p.queryRawMetrics(ctx, agentID, from, to)
case "1min":
table = "metrics_1min"
case "5min":
table = "metrics_5min"
case "hourly":
table = "metrics_hourly"
default:
table = p.selectResolution(from, to)
}
query := fmt.Sprintf(`
SELECT timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg
FROM %s
WHERE agent_id = $1 AND timestamp >= $2 AND timestamp <= $3
ORDER BY timestamp ASC
`, table)
rows, err := p.db.QueryContext(ctx, query, agentID, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var points []MetricPoint
for rows.Next() {
var pt MetricPoint
pt.AgentID = agentID
var diskAvg, gpuAvg sql.NullFloat64
err := rows.Scan(&pt.Timestamp, &pt.CPUAvg, &pt.CPUMin, &pt.CPUMax,
&pt.MemoryAvg, &pt.MemoryMin, &pt.MemoryMax, &diskAvg, &gpuAvg)
if err != nil {
return nil, err
}
if diskAvg.Valid {
pt.DiskAvg = diskAvg.Float64
}
if gpuAvg.Valid {
pt.GPUAvg = gpuAvg.Float64
}
points = append(points, pt)
}
return points, rows.Err()
}
func (p *PostgresDB) queryRawMetrics(ctx context.Context, agentID string, from, to time.Time) ([]MetricPoint, error) {
rows, err := p.db.QueryContext(ctx, `
SELECT timestamp, data FROM metrics_raw
WHERE agent_id = $1 AND timestamp >= $2 AND timestamp <= $3
ORDER BY timestamp ASC
`, agentID, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var points []MetricPoint
for rows.Next() {
var ts time.Time
var data []byte
if err := rows.Scan(&ts, &data); err != nil {
return nil, err
}
var m models.AllMetrics
if err := json.Unmarshal(data, &m); err != nil {
continue
}
pt := MetricPoint{Timestamp: ts, AgentID: agentID}
pt.CPUAvg = m.CPU.TotalUsage
pt.CPUMin = m.CPU.TotalUsage
pt.CPUMax = m.CPU.TotalUsage
if m.Memory.Total > 0 {
usedPct := float64(m.Memory.Used) / float64(m.Memory.Total) * 100
pt.MemoryAvg = usedPct
pt.MemoryMin = usedPct
pt.MemoryMax = usedPct
}
if len(m.Disk.Mounts) > 0 {
var totalUsed, totalTotal uint64
for _, d := range m.Disk.Mounts {
totalUsed += d.Used
totalTotal += d.Total
}
if totalTotal > 0 {
pt.DiskAvg = float64(totalUsed) / float64(totalTotal) * 100
}
}
if m.GPU.Available {
pt.GPUAvg = float64(m.GPU.Utilization)
}
points = append(points, pt)
}
return points, rows.Err()
}
func (p *PostgresDB) selectResolution(from, to time.Time) string {
duration := to.Sub(from)
switch {
case duration <= 2*time.Hour:
return "metrics_1min"
case duration <= 24*time.Hour:
return "metrics_5min"
default:
return "metrics_hourly"
}
}
// GetLatestMetrics returns the most recent metrics for an agent.
func (p *PostgresDB) GetLatestMetrics(ctx context.Context, agentID string) (*models.AllMetrics, error) {
var data []byte
err := p.db.QueryRowContext(ctx, `
SELECT data FROM metrics_raw
WHERE agent_id = $1
ORDER BY timestamp DESC
LIMIT 1
`, agentID).Scan(&data)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
var m models.AllMetrics
if err := json.Unmarshal(data, &m); err != nil {
return nil, err
}
return &m, nil
}
// ============================================================================
// Agents
// ============================================================================
// StoreAgent stores or updates an agent record.
func (p *PostgresDB) StoreAgent(ctx context.Context, agent *Agent) error {
_, err := p.db.ExecContext(ctx, `
INSERT INTO agents (id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
hostname = EXCLUDED.hostname,
os = EXCLUDED.os,
architecture = EXCLUDED.architecture,
version = EXCLUDED.version,
capabilities = EXCLUDED.capabilities,
status = EXCLUDED.status,
cert_serial = EXCLUDED.cert_serial,
cert_expiry = EXCLUDED.cert_expiry,
last_seen = EXCLUDED.last_seen,
tags = EXCLUDED.tags
`, agent.ID, agent.Name, agent.Hostname, agent.OS, agent.Architecture, agent.Version,
pq.Array(agent.Capabilities), agent.Status, agent.CertSerial, agent.CertExpiry, agent.LastSeen, agent.RegisteredAt, pq.Array(agent.Tags))
return err
}
// GetAgent retrieves an agent by ID.
func (p *PostgresDB) GetAgent(ctx context.Context, id string) (*Agent, error) {
var agent Agent
var certExpiry, lastSeen sql.NullTime
err := p.db.QueryRowContext(ctx, `
SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags
FROM agents WHERE id = $1
`, id).Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version,
pq.Array(&agent.Capabilities), &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, pq.Array(&agent.Tags))
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
if certExpiry.Valid {
agent.CertExpiry = certExpiry.Time
}
if lastSeen.Valid {
agent.LastSeen = lastSeen.Time
}
return &agent, nil
}
// ListAgents returns all registered agents.
func (p *PostgresDB) ListAgents(ctx context.Context) ([]*Agent, error) {
rows, err := p.db.QueryContext(ctx, `
SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags
FROM agents ORDER BY registered_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var agents []*Agent
for rows.Next() {
var agent Agent
var certExpiry, lastSeen sql.NullTime
err := rows.Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version,
pq.Array(&agent.Capabilities), &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, pq.Array(&agent.Tags))
if err != nil {
return nil, err
}
if certExpiry.Valid {
agent.CertExpiry = certExpiry.Time
}
if lastSeen.Valid {
agent.LastSeen = lastSeen.Time
}
agents = append(agents, &agent)
}
return agents, rows.Err()
}
// UpdateAgentStatus updates an agent's status and last seen time.
func (p *PostgresDB) UpdateAgentStatus(ctx context.Context, id string, status AgentStatus, lastSeen time.Time) error {
result, err := p.db.ExecContext(ctx, `
UPDATE agents SET status = $1, last_seen = $2 WHERE id = $3
`, status, lastSeen, id)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("agent not found: %s", id)
}
return nil
}
// DeleteAgent removes an agent and all its data.
func (p *PostgresDB) DeleteAgent(ctx context.Context, id string) error {
_, err := p.db.ExecContext(ctx, `DELETE FROM agents WHERE id = $1`, id)
return err
}
// ============================================================================
// Users
// ============================================================================
// CreateUser creates a new user.
func (p *PostgresDB) CreateUser(ctx context.Context, user *User) error {
_, err := p.db.ExecContext(ctx, `
INSERT INTO users (id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, disabled)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
`, user.ID, user.Username, user.Email, user.PasswordHash, user.AuthProvider,
user.LDAPDN, user.CreatedAt, user.UpdatedAt, user.Disabled)
return err
}
// GetUser retrieves a user by ID.
func (p *PostgresDB) GetUser(ctx context.Context, id string) (*User, error) {
return p.scanUser(p.db.QueryRowContext(ctx, `
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
FROM users WHERE id = $1
`, id))
}
// GetUserByUsername retrieves a user by username.
func (p *PostgresDB) GetUserByUsername(ctx context.Context, username string) (*User, error) {
return p.scanUser(p.db.QueryRowContext(ctx, `
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
FROM users WHERE username = $1
`, username))
}
func (p *PostgresDB) scanUser(row *sql.Row) (*User, error) {
var user User
var lastLogin sql.NullTime
var email sql.NullString
err := row.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider,
&user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
if email.Valid {
user.Email = email.String
}
if lastLogin.Valid {
user.LastLogin = lastLogin.Time
}
return &user, nil
}
// UpdateUser updates a user record.
func (p *PostgresDB) UpdateUser(ctx context.Context, user *User) error {
_, err := p.db.ExecContext(ctx, `
UPDATE users SET
username = $1, email = $2, password_hash = $3, auth_provider = $4,
ldap_dn = $5, updated_at = $6, last_login = $7, disabled = $8
WHERE id = $9
`, user.Username, user.Email, user.PasswordHash, user.AuthProvider,
user.LDAPDN, time.Now().UTC(), user.LastLogin, user.Disabled, user.ID)
return err
}
// DeleteUser removes a user.
func (p *PostgresDB) DeleteUser(ctx context.Context, id string) error {
_, err := p.db.ExecContext(ctx, `DELETE FROM users WHERE id = $1`, id)
return err
}
// ListUsers returns all users.
func (p *PostgresDB) ListUsers(ctx context.Context) ([]*User, error) {
rows, err := p.db.QueryContext(ctx, `
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
FROM users ORDER BY username
`)
if err != nil {
return nil, err
}
defer rows.Close()
var users []*User
for rows.Next() {
var user User
var lastLogin sql.NullTime
var email sql.NullString
err := rows.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider,
&user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled)
if err != nil {
return nil, err
}
if email.Valid {
user.Email = email.String
}
if lastLogin.Valid {
user.LastLogin = lastLogin.Time
}
users = append(users, &user)
}
return users, rows.Err()
}
// ============================================================================
// Roles
// ============================================================================
// CreateRole creates a new role.
func (p *PostgresDB) CreateRole(ctx context.Context, role *Role) error {
perms, _ := json.Marshal(role.Permissions)
_, err := p.db.ExecContext(ctx, `
INSERT INTO roles (id, name, description, permissions, is_system, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
`, role.ID, role.Name, role.Description, perms, role.IsSystem, role.CreatedAt)
return err
}
// GetRole retrieves a role by ID.
func (p *PostgresDB) GetRole(ctx context.Context, id string) (*Role, error) {
var role Role
var perms []byte
err := p.db.QueryRowContext(ctx, `
SELECT id, name, description, permissions, is_system, created_at
FROM roles WHERE id = $1
`, id).Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
json.Unmarshal(perms, &role.Permissions)
return &role, nil
}
// ListRoles returns all roles.
func (p *PostgresDB) ListRoles(ctx context.Context) ([]*Role, error) {
rows, err := p.db.QueryContext(ctx, `
SELECT id, name, description, permissions, is_system, created_at
FROM roles ORDER BY name
`)
if err != nil {
return nil, err
}
defer rows.Close()
var roles []*Role
for rows.Next() {
var role Role
var perms []byte
if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil {
return nil, err
}
json.Unmarshal(perms, &role.Permissions)
roles = append(roles, &role)
}
return roles, rows.Err()
}
// UpdateRole updates a role.
func (p *PostgresDB) UpdateRole(ctx context.Context, role *Role) error {
perms, _ := json.Marshal(role.Permissions)
_, err := p.db.ExecContext(ctx, `
UPDATE roles SET name = $1, description = $2, permissions = $3
WHERE id = $4 AND is_system = FALSE
`, role.Name, role.Description, perms, role.ID)
return err
}
// DeleteRole removes a custom role.
func (p *PostgresDB) DeleteRole(ctx context.Context, id string) error {
result, err := p.db.ExecContext(ctx, `DELETE FROM roles WHERE id = $1 AND is_system = FALSE`, id)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
return errors.New("cannot delete system role or role not found")
}
return nil
}
// GetUserRoles returns all roles assigned to a user.
func (p *PostgresDB) GetUserRoles(ctx context.Context, userID string) ([]*Role, error) {
rows, err := p.db.QueryContext(ctx, `
SELECT r.id, r.name, r.description, r.permissions, r.is_system, r.created_at
FROM roles r
JOIN user_roles ur ON r.id = ur.role_id
WHERE ur.user_id = $1
`, userID)
if err != nil {
return nil, err
}
defer rows.Close()
var roles []*Role
for rows.Next() {
var role Role
var perms []byte
if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil {
return nil, err
}
json.Unmarshal(perms, &role.Permissions)
roles = append(roles, &role)
}
return roles, rows.Err()
}
// AssignRole assigns a role to a user.
func (p *PostgresDB) AssignRole(ctx context.Context, userID, roleID string) error {
_, err := p.db.ExecContext(ctx, `
INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2)
ON CONFLICT DO NOTHING
`, userID, roleID)
return err
}
// RemoveRole removes a role from a user.
func (p *PostgresDB) RemoveRole(ctx context.Context, userID, roleID string) error {
_, err := p.db.ExecContext(ctx, `
DELETE FROM user_roles WHERE user_id = $1 AND role_id = $2
`, userID, roleID)
return err
}
// ============================================================================
// Sessions
// ============================================================================
// CreateSession creates a new session.
func (p *PostgresDB) CreateSession(ctx context.Context, session *Session) error {
_, err := p.db.ExecContext(ctx, `
INSERT INTO sessions (token, user_id, created_at, expires_at, ip_address, user_agent)
VALUES ($1, $2, $3, $4, $5, $6)
`, session.Token, session.UserID, session.CreatedAt, session.ExpiresAt, session.IPAddress, session.UserAgent)
return err
}
// GetSession retrieves a session by token.
func (p *PostgresDB) GetSession(ctx context.Context, token string) (*Session, error) {
var session Session
err := p.db.QueryRowContext(ctx, `
SELECT token, user_id, created_at, expires_at, ip_address, user_agent
FROM sessions WHERE token = $1 AND expires_at > $2
`, token, time.Now().UTC()).Scan(&session.Token, &session.UserID, &session.CreatedAt,
&session.ExpiresAt, &session.IPAddress, &session.UserAgent)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return &session, nil
}
// DeleteSession removes a session.
func (p *PostgresDB) DeleteSession(ctx context.Context, token string) error {
_, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE token = $1`, token)
return err
}
// DeleteUserSessions removes all sessions for a user.
func (p *PostgresDB) DeleteUserSessions(ctx context.Context, userID string) error {
_, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE user_id = $1`, userID)
return err
}
// CleanupExpiredSessions removes all expired sessions.
func (p *PostgresDB) CleanupExpiredSessions(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE expires_at < $1`, time.Now().UTC())
return err
}
// ============================================================================
// Alerts
// ============================================================================
// StoreAlert stores a new alert.
func (p *PostgresDB) StoreAlert(ctx context.Context, alert *Alert) error {
_, err := p.db.ExecContext(ctx, `
INSERT INTO alerts (id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`, alert.ID, alert.AgentID, alert.Type, alert.Severity, alert.Message,
alert.Value, alert.Threshold, alert.TriggeredAt, alert.ResolvedAt, alert.Acknowledged)
return err
}
// GetAlert retrieves an alert by ID.
func (p *PostgresDB) GetAlert(ctx context.Context, id string) (*Alert, error) {
var alert Alert
var resolvedAt sql.NullTime
err := p.db.QueryRowContext(ctx, `
SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged
FROM alerts WHERE id = $1
`, id).Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message,
&alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
if resolvedAt.Valid {
alert.ResolvedAt = &resolvedAt.Time
}
return &alert, nil
}
// QueryAlerts queries alerts with filters.
func (p *PostgresDB) QueryAlerts(ctx context.Context, filter AlertFilter) ([]*Alert, error) {
var conditions []string
var args []interface{}
argNum := 1
if filter.AgentID != "" {
conditions = append(conditions, fmt.Sprintf("agent_id = $%d", argNum))
args = append(args, filter.AgentID)
argNum++
}
if filter.Type != "" {
conditions = append(conditions, fmt.Sprintf("type = $%d", argNum))
args = append(args, filter.Type)
argNum++
}
if filter.Severity != "" {
conditions = append(conditions, fmt.Sprintf("severity = $%d", argNum))
args = append(args, filter.Severity)
argNum++
}
if filter.Acknowledged != nil {
conditions = append(conditions, fmt.Sprintf("acknowledged = $%d", argNum))
args = append(args, *filter.Acknowledged)
argNum++
}
if !filter.From.IsZero() {
conditions = append(conditions, fmt.Sprintf("triggered_at >= $%d", argNum))
args = append(args, filter.From)
argNum++
}
if !filter.To.IsZero() {
conditions = append(conditions, fmt.Sprintf("triggered_at <= $%d", argNum))
args = append(args, filter.To)
argNum++
}
query := "SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged FROM alerts"
if len(conditions) > 0 {
query += " WHERE " + strings.Join(conditions, " AND ")
}
query += " ORDER BY triggered_at DESC"
if filter.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", filter.Limit)
}
if filter.Offset > 0 {
query += fmt.Sprintf(" OFFSET %d", filter.Offset)
}
rows, err := p.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var alerts []*Alert
for rows.Next() {
var alert Alert
var resolvedAt sql.NullTime
err := rows.Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message,
&alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged)
if err != nil {
return nil, err
}
if resolvedAt.Valid {
alert.ResolvedAt = &resolvedAt.Time
}
alerts = append(alerts, &alert)
}
return alerts, rows.Err()
}
// AcknowledgeAlert marks an alert as acknowledged.
func (p *PostgresDB) AcknowledgeAlert(ctx context.Context, id string) error {
_, err := p.db.ExecContext(ctx, `UPDATE alerts SET acknowledged = TRUE WHERE id = $1`, id)
return err
}
// ============================================================================
// Retention
// ============================================================================
// RunRetention runs the retention policy, aggregating and deleting old data.
func (p *PostgresDB) RunRetention(ctx context.Context) error {
now := time.Now().UTC()
// Aggregate raw -> 1min for data older than raw retention
rawCutoff := now.Add(-p.retention.RawRetention)
if err := p.aggregateRawTo1Min(ctx, rawCutoff); err != nil {
return fmt.Errorf("aggregate raw->1min: %w", err)
}
// Delete raw data older than retention
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_raw WHERE timestamp < $1`, rawCutoff); err != nil {
return fmt.Errorf("delete old raw: %w", err)
}
// Aggregate 1min -> 5min
oneMinCutoff := now.Add(-p.retention.OneMinuteRetention)
if err := p.aggregate1MinTo5Min(ctx, oneMinCutoff); err != nil {
return fmt.Errorf("aggregate 1min->5min: %w", err)
}
// Delete 1min data older than retention
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_1min WHERE timestamp < $1`, oneMinCutoff); err != nil {
return fmt.Errorf("delete old 1min: %w", err)
}
// Aggregate 5min -> hourly
fiveMinCutoff := now.Add(-p.retention.FiveMinuteRetention)
if err := p.aggregate5MinToHourly(ctx, fiveMinCutoff); err != nil {
return fmt.Errorf("aggregate 5min->hourly: %w", err)
}
// Delete 5min data older than retention
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_5min WHERE timestamp < $1`, fiveMinCutoff); err != nil {
return fmt.Errorf("delete old 5min: %w", err)
}
// Delete hourly data older than retention
hourlyCutoff := now.Add(-p.retention.HourlyRetention)
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_hourly WHERE timestamp < $1`, hourlyCutoff); err != nil {
return fmt.Errorf("delete old hourly: %w", err)
}
return nil
}
func (p *PostgresDB) aggregateRawTo1Min(ctx context.Context, before time.Time) error {
// Use CTE to aggregate raw metrics into 1-minute buckets
_, err := p.db.ExecContext(ctx, `
INSERT INTO metrics_1min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
SELECT
agent_id,
date_trunc('minute', timestamp) as ts,
AVG((data->'cpu'->>'usagePercent')::float),
MIN((data->'cpu'->>'usagePercent')::float),
MAX((data->'cpu'->>'usagePercent')::float),
AVG(CASE WHEN (data->'memory'->>'total')::bigint > 0
THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100
ELSE 0 END),
MIN(CASE WHEN (data->'memory'->>'total')::bigint > 0
THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100
ELSE 0 END),
MAX(CASE WHEN (data->'memory'->>'total')::bigint > 0
THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100
ELSE 0 END),
NULL,
NULL,
COUNT(*)
FROM metrics_raw
WHERE timestamp < $1 AND timestamp >= $2
GROUP BY agent_id, ts
ON CONFLICT (agent_id, timestamp) DO UPDATE SET
cpu_avg = EXCLUDED.cpu_avg,
cpu_min = EXCLUDED.cpu_min,
cpu_max = EXCLUDED.cpu_max,
mem_avg = EXCLUDED.mem_avg,
mem_min = EXCLUDED.mem_min,
mem_max = EXCLUDED.mem_max,
sample_count = EXCLUDED.sample_count
`, before, before.Add(-24*time.Hour))
return err
}
func (p *PostgresDB) aggregate1MinTo5Min(ctx context.Context, before time.Time) error {
_, err := p.db.ExecContext(ctx, `
INSERT INTO metrics_5min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
SELECT
agent_id,
date_trunc('hour', timestamp) +
INTERVAL '5 min' * (EXTRACT(MINUTE FROM timestamp)::int / 5) as ts,
AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max),
AVG(mem_avg), MIN(mem_min), MAX(mem_max),
AVG(disk_avg), AVG(gpu_avg),
SUM(sample_count)
FROM metrics_1min
WHERE timestamp < $1
GROUP BY agent_id, ts
ON CONFLICT (agent_id, timestamp) DO UPDATE SET
cpu_avg = EXCLUDED.cpu_avg,
cpu_min = EXCLUDED.cpu_min,
cpu_max = EXCLUDED.cpu_max,
mem_avg = EXCLUDED.mem_avg,
mem_min = EXCLUDED.mem_min,
mem_max = EXCLUDED.mem_max,
disk_avg = EXCLUDED.disk_avg,
gpu_avg = EXCLUDED.gpu_avg,
sample_count = EXCLUDED.sample_count
`, before)
return err
}
func (p *PostgresDB) aggregate5MinToHourly(ctx context.Context, before time.Time) error {
_, err := p.db.ExecContext(ctx, `
INSERT INTO metrics_hourly (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
SELECT
agent_id,
date_trunc('hour', timestamp) as ts,
AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max),
AVG(mem_avg), MIN(mem_min), MAX(mem_max),
AVG(disk_avg), AVG(gpu_avg),
SUM(sample_count)
FROM metrics_5min
WHERE timestamp < $1
GROUP BY agent_id, ts
ON CONFLICT (agent_id, timestamp) DO UPDATE SET
cpu_avg = EXCLUDED.cpu_avg,
cpu_min = EXCLUDED.cpu_min,
cpu_max = EXCLUDED.cpu_max,
mem_avg = EXCLUDED.mem_avg,
mem_min = EXCLUDED.mem_min,
mem_max = EXCLUDED.mem_max,
disk_avg = EXCLUDED.disk_avg,
gpu_avg = EXCLUDED.gpu_avg,
sample_count = EXCLUDED.sample_count
`, before)
return err
}