Log Collectors (backend/internal/collectors/logs/): - LogEntry model with level, source, message, fields - Manager for coordinating multiple collectors - JournalCollector: systemd journal via journalctl CLI - FileCollector: tail log files with format parsing (plain, json, nginx) - DockerCollector: docker container logs via docker CLI - All collectors are pure Go (no CGO dependencies) Database Storage: - Add logs table with indexes for efficient querying - StoreLogs: batch insert log entries - QueryLogs: filter by agent, source, level, time, full-text search - DeleteOldLogs: retention cleanup - Implementations for both SQLite and PostgreSQL Frontend Log Viewer: - Log types and level color definitions - Logs API client with streaming support - /logs route with search, level filters, source filters - Live streaming mode for real-time log tailing - Paginated loading with load more 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1253 lines
36 KiB
Go
1253 lines
36 KiB
Go
// Package database provides PostgreSQL implementation of the Database interface.
|
|
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"tyto/internal/models"
|
|
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
// PostgresDB implements the Database interface using PostgreSQL.
|
|
type PostgresDB struct {
|
|
db *sql.DB
|
|
retention RetentionConfig
|
|
}
|
|
|
|
// NewPostgresDB creates a new PostgreSQL database connection.
|
|
func NewPostgresDB(connStr string, retention RetentionConfig) (*PostgresDB, error) {
|
|
db, err := sql.Open("postgres", connStr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open postgres: %w", err)
|
|
}
|
|
|
|
// Configure connection pool
|
|
db.SetMaxOpenConns(25)
|
|
db.SetMaxIdleConns(5)
|
|
db.SetConnMaxLifetime(5 * time.Minute)
|
|
|
|
// Verify connection
|
|
if err := db.Ping(); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("ping postgres: %w", err)
|
|
}
|
|
|
|
return &PostgresDB{
|
|
db: db,
|
|
retention: retention,
|
|
}, nil
|
|
}
|
|
|
|
// Close closes the database connection.
|
|
func (p *PostgresDB) Close() error {
|
|
return p.db.Close()
|
|
}
|
|
|
|
// Migrate runs database migrations.
|
|
func (p *PostgresDB) Migrate() error {
|
|
migrations := []string{
|
|
pgMigrationAgents,
|
|
pgMigrationUsers,
|
|
pgMigrationRoles,
|
|
pgMigrationSessions,
|
|
pgMigrationMetrics,
|
|
pgMigrationAlerts,
|
|
pgMigrationLogs,
|
|
}
|
|
|
|
for i, m := range migrations {
|
|
if _, err := p.db.Exec(m); err != nil {
|
|
return fmt.Errorf("migration %d: %w", i+1, err)
|
|
}
|
|
}
|
|
|
|
return p.insertDefaultRoles()
|
|
}
|
|
|
|
// PostgreSQL migration statements
|
|
const pgMigrationAgents = `
|
|
CREATE TABLE IF NOT EXISTS agents (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT,
|
|
hostname TEXT NOT NULL,
|
|
os TEXT NOT NULL,
|
|
architecture TEXT NOT NULL,
|
|
version TEXT NOT NULL,
|
|
capabilities TEXT[] DEFAULT '{}',
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
cert_serial TEXT,
|
|
cert_expiry TIMESTAMPTZ,
|
|
last_seen TIMESTAMPTZ,
|
|
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
tags TEXT[] DEFAULT '{}'
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_agents_status ON agents(status);
|
|
`
|
|
|
|
const pgMigrationUsers = `
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id TEXT PRIMARY KEY,
|
|
username TEXT UNIQUE NOT NULL,
|
|
email TEXT,
|
|
password_hash BYTEA,
|
|
auth_provider TEXT NOT NULL DEFAULT 'local',
|
|
ldap_dn TEXT,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
last_login TIMESTAMPTZ,
|
|
disabled BOOLEAN NOT NULL DEFAULT FALSE
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
|
|
`
|
|
|
|
const pgMigrationRoles = `
|
|
CREATE TABLE IF NOT EXISTS roles (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT UNIQUE NOT NULL,
|
|
description TEXT,
|
|
permissions JSONB NOT NULL DEFAULT '[]',
|
|
is_system BOOLEAN NOT NULL DEFAULT FALSE,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS user_roles (
|
|
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
role_id TEXT NOT NULL REFERENCES roles(id) ON DELETE CASCADE,
|
|
PRIMARY KEY (user_id, role_id)
|
|
);
|
|
`
|
|
|
|
const pgMigrationSessions = `
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
token TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
expires_at TIMESTAMPTZ NOT NULL,
|
|
ip_address TEXT,
|
|
user_agent TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id);
|
|
CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at);
|
|
`
|
|
|
|
const pgMigrationMetrics = `
|
|
-- Raw metrics (high resolution, short retention)
|
|
CREATE TABLE IF NOT EXISTS metrics_raw (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
data JSONB NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_metrics_raw_agent_time ON metrics_raw(agent_id, timestamp);
|
|
|
|
-- 1-minute aggregations
|
|
CREATE TABLE IF NOT EXISTS metrics_1min (
|
|
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION,
|
|
mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION,
|
|
disk_avg DOUBLE PRECISION,
|
|
gpu_avg DOUBLE PRECISION,
|
|
sample_count INTEGER NOT NULL DEFAULT 1,
|
|
PRIMARY KEY (agent_id, timestamp)
|
|
);
|
|
|
|
-- 5-minute aggregations
|
|
CREATE TABLE IF NOT EXISTS metrics_5min (
|
|
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION,
|
|
mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION,
|
|
disk_avg DOUBLE PRECISION,
|
|
gpu_avg DOUBLE PRECISION,
|
|
sample_count INTEGER NOT NULL DEFAULT 1,
|
|
PRIMARY KEY (agent_id, timestamp)
|
|
);
|
|
|
|
-- Hourly aggregations
|
|
CREATE TABLE IF NOT EXISTS metrics_hourly (
|
|
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION,
|
|
mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION,
|
|
disk_avg DOUBLE PRECISION,
|
|
gpu_avg DOUBLE PRECISION,
|
|
sample_count INTEGER NOT NULL DEFAULT 1,
|
|
PRIMARY KEY (agent_id, timestamp)
|
|
);
|
|
|
|
-- Create hypertable-like partitioning hint (for future TimescaleDB upgrade)
|
|
-- COMMENT ON TABLE metrics_raw IS 'timescale:hypertable';
|
|
`
|
|
|
|
const pgMigrationAlerts = `
|
|
CREATE TABLE IF NOT EXISTS alerts (
|
|
id TEXT PRIMARY KEY,
|
|
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
|
type TEXT NOT NULL,
|
|
severity TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
value DOUBLE PRECISION NOT NULL,
|
|
threshold DOUBLE PRECISION NOT NULL,
|
|
triggered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
resolved_at TIMESTAMPTZ,
|
|
acknowledged BOOLEAN NOT NULL DEFAULT FALSE
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_alerts_agent ON alerts(agent_id);
|
|
CREATE INDEX IF NOT EXISTS idx_alerts_triggered ON alerts(triggered_at);
|
|
CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity);
|
|
`
|
|
|
|
const pgMigrationLogs = `
|
|
CREATE TABLE IF NOT EXISTS logs (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
source TEXT NOT NULL,
|
|
source_name TEXT,
|
|
level TEXT NOT NULL,
|
|
message TEXT NOT NULL,
|
|
fields JSONB
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_logs_agent_time ON logs(agent_id, timestamp DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level);
|
|
CREATE INDEX IF NOT EXISTS idx_logs_source ON logs(source, source_name);
|
|
CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp DESC);
|
|
`
|
|
|
|
func (p *PostgresDB) insertDefaultRoles() error {
|
|
defaultRoles := []struct {
|
|
id, name, desc string
|
|
perms []string
|
|
}{
|
|
{"admin", "Administrator", "Full system access", []string{"*"}},
|
|
{"operator", "Operator", "Manage agents and alerts", []string{
|
|
"dashboard:view", "agents:view", "agents:manage",
|
|
"alerts:view", "alerts:acknowledge", "alerts:configure",
|
|
"metrics:export", "metrics:query",
|
|
}},
|
|
{"viewer", "Viewer", "Read-only access", []string{
|
|
"dashboard:view", "agents:view", "alerts:view",
|
|
}},
|
|
}
|
|
|
|
for _, r := range defaultRoles {
|
|
perms, _ := json.Marshal(r.perms)
|
|
_, err := p.db.Exec(`
|
|
INSERT INTO roles (id, name, description, permissions, is_system)
|
|
VALUES ($1, $2, $3, $4, TRUE)
|
|
ON CONFLICT (id) DO NOTHING
|
|
`, r.id, r.name, r.desc, perms)
|
|
if err != nil {
|
|
return fmt.Errorf("insert role %s: %w", r.id, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ============================================================================
|
|
// Metrics Storage
|
|
// ============================================================================
|
|
|
|
// StoreMetrics stores raw metrics from an agent.
|
|
func (p *PostgresDB) StoreMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error {
|
|
data, err := json.Marshal(metrics)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal metrics: %w", err)
|
|
}
|
|
|
|
_, err = p.db.ExecContext(ctx, `
|
|
INSERT INTO metrics_raw (agent_id, timestamp, data)
|
|
VALUES ($1, $2, $3)
|
|
`, agentID, time.Now().UTC(), data)
|
|
|
|
return err
|
|
}
|
|
|
|
// QueryMetrics queries metrics with the specified resolution.
|
|
func (p *PostgresDB) QueryMetrics(ctx context.Context, agentID string, from, to time.Time, resolution string) ([]MetricPoint, error) {
|
|
var table string
|
|
switch resolution {
|
|
case "raw":
|
|
return p.queryRawMetrics(ctx, agentID, from, to)
|
|
case "1min":
|
|
table = "metrics_1min"
|
|
case "5min":
|
|
table = "metrics_5min"
|
|
case "hourly":
|
|
table = "metrics_hourly"
|
|
default:
|
|
table = p.selectResolution(from, to)
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
SELECT timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg
|
|
FROM %s
|
|
WHERE agent_id = $1 AND timestamp >= $2 AND timestamp <= $3
|
|
ORDER BY timestamp ASC
|
|
`, table)
|
|
|
|
rows, err := p.db.QueryContext(ctx, query, agentID, from, to)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var points []MetricPoint
|
|
for rows.Next() {
|
|
var pt MetricPoint
|
|
pt.AgentID = agentID
|
|
var diskAvg, gpuAvg sql.NullFloat64
|
|
err := rows.Scan(&pt.Timestamp, &pt.CPUAvg, &pt.CPUMin, &pt.CPUMax,
|
|
&pt.MemoryAvg, &pt.MemoryMin, &pt.MemoryMax, &diskAvg, &gpuAvg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if diskAvg.Valid {
|
|
pt.DiskAvg = diskAvg.Float64
|
|
}
|
|
if gpuAvg.Valid {
|
|
pt.GPUAvg = gpuAvg.Float64
|
|
}
|
|
points = append(points, pt)
|
|
}
|
|
return points, rows.Err()
|
|
}
|
|
|
|
func (p *PostgresDB) queryRawMetrics(ctx context.Context, agentID string, from, to time.Time) ([]MetricPoint, error) {
|
|
rows, err := p.db.QueryContext(ctx, `
|
|
SELECT timestamp, data FROM metrics_raw
|
|
WHERE agent_id = $1 AND timestamp >= $2 AND timestamp <= $3
|
|
ORDER BY timestamp ASC
|
|
`, agentID, from, to)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var points []MetricPoint
|
|
for rows.Next() {
|
|
var ts time.Time
|
|
var data []byte
|
|
if err := rows.Scan(&ts, &data); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var m models.AllMetrics
|
|
if err := json.Unmarshal(data, &m); err != nil {
|
|
continue
|
|
}
|
|
|
|
pt := MetricPoint{Timestamp: ts, AgentID: agentID}
|
|
|
|
pt.CPUAvg = m.CPU.TotalUsage
|
|
pt.CPUMin = m.CPU.TotalUsage
|
|
pt.CPUMax = m.CPU.TotalUsage
|
|
|
|
if m.Memory.Total > 0 {
|
|
usedPct := float64(m.Memory.Used) / float64(m.Memory.Total) * 100
|
|
pt.MemoryAvg = usedPct
|
|
pt.MemoryMin = usedPct
|
|
pt.MemoryMax = usedPct
|
|
}
|
|
|
|
if len(m.Disk.Mounts) > 0 {
|
|
var totalUsed, totalTotal uint64
|
|
for _, d := range m.Disk.Mounts {
|
|
totalUsed += d.Used
|
|
totalTotal += d.Total
|
|
}
|
|
if totalTotal > 0 {
|
|
pt.DiskAvg = float64(totalUsed) / float64(totalTotal) * 100
|
|
}
|
|
}
|
|
|
|
if m.GPU.Available {
|
|
pt.GPUAvg = float64(m.GPU.Utilization)
|
|
}
|
|
|
|
points = append(points, pt)
|
|
}
|
|
return points, rows.Err()
|
|
}
|
|
|
|
func (p *PostgresDB) selectResolution(from, to time.Time) string {
|
|
duration := to.Sub(from)
|
|
switch {
|
|
case duration <= 2*time.Hour:
|
|
return "metrics_1min"
|
|
case duration <= 24*time.Hour:
|
|
return "metrics_5min"
|
|
default:
|
|
return "metrics_hourly"
|
|
}
|
|
}
|
|
|
|
// GetLatestMetrics returns the most recent metrics for an agent.
|
|
func (p *PostgresDB) GetLatestMetrics(ctx context.Context, agentID string) (*models.AllMetrics, error) {
|
|
var data []byte
|
|
err := p.db.QueryRowContext(ctx, `
|
|
SELECT data FROM metrics_raw
|
|
WHERE agent_id = $1
|
|
ORDER BY timestamp DESC
|
|
LIMIT 1
|
|
`, agentID).Scan(&data)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var m models.AllMetrics
|
|
if err := json.Unmarshal(data, &m); err != nil {
|
|
return nil, err
|
|
}
|
|
return &m, nil
|
|
}
|
|
|
|
// ============================================================================
|
|
// Agents
|
|
// ============================================================================
|
|
|
|
// StoreAgent stores or updates an agent record.
|
|
func (p *PostgresDB) StoreAgent(ctx context.Context, agent *Agent) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO agents (id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
hostname = EXCLUDED.hostname,
|
|
os = EXCLUDED.os,
|
|
architecture = EXCLUDED.architecture,
|
|
version = EXCLUDED.version,
|
|
capabilities = EXCLUDED.capabilities,
|
|
status = EXCLUDED.status,
|
|
cert_serial = EXCLUDED.cert_serial,
|
|
cert_expiry = EXCLUDED.cert_expiry,
|
|
last_seen = EXCLUDED.last_seen,
|
|
tags = EXCLUDED.tags
|
|
`, agent.ID, agent.Name, agent.Hostname, agent.OS, agent.Architecture, agent.Version,
|
|
pq.Array(agent.Capabilities), agent.Status, agent.CertSerial, agent.CertExpiry, agent.LastSeen, agent.RegisteredAt, pq.Array(agent.Tags))
|
|
|
|
return err
|
|
}
|
|
|
|
// GetAgent retrieves an agent by ID.
|
|
func (p *PostgresDB) GetAgent(ctx context.Context, id string) (*Agent, error) {
|
|
var agent Agent
|
|
var certExpiry, lastSeen sql.NullTime
|
|
|
|
err := p.db.QueryRowContext(ctx, `
|
|
SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags
|
|
FROM agents WHERE id = $1
|
|
`, id).Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version,
|
|
pq.Array(&agent.Capabilities), &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, pq.Array(&agent.Tags))
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if certExpiry.Valid {
|
|
agent.CertExpiry = certExpiry.Time
|
|
}
|
|
if lastSeen.Valid {
|
|
agent.LastSeen = lastSeen.Time
|
|
}
|
|
return &agent, nil
|
|
}
|
|
|
|
// ListAgents returns all registered agents.
|
|
func (p *PostgresDB) ListAgents(ctx context.Context) ([]*Agent, error) {
|
|
rows, err := p.db.QueryContext(ctx, `
|
|
SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags
|
|
FROM agents ORDER BY registered_at DESC
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var agents []*Agent
|
|
for rows.Next() {
|
|
var agent Agent
|
|
var certExpiry, lastSeen sql.NullTime
|
|
|
|
err := rows.Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version,
|
|
pq.Array(&agent.Capabilities), &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, pq.Array(&agent.Tags))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if certExpiry.Valid {
|
|
agent.CertExpiry = certExpiry.Time
|
|
}
|
|
if lastSeen.Valid {
|
|
agent.LastSeen = lastSeen.Time
|
|
}
|
|
agents = append(agents, &agent)
|
|
}
|
|
return agents, rows.Err()
|
|
}
|
|
|
|
// UpdateAgentStatus updates an agent's status and last seen time.
|
|
func (p *PostgresDB) UpdateAgentStatus(ctx context.Context, id string, status AgentStatus, lastSeen time.Time) error {
|
|
result, err := p.db.ExecContext(ctx, `
|
|
UPDATE agents SET status = $1, last_seen = $2 WHERE id = $3
|
|
`, status, lastSeen, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rows, _ := result.RowsAffected()
|
|
if rows == 0 {
|
|
return fmt.Errorf("agent not found: %s", id)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteAgent removes an agent and all its data.
|
|
func (p *PostgresDB) DeleteAgent(ctx context.Context, id string) error {
|
|
_, err := p.db.ExecContext(ctx, `DELETE FROM agents WHERE id = $1`, id)
|
|
return err
|
|
}
|
|
|
|
// ============================================================================
|
|
// Users
|
|
// ============================================================================
|
|
|
|
// CreateUser creates a new user.
|
|
func (p *PostgresDB) CreateUser(ctx context.Context, user *User) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO users (id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, disabled)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
`, user.ID, user.Username, user.Email, user.PasswordHash, user.AuthProvider,
|
|
user.LDAPDN, user.CreatedAt, user.UpdatedAt, user.Disabled)
|
|
return err
|
|
}
|
|
|
|
// GetUser retrieves a user by ID.
|
|
func (p *PostgresDB) GetUser(ctx context.Context, id string) (*User, error) {
|
|
return p.scanUser(p.db.QueryRowContext(ctx, `
|
|
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
|
|
FROM users WHERE id = $1
|
|
`, id))
|
|
}
|
|
|
|
// GetUserByUsername retrieves a user by username.
|
|
func (p *PostgresDB) GetUserByUsername(ctx context.Context, username string) (*User, error) {
|
|
return p.scanUser(p.db.QueryRowContext(ctx, `
|
|
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
|
|
FROM users WHERE username = $1
|
|
`, username))
|
|
}
|
|
|
|
func (p *PostgresDB) scanUser(row *sql.Row) (*User, error) {
|
|
var user User
|
|
var lastLogin sql.NullTime
|
|
var email sql.NullString
|
|
|
|
err := row.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider,
|
|
&user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if email.Valid {
|
|
user.Email = email.String
|
|
}
|
|
if lastLogin.Valid {
|
|
user.LastLogin = lastLogin.Time
|
|
}
|
|
return &user, nil
|
|
}
|
|
|
|
// UpdateUser updates a user record.
|
|
func (p *PostgresDB) UpdateUser(ctx context.Context, user *User) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
UPDATE users SET
|
|
username = $1, email = $2, password_hash = $3, auth_provider = $4,
|
|
ldap_dn = $5, updated_at = $6, last_login = $7, disabled = $8
|
|
WHERE id = $9
|
|
`, user.Username, user.Email, user.PasswordHash, user.AuthProvider,
|
|
user.LDAPDN, time.Now().UTC(), user.LastLogin, user.Disabled, user.ID)
|
|
return err
|
|
}
|
|
|
|
// DeleteUser removes a user.
|
|
func (p *PostgresDB) DeleteUser(ctx context.Context, id string) error {
|
|
_, err := p.db.ExecContext(ctx, `DELETE FROM users WHERE id = $1`, id)
|
|
return err
|
|
}
|
|
|
|
// ListUsers returns all users.
|
|
func (p *PostgresDB) ListUsers(ctx context.Context) ([]*User, error) {
|
|
rows, err := p.db.QueryContext(ctx, `
|
|
SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled
|
|
FROM users ORDER BY username
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var users []*User
|
|
for rows.Next() {
|
|
var user User
|
|
var lastLogin sql.NullTime
|
|
var email sql.NullString
|
|
|
|
err := rows.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider,
|
|
&user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if email.Valid {
|
|
user.Email = email.String
|
|
}
|
|
if lastLogin.Valid {
|
|
user.LastLogin = lastLogin.Time
|
|
}
|
|
users = append(users, &user)
|
|
}
|
|
return users, rows.Err()
|
|
}
|
|
|
|
// ============================================================================
|
|
// Roles
|
|
// ============================================================================
|
|
|
|
// CreateRole creates a new role.
|
|
func (p *PostgresDB) CreateRole(ctx context.Context, role *Role) error {
|
|
perms, _ := json.Marshal(role.Permissions)
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO roles (id, name, description, permissions, is_system, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
`, role.ID, role.Name, role.Description, perms, role.IsSystem, role.CreatedAt)
|
|
return err
|
|
}
|
|
|
|
// GetRole retrieves a role by ID.
|
|
func (p *PostgresDB) GetRole(ctx context.Context, id string) (*Role, error) {
|
|
var role Role
|
|
var perms []byte
|
|
|
|
err := p.db.QueryRowContext(ctx, `
|
|
SELECT id, name, description, permissions, is_system, created_at
|
|
FROM roles WHERE id = $1
|
|
`, id).Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
json.Unmarshal(perms, &role.Permissions)
|
|
return &role, nil
|
|
}
|
|
|
|
// ListRoles returns all roles.
|
|
func (p *PostgresDB) ListRoles(ctx context.Context) ([]*Role, error) {
|
|
rows, err := p.db.QueryContext(ctx, `
|
|
SELECT id, name, description, permissions, is_system, created_at
|
|
FROM roles ORDER BY name
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var roles []*Role
|
|
for rows.Next() {
|
|
var role Role
|
|
var perms []byte
|
|
|
|
if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
json.Unmarshal(perms, &role.Permissions)
|
|
roles = append(roles, &role)
|
|
}
|
|
return roles, rows.Err()
|
|
}
|
|
|
|
// UpdateRole updates a role.
|
|
func (p *PostgresDB) UpdateRole(ctx context.Context, role *Role) error {
|
|
perms, _ := json.Marshal(role.Permissions)
|
|
_, err := p.db.ExecContext(ctx, `
|
|
UPDATE roles SET name = $1, description = $2, permissions = $3
|
|
WHERE id = $4 AND is_system = FALSE
|
|
`, role.Name, role.Description, perms, role.ID)
|
|
return err
|
|
}
|
|
|
|
// DeleteRole removes a custom role.
|
|
func (p *PostgresDB) DeleteRole(ctx context.Context, id string) error {
|
|
result, err := p.db.ExecContext(ctx, `DELETE FROM roles WHERE id = $1 AND is_system = FALSE`, id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rows, _ := result.RowsAffected()
|
|
if rows == 0 {
|
|
return errors.New("cannot delete system role or role not found")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetUserRoles returns all roles assigned to a user.
|
|
func (p *PostgresDB) GetUserRoles(ctx context.Context, userID string) ([]*Role, error) {
|
|
rows, err := p.db.QueryContext(ctx, `
|
|
SELECT r.id, r.name, r.description, r.permissions, r.is_system, r.created_at
|
|
FROM roles r
|
|
JOIN user_roles ur ON r.id = ur.role_id
|
|
WHERE ur.user_id = $1
|
|
`, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var roles []*Role
|
|
for rows.Next() {
|
|
var role Role
|
|
var perms []byte
|
|
|
|
if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
json.Unmarshal(perms, &role.Permissions)
|
|
roles = append(roles, &role)
|
|
}
|
|
return roles, rows.Err()
|
|
}
|
|
|
|
// AssignRole assigns a role to a user.
|
|
func (p *PostgresDB) AssignRole(ctx context.Context, userID, roleID string) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2)
|
|
ON CONFLICT DO NOTHING
|
|
`, userID, roleID)
|
|
return err
|
|
}
|
|
|
|
// RemoveRole removes a role from a user.
|
|
func (p *PostgresDB) RemoveRole(ctx context.Context, userID, roleID string) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
DELETE FROM user_roles WHERE user_id = $1 AND role_id = $2
|
|
`, userID, roleID)
|
|
return err
|
|
}
|
|
|
|
// ============================================================================
|
|
// Sessions
|
|
// ============================================================================
|
|
|
|
// CreateSession creates a new session.
|
|
func (p *PostgresDB) CreateSession(ctx context.Context, session *Session) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO sessions (token, user_id, created_at, expires_at, ip_address, user_agent)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
`, session.Token, session.UserID, session.CreatedAt, session.ExpiresAt, session.IPAddress, session.UserAgent)
|
|
return err
|
|
}
|
|
|
|
// GetSession retrieves a session by token.
|
|
func (p *PostgresDB) GetSession(ctx context.Context, token string) (*Session, error) {
|
|
var session Session
|
|
|
|
err := p.db.QueryRowContext(ctx, `
|
|
SELECT token, user_id, created_at, expires_at, ip_address, user_agent
|
|
FROM sessions WHERE token = $1 AND expires_at > $2
|
|
`, token, time.Now().UTC()).Scan(&session.Token, &session.UserID, &session.CreatedAt,
|
|
&session.ExpiresAt, &session.IPAddress, &session.UserAgent)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &session, nil
|
|
}
|
|
|
|
// DeleteSession removes a session.
|
|
func (p *PostgresDB) DeleteSession(ctx context.Context, token string) error {
|
|
_, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE token = $1`, token)
|
|
return err
|
|
}
|
|
|
|
// DeleteUserSessions removes all sessions for a user.
|
|
func (p *PostgresDB) DeleteUserSessions(ctx context.Context, userID string) error {
|
|
_, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE user_id = $1`, userID)
|
|
return err
|
|
}
|
|
|
|
// CleanupExpiredSessions removes all expired sessions.
|
|
func (p *PostgresDB) CleanupExpiredSessions(ctx context.Context) error {
|
|
_, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE expires_at < $1`, time.Now().UTC())
|
|
return err
|
|
}
|
|
|
|
// ============================================================================
|
|
// Alerts
|
|
// ============================================================================
|
|
|
|
// StoreAlert stores a new alert.
|
|
func (p *PostgresDB) StoreAlert(ctx context.Context, alert *Alert) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO alerts (id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
|
`, alert.ID, alert.AgentID, alert.Type, alert.Severity, alert.Message,
|
|
alert.Value, alert.Threshold, alert.TriggeredAt, alert.ResolvedAt, alert.Acknowledged)
|
|
return err
|
|
}
|
|
|
|
// GetAlert retrieves an alert by ID.
|
|
func (p *PostgresDB) GetAlert(ctx context.Context, id string) (*Alert, error) {
|
|
var alert Alert
|
|
var resolvedAt sql.NullTime
|
|
|
|
err := p.db.QueryRowContext(ctx, `
|
|
SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged
|
|
FROM alerts WHERE id = $1
|
|
`, id).Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message,
|
|
&alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged)
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resolvedAt.Valid {
|
|
alert.ResolvedAt = &resolvedAt.Time
|
|
}
|
|
return &alert, nil
|
|
}
|
|
|
|
// QueryAlerts queries alerts with filters.
|
|
func (p *PostgresDB) QueryAlerts(ctx context.Context, filter AlertFilter) ([]*Alert, error) {
|
|
var conditions []string
|
|
var args []interface{}
|
|
argNum := 1
|
|
|
|
if filter.AgentID != "" {
|
|
conditions = append(conditions, fmt.Sprintf("agent_id = $%d", argNum))
|
|
args = append(args, filter.AgentID)
|
|
argNum++
|
|
}
|
|
if filter.Type != "" {
|
|
conditions = append(conditions, fmt.Sprintf("type = $%d", argNum))
|
|
args = append(args, filter.Type)
|
|
argNum++
|
|
}
|
|
if filter.Severity != "" {
|
|
conditions = append(conditions, fmt.Sprintf("severity = $%d", argNum))
|
|
args = append(args, filter.Severity)
|
|
argNum++
|
|
}
|
|
if filter.Acknowledged != nil {
|
|
conditions = append(conditions, fmt.Sprintf("acknowledged = $%d", argNum))
|
|
args = append(args, *filter.Acknowledged)
|
|
argNum++
|
|
}
|
|
if !filter.From.IsZero() {
|
|
conditions = append(conditions, fmt.Sprintf("triggered_at >= $%d", argNum))
|
|
args = append(args, filter.From)
|
|
argNum++
|
|
}
|
|
if !filter.To.IsZero() {
|
|
conditions = append(conditions, fmt.Sprintf("triggered_at <= $%d", argNum))
|
|
args = append(args, filter.To)
|
|
argNum++
|
|
}
|
|
|
|
query := "SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged FROM alerts"
|
|
if len(conditions) > 0 {
|
|
query += " WHERE " + strings.Join(conditions, " AND ")
|
|
}
|
|
query += " ORDER BY triggered_at DESC"
|
|
|
|
if filter.Limit > 0 {
|
|
query += fmt.Sprintf(" LIMIT %d", filter.Limit)
|
|
}
|
|
if filter.Offset > 0 {
|
|
query += fmt.Sprintf(" OFFSET %d", filter.Offset)
|
|
}
|
|
|
|
rows, err := p.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var alerts []*Alert
|
|
for rows.Next() {
|
|
var alert Alert
|
|
var resolvedAt sql.NullTime
|
|
|
|
err := rows.Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message,
|
|
&alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resolvedAt.Valid {
|
|
alert.ResolvedAt = &resolvedAt.Time
|
|
}
|
|
alerts = append(alerts, &alert)
|
|
}
|
|
return alerts, rows.Err()
|
|
}
|
|
|
|
// AcknowledgeAlert marks an alert as acknowledged.
|
|
func (p *PostgresDB) AcknowledgeAlert(ctx context.Context, id string) error {
|
|
_, err := p.db.ExecContext(ctx, `UPDATE alerts SET acknowledged = TRUE WHERE id = $1`, id)
|
|
return err
|
|
}
|
|
|
|
// ============================================================================
|
|
// Retention
|
|
// ============================================================================
|
|
|
|
// RunRetention runs the retention policy, aggregating and deleting old data.
|
|
func (p *PostgresDB) RunRetention(ctx context.Context) error {
|
|
now := time.Now().UTC()
|
|
|
|
// Aggregate raw -> 1min for data older than raw retention
|
|
rawCutoff := now.Add(-p.retention.RawRetention)
|
|
if err := p.aggregateRawTo1Min(ctx, rawCutoff); err != nil {
|
|
return fmt.Errorf("aggregate raw->1min: %w", err)
|
|
}
|
|
|
|
// Delete raw data older than retention
|
|
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_raw WHERE timestamp < $1`, rawCutoff); err != nil {
|
|
return fmt.Errorf("delete old raw: %w", err)
|
|
}
|
|
|
|
// Aggregate 1min -> 5min
|
|
oneMinCutoff := now.Add(-p.retention.OneMinuteRetention)
|
|
if err := p.aggregate1MinTo5Min(ctx, oneMinCutoff); err != nil {
|
|
return fmt.Errorf("aggregate 1min->5min: %w", err)
|
|
}
|
|
|
|
// Delete 1min data older than retention
|
|
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_1min WHERE timestamp < $1`, oneMinCutoff); err != nil {
|
|
return fmt.Errorf("delete old 1min: %w", err)
|
|
}
|
|
|
|
// Aggregate 5min -> hourly
|
|
fiveMinCutoff := now.Add(-p.retention.FiveMinuteRetention)
|
|
if err := p.aggregate5MinToHourly(ctx, fiveMinCutoff); err != nil {
|
|
return fmt.Errorf("aggregate 5min->hourly: %w", err)
|
|
}
|
|
|
|
// Delete 5min data older than retention
|
|
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_5min WHERE timestamp < $1`, fiveMinCutoff); err != nil {
|
|
return fmt.Errorf("delete old 5min: %w", err)
|
|
}
|
|
|
|
// Delete hourly data older than retention
|
|
hourlyCutoff := now.Add(-p.retention.HourlyRetention)
|
|
if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_hourly WHERE timestamp < $1`, hourlyCutoff); err != nil {
|
|
return fmt.Errorf("delete old hourly: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *PostgresDB) aggregateRawTo1Min(ctx context.Context, before time.Time) error {
|
|
// Use CTE to aggregate raw metrics into 1-minute buckets
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO metrics_1min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
|
|
SELECT
|
|
agent_id,
|
|
date_trunc('minute', timestamp) as ts,
|
|
AVG((data->'cpu'->>'usagePercent')::float),
|
|
MIN((data->'cpu'->>'usagePercent')::float),
|
|
MAX((data->'cpu'->>'usagePercent')::float),
|
|
AVG(CASE WHEN (data->'memory'->>'total')::bigint > 0
|
|
THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100
|
|
ELSE 0 END),
|
|
MIN(CASE WHEN (data->'memory'->>'total')::bigint > 0
|
|
THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100
|
|
ELSE 0 END),
|
|
MAX(CASE WHEN (data->'memory'->>'total')::bigint > 0
|
|
THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100
|
|
ELSE 0 END),
|
|
NULL,
|
|
NULL,
|
|
COUNT(*)
|
|
FROM metrics_raw
|
|
WHERE timestamp < $1 AND timestamp >= $2
|
|
GROUP BY agent_id, ts
|
|
ON CONFLICT (agent_id, timestamp) DO UPDATE SET
|
|
cpu_avg = EXCLUDED.cpu_avg,
|
|
cpu_min = EXCLUDED.cpu_min,
|
|
cpu_max = EXCLUDED.cpu_max,
|
|
mem_avg = EXCLUDED.mem_avg,
|
|
mem_min = EXCLUDED.mem_min,
|
|
mem_max = EXCLUDED.mem_max,
|
|
sample_count = EXCLUDED.sample_count
|
|
`, before, before.Add(-24*time.Hour))
|
|
return err
|
|
}
|
|
|
|
func (p *PostgresDB) aggregate1MinTo5Min(ctx context.Context, before time.Time) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO metrics_5min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
|
|
SELECT
|
|
agent_id,
|
|
date_trunc('hour', timestamp) +
|
|
INTERVAL '5 min' * (EXTRACT(MINUTE FROM timestamp)::int / 5) as ts,
|
|
AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max),
|
|
AVG(mem_avg), MIN(mem_min), MAX(mem_max),
|
|
AVG(disk_avg), AVG(gpu_avg),
|
|
SUM(sample_count)
|
|
FROM metrics_1min
|
|
WHERE timestamp < $1
|
|
GROUP BY agent_id, ts
|
|
ON CONFLICT (agent_id, timestamp) DO UPDATE SET
|
|
cpu_avg = EXCLUDED.cpu_avg,
|
|
cpu_min = EXCLUDED.cpu_min,
|
|
cpu_max = EXCLUDED.cpu_max,
|
|
mem_avg = EXCLUDED.mem_avg,
|
|
mem_min = EXCLUDED.mem_min,
|
|
mem_max = EXCLUDED.mem_max,
|
|
disk_avg = EXCLUDED.disk_avg,
|
|
gpu_avg = EXCLUDED.gpu_avg,
|
|
sample_count = EXCLUDED.sample_count
|
|
`, before)
|
|
return err
|
|
}
|
|
|
|
func (p *PostgresDB) aggregate5MinToHourly(ctx context.Context, before time.Time) error {
|
|
_, err := p.db.ExecContext(ctx, `
|
|
INSERT INTO metrics_hourly (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count)
|
|
SELECT
|
|
agent_id,
|
|
date_trunc('hour', timestamp) as ts,
|
|
AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max),
|
|
AVG(mem_avg), MIN(mem_min), MAX(mem_max),
|
|
AVG(disk_avg), AVG(gpu_avg),
|
|
SUM(sample_count)
|
|
FROM metrics_5min
|
|
WHERE timestamp < $1
|
|
GROUP BY agent_id, ts
|
|
ON CONFLICT (agent_id, timestamp) DO UPDATE SET
|
|
cpu_avg = EXCLUDED.cpu_avg,
|
|
cpu_min = EXCLUDED.cpu_min,
|
|
cpu_max = EXCLUDED.cpu_max,
|
|
mem_avg = EXCLUDED.mem_avg,
|
|
mem_min = EXCLUDED.mem_min,
|
|
mem_max = EXCLUDED.mem_max,
|
|
disk_avg = EXCLUDED.disk_avg,
|
|
gpu_avg = EXCLUDED.gpu_avg,
|
|
sample_count = EXCLUDED.sample_count
|
|
`, before)
|
|
return err
|
|
}
|
|
|
|
// =====================
|
|
// Log Storage Methods
|
|
// =====================
|
|
|
|
// StoreLogs stores multiple log entries in a batch.
|
|
func (p *PostgresDB) StoreLogs(ctx context.Context, entries []LogEntry) error {
|
|
if len(entries) == 0 {
|
|
return nil
|
|
}
|
|
|
|
tx, err := p.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
stmt, err := tx.PrepareContext(ctx, `
|
|
INSERT INTO logs (agent_id, timestamp, source, source_name, level, message, fields)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("prepare stmt: %w", err)
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, e := range entries {
|
|
var fieldsJSON []byte
|
|
if len(e.Fields) > 0 {
|
|
fieldsJSON, _ = json.Marshal(e.Fields)
|
|
}
|
|
|
|
_, err := stmt.ExecContext(ctx,
|
|
e.AgentID,
|
|
e.Timestamp,
|
|
e.Source,
|
|
e.SourceName,
|
|
string(e.Level),
|
|
e.Message,
|
|
fieldsJSON,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("insert log: %w", err)
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// QueryLogs queries logs with filtering and pagination.
|
|
// Returns entries, total count, and error.
|
|
func (p *PostgresDB) QueryLogs(ctx context.Context, filter LogFilter) ([]LogEntry, int, error) {
|
|
var conditions []string
|
|
var args []interface{}
|
|
argNum := 1
|
|
|
|
if filter.AgentID != "" {
|
|
conditions = append(conditions, fmt.Sprintf("agent_id = $%d", argNum))
|
|
args = append(args, filter.AgentID)
|
|
argNum++
|
|
}
|
|
|
|
if filter.Source != "" {
|
|
conditions = append(conditions, fmt.Sprintf("source = $%d", argNum))
|
|
args = append(args, filter.Source)
|
|
argNum++
|
|
}
|
|
|
|
if filter.SourceName != "" {
|
|
conditions = append(conditions, fmt.Sprintf("source_name = $%d", argNum))
|
|
args = append(args, filter.SourceName)
|
|
argNum++
|
|
}
|
|
|
|
if len(filter.Level) > 0 {
|
|
placeholders := make([]string, len(filter.Level))
|
|
for i, l := range filter.Level {
|
|
placeholders[i] = fmt.Sprintf("$%d", argNum)
|
|
args = append(args, string(l))
|
|
argNum++
|
|
}
|
|
conditions = append(conditions, fmt.Sprintf("level IN (%s)", strings.Join(placeholders, ",")))
|
|
}
|
|
|
|
if filter.Query != "" {
|
|
conditions = append(conditions, fmt.Sprintf("message ILIKE $%d", argNum))
|
|
args = append(args, "%"+filter.Query+"%")
|
|
argNum++
|
|
}
|
|
|
|
if !filter.From.IsZero() {
|
|
conditions = append(conditions, fmt.Sprintf("timestamp >= $%d", argNum))
|
|
args = append(args, filter.From)
|
|
argNum++
|
|
}
|
|
|
|
if !filter.To.IsZero() {
|
|
conditions = append(conditions, fmt.Sprintf("timestamp <= $%d", argNum))
|
|
args = append(args, filter.To)
|
|
argNum++
|
|
}
|
|
|
|
whereClause := ""
|
|
if len(conditions) > 0 {
|
|
whereClause = "WHERE " + strings.Join(conditions, " AND ")
|
|
}
|
|
|
|
// Get total count
|
|
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM logs %s", whereClause)
|
|
var total int
|
|
if err := p.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil {
|
|
return nil, 0, fmt.Errorf("count logs: %w", err)
|
|
}
|
|
|
|
// Get entries with pagination
|
|
limit := filter.Limit
|
|
if limit <= 0 {
|
|
limit = 100
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
SELECT id, agent_id, timestamp, source, source_name, level, message, fields
|
|
FROM logs %s
|
|
ORDER BY timestamp DESC
|
|
LIMIT $%d OFFSET $%d
|
|
`, whereClause, argNum, argNum+1)
|
|
|
|
args = append(args, limit, filter.Offset)
|
|
|
|
rows, err := p.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("query logs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var entries []LogEntry
|
|
for rows.Next() {
|
|
var e LogEntry
|
|
var level string
|
|
var fieldsJSON sql.NullString
|
|
|
|
err := rows.Scan(
|
|
&e.ID,
|
|
&e.AgentID,
|
|
&e.Timestamp,
|
|
&e.Source,
|
|
&e.SourceName,
|
|
&level,
|
|
&e.Message,
|
|
&fieldsJSON,
|
|
)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("scan log: %w", err)
|
|
}
|
|
|
|
e.Level = LogLevel(level)
|
|
|
|
if fieldsJSON.Valid && fieldsJSON.String != "" {
|
|
json.Unmarshal([]byte(fieldsJSON.String), &e.Fields)
|
|
}
|
|
|
|
entries = append(entries, e)
|
|
}
|
|
|
|
return entries, total, nil
|
|
}
|
|
|
|
// DeleteOldLogs deletes logs older than the specified time.
|
|
// Returns the number of deleted entries.
|
|
func (p *PostgresDB) DeleteOldLogs(ctx context.Context, before time.Time) (int, error) {
|
|
result, err := p.db.ExecContext(ctx, "DELETE FROM logs WHERE timestamp < $1", before)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("delete old logs: %w", err)
|
|
}
|
|
|
|
affected, err := result.RowsAffected()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return int(affected), nil
|
|
}
|