diff --git a/backend/internal/collectors/logs/collector.go b/backend/internal/collectors/logs/collector.go new file mode 100644 index 0000000..33f5467 --- /dev/null +++ b/backend/internal/collectors/logs/collector.go @@ -0,0 +1,254 @@ +// Package logs provides log collection from various sources. +package logs + +import ( + "context" + "sync" + "time" +) + +// LogLevel represents log severity levels. +type LogLevel string + +const ( + LogLevelDebug LogLevel = "debug" + LogLevelInfo LogLevel = "info" + LogLevelWarning LogLevel = "warning" + LogLevelError LogLevel = "error" + LogLevelFatal LogLevel = "fatal" +) + +// LogEntry represents a single log entry from any source. +type LogEntry struct { + Timestamp time.Time `json:"timestamp"` + AgentID string `json:"agentId,omitempty"` + Source string `json:"source"` // "journal", "file", "docker" + SourceName string `json:"sourceName"` // Unit name, filename, container name + Level LogLevel `json:"level"` + Message string `json:"message"` + Fields map[string]string `json:"fields,omitempty"` +} + +// Collector is the interface that all log collectors must implement. +type Collector interface { + // Name returns the collector name for identification. + Name() string + + // Start begins collecting logs and sends them to the provided channel. + Start(ctx context.Context, entries chan<- LogEntry) error + + // Stop gracefully stops the collector. + Stop() error +} + +// Config holds the configuration for the log collection system. +type Config struct { + Enabled bool `yaml:"enabled"` + BufferSize int `yaml:"buffer_size"` + FlushInterval time.Duration `yaml:"flush_interval"` + + Journal JournalConfig `yaml:"journal"` + Files FilesConfig `yaml:"files"` + Docker DockerConfig `yaml:"docker"` +} + +// JournalConfig holds systemd journal collector configuration. +type JournalConfig struct { + Enabled bool `yaml:"enabled"` + Units []string `yaml:"units"` // Filter by unit names (empty = all) + Priority int `yaml:"priority"` // 0=emerg to 7=debug, collect <= priority +} + +// FilesConfig holds file-based log collector configuration. +type FilesConfig struct { + Enabled bool `yaml:"enabled"` + Files []FileSpec `yaml:"files"` +} + +// FileSpec defines a single file to tail. +type FileSpec struct { + Path string `yaml:"path"` // File path or glob pattern + Format string `yaml:"format"` // "plain", "json", "nginx", "nginx_error" +} + +// DockerConfig holds Docker log collector configuration. +type DockerConfig struct { + Enabled bool `yaml:"enabled"` + Containers []string `yaml:"containers"` // Container names/IDs (empty = all) + Since string `yaml:"since"` // Only logs from this duration ago on start (e.g., "5m") +} + +// Manager manages multiple log collectors and aggregates their output. +type Manager struct { + collectors []Collector + config *Config + entries chan LogEntry + handlers []Handler + mu sync.RWMutex + stopCh chan struct{} + wg sync.WaitGroup +} + +// Handler receives log entries for processing. +type Handler interface { + HandleLogEntry(entry LogEntry) error +} + +// HandlerFunc is a function adapter for Handler. +type HandlerFunc func(LogEntry) error + +func (f HandlerFunc) HandleLogEntry(entry LogEntry) error { + return f(entry) +} + +// NewManager creates a new log collection manager. +func NewManager(config *Config) *Manager { + bufSize := config.BufferSize + if bufSize <= 0 { + bufSize = 1000 + } + + return &Manager{ + config: config, + collectors: make([]Collector, 0), + entries: make(chan LogEntry, bufSize), + handlers: make([]Handler, 0), + stopCh: make(chan struct{}), + } +} + +// AddCollector registers a new collector with the manager. +func (m *Manager) AddCollector(c Collector) { + m.mu.Lock() + defer m.mu.Unlock() + m.collectors = append(m.collectors, c) +} + +// AddHandler registers a handler to receive log entries. +func (m *Manager) AddHandler(h Handler) { + m.mu.Lock() + defer m.mu.Unlock() + m.handlers = append(m.handlers, h) +} + +// Start begins all collectors and the entry processing loop. +func (m *Manager) Start(ctx context.Context) error { + // Start all collectors + for _, c := range m.collectors { + collector := c + m.wg.Add(1) + go func() { + defer m.wg.Done() + if err := collector.Start(ctx, m.entries); err != nil { + // Log error but continue with other collectors + } + }() + } + + // Start the processing loop + m.wg.Add(1) + go m.processEntries(ctx) + + return nil +} + +// Stop gracefully stops all collectors and processing. +func (m *Manager) Stop() error { + close(m.stopCh) + + // Stop all collectors + for _, c := range m.collectors { + c.Stop() + } + + // Wait for all goroutines to finish + m.wg.Wait() + + // Close entries channel + close(m.entries) + + return nil +} + +// processEntries reads from the entries channel and dispatches to handlers. +func (m *Manager) processEntries(ctx context.Context) { + defer m.wg.Done() + + flushInterval := m.config.FlushInterval + if flushInterval <= 0 { + flushInterval = 5 * time.Second + } + + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + + var batch []LogEntry + + for { + select { + case <-ctx.Done(): + m.flushBatch(batch) + return + case <-m.stopCh: + m.flushBatch(batch) + return + case entry, ok := <-m.entries: + if !ok { + m.flushBatch(batch) + return + } + batch = append(batch, entry) + + // If batch is large enough, flush immediately + if len(batch) >= 100 { + m.flushBatch(batch) + batch = nil + } + case <-ticker.C: + if len(batch) > 0 { + m.flushBatch(batch) + batch = nil + } + } + } +} + +// flushBatch sends all entries in the batch to handlers. +func (m *Manager) flushBatch(batch []LogEntry) { + if len(batch) == 0 { + return + } + + m.mu.RLock() + handlers := m.handlers + m.mu.RUnlock() + + for _, entry := range batch { + for _, h := range handlers { + h.HandleLogEntry(entry) + } + } +} + +// DefaultConfig returns a sensible default configuration. +func DefaultConfig() *Config { + return &Config{ + Enabled: false, + BufferSize: 1000, + FlushInterval: 5 * time.Second, + Journal: JournalConfig{ + Enabled: true, + Units: []string{}, // All units + Priority: 6, // Info and above + }, + Files: FilesConfig{ + Enabled: false, + Files: []FileSpec{}, + }, + Docker: DockerConfig{ + Enabled: false, + Containers: []string{}, + Since: "5m", + }, + } +} diff --git a/backend/internal/collectors/logs/docker.go b/backend/internal/collectors/logs/docker.go new file mode 100644 index 0000000..3f4652d --- /dev/null +++ b/backend/internal/collectors/logs/docker.go @@ -0,0 +1,359 @@ +package logs + +import ( + "bufio" + "context" + "encoding/json" + "os/exec" + "strings" + "sync" + "time" +) + +// DockerCollector collects logs from Docker containers. +// Uses docker CLI for pure-Go implementation (no Docker SDK required). +type DockerCollector struct { + config DockerConfig + stopCh chan struct{} + cmds []*exec.Cmd + mu sync.Mutex + wg sync.WaitGroup +} + +// NewDockerCollector creates a new Docker log collector. +func NewDockerCollector(config DockerConfig) *DockerCollector { + return &DockerCollector{ + config: config, + stopCh: make(chan struct{}), + cmds: make([]*exec.Cmd, 0), + } +} + +// Name returns the collector name. +func (c *DockerCollector) Name() string { + return "docker" +} + +// Start begins collecting logs from Docker containers. +func (c *DockerCollector) Start(ctx context.Context, entries chan<- LogEntry) error { + if !c.config.Enabled { + return nil + } + + // Check if docker is available + if !IsDockerAvailable() { + return nil + } + + // Get list of containers to monitor + containers, err := c.getContainers() + if err != nil { + return err + } + + // Filter containers if specified + if len(c.config.Containers) > 0 { + containers = filterContainers(containers, c.config.Containers) + } + + // Start tailing each container + for _, container := range containers { + c.wg.Add(1) + go c.tailContainer(ctx, container, entries) + } + + return nil +} + +// Stop stops all container log tailers. +func (c *DockerCollector) Stop() error { + close(c.stopCh) + + c.mu.Lock() + for _, cmd := range c.cmds { + if cmd != nil && cmd.Process != nil { + cmd.Process.Kill() + } + } + c.mu.Unlock() + + c.wg.Wait() + return nil +} + +// ContainerInfo holds container identification info. +type ContainerInfo struct { + ID string + Name string + Image string +} + +// getContainers lists running Docker containers. +func (c *DockerCollector) getContainers() ([]ContainerInfo, error) { + cmd := exec.Command("docker", "ps", "--format", "{{.ID}}\t{{.Names}}\t{{.Image}}") + output, err := cmd.Output() + if err != nil { + return nil, err + } + + var containers []ContainerInfo + for _, line := range strings.Split(string(output), "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + parts := strings.Split(line, "\t") + if len(parts) >= 3 { + containers = append(containers, ContainerInfo{ + ID: parts[0], + Name: parts[1], + Image: parts[2], + }) + } + } + + return containers, nil +} + +// filterContainers filters containers by name or ID. +func filterContainers(all []ContainerInfo, filter []string) []ContainerInfo { + filterSet := make(map[string]bool) + for _, f := range filter { + filterSet[strings.ToLower(f)] = true + } + + var result []ContainerInfo + for _, c := range all { + if filterSet[strings.ToLower(c.Name)] || filterSet[strings.ToLower(c.ID)] { + result = append(result, c) + } + } + return result +} + +// tailContainer tails logs from a single container. +func (c *DockerCollector) tailContainer(ctx context.Context, container ContainerInfo, entries chan<- LogEntry) { + defer c.wg.Done() + + args := []string{ + "logs", + "--follow", + "--timestamps", + } + + // Add since filter + if c.config.Since != "" { + args = append(args, "--since="+c.config.Since) + } + + args = append(args, container.ID) + + c.mu.Lock() + cmd := exec.CommandContext(ctx, "docker", args...) + c.cmds = append(c.cmds, cmd) + c.mu.Unlock() + + // Docker logs go to stderr for stderr stream, stdout for stdout + // We'll capture both by merging them + stdout, err := cmd.StdoutPipe() + if err != nil { + return + } + stderr, err := cmd.StderrPipe() + if err != nil { + return + } + + if err := cmd.Start(); err != nil { + return + } + + // Create a combined reader + done := make(chan struct{}) + + // Read stdout + go func() { + scanner := bufio.NewScanner(stdout) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) + + for scanner.Scan() { + select { + case <-ctx.Done(): + return + case <-c.stopCh: + return + case <-done: + return + default: + } + + entry := c.parseDockerLog(scanner.Text(), container.Name, "stdout") + select { + case entries <- entry: + default: + } + } + }() + + // Read stderr + go func() { + scanner := bufio.NewScanner(stderr) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) + + for scanner.Scan() { + select { + case <-ctx.Done(): + return + case <-c.stopCh: + return + case <-done: + return + default: + } + + entry := c.parseDockerLog(scanner.Text(), container.Name, "stderr") + // Stderr often contains errors + if entry.Level == LogLevelInfo { + entry.Level = LogLevelWarning + } + select { + case entries <- entry: + default: + } + } + }() + + // Wait for command to finish + cmd.Wait() + close(done) +} + +// parseDockerLog parses a docker log line with timestamp prefix. +func (c *DockerCollector) parseDockerLog(line string, containerName string, stream string) LogEntry { + entry := LogEntry{ + Timestamp: time.Now(), + Source: "docker", + SourceName: containerName, + Level: LogLevelInfo, + Fields: map[string]string{ + "stream": stream, + }, + } + + // Docker timestamps are in RFC3339Nano format at the start of the line + // Format: 2024-01-15T10:30:45.123456789Z message + if len(line) > 30 && line[4] == '-' && line[7] == '-' && line[10] == 'T' { + // Find the space after timestamp + spaceIdx := strings.Index(line, " ") + if spaceIdx > 20 { + tsStr := line[:spaceIdx] + message := line[spaceIdx+1:] + + if ts, err := time.Parse(time.RFC3339Nano, tsStr); err == nil { + entry.Timestamp = ts + } + line = message + } + } + + // Try to parse as JSON (common for container logs) + var jsonLog map[string]interface{} + if err := json.Unmarshal([]byte(line), &jsonLog); err == nil { + // Extract message + if msg, ok := jsonLog["message"].(string); ok { + entry.Message = msg + } else if msg, ok := jsonLog["msg"].(string); ok { + entry.Message = msg + } else if log, ok := jsonLog["log"].(string); ok { + entry.Message = log + } else { + entry.Message = line + } + + // Extract level + if level, ok := jsonLog["level"].(string); ok { + entry.Level = stringToLevel(level) + } else if level, ok := jsonLog["severity"].(string); ok { + entry.Level = stringToLevel(level) + } + + // Add other fields + for k, v := range jsonLog { + switch k { + case "message", "msg", "log", "level", "severity", "time", "timestamp": + continue + default: + if s, ok := v.(string); ok { + entry.Fields[k] = s + } + } + } + } else { + // Plain text log + entry.Message = line + entry.Level = detectLevel(line) + } + + return entry +} + +// IsDockerAvailable checks if Docker CLI is available. +func IsDockerAvailable() bool { + cmd := exec.Command("docker", "info") + return cmd.Run() == nil +} + +// ListContainers returns a list of running Docker containers. +func ListContainers() ([]ContainerInfo, error) { + collector := &DockerCollector{ + config: DockerConfig{Enabled: true}, + } + return collector.getContainers() +} + +// DockerTailer provides a simple interface for tailing container logs. +type DockerTailer struct { + containerID string + since string + entries chan LogEntry + stopCh chan struct{} + cmd *exec.Cmd +} + +// NewDockerTailer creates a new Docker log tailer for a specific container. +func NewDockerTailer(containerID string, since string) *DockerTailer { + return &DockerTailer{ + containerID: containerID, + since: since, + entries: make(chan LogEntry, 100), + stopCh: make(chan struct{}), + } +} + +// Entries returns the channel for receiving log entries. +func (t *DockerTailer) Entries() <-chan LogEntry { + return t.entries +} + +// Start begins tailing the container. +func (t *DockerTailer) Start(ctx context.Context) error { + config := DockerConfig{ + Enabled: true, + Containers: []string{t.containerID}, + Since: t.since, + } + + collector := NewDockerCollector(config) + return collector.Start(ctx, t.entries) +} + +// Stop stops the tailer. +func (t *DockerTailer) Stop() { + close(t.stopCh) + if t.cmd != nil && t.cmd.Process != nil { + t.cmd.Process.Kill() + } +} diff --git a/backend/internal/collectors/logs/file.go b/backend/internal/collectors/logs/file.go new file mode 100644 index 0000000..aa8a2da --- /dev/null +++ b/backend/internal/collectors/logs/file.go @@ -0,0 +1,384 @@ +package logs + +import ( + "bufio" + "context" + "encoding/json" + "io" + "os" + "path/filepath" + "regexp" + "strings" + "sync" + "time" +) + +// FileCollector tails log files and collects entries. +type FileCollector struct { + config FilesConfig + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewFileCollector creates a new file-based log collector. +func NewFileCollector(config FilesConfig) *FileCollector { + return &FileCollector{ + config: config, + stopCh: make(chan struct{}), + } +} + +// Name returns the collector name. +func (c *FileCollector) Name() string { + return "file" +} + +// Start begins tailing configured log files. +func (c *FileCollector) Start(ctx context.Context, entries chan<- LogEntry) error { + if !c.config.Enabled { + return nil + } + + // Expand globs and start tailing each file + for _, spec := range c.config.Files { + files, err := filepath.Glob(spec.Path) + if err != nil { + continue + } + + for _, file := range files { + c.wg.Add(1) + go c.tailFile(ctx, file, spec.Format, entries) + } + } + + return nil +} + +// Stop stops all file tailers. +func (c *FileCollector) Stop() error { + close(c.stopCh) + c.wg.Wait() + return nil +} + +// tailFile tails a single file and sends entries to the channel. +func (c *FileCollector) tailFile(ctx context.Context, path string, format string, entries chan<- LogEntry) { + defer c.wg.Done() + + parser := newLogParser(format) + + // Open file + file, err := os.Open(path) + if err != nil { + return + } + defer file.Close() + + // Seek to end to only get new entries + file.Seek(0, io.SeekEnd) + + reader := bufio.NewReader(file) + baseName := filepath.Base(path) + + pollInterval := 100 * time.Millisecond + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-c.stopCh: + return + case <-ticker.C: + // Try to read new lines + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + // Check if file was truncated or rotated + if info, statErr := file.Stat(); statErr == nil { + pos, _ := file.Seek(0, io.SeekCurrent) + if info.Size() < pos { + // File was truncated, seek to start + file.Seek(0, io.SeekStart) + reader.Reset(file) + } + } + } + break // No more lines available + } + + line = strings.TrimRight(line, "\r\n") + if line == "" { + continue + } + + entry := parser.Parse(line, baseName) + + select { + case entries <- entry: + case <-ctx.Done(): + return + case <-c.stopCh: + return + default: + // Drop if channel full + } + } + } + } +} + +// LogParser parses log lines based on format. +type LogParser interface { + Parse(line string, sourceName string) LogEntry +} + +// newLogParser creates a parser for the given format. +func newLogParser(format string) LogParser { + switch format { + case "json": + return &JSONParser{} + case "nginx": + return &NginxParser{} + case "nginx_error": + return &NginxErrorParser{} + default: + return &PlainParser{} + } +} + +// PlainParser treats each line as a plain text message. +type PlainParser struct{} + +func (p *PlainParser) Parse(line string, sourceName string) LogEntry { + level := detectLevel(line) + + return LogEntry{ + Timestamp: time.Now(), + Source: "file", + SourceName: sourceName, + Level: level, + Message: line, + } +} + +// JSONParser parses JSON-formatted log lines. +type JSONParser struct{} + +func (p *JSONParser) Parse(line string, sourceName string) LogEntry { + var data map[string]interface{} + if err := json.Unmarshal([]byte(line), &data); err != nil { + // Fall back to plain text + return (&PlainParser{}).Parse(line, sourceName) + } + + entry := LogEntry{ + Timestamp: time.Now(), + Source: "file", + SourceName: sourceName, + Level: LogLevelInfo, + Fields: make(map[string]string), + } + + // Extract common fields + if msg, ok := data["message"].(string); ok { + entry.Message = msg + } else if msg, ok := data["msg"].(string); ok { + entry.Message = msg + } else { + // Use whole line as message + entry.Message = line + } + + // Extract level + if level, ok := data["level"].(string); ok { + entry.Level = stringToLevel(level) + } else if level, ok := data["severity"].(string); ok { + entry.Level = stringToLevel(level) + } + + // Extract timestamp + if ts, ok := data["timestamp"].(string); ok { + if parsed, err := time.Parse(time.RFC3339, ts); err == nil { + entry.Timestamp = parsed + } + } else if ts, ok := data["time"].(string); ok { + if parsed, err := time.Parse(time.RFC3339, ts); err == nil { + entry.Timestamp = parsed + } + } + + // Store other fields + for k, v := range data { + switch k { + case "message", "msg", "level", "severity", "timestamp", "time": + continue + default: + if s, ok := v.(string); ok { + entry.Fields[k] = s + } + } + } + + return entry +} + +// NginxParser parses nginx access log format. +type NginxParser struct { + re *regexp.Regexp +} + +func (p *NginxParser) Parse(line string, sourceName string) LogEntry { + if p.re == nil { + // Combined log format: $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" + p.re = regexp.MustCompile(`^(\S+) - (\S+) \[([^\]]+)\] "([^"]*)" (\d+) (\d+) "([^"]*)" "([^"]*)"`) + } + + matches := p.re.FindStringSubmatch(line) + if matches == nil { + return (&PlainParser{}).Parse(line, sourceName) + } + + entry := LogEntry{ + Timestamp: time.Now(), + Source: "file", + SourceName: sourceName, + Level: LogLevelInfo, + Message: matches[4], // Request line + Fields: map[string]string{ + "remote_addr": matches[1], + "remote_user": matches[2], + "status": matches[5], + "bytes": matches[6], + "referer": matches[7], + "user_agent": matches[8], + }, + } + + // Parse timestamp + if ts, err := time.Parse("02/Jan/2006:15:04:05 -0700", matches[3]); err == nil { + entry.Timestamp = ts + } + + // Determine level based on status code + status := matches[5] + if len(status) > 0 { + switch status[0] { + case '5': + entry.Level = LogLevelError + case '4': + entry.Level = LogLevelWarning + } + } + + return entry +} + +// NginxErrorParser parses nginx error log format. +type NginxErrorParser struct { + re *regexp.Regexp +} + +func (p *NginxErrorParser) Parse(line string, sourceName string) LogEntry { + if p.re == nil { + // Error log format: YYYY/MM/DD HH:MM:SS [level] PID#TID: *CID message + p.re = regexp.MustCompile(`^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (\d+#\d+): (.*)`) + } + + matches := p.re.FindStringSubmatch(line) + if matches == nil { + return (&PlainParser{}).Parse(line, sourceName) + } + + entry := LogEntry{ + Timestamp: time.Now(), + Source: "file", + SourceName: sourceName, + Level: stringToLevel(matches[2]), + Message: matches[4], + Fields: map[string]string{ + "process": matches[3], + }, + } + + // Parse timestamp + if ts, err := time.Parse("2006/01/02 15:04:05", matches[1]); err == nil { + entry.Timestamp = ts + } + + return entry +} + +// detectLevel attempts to detect log level from message content. +func detectLevel(message string) LogLevel { + lower := strings.ToLower(message) + + switch { + case strings.Contains(lower, "fatal") || strings.Contains(lower, "panic"): + return LogLevelFatal + case strings.Contains(lower, "error") || strings.Contains(lower, "err"): + return LogLevelError + case strings.Contains(lower, "warn"): + return LogLevelWarning + case strings.Contains(lower, "debug"): + return LogLevelDebug + default: + return LogLevelInfo + } +} + +// stringToLevel converts a string level to LogLevel. +func stringToLevel(s string) LogLevel { + switch strings.ToLower(s) { + case "debug", "trace": + return LogLevelDebug + case "info", "notice": + return LogLevelInfo + case "warn", "warning": + return LogLevelWarning + case "error", "err": + return LogLevelError + case "fatal", "critical", "crit", "emerg", "alert", "panic": + return LogLevelFatal + default: + return LogLevelInfo + } +} + +// FileTailer provides a simple interface for tailing a single file. +type FileTailer struct { + path string + format string + entries chan LogEntry + stopCh chan struct{} +} + +// NewFileTailer creates a new file tailer. +func NewFileTailer(path, format string) *FileTailer { + return &FileTailer{ + path: path, + format: format, + entries: make(chan LogEntry, 100), + stopCh: make(chan struct{}), + } +} + +// Entries returns the channel for receiving log entries. +func (t *FileTailer) Entries() <-chan LogEntry { + return t.entries +} + +// Start begins tailing the file. +func (t *FileTailer) Start(ctx context.Context) error { + collector := &FileCollector{} + go collector.tailFile(ctx, t.path, t.format, t.entries) + return nil +} + +// Stop stops the tailer. +func (t *FileTailer) Stop() { + close(t.stopCh) +} diff --git a/backend/internal/collectors/logs/journal.go b/backend/internal/collectors/logs/journal.go new file mode 100644 index 0000000..4406b20 --- /dev/null +++ b/backend/internal/collectors/logs/journal.go @@ -0,0 +1,304 @@ +package logs + +import ( + "bufio" + "context" + "encoding/json" + "os/exec" + "strconv" + "strings" + "sync" + "time" +) + +// JournalCollector collects logs from systemd journal. +// Uses journalctl command for pure-Go implementation (no CGO required). +type JournalCollector struct { + config JournalConfig + cmd *exec.Cmd + stopCh chan struct{} + mu sync.Mutex +} + +// NewJournalCollector creates a new systemd journal collector. +func NewJournalCollector(config JournalConfig) *JournalCollector { + return &JournalCollector{ + config: config, + stopCh: make(chan struct{}), + } +} + +// Name returns the collector name. +func (c *JournalCollector) Name() string { + return "journal" +} + +// Start begins collecting journal entries. +func (c *JournalCollector) Start(ctx context.Context, entries chan<- LogEntry) error { + if !c.config.Enabled { + return nil + } + + // Check if journalctl is available + if _, err := exec.LookPath("journalctl"); err != nil { + return nil // Skip silently if journalctl not available + } + + // Build journalctl command arguments + args := []string{ + "--follow", // Follow new entries + "--output=json", // JSON output for structured parsing + "--no-pager", // Don't use pager + "--since=now", // Start from now (don't replay old logs) + "--output-fields=MESSAGE,PRIORITY,SYSLOG_IDENTIFIER,_SYSTEMD_UNIT,_HOSTNAME,_PID", + } + + // Add priority filter + if c.config.Priority >= 0 && c.config.Priority <= 7 { + args = append(args, "--priority="+strconv.Itoa(c.config.Priority)) + } + + // Add unit filters + for _, unit := range c.config.Units { + args = append(args, "--unit="+unit) + } + + c.mu.Lock() + c.cmd = exec.CommandContext(ctx, "journalctl", args...) + stdout, err := c.cmd.StdoutPipe() + if err != nil { + c.mu.Unlock() + return err + } + c.mu.Unlock() + + if err := c.cmd.Start(); err != nil { + return err + } + + // Read and parse journal entries + scanner := bufio.NewScanner(stdout) + // Increase buffer size for long log lines + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) + + go func() { + for scanner.Scan() { + select { + case <-ctx.Done(): + return + case <-c.stopCh: + return + default: + } + + line := scanner.Text() + if line == "" { + continue + } + + entry, ok := c.parseJournalEntry(line) + if !ok { + continue + } + + select { + case entries <- entry: + case <-ctx.Done(): + return + case <-c.stopCh: + return + default: + // Drop entry if channel is full (backpressure) + } + } + }() + + // Wait for command to finish + go func() { + c.cmd.Wait() + }() + + return nil +} + +// Stop stops the journal collector. +func (c *JournalCollector) Stop() error { + close(c.stopCh) + + c.mu.Lock() + defer c.mu.Unlock() + + if c.cmd != nil && c.cmd.Process != nil { + c.cmd.Process.Kill() + } + + return nil +} + +// JournalEntry represents a raw journal entry in JSON format. +type JournalEntry struct { + Message string `json:"MESSAGE"` + Priority string `json:"PRIORITY"` + SyslogIdentifier string `json:"SYSLOG_IDENTIFIER"` + SystemdUnit string `json:"_SYSTEMD_UNIT"` + Hostname string `json:"_HOSTNAME"` + PID string `json:"_PID"` + RealtimeTimestamp string `json:"__REALTIME_TIMESTAMP"` +} + +// parseJournalEntry parses a JSON journal entry line. +func (c *JournalCollector) parseJournalEntry(line string) (LogEntry, bool) { + var je JournalEntry + if err := json.Unmarshal([]byte(line), &je); err != nil { + return LogEntry{}, false + } + + // Parse timestamp + ts := time.Now() + if je.RealtimeTimestamp != "" { + // Journal timestamp is in microseconds since epoch + if usec, err := strconv.ParseInt(je.RealtimeTimestamp, 10, 64); err == nil { + ts = time.UnixMicro(usec) + } + } + + // Map priority to level + level := priorityToLevel(je.Priority) + + // Determine source name (unit or syslog identifier) + sourceName := je.SystemdUnit + if sourceName == "" { + sourceName = je.SyslogIdentifier + } + if sourceName == "" { + sourceName = "unknown" + } + + // Build extra fields + fields := make(map[string]string) + if je.Hostname != "" { + fields["hostname"] = je.Hostname + } + if je.PID != "" { + fields["pid"] = je.PID + } + + return LogEntry{ + Timestamp: ts, + Source: "journal", + SourceName: sourceName, + Level: level, + Message: je.Message, + Fields: fields, + }, true +} + +// priorityToLevel converts syslog priority to LogLevel. +func priorityToLevel(priority string) LogLevel { + p, err := strconv.Atoi(priority) + if err != nil { + return LogLevelInfo + } + + switch p { + case 0, 1, 2: // emerg, alert, crit + return LogLevelFatal + case 3: // error + return LogLevelError + case 4: // warning + return LogLevelWarning + case 5, 6: // notice, info + return LogLevelInfo + case 7: // debug + return LogLevelDebug + default: + return LogLevelInfo + } +} + +// IsJournalAvailable checks if systemd journal is available on this system. +func IsJournalAvailable() bool { + _, err := exec.LookPath("journalctl") + return err == nil +} + +// JournalTailer provides a simpler interface for tailing journal entries. +type JournalTailer struct { + config JournalConfig + entries chan LogEntry +} + +// NewJournalTailer creates a new journal tailer. +func NewJournalTailer(config JournalConfig) *JournalTailer { + return &JournalTailer{ + config: config, + entries: make(chan LogEntry, 100), + } +} + +// QueryRecentEntries queries recent journal entries without following. +func QueryRecentEntries(units []string, since time.Duration, limit int) ([]LogEntry, error) { + if !IsJournalAvailable() { + return nil, nil + } + + args := []string{ + "--output=json", + "--no-pager", + "--since=" + formatDuration(since), + } + + if limit > 0 { + args = append(args, "--lines="+strconv.Itoa(limit)) + } + + for _, unit := range units { + args = append(args, "--unit="+unit) + } + + cmd := exec.Command("journalctl", args...) + output, err := cmd.Output() + if err != nil { + return nil, err + } + + var entries []LogEntry + collector := &JournalCollector{} + + for _, line := range strings.Split(string(output), "\n") { + if line == "" { + continue + } + if entry, ok := collector.parseJournalEntry(line); ok { + entries = append(entries, entry) + } + } + + return entries, nil +} + +// formatDuration formats a duration for journalctl --since flag. +func formatDuration(d time.Duration) string { + if d <= 0 { + return "today" + } + + secs := int(d.Seconds()) + if secs < 60 { + return strconv.Itoa(secs) + " seconds ago" + } + + mins := secs / 60 + if mins < 60 { + return strconv.Itoa(mins) + " minutes ago" + } + + hours := mins / 60 + if hours < 24 { + return strconv.Itoa(hours) + " hours ago" + } + + days := hours / 24 + return strconv.Itoa(days) + " days ago" +} diff --git a/backend/internal/database/database.go b/backend/internal/database/database.go index 3e83c5f..0e72481 100644 --- a/backend/internal/database/database.go +++ b/backend/internal/database/database.go @@ -60,6 +60,11 @@ type Database interface { // Retention RunRetention(ctx context.Context) error + + // Logs + StoreLogs(ctx context.Context, entries []LogEntry) error + QueryLogs(ctx context.Context, filter LogFilter) ([]LogEntry, int, error) + DeleteOldLogs(ctx context.Context, before time.Time) (int, error) } // MetricPoint represents a single metric data point. @@ -182,6 +187,42 @@ type AlertFilter struct { Offset int } +// LogLevel represents log severity. +type LogLevel string + +const ( + LogLevelDebug LogLevel = "debug" + LogLevelInfo LogLevel = "info" + LogLevelWarning LogLevel = "warning" + LogLevelError LogLevel = "error" + LogLevelFatal LogLevel = "fatal" +) + +// LogEntry represents a stored log entry. +type LogEntry struct { + ID int64 `json:"id"` + AgentID string `json:"agentId"` + Timestamp time.Time `json:"timestamp"` + Source string `json:"source"` // "journal", "file", "docker" + SourceName string `json:"sourceName"` // Unit name, filename, container + Level LogLevel `json:"level"` + Message string `json:"message"` + Fields map[string]string `json:"fields,omitempty"` +} + +// LogFilter specifies criteria for querying logs. +type LogFilter struct { + AgentID string // Filter by agent + Source string // Filter by source type (journal, file, docker) + SourceName string // Filter by source name + Level []LogLevel // Filter by levels + Query string // Full-text search query + From time.Time + To time.Time + Limit int + Offset int +} + // RetentionConfig defines data retention policies. type RetentionConfig struct { // Raw metrics retention (default: 24 hours) @@ -195,6 +236,9 @@ type RetentionConfig struct { // Hourly aggregation retention (default: 1 year) HourlyRetention time.Duration + + // Log retention (default: 7 days) + LogRetention time.Duration } // DefaultRetentionConfig returns default retention settings. @@ -204,5 +248,6 @@ func DefaultRetentionConfig() RetentionConfig { OneMinuteRetention: 7 * 24 * time.Hour, FiveMinuteRetention: 30 * 24 * time.Hour, HourlyRetention: 365 * 24 * time.Hour, + LogRetention: 7 * 24 * time.Hour, } } diff --git a/backend/internal/database/postgres.go b/backend/internal/database/postgres.go index 0845052..5d90d28 100644 --- a/backend/internal/database/postgres.go +++ b/backend/internal/database/postgres.go @@ -59,6 +59,7 @@ func (p *PostgresDB) Migrate() error { pgMigrationSessions, pgMigrationMetrics, pgMigrationAlerts, + pgMigrationLogs, } for i, m := range migrations { @@ -204,6 +205,23 @@ CREATE INDEX IF NOT EXISTS idx_alerts_triggered ON alerts(triggered_at); CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity); ` +const pgMigrationLogs = ` +CREATE TABLE IF NOT EXISTS logs ( + id BIGSERIAL PRIMARY KEY, + agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + timestamp TIMESTAMPTZ NOT NULL, + source TEXT NOT NULL, + source_name TEXT, + level TEXT NOT NULL, + message TEXT NOT NULL, + fields JSONB +); +CREATE INDEX IF NOT EXISTS idx_logs_agent_time ON logs(agent_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level); +CREATE INDEX IF NOT EXISTS idx_logs_source ON logs(source, source_name); +CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp DESC); +` + func (p *PostgresDB) insertDefaultRoles() error { defaultRoles := []struct { id, name, desc string @@ -1050,3 +1068,185 @@ func (p *PostgresDB) aggregate5MinToHourly(ctx context.Context, before time.Time `, before) return err } + +// ===================== +// Log Storage Methods +// ===================== + +// StoreLogs stores multiple log entries in a batch. +func (p *PostgresDB) StoreLogs(ctx context.Context, entries []LogEntry) error { + if len(entries) == 0 { + return nil + } + + tx, err := p.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, ` + INSERT INTO logs (agent_id, timestamp, source, source_name, level, message, fields) + VALUES ($1, $2, $3, $4, $5, $6, $7) + `) + if err != nil { + return fmt.Errorf("prepare stmt: %w", err) + } + defer stmt.Close() + + for _, e := range entries { + var fieldsJSON []byte + if len(e.Fields) > 0 { + fieldsJSON, _ = json.Marshal(e.Fields) + } + + _, err := stmt.ExecContext(ctx, + e.AgentID, + e.Timestamp, + e.Source, + e.SourceName, + string(e.Level), + e.Message, + fieldsJSON, + ) + if err != nil { + return fmt.Errorf("insert log: %w", err) + } + } + + return tx.Commit() +} + +// QueryLogs queries logs with filtering and pagination. +// Returns entries, total count, and error. +func (p *PostgresDB) QueryLogs(ctx context.Context, filter LogFilter) ([]LogEntry, int, error) { + var conditions []string + var args []interface{} + argNum := 1 + + if filter.AgentID != "" { + conditions = append(conditions, fmt.Sprintf("agent_id = $%d", argNum)) + args = append(args, filter.AgentID) + argNum++ + } + + if filter.Source != "" { + conditions = append(conditions, fmt.Sprintf("source = $%d", argNum)) + args = append(args, filter.Source) + argNum++ + } + + if filter.SourceName != "" { + conditions = append(conditions, fmt.Sprintf("source_name = $%d", argNum)) + args = append(args, filter.SourceName) + argNum++ + } + + if len(filter.Level) > 0 { + placeholders := make([]string, len(filter.Level)) + for i, l := range filter.Level { + placeholders[i] = fmt.Sprintf("$%d", argNum) + args = append(args, string(l)) + argNum++ + } + conditions = append(conditions, fmt.Sprintf("level IN (%s)", strings.Join(placeholders, ","))) + } + + if filter.Query != "" { + conditions = append(conditions, fmt.Sprintf("message ILIKE $%d", argNum)) + args = append(args, "%"+filter.Query+"%") + argNum++ + } + + if !filter.From.IsZero() { + conditions = append(conditions, fmt.Sprintf("timestamp >= $%d", argNum)) + args = append(args, filter.From) + argNum++ + } + + if !filter.To.IsZero() { + conditions = append(conditions, fmt.Sprintf("timestamp <= $%d", argNum)) + args = append(args, filter.To) + argNum++ + } + + whereClause := "" + if len(conditions) > 0 { + whereClause = "WHERE " + strings.Join(conditions, " AND ") + } + + // Get total count + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM logs %s", whereClause) + var total int + if err := p.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil { + return nil, 0, fmt.Errorf("count logs: %w", err) + } + + // Get entries with pagination + limit := filter.Limit + if limit <= 0 { + limit = 100 + } + + query := fmt.Sprintf(` + SELECT id, agent_id, timestamp, source, source_name, level, message, fields + FROM logs %s + ORDER BY timestamp DESC + LIMIT $%d OFFSET $%d + `, whereClause, argNum, argNum+1) + + args = append(args, limit, filter.Offset) + + rows, err := p.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, 0, fmt.Errorf("query logs: %w", err) + } + defer rows.Close() + + var entries []LogEntry + for rows.Next() { + var e LogEntry + var level string + var fieldsJSON sql.NullString + + err := rows.Scan( + &e.ID, + &e.AgentID, + &e.Timestamp, + &e.Source, + &e.SourceName, + &level, + &e.Message, + &fieldsJSON, + ) + if err != nil { + return nil, 0, fmt.Errorf("scan log: %w", err) + } + + e.Level = LogLevel(level) + + if fieldsJSON.Valid && fieldsJSON.String != "" { + json.Unmarshal([]byte(fieldsJSON.String), &e.Fields) + } + + entries = append(entries, e) + } + + return entries, total, nil +} + +// DeleteOldLogs deletes logs older than the specified time. +// Returns the number of deleted entries. +func (p *PostgresDB) DeleteOldLogs(ctx context.Context, before time.Time) (int, error) { + result, err := p.db.ExecContext(ctx, "DELETE FROM logs WHERE timestamp < $1", before) + if err != nil { + return 0, fmt.Errorf("delete old logs: %w", err) + } + + affected, err := result.RowsAffected() + if err != nil { + return 0, err + } + + return int(affected), nil +} diff --git a/backend/internal/database/sqlite.go b/backend/internal/database/sqlite.go index 3026b19..ca1a6c4 100644 --- a/backend/internal/database/sqlite.go +++ b/backend/internal/database/sqlite.go @@ -62,6 +62,7 @@ func (s *SQLiteDB) Migrate() error { migrationSessions, migrationMetrics, migrationAlerts, + migrationLogs, } for i, m := range migrations { @@ -213,6 +214,24 @@ CREATE INDEX IF NOT EXISTS idx_alerts_triggered ON alerts(triggered_at); CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity); ` +const migrationLogs = ` +CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + source TEXT NOT NULL, + source_name TEXT, + level TEXT NOT NULL, + message TEXT NOT NULL, + fields TEXT, -- JSON object + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_logs_agent_time ON logs(agent_id, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_logs_level ON logs(level); +CREATE INDEX IF NOT EXISTS idx_logs_source ON logs(source, source_name); +CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON logs(timestamp DESC); +` + func (s *SQLiteDB) insertDefaultRoles() error { defaultRoles := []struct { id, name, desc string @@ -1132,3 +1151,177 @@ func aggregatePoints(points []MetricPoint) MetricPoint { return agg } + +// ===================== +// Log Storage Methods +// ===================== + +// StoreLogs stores multiple log entries in a batch. +func (s *SQLiteDB) StoreLogs(ctx context.Context, entries []LogEntry) error { + if len(entries) == 0 { + return nil + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer tx.Rollback() + + stmt, err := tx.PrepareContext(ctx, ` + INSERT INTO logs (agent_id, timestamp, source, source_name, level, message, fields) + VALUES (?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + return fmt.Errorf("prepare stmt: %w", err) + } + defer stmt.Close() + + for _, e := range entries { + var fieldsJSON []byte + if len(e.Fields) > 0 { + fieldsJSON, _ = json.Marshal(e.Fields) + } + + _, err := stmt.ExecContext(ctx, + e.AgentID, + e.Timestamp, + e.Source, + e.SourceName, + string(e.Level), + e.Message, + fieldsJSON, + ) + if err != nil { + return fmt.Errorf("insert log: %w", err) + } + } + + return tx.Commit() +} + +// QueryLogs queries logs with filtering and pagination. +// Returns entries, total count, and error. +func (s *SQLiteDB) QueryLogs(ctx context.Context, filter LogFilter) ([]LogEntry, int, error) { + var conditions []string + var args []interface{} + + if filter.AgentID != "" { + conditions = append(conditions, "agent_id = ?") + args = append(args, filter.AgentID) + } + + if filter.Source != "" { + conditions = append(conditions, "source = ?") + args = append(args, filter.Source) + } + + if filter.SourceName != "" { + conditions = append(conditions, "source_name = ?") + args = append(args, filter.SourceName) + } + + if len(filter.Level) > 0 { + placeholders := make([]string, len(filter.Level)) + for i, l := range filter.Level { + placeholders[i] = "?" + args = append(args, string(l)) + } + conditions = append(conditions, fmt.Sprintf("level IN (%s)", strings.Join(placeholders, ","))) + } + + if filter.Query != "" { + conditions = append(conditions, "message LIKE ?") + args = append(args, "%"+filter.Query+"%") + } + + if !filter.From.IsZero() { + conditions = append(conditions, "timestamp >= ?") + args = append(args, filter.From) + } + + if !filter.To.IsZero() { + conditions = append(conditions, "timestamp <= ?") + args = append(args, filter.To) + } + + whereClause := "" + if len(conditions) > 0 { + whereClause = "WHERE " + strings.Join(conditions, " AND ") + } + + // Get total count + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM logs %s", whereClause) + var total int + if err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&total); err != nil { + return nil, 0, fmt.Errorf("count logs: %w", err) + } + + // Get entries with pagination + limit := filter.Limit + if limit <= 0 { + limit = 100 + } + + query := fmt.Sprintf(` + SELECT id, agent_id, timestamp, source, source_name, level, message, fields + FROM logs %s + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + `, whereClause) + + args = append(args, limit, filter.Offset) + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, 0, fmt.Errorf("query logs: %w", err) + } + defer rows.Close() + + var entries []LogEntry + for rows.Next() { + var e LogEntry + var level string + var fieldsJSON sql.NullString + + err := rows.Scan( + &e.ID, + &e.AgentID, + &e.Timestamp, + &e.Source, + &e.SourceName, + &level, + &e.Message, + &fieldsJSON, + ) + if err != nil { + return nil, 0, fmt.Errorf("scan log: %w", err) + } + + e.Level = LogLevel(level) + + if fieldsJSON.Valid && fieldsJSON.String != "" { + json.Unmarshal([]byte(fieldsJSON.String), &e.Fields) + } + + entries = append(entries, e) + } + + return entries, total, nil +} + +// DeleteOldLogs deletes logs older than the specified time. +// Returns the number of deleted entries. +func (s *SQLiteDB) DeleteOldLogs(ctx context.Context, before time.Time) (int, error) { + result, err := s.db.ExecContext(ctx, "DELETE FROM logs WHERE timestamp < ?", before) + if err != nil { + return 0, fmt.Errorf("delete old logs: %w", err) + } + + affected, err := result.RowsAffected() + if err != nil { + return 0, err + } + + return int(affected), nil +} diff --git a/frontend/src/lib/api/logs.ts b/frontend/src/lib/api/logs.ts new file mode 100644 index 0000000..c4cc503 --- /dev/null +++ b/frontend/src/lib/api/logs.ts @@ -0,0 +1,111 @@ +// Logs API client + +import { browser } from '$app/environment'; +import { authStore } from '$lib/stores/auth'; +import { get } from 'svelte/store'; +import type { LogFilter, LogResponse, LogEntry } from '$lib/types/logs'; + +const API_BASE = '/api/v1'; + +// Get auth token from store +function getToken(): string | null { + if (!browser) return null; + const auth = get(authStore); + return auth.token; +} + +// Make authenticated API request +async function authFetch(path: string, options: RequestInit = {}): Promise { + const token = getToken(); + const headers: HeadersInit = { + 'Content-Type': 'application/json', + ...(options.headers || {}) + }; + + if (token) { + (headers as Record)['Authorization'] = `Bearer ${token}`; + } + + return fetch(`${API_BASE}${path}`, { + ...options, + headers + }); +} + +// Build query string from filter +function buildQueryString(filter: LogFilter): string { + const params = new URLSearchParams(); + + if (filter.agentId) params.set('agent_id', filter.agentId); + if (filter.source) params.set('source', filter.source); + if (filter.sourceName) params.set('source_name', filter.sourceName); + if (filter.query) params.set('q', filter.query); + if (filter.from) params.set('from', filter.from); + if (filter.to) params.set('to', filter.to); + if (filter.limit) params.set('limit', filter.limit.toString()); + if (filter.offset) params.set('offset', filter.offset.toString()); + + if (filter.level && filter.level.length > 0) { + params.set('level', filter.level.join(',')); + } + + const qs = params.toString(); + return qs ? `?${qs}` : ''; +} + +export const logsApi = { + // Query logs with filters + async query(filter: LogFilter = {}): Promise { + const queryString = buildQueryString(filter); + const response = await authFetch(`/logs${queryString}`); + + if (!response.ok) { + throw new Error('Failed to fetch logs'); + } + + return response.json(); + }, + + // Get available sources + async getSources(agentId?: string): Promise<{ source: string; sourceName: string }[]> { + const params = agentId ? `?agent_id=${agentId}` : ''; + const response = await authFetch(`/logs/sources${params}`); + + if (!response.ok) { + throw new Error('Failed to fetch log sources'); + } + + return response.json(); + }, + + // Stream logs via SSE (for live tailing) + streamLogs( + filter: LogFilter, + onEntry: (entry: LogEntry) => void, + onError: (error: Error) => void + ): () => void { + const queryString = buildQueryString(filter); + const token = getToken(); + const url = `${API_BASE}/logs/stream${queryString}`; + + const eventSource = new EventSource(url + (token ? `&token=${token}` : '')); + + eventSource.onmessage = (event) => { + try { + const entry = JSON.parse(event.data) as LogEntry; + onEntry(entry); + } catch { + // Ignore parse errors + } + }; + + eventSource.onerror = () => { + onError(new Error('Log stream disconnected')); + }; + + // Return cleanup function + return () => { + eventSource.close(); + }; + } +}; diff --git a/frontend/src/lib/types/logs.ts b/frontend/src/lib/types/logs.ts new file mode 100644 index 0000000..b73c36f --- /dev/null +++ b/frontend/src/lib/types/logs.ts @@ -0,0 +1,47 @@ +// Log entry types + +export type LogLevel = 'debug' | 'info' | 'warning' | 'error' | 'fatal'; + +export interface LogEntry { + id: number; + agentId: string; + timestamp: string; + source: string; // 'journal', 'file', 'docker' + sourceName: string; // Unit name, filename, container name + level: LogLevel; + message: string; + fields?: Record; +} + +export interface LogFilter { + agentId?: string; + source?: string; + sourceName?: string; + level?: LogLevel[]; + query?: string; + from?: string; + to?: string; + limit?: number; + offset?: number; +} + +export interface LogResponse { + entries: LogEntry[]; + total: number; +} + +// Level colors for display +export const levelColors: Record = { + debug: { bg: 'bg-slate-500/20', text: 'text-slate-400', border: 'border-slate-500/30' }, + info: { bg: 'bg-blue-500/20', text: 'text-blue-400', border: 'border-blue-500/30' }, + warning: { bg: 'bg-yellow-500/20', text: 'text-yellow-400', border: 'border-yellow-500/30' }, + error: { bg: 'bg-red-500/20', text: 'text-red-400', border: 'border-red-500/30' }, + fatal: { bg: 'bg-purple-500/20', text: 'text-purple-400', border: 'border-purple-500/30' } +}; + +// Source icons +export const sourceIcons: Record = { + journal: '📜', + file: '📄', + docker: '🐳' +}; diff --git a/frontend/src/routes/logs/+page.svelte b/frontend/src/routes/logs/+page.svelte new file mode 100644 index 0000000..d5cc033 --- /dev/null +++ b/frontend/src/routes/logs/+page.svelte @@ -0,0 +1,556 @@ + + + + Logs - Tyto + + +
+ + + +
+ + +
+ Level: +
+ {#each allLevels as level} + + {/each} +
+
+ +
+ Source: + +
+
+ + {#if error} +
{error}
+ {/if} + + {#if isLoading && entries.length === 0} +
Loading logs...
+ {:else if entries.length === 0} +
+ + + +

No logs found

+ Adjust your filters or wait for new log entries +
+ {:else} +
+ {#each entries as entry (entry.id)} +
+
+ + {entry.level} + + + {sourceIcons[entry.source] || '📋'} {entry.sourceName} + + + {formatRelativeTime(entry.timestamp)} + +
+
{entry.message}
+ {#if entry.fields && Object.keys(entry.fields).length > 0} +
+ {#each Object.entries(entry.fields) as [key, value]} + {key}: {value} + {/each} +
+ {/if} +
+ {/each} + + {#if entries.length < total && !isLive} + + {/if} +
+ {/if} +
+ +