Files
tyto/backend/internal/database/sqlite.go
vikingowl c0e678931d feat: add database layer with SQLite and PostgreSQL support
Database Package (internal/database/):
- Database interface abstraction for multiple backends
- SQLite implementation with pure Go driver (no CGO)
- PostgreSQL implementation with connection pooling
- Factory pattern for creating database from config
- Tiered retention with automatic aggregation:
  - Raw metrics: 24h (5s resolution)
  - 1-minute aggregation: 7 days
  - 5-minute aggregation: 30 days
  - Hourly aggregation: 1 year

Schema includes:
- agents: registration, status, certificates
- users: local + LDAP authentication
- roles: RBAC with permissions JSON
- sessions: token-based authentication
- metrics_*: time-series with aggregation
- alerts: triggered alerts with acknowledgment

Configuration Updates:
- DatabaseConfig with SQLite path and PostgreSQL settings
- RetentionConfig for customizing data retention
- Environment variables: TYTO_DB_*, TYTO_DB_CONNECTION_STRING
- Default SQLite at /var/lib/tyto/tyto.db

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-28 08:18:48 +01:00

1135 lines
33 KiB
Go

// Package database provides SQLite implementation of the Database interface.
package database
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"tyto/internal/models"
_ "modernc.org/sqlite"
)
// SQLiteDB implements the Database interface using SQLite.
type SQLiteDB struct {
db *sql.DB
retention RetentionConfig
}
// NewSQLiteDB creates a new SQLite database connection.
func NewSQLiteDB(path string, retention RetentionConfig) (*SQLiteDB, error) {
// Enable WAL mode and foreign keys via connection string
dsn := fmt.Sprintf("%s?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)", path)
db, err := sql.Open("sqlite", dsn)
if err != nil {
return nil, fmt.Errorf("open sqlite: %w", err)
}
// Set connection pool settings
db.SetMaxOpenConns(1) // SQLite handles one writer at a time
db.SetMaxIdleConns(1)
db.SetConnMaxLifetime(0) // Keep connection open
// Verify connection
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("ping sqlite: %w", err)
}
return &SQLiteDB{
db: db,
retention: retention,
}, nil
}
// Close closes the database connection.
func (s *SQLiteDB) Close() error {
return s.db.Close()
}
// Migrate runs database migrations.
func (s *SQLiteDB) Migrate() error {
migrations := []string{
migrationAgents,
migrationUsers,
migrationRoles,
migrationSessions,
migrationMetrics,
migrationAlerts,
}
for i, m := range migrations {
if _, err := s.db.Exec(m); err != nil {
return fmt.Errorf("migration %d: %w", i+1, err)
}
}
// Insert default roles if they don't exist
return s.insertDefaultRoles()
}
// SQL migration statements
const migrationAgents = `
CREATE TABLE IF NOT EXISTS agents (
id TEXT PRIMARY KEY,
name TEXT,
hostname TEXT NOT NULL,
os TEXT NOT NULL,
architecture TEXT NOT NULL,
version TEXT NOT NULL,
capabilities TEXT, -- JSON array
status TEXT NOT NULL DEFAULT 'pending',
cert_serial TEXT,
cert_expiry TIMESTAMP,
last_seen TIMESTAMP,
registered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
tags TEXT -- JSON array
);
CREATE INDEX IF NOT EXISTS idx_agents_status ON agents(status);
`
const migrationUsers = `
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT,
password_hash BLOB,
auth_provider TEXT NOT NULL DEFAULT 'local',
ldap_dn TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
disabled INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
`
const migrationRoles = `
CREATE TABLE IF NOT EXISTS roles (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
description TEXT,
permissions TEXT NOT NULL, -- JSON array
is_system INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS user_roles (
user_id TEXT NOT NULL,
role_id TEXT NOT NULL,
PRIMARY KEY (user_id, role_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE
);
`
const migrationSessions = `
CREATE TABLE IF NOT EXISTS sessions (
token TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
ip_address TEXT,
user_agent TEXT,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);
`
const migrationMetrics = `
-- Raw metrics (high resolution, short retention)
CREATE TABLE IF NOT EXISTS metrics_raw (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
data BLOB NOT NULL, -- Compressed JSON
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_metrics_raw_agent_time ON metrics_raw(agent_id, timestamp);
-- 1-minute aggregations
CREATE TABLE IF NOT EXISTS metrics_1min (
agent_id TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
cpu_avg REAL, cpu_min REAL, cpu_max REAL,
mem_avg REAL, mem_min REAL, mem_max REAL,
disk_avg REAL,
gpu_avg REAL,
sample_count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (agent_id, timestamp),
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
);
-- 5-minute aggregations
CREATE TABLE IF NOT EXISTS metrics_5min (
agent_id TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
cpu_avg REAL, cpu_min REAL, cpu_max REAL,
mem_avg REAL, mem_min REAL, mem_max REAL,
disk_avg REAL,
gpu_avg REAL,
sample_count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (agent_id, timestamp),
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
);
-- Hourly aggregations
CREATE TABLE IF NOT EXISTS metrics_hourly (
agent_id TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL,
cpu_avg REAL, cpu_min REAL, cpu_max REAL,
mem_avg REAL, mem_min REAL, mem_max REAL,
disk_avg REAL,
gpu_avg REAL,
sample_count INTEGER NOT NULL DEFAULT 1,
PRIMARY KEY (agent_id, timestamp),
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
);
`
const migrationAlerts = `
CREATE TABLE IF NOT EXISTS alerts (
id TEXT PRIMARY KEY,
agent_id TEXT NOT NULL,
type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
value REAL NOT NULL,
threshold REAL NOT NULL,
triggered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
resolved_at TIMESTAMP,
acknowledged INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_alerts_agent ON alerts(agent_id);
CREATE INDEX IF NOT EXISTS idx_alerts_triggered ON alerts(triggered_at);
CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity);
`
func (s *SQLiteDB) insertDefaultRoles() error {
defaultRoles := []struct {
id, name, desc string
perms []string
}{
{"admin", "Administrator", "Full system access", []string{"*"}},
{"operator", "Operator", "Manage agents and alerts", []string{
"dashboard:view", "agents:view", "agents:manage",
"alerts:view", "alerts:acknowledge", "alerts:configure",
"metrics:export", "metrics:query",
}},
{"viewer", "Viewer", "Read-only access", []string{
"dashboard:view", "agents:view", "alerts:view",
}},
}
for _, r := range defaultRoles {
perms, _ := json.Marshal(r.perms)
_, err := s.db.Exec(`
INSERT OR IGNORE INTO roles (id, name, description, permissions, is_system)
VALUES (?, ?, ?, ?, 1)
`, r.id, r.name, r.desc, string(perms))
if err != nil {
return fmt.Errorf("insert role %s: %w", r.id, err)
}
}
return nil
}
// ============================================================================
// Metrics Storage
// ============================================================================
// StoreMetrics stores raw metrics from an agent.
func (s *SQLiteDB) StoreMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error {
data, err := json.Marshal(metrics)
if err != nil {
return fmt.Errorf("marshal metrics: %w", err)
}
_, err = s.db.ExecContext(ctx, `
INSERT INTO metrics_raw (agent_id, timestamp, data)
VALUES (?, ?, ?)
`, agentID, time.Now().UTC(), data)
return err
}
// QueryMetrics queries metrics with the specified resolution.
func (s *SQLiteDB) QueryMetrics(ctx context.Context, agentID string, from, to time.Time, resolution string) ([]MetricPoint, error) {
var table string
switch resolution {
case "raw":
return s.queryRawMetrics(ctx, agentID, from, to)
case "1min":
table = "metrics_1min"
case "5min":
table = "metrics_5min"
case "hourly":
table = "metrics_hourly"
default:
// Auto-select based on time range
table = s.selectResolution(from, to)
}
query := fmt.Sprintf(`
SELECT timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg
FROM %s
WHERE agent_id = ? AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp ASC
`, table)
rows, err := s.db.QueryContext(ctx, query, agentID, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var points []MetricPoint
for rows.Next() {
var p MetricPoint
p.AgentID = agentID
err := rows.Scan(&p.Timestamp, &p.CPUAvg, &p.CPUMin, &p.CPUMax,
&p.MemoryAvg, &p.MemoryMin, &p.MemoryMax, &p.DiskAvg, &p.GPUAvg)
if err != nil {
return nil, err
}
points = append(points, p)
}
return points, rows.Err()
}
func (s *SQLiteDB) queryRawMetrics(ctx context.Context, agentID string, from, to time.Time) ([]MetricPoint, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT timestamp, data FROM metrics_raw
WHERE agent_id = ? AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp ASC
`, agentID, from, to)
if err != nil {
return nil, err
}
defer rows.Close()
var points []MetricPoint
for rows.Next() {
var ts time.Time
var data []byte
if err := rows.Scan(&ts, &data); err != nil {
return nil, err
}
var m models.AllMetrics
if err := json.Unmarshal(data, &m); err != nil {
continue // Skip corrupted entries
}
p := MetricPoint{
Timestamp: ts,
AgentID: agentID,
}
// Extract CPU usage
p.CPUAvg = m.CPU.TotalUsage
p.CPUMin = m.CPU.TotalUsage
p.CPUMax = m.CPU.TotalUsage
// Extract memory usage
if m.Memory.Total > 0 {
usedPct := float64(m.Memory.Used) / float64(m.Memory.Total) * 100
p.MemoryAvg = usedPct
p.MemoryMin = usedPct
p.MemoryMax = usedPct
}
// Extract disk usage (aggregate all mounts)
if len(m.Disk.Mounts) > 0 {
var totalUsed, totalTotal uint64
for _, d := range m.Disk.Mounts {
totalUsed += d.Used
totalTotal += d.Total
}
if totalTotal > 0 {
p.DiskAvg = float64(totalUsed) / float64(totalTotal) * 100
}
}
// Extract GPU usage
if m.GPU.Available {
p.GPUAvg = float64(m.GPU.Utilization)
}
points = append(points, p)
}
return points, rows.Err()
}
func (s *SQLiteDB) selectResolution(from, to time.Time) string {
duration := to.Sub(from)
switch {
case duration <= 2*time.Hour:
return "metrics_1min"
case duration <= 24*time.Hour:
return "metrics_5min"
default:
return "metrics_hourly"
}
}
// GetLatestMetrics returns the most recent metrics for an agent.
func (s *SQLiteDB) GetLatestMetrics(ctx context.Context, agentID string) (*models.AllMetrics, error) {
var data []byte
err := s.db.QueryRowContext(ctx, `
SELECT data FROM metrics_raw
WHERE agent_id = ?
ORDER BY timestamp DESC
LIMIT 1
`, agentID).Scan(&data)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
var m models.AllMetrics
if err := json.Unmarshal(data, &m); err != nil {
return nil, err
}
return &m, nil
}
// ============================================================================
// Agents
// ============================================================================
// StoreAgent stores or updates an agent record.
func (s *SQLiteDB) StoreAgent(ctx context.Context, agent *Agent) error {
caps, _ := json.Marshal(agent.Capabilities)
tags, _ := json.Marshal(agent.Tags)
_, err := s.db.ExecContext(ctx, `
INSERT INTO agents (id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
hostname = excluded.hostname,
os = excluded.os,
architecture = excluded.architecture,
version = excluded.version,
capabilities = excluded.capabilities,
status = excluded.status,
cert_serial = excluded.cert_serial,
cert_expiry = excluded.cert_expiry,
last_seen = excluded.last_seen,
tags = excluded.tags
`, agent.ID, agent.Name, agent.Hostname, agent.OS, agent.Architecture, agent.Version,
string(caps), agent.Status, agent.CertSerial, agent.CertExpiry, agent.LastSeen, agent.RegisteredAt, string(tags))
return err
}
// GetAgent retrieves an agent by ID.
func (s *SQLiteDB) GetAgent(ctx context.Context, id string) (*Agent, error) {
var agent Agent
var caps, tags string
var certExpiry, lastSeen sql.NullTime
err := s.db.QueryRowContext(ctx, `
SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags
FROM agents WHERE id = ?
`, id).Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version,
&caps, &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, &tags)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
if certExpiry.Valid {
agent.CertExpiry = certExpiry.Time
}
if lastSeen.Valid {
agent.LastSeen = lastSeen.Time
}
json.Unmarshal([]byte(caps), &agent.Capabilities)
json.Unmarshal([]byte(tags), &agent.Tags)
return &agent, nil
}
// ListAgents returns all registered agents.
func (s *SQLiteDB) ListAgents(ctx context.Context) ([]*Agent, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags
FROM agents ORDER BY registered_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var agents []*Agent
for rows.Next() {
var agent Agent
var caps, tags string
var certExpiry, lastSeen sql.NullTime
err := rows.Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version,
&caps, &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, &tags)
if err != nil {
return nil, err
}
if certExpiry.Valid {
agent.CertExpiry = certExpiry.Time
}
if lastSeen.Valid {
agent.LastSeen = lastSeen.Time
}
json.Unmarshal([]byte(caps), &agent.Capabilities)
json.Unmarshal([]byte(tags), &agent.Tags)
agents = append(agents, &agent)
}
return agents, rows.Err()
}
// UpdateAgentStatus updates an agent's status and last seen time.
func (s *SQLiteDB) UpdateAgentStatus(ctx context.Context, id string, status AgentStatus, lastSeen time.Time) error {
result, err := s.db.ExecContext(ctx, `
UPDATE agents SET status = ?, last_seen = ? WHERE id = ?
`, status, lastSeen, id)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("agent not found: %s", id)
}
return nil
}
// DeleteAgent removes an agent and all its data.
func (s *SQLiteDB) DeleteAgent(ctx context.Context, id string) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM agents WHERE id = ?`, id)
return err
}
// ============================================================================
// Users
// ============================================================================
// CreateUser creates a new user.
func (s *SQLiteDB) CreateUser(ctx context.Context, user *User) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO users (id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, disabled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, user.ID, user.Username, user.Email, user.PasswordHash, user.AuthProvider,
user.LDAPDN, user.CreatedAt, user.UpdatedAt, user.Disabled)
return err
}
// GetUser retrieves a user by ID.
func (s *SQLiteDB) GetUser(ctx context.Context, id string) (*User, error) {
return s.scanUser(s.db.QueryRowContext(ctx, `
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
FROM users WHERE id = ?
`, id))
}
// GetUserByUsername retrieves a user by username.
func (s *SQLiteDB) GetUserByUsername(ctx context.Context, username string) (*User, error) {
return s.scanUser(s.db.QueryRowContext(ctx, `
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
FROM users WHERE username = ?
`, username))
}
func (s *SQLiteDB) scanUser(row *sql.Row) (*User, error) {
var user User
var lastLogin sql.NullTime
var email sql.NullString
err := row.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider,
&user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
if email.Valid {
user.Email = email.String
}
if lastLogin.Valid {
user.LastLogin = lastLogin.Time
}
return &user, nil
}
// UpdateUser updates a user record.
func (s *SQLiteDB) UpdateUser(ctx context.Context, user *User) error {
_, err := s.db.ExecContext(ctx, `
UPDATE users SET
username = ?, email = ?, password_hash = ?, auth_provider = ?,
ldap_dn = ?, updated_at = ?, last_login = ?, disabled = ?
WHERE id = ?
`, user.Username, user.Email, user.PasswordHash, user.AuthProvider,
user.LDAPDN, time.Now().UTC(), user.LastLogin, user.Disabled, user.ID)
return err
}
// DeleteUser removes a user.
func (s *SQLiteDB) DeleteUser(ctx context.Context, id string) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM users WHERE id = ?`, id)
return err
}
// ListUsers returns all users.
func (s *SQLiteDB) ListUsers(ctx context.Context) ([]*User, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
FROM users ORDER BY username
`)
if err != nil {
return nil, err
}
defer rows.Close()
var users []*User
for rows.Next() {
var user User
var lastLogin sql.NullTime
var email sql.NullString
err := rows.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider,
&user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled)
if err != nil {
return nil, err
}
if email.Valid {
user.Email = email.String
}
if lastLogin.Valid {
user.LastLogin = lastLogin.Time
}
users = append(users, &user)
}
return users, rows.Err()
}
// ============================================================================
// Roles
// ============================================================================
// CreateRole creates a new role.
func (s *SQLiteDB) CreateRole(ctx context.Context, role *Role) error {
perms, _ := json.Marshal(role.Permissions)
_, err := s.db.ExecContext(ctx, `
INSERT INTO roles (id, name, description, permissions, is_system, created_at)
VALUES (?, ?, ?, ?, ?, ?)
`, role.ID, role.Name, role.Description, string(perms), role.IsSystem, role.CreatedAt)
return err
}
// GetRole retrieves a role by ID.
func (s *SQLiteDB) GetRole(ctx context.Context, id string) (*Role, error) {
var role Role
var perms string
err := s.db.QueryRowContext(ctx, `
SELECT id, name, description, permissions, is_system, created_at
FROM roles WHERE id = ?
`, id).Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
json.Unmarshal([]byte(perms), &role.Permissions)
return &role, nil
}
// ListRoles returns all roles.
func (s *SQLiteDB) ListRoles(ctx context.Context) ([]*Role, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, name, description, permissions, is_system, created_at
FROM roles ORDER BY name
`)
if err != nil {
return nil, err
}
defer rows.Close()
var roles []*Role
for rows.Next() {
var role Role
var perms string
if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil {
return nil, err
}
json.Unmarshal([]byte(perms), &role.Permissions)
roles = append(roles, &role)
}
return roles, rows.Err()
}
// UpdateRole updates a role.
func (s *SQLiteDB) UpdateRole(ctx context.Context, role *Role) error {
perms, _ := json.Marshal(role.Permissions)
_, err := s.db.ExecContext(ctx, `
UPDATE roles SET name = ?, description = ?, permissions = ?
WHERE id = ? AND is_system = 0
`, role.Name, role.Description, string(perms), role.ID)
return err
}
// DeleteRole removes a custom role.
func (s *SQLiteDB) DeleteRole(ctx context.Context, id string) error {
result, err := s.db.ExecContext(ctx, `DELETE FROM roles WHERE id = ? AND is_system = 0`, id)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
return errors.New("cannot delete system role or role not found")
}
return nil
}
// GetUserRoles returns all roles assigned to a user.
func (s *SQLiteDB) GetUserRoles(ctx context.Context, userID string) ([]*Role, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT r.id, r.name, r.description, r.permissions, r.is_system, r.created_at
FROM roles r
JOIN user_roles ur ON r.id = ur.role_id
WHERE ur.user_id = ?
`, userID)
if err != nil {
return nil, err
}
defer rows.Close()
var roles []*Role
for rows.Next() {
var role Role
var perms string
if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil {
return nil, err
}
json.Unmarshal([]byte(perms), &role.Permissions)
roles = append(roles, &role)
}
return roles, rows.Err()
}
// AssignRole assigns a role to a user.
func (s *SQLiteDB) AssignRole(ctx context.Context, userID, roleID string) error {
_, err := s.db.ExecContext(ctx, `
INSERT OR IGNORE INTO user_roles (user_id, role_id) VALUES (?, ?)
`, userID, roleID)
return err
}
// RemoveRole removes a role from a user.
func (s *SQLiteDB) RemoveRole(ctx context.Context, userID, roleID string) error {
_, err := s.db.ExecContext(ctx, `
DELETE FROM user_roles WHERE user_id = ? AND role_id = ?
`, userID, roleID)
return err
}
// ============================================================================
// Sessions
// ============================================================================
// CreateSession creates a new session.
func (s *SQLiteDB) CreateSession(ctx context.Context, session *Session) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO sessions (token, user_id, created_at, expires_at, ip_address, user_agent)
VALUES (?, ?, ?, ?, ?, ?)
`, session.Token, session.UserID, session.CreatedAt, session.ExpiresAt, session.IPAddress, session.UserAgent)
return err
}
// GetSession retrieves a session by token.
func (s *SQLiteDB) GetSession(ctx context.Context, token string) (*Session, error) {
var session Session
err := s.db.QueryRowContext(ctx, `
SELECT token, user_id, created_at, expires_at, ip_address, user_agent
FROM sessions WHERE token = ? AND expires_at > ?
`, token, time.Now().UTC()).Scan(&session.Token, &session.UserID, &session.CreatedAt,
&session.ExpiresAt, &session.IPAddress, &session.UserAgent)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return &session, nil
}
// DeleteSession removes a session.
func (s *SQLiteDB) DeleteSession(ctx context.Context, token string) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM sessions WHERE token = ?`, token)
return err
}
// DeleteUserSessions removes all sessions for a user.
func (s *SQLiteDB) DeleteUserSessions(ctx context.Context, userID string) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM sessions WHERE user_id = ?`, userID)
return err
}
// CleanupExpiredSessions removes all expired sessions.
func (s *SQLiteDB) CleanupExpiredSessions(ctx context.Context) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM sessions WHERE expires_at < ?`, time.Now().UTC())
return err
}
// ============================================================================
// Alerts
// ============================================================================
// StoreAlert stores a new alert.
func (s *SQLiteDB) StoreAlert(ctx context.Context, alert *Alert) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO alerts (id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, alert.ID, alert.AgentID, alert.Type, alert.Severity, alert.Message,
alert.Value, alert.Threshold, alert.TriggeredAt, alert.ResolvedAt, alert.Acknowledged)
return err
}
// GetAlert retrieves an alert by ID.
func (s *SQLiteDB) GetAlert(ctx context.Context, id string) (*Alert, error) {
var alert Alert
var resolvedAt sql.NullTime
err := s.db.QueryRowContext(ctx, `
SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged
FROM alerts WHERE id = ?
`, id).Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message,
&alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
if resolvedAt.Valid {
alert.ResolvedAt = &resolvedAt.Time
}
return &alert, nil
}
// QueryAlerts queries alerts with filters.
func (s *SQLiteDB) QueryAlerts(ctx context.Context, filter AlertFilter) ([]*Alert, error) {
var conditions []string
var args []interface{}
if filter.AgentID != "" {
conditions = append(conditions, "agent_id = ?")
args = append(args, filter.AgentID)
}
if filter.Type != "" {
conditions = append(conditions, "type = ?")
args = append(args, filter.Type)
}
if filter.Severity != "" {
conditions = append(conditions, "severity = ?")
args = append(args, filter.Severity)
}
if filter.Acknowledged != nil {
conditions = append(conditions, "acknowledged = ?")
if *filter.Acknowledged {
args = append(args, 1)
} else {
args = append(args, 0)
}
}
if !filter.From.IsZero() {
conditions = append(conditions, "triggered_at >= ?")
args = append(args, filter.From)
}
if !filter.To.IsZero() {
conditions = append(conditions, "triggered_at <= ?")
args = append(args, filter.To)
}
query := "SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged FROM alerts"
if len(conditions) > 0 {
query += " WHERE " + strings.Join(conditions, " AND ")
}
query += " ORDER BY triggered_at DESC"
if filter.Limit > 0 {
query += fmt.Sprintf(" LIMIT %d", filter.Limit)
}
if filter.Offset > 0 {
query += fmt.Sprintf(" OFFSET %d", filter.Offset)
}
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var alerts []*Alert
for rows.Next() {
var alert Alert
var resolvedAt sql.NullTime
err := rows.Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message,
&alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged)
if err != nil {
return nil, err
}
if resolvedAt.Valid {
alert.ResolvedAt = &resolvedAt.Time
}
alerts = append(alerts, &alert)
}
return alerts, rows.Err()
}
// AcknowledgeAlert marks an alert as acknowledged.
func (s *SQLiteDB) AcknowledgeAlert(ctx context.Context, id string) error {
_, err := s.db.ExecContext(ctx, `UPDATE alerts SET acknowledged = 1 WHERE id = ?`, id)
return err
}
// ============================================================================
// Retention
// ============================================================================
// RunRetention runs the retention policy, aggregating and deleting old data.
func (s *SQLiteDB) RunRetention(ctx context.Context) error {
now := time.Now().UTC()
// Aggregate raw -> 1min for data older than raw retention
rawCutoff := now.Add(-s.retention.RawRetention)
if err := s.aggregateRawTo1Min(ctx, rawCutoff); err != nil {
return fmt.Errorf("aggregate raw->1min: %w", err)
}
// Delete raw data older than retention
if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_raw WHERE timestamp < ?`, rawCutoff); err != nil {
return fmt.Errorf("delete old raw: %w", err)
}
// Aggregate 1min -> 5min for data older than 1min retention
oneMinCutoff := now.Add(-s.retention.OneMinuteRetention)
if err := s.aggregate1MinTo5Min(ctx, oneMinCutoff); err != nil {
return fmt.Errorf("aggregate 1min->5min: %w", err)
}
// Delete 1min data older than retention
if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_1min WHERE timestamp < ?`, oneMinCutoff); err != nil {
return fmt.Errorf("delete old 1min: %w", err)
}
// Aggregate 5min -> hourly for data older than 5min retention
fiveMinCutoff := now.Add(-s.retention.FiveMinuteRetention)
if err := s.aggregate5MinToHourly(ctx, fiveMinCutoff); err != nil {
return fmt.Errorf("aggregate 5min->hourly: %w", err)
}
// Delete 5min data older than retention
if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_5min WHERE timestamp < ?`, fiveMinCutoff); err != nil {
return fmt.Errorf("delete old 5min: %w", err)
}
// Delete hourly data older than retention
hourlyCutoff := now.Add(-s.retention.HourlyRetention)
if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_hourly WHERE timestamp < ?`, hourlyCutoff); err != nil {
return fmt.Errorf("delete old hourly: %w", err)
}
return nil
}
func (s *SQLiteDB) aggregateRawTo1Min(ctx context.Context, before time.Time) error {
// Get distinct agents with raw data to aggregate
rows, err := s.db.QueryContext(ctx, `
SELECT DISTINCT agent_id FROM metrics_raw
WHERE timestamp < ? AND timestamp >= ?
`, before, before.Add(-24*time.Hour)) // Only look back 24h max
if err != nil {
return err
}
var agents []string
for rows.Next() {
var agentID string
if err := rows.Scan(&agentID); err != nil {
rows.Close()
return err
}
agents = append(agents, agentID)
}
rows.Close()
for _, agentID := range agents {
if err := s.aggregateRawTo1MinForAgent(ctx, agentID, before); err != nil {
return err
}
}
return nil
}
func (s *SQLiteDB) aggregateRawTo1MinForAgent(ctx context.Context, agentID string, before time.Time) error {
// Fetch raw metrics and aggregate by minute
rows, err := s.db.QueryContext(ctx, `
SELECT timestamp, data FROM metrics_raw
WHERE agent_id = ? AND timestamp < ?
ORDER BY timestamp
`, agentID, before)
if err != nil {
return err
}
defer rows.Close()
// Group by minute
buckets := make(map[time.Time][]MetricPoint)
for rows.Next() {
var ts time.Time
var data []byte
if err := rows.Scan(&ts, &data); err != nil {
return err
}
var m models.AllMetrics
if err := json.Unmarshal(data, &m); err != nil {
continue
}
// Truncate to minute
minute := ts.Truncate(time.Minute)
p := MetricPoint{Timestamp: ts, AgentID: agentID}
p.CPUAvg = m.CPU.TotalUsage
p.CPUMin = m.CPU.TotalUsage
p.CPUMax = m.CPU.TotalUsage
if m.Memory.Total > 0 {
pct := float64(m.Memory.Used) / float64(m.Memory.Total) * 100
p.MemoryAvg, p.MemoryMin, p.MemoryMax = pct, pct, pct
}
buckets[minute] = append(buckets[minute], p)
}
// Insert aggregated data
for minute, points := range buckets {
agg := aggregatePoints(points)
_, err := s.db.ExecContext(ctx, `
INSERT OR REPLACE INTO metrics_1min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, agentID, minute, agg.CPUAvg, agg.CPUMin, agg.CPUMax, agg.MemoryAvg, agg.MemoryMin, agg.MemoryMax, agg.DiskAvg, agg.GPUAvg, len(points))
if err != nil {
return err
}
}
return nil
}
func (s *SQLiteDB) aggregate1MinTo5Min(ctx context.Context, before time.Time) error {
_, err := s.db.ExecContext(ctx, `
INSERT OR REPLACE INTO metrics_5min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
SELECT
agent_id,
datetime((strftime('%s', timestamp) / 300) * 300, 'unixepoch') as ts,
AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max),
AVG(mem_avg), MIN(mem_min), MAX(mem_max),
AVG(disk_avg), AVG(gpu_avg),
SUM(sample_count)
FROM metrics_1min
WHERE timestamp < ?
GROUP BY agent_id, ts
`, before)
return err
}
func (s *SQLiteDB) aggregate5MinToHourly(ctx context.Context, before time.Time) error {
_, err := s.db.ExecContext(ctx, `
INSERT OR REPLACE INTO metrics_hourly (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
SELECT
agent_id,
datetime((strftime('%s', timestamp) / 3600) * 3600, 'unixepoch') as ts,
AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max),
AVG(mem_avg), MIN(mem_min), MAX(mem_max),
AVG(disk_avg), AVG(gpu_avg),
SUM(sample_count)
FROM metrics_5min
WHERE timestamp < ?
GROUP BY agent_id, ts
`, before)
return err
}
func aggregatePoints(points []MetricPoint) MetricPoint {
if len(points) == 0 {
return MetricPoint{}
}
var agg MetricPoint
agg.CPUMin = points[0].CPUMin
agg.MemoryMin = points[0].MemoryMin
for _, p := range points {
agg.CPUAvg += p.CPUAvg
agg.MemoryAvg += p.MemoryAvg
agg.DiskAvg += p.DiskAvg
agg.GPUAvg += p.GPUAvg
if p.CPUMin < agg.CPUMin {
agg.CPUMin = p.CPUMin
}
if p.CPUMax > agg.CPUMax {
agg.CPUMax = p.CPUMax
}
if p.MemoryMin < agg.MemoryMin {
agg.MemoryMin = p.MemoryMin
}
if p.MemoryMax > agg.MemoryMax {
agg.MemoryMax = p.MemoryMax
}
}
n := float64(len(points))
agg.CPUAvg /= n
agg.MemoryAvg /= n
agg.DiskAvg /= n
agg.GPUAvg /= n
return agg
}