From c0e678931dcbe70bc11f517fb715557945badfaf Mon Sep 17 00:00:00 2001 From: vikingowl Date: Sun, 28 Dec 2025 08:18:48 +0100 Subject: [PATCH] feat: add database layer with SQLite and PostgreSQL support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Database Package (internal/database/): - Database interface abstraction for multiple backends - SQLite implementation with pure Go driver (no CGO) - PostgreSQL implementation with connection pooling - Factory pattern for creating database from config - Tiered retention with automatic aggregation: - Raw metrics: 24h (5s resolution) - 1-minute aggregation: 7 days - 5-minute aggregation: 30 days - Hourly aggregation: 1 year Schema includes: - agents: registration, status, certificates - users: local + LDAP authentication - roles: RBAC with permissions JSON - sessions: token-based authentication - metrics_*: time-series with aggregation - alerts: triggered alerts with acknowledgment Configuration Updates: - DatabaseConfig with SQLite path and PostgreSQL settings - RetentionConfig for customizing data retention - Environment variables: TYTO_DB_*, TYTO_DB_CONNECTION_STRING - Default SQLite at /var/lib/tyto/tyto.db 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/cmd/server/main.go | 4 +- backend/go.mod | 13 + backend/go.sum | 46 + backend/internal/config/config.go | 55 +- backend/internal/database/database.go | 208 +++++ backend/internal/database/factory.go | 90 ++ backend/internal/database/postgres.go | 1052 +++++++++++++++++++++++ backend/internal/database/sqlite.go | 1134 +++++++++++++++++++++++++ 8 files changed, 2588 insertions(+), 14 deletions(-) create mode 100644 backend/internal/database/database.go create mode 100644 backend/internal/database/factory.go create mode 100644 backend/internal/database/postgres.go create mode 100644 backend/internal/database/sqlite.go diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index 3abd07c..70fe033 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -75,8 +75,8 @@ func runServer(cfg *config.Config) { // Initialize agent registry registryPath := "/var/lib/tyto/agents.json" - if cfg.Database.SQLitePath != "" { - registryPath = cfg.Database.SQLitePath + ".agents.json" + if cfg.Database.Path != "" { + registryPath = cfg.Database.Path + ".agents.json" } registry := server.NewRegistry(registryPath) log.Printf("Agent registry initialized: %s", registryPath) diff --git a/backend/go.mod b/backend/go.mod index 3d4f527..8174ec0 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -8,9 +8,11 @@ require github.com/gin-contrib/cors v1.7.2 require ( github.com/godbus/dbus/v5 v5.1.0 + github.com/lib/pq v1.10.9 google.golang.org/grpc v1.68.0 google.golang.org/protobuf v1.35.2 gopkg.in/yaml.v3 v3.0.1 + modernc.org/sqlite v1.34.4 ) require ( @@ -18,12 +20,15 @@ require ( github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.20.0 // indirect github.com/goccy/go-json v0.10.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/text v0.2.0 // indirect @@ -31,7 +36,9 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect golang.org/x/arch v0.8.0 // indirect @@ -40,4 +47,10 @@ require ( golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect + modernc.org/libc v1.55.3 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.8.0 // indirect + modernc.org/strutil v1.2.0 // indirect + modernc.org/token v1.1.0 // indirect ) diff --git a/backend/go.sum b/backend/go.sum index 0be6411..82aa7ce 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -10,6 +10,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/gin-contrib/cors v1.7.2 h1:oLDHxdg8W/XDoN/8zamqk/Drgt4oVZDvaV0YmvVICQw= @@ -35,6 +37,12 @@ github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6 github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -47,6 +55,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -54,10 +64,14 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -81,14 +95,20 @@ golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= @@ -101,5 +121,31 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ= +modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ= +modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y= +modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s= +modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= +modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= +modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw= +modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= +modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= +modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= +modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= +modernc.org/sqlite v1.34.4 h1:sjdARozcL5KJBvYQvLlZEmctRgW9xqIZc2ncN7PU0P8= +modernc.org/sqlite v1.34.4/go.mod h1:3QQFCG2SEMtc2nv+Wq4cQCH7Hjcg+p/RMlS1XK+zwbk= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index e354b47..8decd6e 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -115,15 +115,27 @@ type DatabaseConfig struct { Type string `yaml:"type"` // SQLite settings - SQLitePath string `yaml:"sqlite_path"` + Path string `yaml:"path"` // Path to SQLite database file // PostgreSQL settings - PostgresHost string `yaml:"postgres_host"` - PostgresPort int `yaml:"postgres_port"` - PostgresUser string `yaml:"postgres_user"` - PostgresPassword string `yaml:"postgres_password"` - PostgresDatabase string `yaml:"postgres_database"` - PostgresSSLMode string `yaml:"postgres_sslmode"` + Host string `yaml:"host"` + Port int `yaml:"port"` + User string `yaml:"user"` + Password string `yaml:"password"` + Database string `yaml:"database"` + SSLMode string `yaml:"sslmode"` + ConnectionString string `yaml:"connection_string"` // Full connection string (overrides individual fields) + + // Retention settings + Retention RetentionConfig `yaml:"retention"` +} + +// RetentionConfig defines data retention policies. +type RetentionConfig struct { + RawRetention time.Duration `yaml:"raw"` // Raw metrics retention (default: 24h) + OneMinuteRetention time.Duration `yaml:"one_minute"` // 1-minute aggregation (default: 7d) + FiveMinuteRetention time.Duration `yaml:"five_minute"` // 5-minute aggregation (default: 30d) + HourlyRetention time.Duration `yaml:"hourly"` // Hourly aggregation (default: 1y) } type AlertConfig struct { @@ -160,8 +172,10 @@ func Load() *Config { IntervalSeconds: 5, }, Database: DatabaseConfig{ - Type: "sqlite", - SQLitePath: "/var/lib/tyto/tyto.db", + Type: "sqlite", + Path: "/var/lib/tyto/tyto.db", + Port: 5432, + SSLMode: "disable", }, } @@ -214,7 +228,22 @@ func Load() *Config { cfg.Database.Type = val } if val := os.Getenv("TYTO_DB_PATH"); val != "" { - cfg.Database.SQLitePath = val + cfg.Database.Path = val + } + if val := os.Getenv("TYTO_DB_HOST"); val != "" { + cfg.Database.Host = val + } + if val := os.Getenv("TYTO_DB_USER"); val != "" { + cfg.Database.User = val + } + if val := os.Getenv("TYTO_DB_PASSWORD"); val != "" { + cfg.Database.Password = val + } + if val := os.Getenv("TYTO_DB_NAME"); val != "" { + cfg.Database.Database = val + } + if val := os.Getenv("TYTO_DB_CONNECTION_STRING"); val != "" { + cfg.Database.ConnectionString = val } // Agent configuration @@ -269,8 +298,10 @@ func DefaultConfig() *Config { Interval: 5 * time.Second, }, Database: DatabaseConfig{ - Type: "sqlite", - SQLitePath: "/var/lib/tyto/tyto.db", + Type: "sqlite", + Path: "/var/lib/tyto/tyto.db", + Port: 5432, + SSLMode: "disable", }, RefreshInterval: 5 * time.Second, } diff --git a/backend/internal/database/database.go b/backend/internal/database/database.go new file mode 100644 index 0000000..3e83c5f --- /dev/null +++ b/backend/internal/database/database.go @@ -0,0 +1,208 @@ +// Package database provides database abstraction for Tyto. +// Supports both SQLite (default) and PostgreSQL backends. +package database + +import ( + "context" + "time" + + "tyto/internal/models" +) + +// Database defines the interface for all database operations. +type Database interface { + // Lifecycle + Close() error + Migrate() error + + // Metrics storage + StoreMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error + QueryMetrics(ctx context.Context, agentID string, from, to time.Time, resolution string) ([]MetricPoint, error) + GetLatestMetrics(ctx context.Context, agentID string) (*models.AllMetrics, error) + + // Agents (extends registry with persistence) + StoreAgent(ctx context.Context, agent *Agent) error + GetAgent(ctx context.Context, id string) (*Agent, error) + ListAgents(ctx context.Context) ([]*Agent, error) + UpdateAgentStatus(ctx context.Context, id string, status AgentStatus, lastSeen time.Time) error + DeleteAgent(ctx context.Context, id string) error + + // Users + CreateUser(ctx context.Context, user *User) error + GetUser(ctx context.Context, id string) (*User, error) + GetUserByUsername(ctx context.Context, username string) (*User, error) + UpdateUser(ctx context.Context, user *User) error + DeleteUser(ctx context.Context, id string) error + ListUsers(ctx context.Context) ([]*User, error) + + // Roles + CreateRole(ctx context.Context, role *Role) error + GetRole(ctx context.Context, id string) (*Role, error) + ListRoles(ctx context.Context) ([]*Role, error) + UpdateRole(ctx context.Context, role *Role) error + DeleteRole(ctx context.Context, id string) error + GetUserRoles(ctx context.Context, userID string) ([]*Role, error) + AssignRole(ctx context.Context, userID, roleID string) error + RemoveRole(ctx context.Context, userID, roleID string) error + + // Sessions + CreateSession(ctx context.Context, session *Session) error + GetSession(ctx context.Context, token string) (*Session, error) + DeleteSession(ctx context.Context, token string) error + DeleteUserSessions(ctx context.Context, userID string) error + CleanupExpiredSessions(ctx context.Context) error + + // Alerts + StoreAlert(ctx context.Context, alert *Alert) error + GetAlert(ctx context.Context, id string) (*Alert, error) + QueryAlerts(ctx context.Context, filter AlertFilter) ([]*Alert, error) + AcknowledgeAlert(ctx context.Context, id string) error + + // Retention + RunRetention(ctx context.Context) error +} + +// MetricPoint represents a single metric data point. +type MetricPoint struct { + Timestamp time.Time `json:"timestamp"` + AgentID string `json:"agentId"` + + // Aggregated values + CPUAvg float64 `json:"cpuAvg"` + CPUMin float64 `json:"cpuMin"` + CPUMax float64 `json:"cpuMax"` + MemoryAvg float64 `json:"memoryAvg"` + MemoryMin float64 `json:"memoryMin"` + MemoryMax float64 `json:"memoryMax"` + DiskAvg float64 `json:"diskAvg,omitempty"` + GPUAvg float64 `json:"gpuAvg,omitempty"` +} + +// AgentStatus represents agent connection state. +type AgentStatus string + +const ( + AgentStatusPending AgentStatus = "pending" + AgentStatusApproved AgentStatus = "approved" + AgentStatusConnected AgentStatus = "connected" + AgentStatusOffline AgentStatus = "offline" + AgentStatusRevoked AgentStatus = "revoked" +) + +// Agent represents a registered monitoring agent. +type Agent struct { + ID string `json:"id"` + Name string `json:"name,omitempty"` + Hostname string `json:"hostname"` + OS string `json:"os"` + Architecture string `json:"architecture"` + Version string `json:"version"` + Capabilities []string `json:"capabilities,omitempty"` + Status AgentStatus `json:"status"` + CertSerial string `json:"certSerial,omitempty"` + CertExpiry time.Time `json:"certExpiry,omitempty"` + LastSeen time.Time `json:"lastSeen,omitempty"` + RegisteredAt time.Time `json:"registeredAt"` + Tags []string `json:"tags,omitempty"` +} + +// AuthProvider indicates how a user authenticates. +type AuthProvider string + +const ( + AuthProviderLocal AuthProvider = "local" + AuthProviderLDAP AuthProvider = "ldap" +) + +// User represents a system user. +type User struct { + ID string `json:"id"` + Username string `json:"username"` + Email string `json:"email,omitempty"` + PasswordHash []byte `json:"-"` + AuthProvider AuthProvider `json:"authProvider"` + LDAPDN string `json:"ldapDn,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + LastLogin time.Time `json:"lastLogin,omitempty"` + Disabled bool `json:"disabled"` +} + +// Role represents a set of permissions. +type Role struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Permissions []string `json:"permissions"` + IsSystem bool `json:"isSystem"` + CreatedAt time.Time `json:"createdAt"` +} + +// Session represents an authenticated user session. +type Session struct { + Token string `json:"token"` + UserID string `json:"userId"` + CreatedAt time.Time `json:"createdAt"` + ExpiresAt time.Time `json:"expiresAt"` + IPAddress string `json:"ipAddress,omitempty"` + UserAgent string `json:"userAgent,omitempty"` +} + +// AlertSeverity indicates alert severity level. +type AlertSeverity string + +const ( + AlertSeverityWarning AlertSeverity = "warning" + AlertSeverityCritical AlertSeverity = "critical" +) + +// Alert represents a triggered alert. +type Alert struct { + ID string `json:"id"` + AgentID string `json:"agentId"` + Type string `json:"type"` + Severity AlertSeverity `json:"severity"` + Message string `json:"message"` + Value float64 `json:"value"` + Threshold float64 `json:"threshold"` + TriggeredAt time.Time `json:"triggeredAt"` + ResolvedAt *time.Time `json:"resolvedAt,omitempty"` + Acknowledged bool `json:"acknowledged"` +} + +// AlertFilter specifies criteria for querying alerts. +type AlertFilter struct { + AgentID string + Type string + Severity AlertSeverity + Acknowledged *bool + From time.Time + To time.Time + Limit int + Offset int +} + +// RetentionConfig defines data retention policies. +type RetentionConfig struct { + // Raw metrics retention (default: 24 hours) + RawRetention time.Duration + + // 1-minute aggregation retention (default: 7 days) + OneMinuteRetention time.Duration + + // 5-minute aggregation retention (default: 30 days) + FiveMinuteRetention time.Duration + + // Hourly aggregation retention (default: 1 year) + HourlyRetention time.Duration +} + +// DefaultRetentionConfig returns default retention settings. +func DefaultRetentionConfig() RetentionConfig { + return RetentionConfig{ + RawRetention: 24 * time.Hour, + OneMinuteRetention: 7 * 24 * time.Hour, + FiveMinuteRetention: 30 * 24 * time.Hour, + HourlyRetention: 365 * 24 * time.Hour, + } +} diff --git a/backend/internal/database/factory.go b/backend/internal/database/factory.go new file mode 100644 index 0000000..0a8ee9a --- /dev/null +++ b/backend/internal/database/factory.go @@ -0,0 +1,90 @@ +// Package database provides database factory for creating the appropriate backend. +package database + +import ( + "fmt" + "os" + "path/filepath" + + "tyto/internal/config" +) + +// New creates a new database connection based on configuration. +func New(cfg *config.DatabaseConfig) (Database, error) { + retention := DefaultRetentionConfig() + + // Override retention from config if provided + if cfg.Retention.RawRetention > 0 { + retention.RawRetention = cfg.Retention.RawRetention + } + if cfg.Retention.OneMinuteRetention > 0 { + retention.OneMinuteRetention = cfg.Retention.OneMinuteRetention + } + if cfg.Retention.FiveMinuteRetention > 0 { + retention.FiveMinuteRetention = cfg.Retention.FiveMinuteRetention + } + if cfg.Retention.HourlyRetention > 0 { + retention.HourlyRetention = cfg.Retention.HourlyRetention + } + + switch cfg.Type { + case "sqlite", "": + return newSQLiteFromConfig(cfg, retention) + case "postgres", "postgresql": + return newPostgresFromConfig(cfg, retention) + default: + return nil, fmt.Errorf("unsupported database type: %s", cfg.Type) + } +} + +func newSQLiteFromConfig(cfg *config.DatabaseConfig, retention RetentionConfig) (*SQLiteDB, error) { + path := cfg.Path + if path == "" { + path = "tyto.db" + } + + // Ensure directory exists + dir := filepath.Dir(path) + if dir != "" && dir != "." { + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("create database directory: %w", err) + } + } + + db, err := NewSQLiteDB(path, retention) + if err != nil { + return nil, err + } + + // Run migrations + if err := db.Migrate(); err != nil { + db.Close() + return nil, fmt.Errorf("migrate: %w", err) + } + + return db, nil +} + +func newPostgresFromConfig(cfg *config.DatabaseConfig, retention RetentionConfig) (*PostgresDB, error) { + connStr := cfg.ConnectionString + if connStr == "" { + // Build connection string from individual fields + connStr = fmt.Sprintf( + "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", + cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.Database, cfg.SSLMode, + ) + } + + db, err := NewPostgresDB(connStr, retention) + if err != nil { + return nil, err + } + + // Run migrations + if err := db.Migrate(); err != nil { + db.Close() + return nil, fmt.Errorf("migrate: %w", err) + } + + return db, nil +} diff --git a/backend/internal/database/postgres.go b/backend/internal/database/postgres.go new file mode 100644 index 0000000..0845052 --- /dev/null +++ b/backend/internal/database/postgres.go @@ -0,0 +1,1052 @@ +// Package database provides PostgreSQL implementation of the Database interface. +package database + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "tyto/internal/models" + + "github.com/lib/pq" +) + +// PostgresDB implements the Database interface using PostgreSQL. +type PostgresDB struct { + db *sql.DB + retention RetentionConfig +} + +// NewPostgresDB creates a new PostgreSQL database connection. +func NewPostgresDB(connStr string, retention RetentionConfig) (*PostgresDB, error) { + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, fmt.Errorf("open postgres: %w", err) + } + + // Configure connection pool + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(5 * time.Minute) + + // Verify connection + if err := db.Ping(); err != nil { + db.Close() + return nil, fmt.Errorf("ping postgres: %w", err) + } + + return &PostgresDB{ + db: db, + retention: retention, + }, nil +} + +// Close closes the database connection. +func (p *PostgresDB) Close() error { + return p.db.Close() +} + +// Migrate runs database migrations. +func (p *PostgresDB) Migrate() error { + migrations := []string{ + pgMigrationAgents, + pgMigrationUsers, + pgMigrationRoles, + pgMigrationSessions, + pgMigrationMetrics, + pgMigrationAlerts, + } + + for i, m := range migrations { + if _, err := p.db.Exec(m); err != nil { + return fmt.Errorf("migration %d: %w", i+1, err) + } + } + + return p.insertDefaultRoles() +} + +// PostgreSQL migration statements +const pgMigrationAgents = ` +CREATE TABLE IF NOT EXISTS agents ( + id TEXT PRIMARY KEY, + name TEXT, + hostname TEXT NOT NULL, + os TEXT NOT NULL, + architecture TEXT NOT NULL, + version TEXT NOT NULL, + capabilities TEXT[] DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'pending', + cert_serial TEXT, + cert_expiry TIMESTAMPTZ, + last_seen TIMESTAMPTZ, + registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + tags TEXT[] DEFAULT '{}' +); +CREATE INDEX IF NOT EXISTS idx_agents_status ON agents(status); +` + +const pgMigrationUsers = ` +CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + username TEXT UNIQUE NOT NULL, + email TEXT, + password_hash BYTEA, + auth_provider TEXT NOT NULL DEFAULT 'local', + ldap_dn TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + last_login TIMESTAMPTZ, + disabled BOOLEAN NOT NULL DEFAULT FALSE +); +CREATE INDEX IF NOT EXISTS idx_users_username ON users(username); +` + +const pgMigrationRoles = ` +CREATE TABLE IF NOT EXISTS roles ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + description TEXT, + permissions JSONB NOT NULL DEFAULT '[]', + is_system BOOLEAN NOT NULL DEFAULT FALSE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS user_roles ( + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + role_id TEXT NOT NULL REFERENCES roles(id) ON DELETE CASCADE, + PRIMARY KEY (user_id, role_id) +); +` + +const pgMigrationSessions = ` +CREATE TABLE IF NOT EXISTS sessions ( + token TEXT PRIMARY KEY, + user_id TEXT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + expires_at TIMESTAMPTZ NOT NULL, + ip_address TEXT, + user_agent TEXT +); +CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id); +CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at); +` + +const pgMigrationMetrics = ` +-- Raw metrics (high resolution, short retention) +CREATE TABLE IF NOT EXISTS metrics_raw ( + id BIGSERIAL PRIMARY KEY, + agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + timestamp TIMESTAMPTZ NOT NULL, + data JSONB NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_metrics_raw_agent_time ON metrics_raw(agent_id, timestamp); + +-- 1-minute aggregations +CREATE TABLE IF NOT EXISTS metrics_1min ( + agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + timestamp TIMESTAMPTZ NOT NULL, + cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION, + mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION, + disk_avg DOUBLE PRECISION, + gpu_avg DOUBLE PRECISION, + sample_count INTEGER NOT NULL DEFAULT 1, + PRIMARY KEY (agent_id, timestamp) +); + +-- 5-minute aggregations +CREATE TABLE IF NOT EXISTS metrics_5min ( + agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + timestamp TIMESTAMPTZ NOT NULL, + cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION, + mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION, + disk_avg DOUBLE PRECISION, + gpu_avg DOUBLE PRECISION, + sample_count INTEGER NOT NULL DEFAULT 1, + PRIMARY KEY (agent_id, timestamp) +); + +-- Hourly aggregations +CREATE TABLE IF NOT EXISTS metrics_hourly ( + agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + timestamp TIMESTAMPTZ NOT NULL, + cpu_avg DOUBLE PRECISION, cpu_min DOUBLE PRECISION, cpu_max DOUBLE PRECISION, + mem_avg DOUBLE PRECISION, mem_min DOUBLE PRECISION, mem_max DOUBLE PRECISION, + disk_avg DOUBLE PRECISION, + gpu_avg DOUBLE PRECISION, + sample_count INTEGER NOT NULL DEFAULT 1, + PRIMARY KEY (agent_id, timestamp) +); + +-- Create hypertable-like partitioning hint (for future TimescaleDB upgrade) +-- COMMENT ON TABLE metrics_raw IS 'timescale:hypertable'; +` + +const pgMigrationAlerts = ` +CREATE TABLE IF NOT EXISTS alerts ( + id TEXT PRIMARY KEY, + agent_id TEXT NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + type TEXT NOT NULL, + severity TEXT NOT NULL, + message TEXT NOT NULL, + value DOUBLE PRECISION NOT NULL, + threshold DOUBLE PRECISION NOT NULL, + triggered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + resolved_at TIMESTAMPTZ, + acknowledged BOOLEAN NOT NULL DEFAULT FALSE +); +CREATE INDEX IF NOT EXISTS idx_alerts_agent ON alerts(agent_id); +CREATE INDEX IF NOT EXISTS idx_alerts_triggered ON alerts(triggered_at); +CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity); +` + +func (p *PostgresDB) insertDefaultRoles() error { + defaultRoles := []struct { + id, name, desc string + perms []string + }{ + {"admin", "Administrator", "Full system access", []string{"*"}}, + {"operator", "Operator", "Manage agents and alerts", []string{ + "dashboard:view", "agents:view", "agents:manage", + "alerts:view", "alerts:acknowledge", "alerts:configure", + "metrics:export", "metrics:query", + }}, + {"viewer", "Viewer", "Read-only access", []string{ + "dashboard:view", "agents:view", "alerts:view", + }}, + } + + for _, r := range defaultRoles { + perms, _ := json.Marshal(r.perms) + _, err := p.db.Exec(` + INSERT INTO roles (id, name, description, permissions, is_system) + VALUES ($1, $2, $3, $4, TRUE) + ON CONFLICT (id) DO NOTHING + `, r.id, r.name, r.desc, perms) + if err != nil { + return fmt.Errorf("insert role %s: %w", r.id, err) + } + } + return nil +} + +// ============================================================================ +// Metrics Storage +// ============================================================================ + +// StoreMetrics stores raw metrics from an agent. +func (p *PostgresDB) StoreMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error { + data, err := json.Marshal(metrics) + if err != nil { + return fmt.Errorf("marshal metrics: %w", err) + } + + _, err = p.db.ExecContext(ctx, ` + INSERT INTO metrics_raw (agent_id, timestamp, data) + VALUES ($1, $2, $3) + `, agentID, time.Now().UTC(), data) + + return err +} + +// QueryMetrics queries metrics with the specified resolution. +func (p *PostgresDB) QueryMetrics(ctx context.Context, agentID string, from, to time.Time, resolution string) ([]MetricPoint, error) { + var table string + switch resolution { + case "raw": + return p.queryRawMetrics(ctx, agentID, from, to) + case "1min": + table = "metrics_1min" + case "5min": + table = "metrics_5min" + case "hourly": + table = "metrics_hourly" + default: + table = p.selectResolution(from, to) + } + + query := fmt.Sprintf(` + SELECT timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg + FROM %s + WHERE agent_id = $1 AND timestamp >= $2 AND timestamp <= $3 + ORDER BY timestamp ASC + `, table) + + rows, err := p.db.QueryContext(ctx, query, agentID, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + + var points []MetricPoint + for rows.Next() { + var pt MetricPoint + pt.AgentID = agentID + var diskAvg, gpuAvg sql.NullFloat64 + err := rows.Scan(&pt.Timestamp, &pt.CPUAvg, &pt.CPUMin, &pt.CPUMax, + &pt.MemoryAvg, &pt.MemoryMin, &pt.MemoryMax, &diskAvg, &gpuAvg) + if err != nil { + return nil, err + } + if diskAvg.Valid { + pt.DiskAvg = diskAvg.Float64 + } + if gpuAvg.Valid { + pt.GPUAvg = gpuAvg.Float64 + } + points = append(points, pt) + } + return points, rows.Err() +} + +func (p *PostgresDB) queryRawMetrics(ctx context.Context, agentID string, from, to time.Time) ([]MetricPoint, error) { + rows, err := p.db.QueryContext(ctx, ` + SELECT timestamp, data FROM metrics_raw + WHERE agent_id = $1 AND timestamp >= $2 AND timestamp <= $3 + ORDER BY timestamp ASC + `, agentID, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + + var points []MetricPoint + for rows.Next() { + var ts time.Time + var data []byte + if err := rows.Scan(&ts, &data); err != nil { + return nil, err + } + + var m models.AllMetrics + if err := json.Unmarshal(data, &m); err != nil { + continue + } + + pt := MetricPoint{Timestamp: ts, AgentID: agentID} + + pt.CPUAvg = m.CPU.TotalUsage + pt.CPUMin = m.CPU.TotalUsage + pt.CPUMax = m.CPU.TotalUsage + + if m.Memory.Total > 0 { + usedPct := float64(m.Memory.Used) / float64(m.Memory.Total) * 100 + pt.MemoryAvg = usedPct + pt.MemoryMin = usedPct + pt.MemoryMax = usedPct + } + + if len(m.Disk.Mounts) > 0 { + var totalUsed, totalTotal uint64 + for _, d := range m.Disk.Mounts { + totalUsed += d.Used + totalTotal += d.Total + } + if totalTotal > 0 { + pt.DiskAvg = float64(totalUsed) / float64(totalTotal) * 100 + } + } + + if m.GPU.Available { + pt.GPUAvg = float64(m.GPU.Utilization) + } + + points = append(points, pt) + } + return points, rows.Err() +} + +func (p *PostgresDB) selectResolution(from, to time.Time) string { + duration := to.Sub(from) + switch { + case duration <= 2*time.Hour: + return "metrics_1min" + case duration <= 24*time.Hour: + return "metrics_5min" + default: + return "metrics_hourly" + } +} + +// GetLatestMetrics returns the most recent metrics for an agent. +func (p *PostgresDB) GetLatestMetrics(ctx context.Context, agentID string) (*models.AllMetrics, error) { + var data []byte + err := p.db.QueryRowContext(ctx, ` + SELECT data FROM metrics_raw + WHERE agent_id = $1 + ORDER BY timestamp DESC + LIMIT 1 + `, agentID).Scan(&data) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + var m models.AllMetrics + if err := json.Unmarshal(data, &m); err != nil { + return nil, err + } + return &m, nil +} + +// ============================================================================ +// Agents +// ============================================================================ + +// StoreAgent stores or updates an agent record. +func (p *PostgresDB) StoreAgent(ctx context.Context, agent *Agent) error { + _, err := p.db.ExecContext(ctx, ` + INSERT INTO agents (id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (id) DO UPDATE SET + name = EXCLUDED.name, + hostname = EXCLUDED.hostname, + os = EXCLUDED.os, + architecture = EXCLUDED.architecture, + version = EXCLUDED.version, + capabilities = EXCLUDED.capabilities, + status = EXCLUDED.status, + cert_serial = EXCLUDED.cert_serial, + cert_expiry = EXCLUDED.cert_expiry, + last_seen = EXCLUDED.last_seen, + tags = EXCLUDED.tags + `, agent.ID, agent.Name, agent.Hostname, agent.OS, agent.Architecture, agent.Version, + pq.Array(agent.Capabilities), agent.Status, agent.CertSerial, agent.CertExpiry, agent.LastSeen, agent.RegisteredAt, pq.Array(agent.Tags)) + + return err +} + +// GetAgent retrieves an agent by ID. +func (p *PostgresDB) GetAgent(ctx context.Context, id string) (*Agent, error) { + var agent Agent + var certExpiry, lastSeen sql.NullTime + + err := p.db.QueryRowContext(ctx, ` + SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags + FROM agents WHERE id = $1 + `, id).Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version, + pq.Array(&agent.Capabilities), &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, pq.Array(&agent.Tags)) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + if certExpiry.Valid { + agent.CertExpiry = certExpiry.Time + } + if lastSeen.Valid { + agent.LastSeen = lastSeen.Time + } + return &agent, nil +} + +// ListAgents returns all registered agents. +func (p *PostgresDB) ListAgents(ctx context.Context) ([]*Agent, error) { + rows, err := p.db.QueryContext(ctx, ` + SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags + FROM agents ORDER BY registered_at DESC + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var agents []*Agent + for rows.Next() { + var agent Agent + var certExpiry, lastSeen sql.NullTime + + err := rows.Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version, + pq.Array(&agent.Capabilities), &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, pq.Array(&agent.Tags)) + if err != nil { + return nil, err + } + + if certExpiry.Valid { + agent.CertExpiry = certExpiry.Time + } + if lastSeen.Valid { + agent.LastSeen = lastSeen.Time + } + agents = append(agents, &agent) + } + return agents, rows.Err() +} + +// UpdateAgentStatus updates an agent's status and last seen time. +func (p *PostgresDB) UpdateAgentStatus(ctx context.Context, id string, status AgentStatus, lastSeen time.Time) error { + result, err := p.db.ExecContext(ctx, ` + UPDATE agents SET status = $1, last_seen = $2 WHERE id = $3 + `, status, lastSeen, id) + if err != nil { + return err + } + + rows, _ := result.RowsAffected() + if rows == 0 { + return fmt.Errorf("agent not found: %s", id) + } + return nil +} + +// DeleteAgent removes an agent and all its data. +func (p *PostgresDB) DeleteAgent(ctx context.Context, id string) error { + _, err := p.db.ExecContext(ctx, `DELETE FROM agents WHERE id = $1`, id) + return err +} + +// ============================================================================ +// Users +// ============================================================================ + +// CreateUser creates a new user. +func (p *PostgresDB) CreateUser(ctx context.Context, user *User) error { + _, err := p.db.ExecContext(ctx, ` + INSERT INTO users (id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, disabled) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + `, user.ID, user.Username, user.Email, user.PasswordHash, user.AuthProvider, + user.LDAPDN, user.CreatedAt, user.UpdatedAt, user.Disabled) + return err +} + +// GetUser retrieves a user by ID. +func (p *PostgresDB) GetUser(ctx context.Context, id string) (*User, error) { + return p.scanUser(p.db.QueryRowContext(ctx, ` + SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled + FROM users WHERE id = $1 + `, id)) +} + +// GetUserByUsername retrieves a user by username. +func (p *PostgresDB) GetUserByUsername(ctx context.Context, username string) (*User, error) { + return p.scanUser(p.db.QueryRowContext(ctx, ` + SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled + FROM users WHERE username = $1 + `, username)) +} + +func (p *PostgresDB) scanUser(row *sql.Row) (*User, error) { + var user User + var lastLogin sql.NullTime + var email sql.NullString + + err := row.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider, + &user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + if email.Valid { + user.Email = email.String + } + if lastLogin.Valid { + user.LastLogin = lastLogin.Time + } + return &user, nil +} + +// UpdateUser updates a user record. +func (p *PostgresDB) UpdateUser(ctx context.Context, user *User) error { + _, err := p.db.ExecContext(ctx, ` + UPDATE users SET + username = $1, email = $2, password_hash = $3, auth_provider = $4, + ldap_dn = $5, updated_at = $6, last_login = $7, disabled = $8 + WHERE id = $9 + `, user.Username, user.Email, user.PasswordHash, user.AuthProvider, + user.LDAPDN, time.Now().UTC(), user.LastLogin, user.Disabled, user.ID) + return err +} + +// DeleteUser removes a user. +func (p *PostgresDB) DeleteUser(ctx context.Context, id string) error { + _, err := p.db.ExecContext(ctx, `DELETE FROM users WHERE id = $1`, id) + return err +} + +// ListUsers returns all users. +func (p *PostgresDB) ListUsers(ctx context.Context) ([]*User, error) { + rows, err := p.db.QueryContext(ctx, ` + SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled + FROM users ORDER BY username + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var users []*User + for rows.Next() { + var user User + var lastLogin sql.NullTime + var email sql.NullString + + err := rows.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider, + &user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled) + if err != nil { + return nil, err + } + + if email.Valid { + user.Email = email.String + } + if lastLogin.Valid { + user.LastLogin = lastLogin.Time + } + users = append(users, &user) + } + return users, rows.Err() +} + +// ============================================================================ +// Roles +// ============================================================================ + +// CreateRole creates a new role. +func (p *PostgresDB) CreateRole(ctx context.Context, role *Role) error { + perms, _ := json.Marshal(role.Permissions) + _, err := p.db.ExecContext(ctx, ` + INSERT INTO roles (id, name, description, permissions, is_system, created_at) + VALUES ($1, $2, $3, $4, $5, $6) + `, role.ID, role.Name, role.Description, perms, role.IsSystem, role.CreatedAt) + return err +} + +// GetRole retrieves a role by ID. +func (p *PostgresDB) GetRole(ctx context.Context, id string) (*Role, error) { + var role Role + var perms []byte + + err := p.db.QueryRowContext(ctx, ` + SELECT id, name, description, permissions, is_system, created_at + FROM roles WHERE id = $1 + `, id).Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + json.Unmarshal(perms, &role.Permissions) + return &role, nil +} + +// ListRoles returns all roles. +func (p *PostgresDB) ListRoles(ctx context.Context) ([]*Role, error) { + rows, err := p.db.QueryContext(ctx, ` + SELECT id, name, description, permissions, is_system, created_at + FROM roles ORDER BY name + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var roles []*Role + for rows.Next() { + var role Role + var perms []byte + + if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil { + return nil, err + } + + json.Unmarshal(perms, &role.Permissions) + roles = append(roles, &role) + } + return roles, rows.Err() +} + +// UpdateRole updates a role. +func (p *PostgresDB) UpdateRole(ctx context.Context, role *Role) error { + perms, _ := json.Marshal(role.Permissions) + _, err := p.db.ExecContext(ctx, ` + UPDATE roles SET name = $1, description = $2, permissions = $3 + WHERE id = $4 AND is_system = FALSE + `, role.Name, role.Description, perms, role.ID) + return err +} + +// DeleteRole removes a custom role. +func (p *PostgresDB) DeleteRole(ctx context.Context, id string) error { + result, err := p.db.ExecContext(ctx, `DELETE FROM roles WHERE id = $1 AND is_system = FALSE`, id) + if err != nil { + return err + } + rows, _ := result.RowsAffected() + if rows == 0 { + return errors.New("cannot delete system role or role not found") + } + return nil +} + +// GetUserRoles returns all roles assigned to a user. +func (p *PostgresDB) GetUserRoles(ctx context.Context, userID string) ([]*Role, error) { + rows, err := p.db.QueryContext(ctx, ` + SELECT r.id, r.name, r.description, r.permissions, r.is_system, r.created_at + FROM roles r + JOIN user_roles ur ON r.id = ur.role_id + WHERE ur.user_id = $1 + `, userID) + if err != nil { + return nil, err + } + defer rows.Close() + + var roles []*Role + for rows.Next() { + var role Role + var perms []byte + + if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil { + return nil, err + } + + json.Unmarshal(perms, &role.Permissions) + roles = append(roles, &role) + } + return roles, rows.Err() +} + +// AssignRole assigns a role to a user. +func (p *PostgresDB) AssignRole(ctx context.Context, userID, roleID string) error { + _, err := p.db.ExecContext(ctx, ` + INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2) + ON CONFLICT DO NOTHING + `, userID, roleID) + return err +} + +// RemoveRole removes a role from a user. +func (p *PostgresDB) RemoveRole(ctx context.Context, userID, roleID string) error { + _, err := p.db.ExecContext(ctx, ` + DELETE FROM user_roles WHERE user_id = $1 AND role_id = $2 + `, userID, roleID) + return err +} + +// ============================================================================ +// Sessions +// ============================================================================ + +// CreateSession creates a new session. +func (p *PostgresDB) CreateSession(ctx context.Context, session *Session) error { + _, err := p.db.ExecContext(ctx, ` + INSERT INTO sessions (token, user_id, created_at, expires_at, ip_address, user_agent) + VALUES ($1, $2, $3, $4, $5, $6) + `, session.Token, session.UserID, session.CreatedAt, session.ExpiresAt, session.IPAddress, session.UserAgent) + return err +} + +// GetSession retrieves a session by token. +func (p *PostgresDB) GetSession(ctx context.Context, token string) (*Session, error) { + var session Session + + err := p.db.QueryRowContext(ctx, ` + SELECT token, user_id, created_at, expires_at, ip_address, user_agent + FROM sessions WHERE token = $1 AND expires_at > $2 + `, token, time.Now().UTC()).Scan(&session.Token, &session.UserID, &session.CreatedAt, + &session.ExpiresAt, &session.IPAddress, &session.UserAgent) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + return &session, nil +} + +// DeleteSession removes a session. +func (p *PostgresDB) DeleteSession(ctx context.Context, token string) error { + _, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE token = $1`, token) + return err +} + +// DeleteUserSessions removes all sessions for a user. +func (p *PostgresDB) DeleteUserSessions(ctx context.Context, userID string) error { + _, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE user_id = $1`, userID) + return err +} + +// CleanupExpiredSessions removes all expired sessions. +func (p *PostgresDB) CleanupExpiredSessions(ctx context.Context) error { + _, err := p.db.ExecContext(ctx, `DELETE FROM sessions WHERE expires_at < $1`, time.Now().UTC()) + return err +} + +// ============================================================================ +// Alerts +// ============================================================================ + +// StoreAlert stores a new alert. +func (p *PostgresDB) StoreAlert(ctx context.Context, alert *Alert) error { + _, err := p.db.ExecContext(ctx, ` + INSERT INTO alerts (id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + `, alert.ID, alert.AgentID, alert.Type, alert.Severity, alert.Message, + alert.Value, alert.Threshold, alert.TriggeredAt, alert.ResolvedAt, alert.Acknowledged) + return err +} + +// GetAlert retrieves an alert by ID. +func (p *PostgresDB) GetAlert(ctx context.Context, id string) (*Alert, error) { + var alert Alert + var resolvedAt sql.NullTime + + err := p.db.QueryRowContext(ctx, ` + SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged + FROM alerts WHERE id = $1 + `, id).Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message, + &alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + if resolvedAt.Valid { + alert.ResolvedAt = &resolvedAt.Time + } + return &alert, nil +} + +// QueryAlerts queries alerts with filters. +func (p *PostgresDB) QueryAlerts(ctx context.Context, filter AlertFilter) ([]*Alert, error) { + var conditions []string + var args []interface{} + argNum := 1 + + if filter.AgentID != "" { + conditions = append(conditions, fmt.Sprintf("agent_id = $%d", argNum)) + args = append(args, filter.AgentID) + argNum++ + } + if filter.Type != "" { + conditions = append(conditions, fmt.Sprintf("type = $%d", argNum)) + args = append(args, filter.Type) + argNum++ + } + if filter.Severity != "" { + conditions = append(conditions, fmt.Sprintf("severity = $%d", argNum)) + args = append(args, filter.Severity) + argNum++ + } + if filter.Acknowledged != nil { + conditions = append(conditions, fmt.Sprintf("acknowledged = $%d", argNum)) + args = append(args, *filter.Acknowledged) + argNum++ + } + if !filter.From.IsZero() { + conditions = append(conditions, fmt.Sprintf("triggered_at >= $%d", argNum)) + args = append(args, filter.From) + argNum++ + } + if !filter.To.IsZero() { + conditions = append(conditions, fmt.Sprintf("triggered_at <= $%d", argNum)) + args = append(args, filter.To) + argNum++ + } + + query := "SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged FROM alerts" + if len(conditions) > 0 { + query += " WHERE " + strings.Join(conditions, " AND ") + } + query += " ORDER BY triggered_at DESC" + + if filter.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", filter.Limit) + } + if filter.Offset > 0 { + query += fmt.Sprintf(" OFFSET %d", filter.Offset) + } + + rows, err := p.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var alerts []*Alert + for rows.Next() { + var alert Alert + var resolvedAt sql.NullTime + + err := rows.Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message, + &alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged) + if err != nil { + return nil, err + } + + if resolvedAt.Valid { + alert.ResolvedAt = &resolvedAt.Time + } + alerts = append(alerts, &alert) + } + return alerts, rows.Err() +} + +// AcknowledgeAlert marks an alert as acknowledged. +func (p *PostgresDB) AcknowledgeAlert(ctx context.Context, id string) error { + _, err := p.db.ExecContext(ctx, `UPDATE alerts SET acknowledged = TRUE WHERE id = $1`, id) + return err +} + +// ============================================================================ +// Retention +// ============================================================================ + +// RunRetention runs the retention policy, aggregating and deleting old data. +func (p *PostgresDB) RunRetention(ctx context.Context) error { + now := time.Now().UTC() + + // Aggregate raw -> 1min for data older than raw retention + rawCutoff := now.Add(-p.retention.RawRetention) + if err := p.aggregateRawTo1Min(ctx, rawCutoff); err != nil { + return fmt.Errorf("aggregate raw->1min: %w", err) + } + + // Delete raw data older than retention + if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_raw WHERE timestamp < $1`, rawCutoff); err != nil { + return fmt.Errorf("delete old raw: %w", err) + } + + // Aggregate 1min -> 5min + oneMinCutoff := now.Add(-p.retention.OneMinuteRetention) + if err := p.aggregate1MinTo5Min(ctx, oneMinCutoff); err != nil { + return fmt.Errorf("aggregate 1min->5min: %w", err) + } + + // Delete 1min data older than retention + if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_1min WHERE timestamp < $1`, oneMinCutoff); err != nil { + return fmt.Errorf("delete old 1min: %w", err) + } + + // Aggregate 5min -> hourly + fiveMinCutoff := now.Add(-p.retention.FiveMinuteRetention) + if err := p.aggregate5MinToHourly(ctx, fiveMinCutoff); err != nil { + return fmt.Errorf("aggregate 5min->hourly: %w", err) + } + + // Delete 5min data older than retention + if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_5min WHERE timestamp < $1`, fiveMinCutoff); err != nil { + return fmt.Errorf("delete old 5min: %w", err) + } + + // Delete hourly data older than retention + hourlyCutoff := now.Add(-p.retention.HourlyRetention) + if _, err := p.db.ExecContext(ctx, `DELETE FROM metrics_hourly WHERE timestamp < $1`, hourlyCutoff); err != nil { + return fmt.Errorf("delete old hourly: %w", err) + } + + return nil +} + +func (p *PostgresDB) aggregateRawTo1Min(ctx context.Context, before time.Time) error { + // Use CTE to aggregate raw metrics into 1-minute buckets + _, err := p.db.ExecContext(ctx, ` + INSERT INTO metrics_1min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count) + SELECT + agent_id, + date_trunc('minute', timestamp) as ts, + AVG((data->'cpu'->>'usagePercent')::float), + MIN((data->'cpu'->>'usagePercent')::float), + MAX((data->'cpu'->>'usagePercent')::float), + AVG(CASE WHEN (data->'memory'->>'total')::bigint > 0 + THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100 + ELSE 0 END), + MIN(CASE WHEN (data->'memory'->>'total')::bigint > 0 + THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100 + ELSE 0 END), + MAX(CASE WHEN (data->'memory'->>'total')::bigint > 0 + THEN (data->'memory'->>'used')::float / (data->'memory'->>'total')::float * 100 + ELSE 0 END), + NULL, + NULL, + COUNT(*) + FROM metrics_raw + WHERE timestamp < $1 AND timestamp >= $2 + GROUP BY agent_id, ts + ON CONFLICT (agent_id, timestamp) DO UPDATE SET + cpu_avg = EXCLUDED.cpu_avg, + cpu_min = EXCLUDED.cpu_min, + cpu_max = EXCLUDED.cpu_max, + mem_avg = EXCLUDED.mem_avg, + mem_min = EXCLUDED.mem_min, + mem_max = EXCLUDED.mem_max, + sample_count = EXCLUDED.sample_count + `, before, before.Add(-24*time.Hour)) + return err +} + +func (p *PostgresDB) aggregate1MinTo5Min(ctx context.Context, before time.Time) error { + _, err := p.db.ExecContext(ctx, ` + INSERT INTO metrics_5min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count) + SELECT + agent_id, + date_trunc('hour', timestamp) + + INTERVAL '5 min' * (EXTRACT(MINUTE FROM timestamp)::int / 5) as ts, + AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max), + AVG(mem_avg), MIN(mem_min), MAX(mem_max), + AVG(disk_avg), AVG(gpu_avg), + SUM(sample_count) + FROM metrics_1min + WHERE timestamp < $1 + GROUP BY agent_id, ts + ON CONFLICT (agent_id, timestamp) DO UPDATE SET + cpu_avg = EXCLUDED.cpu_avg, + cpu_min = EXCLUDED.cpu_min, + cpu_max = EXCLUDED.cpu_max, + mem_avg = EXCLUDED.mem_avg, + mem_min = EXCLUDED.mem_min, + mem_max = EXCLUDED.mem_max, + disk_avg = EXCLUDED.disk_avg, + gpu_avg = EXCLUDED.gpu_avg, + sample_count = EXCLUDED.sample_count + `, before) + return err +} + +func (p *PostgresDB) aggregate5MinToHourly(ctx context.Context, before time.Time) error { + _, err := p.db.ExecContext(ctx, ` + INSERT INTO metrics_hourly (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count) + SELECT + agent_id, + date_trunc('hour', timestamp) as ts, + AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max), + AVG(mem_avg), MIN(mem_min), MAX(mem_max), + AVG(disk_avg), AVG(gpu_avg), + SUM(sample_count) + FROM metrics_5min + WHERE timestamp < $1 + GROUP BY agent_id, ts + ON CONFLICT (agent_id, timestamp) DO UPDATE SET + cpu_avg = EXCLUDED.cpu_avg, + cpu_min = EXCLUDED.cpu_min, + cpu_max = EXCLUDED.cpu_max, + mem_avg = EXCLUDED.mem_avg, + mem_min = EXCLUDED.mem_min, + mem_max = EXCLUDED.mem_max, + disk_avg = EXCLUDED.disk_avg, + gpu_avg = EXCLUDED.gpu_avg, + sample_count = EXCLUDED.sample_count + `, before) + return err +} diff --git a/backend/internal/database/sqlite.go b/backend/internal/database/sqlite.go new file mode 100644 index 0000000..3026b19 --- /dev/null +++ b/backend/internal/database/sqlite.go @@ -0,0 +1,1134 @@ +// Package database provides SQLite implementation of the Database interface. +package database + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "tyto/internal/models" + + _ "modernc.org/sqlite" +) + +// SQLiteDB implements the Database interface using SQLite. +type SQLiteDB struct { + db *sql.DB + retention RetentionConfig +} + +// NewSQLiteDB creates a new SQLite database connection. +func NewSQLiteDB(path string, retention RetentionConfig) (*SQLiteDB, error) { + // Enable WAL mode and foreign keys via connection string + dsn := fmt.Sprintf("%s?_pragma=journal_mode(WAL)&_pragma=foreign_keys(ON)&_pragma=busy_timeout(5000)", path) + + db, err := sql.Open("sqlite", dsn) + if err != nil { + return nil, fmt.Errorf("open sqlite: %w", err) + } + + // Set connection pool settings + db.SetMaxOpenConns(1) // SQLite handles one writer at a time + db.SetMaxIdleConns(1) + db.SetConnMaxLifetime(0) // Keep connection open + + // Verify connection + if err := db.Ping(); err != nil { + db.Close() + return nil, fmt.Errorf("ping sqlite: %w", err) + } + + return &SQLiteDB{ + db: db, + retention: retention, + }, nil +} + +// Close closes the database connection. +func (s *SQLiteDB) Close() error { + return s.db.Close() +} + +// Migrate runs database migrations. +func (s *SQLiteDB) Migrate() error { + migrations := []string{ + migrationAgents, + migrationUsers, + migrationRoles, + migrationSessions, + migrationMetrics, + migrationAlerts, + } + + for i, m := range migrations { + if _, err := s.db.Exec(m); err != nil { + return fmt.Errorf("migration %d: %w", i+1, err) + } + } + + // Insert default roles if they don't exist + return s.insertDefaultRoles() +} + +// SQL migration statements +const migrationAgents = ` +CREATE TABLE IF NOT EXISTS agents ( + id TEXT PRIMARY KEY, + name TEXT, + hostname TEXT NOT NULL, + os TEXT NOT NULL, + architecture TEXT NOT NULL, + version TEXT NOT NULL, + capabilities TEXT, -- JSON array + status TEXT NOT NULL DEFAULT 'pending', + cert_serial TEXT, + cert_expiry TIMESTAMP, + last_seen TIMESTAMP, + registered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + tags TEXT -- JSON array +); +CREATE INDEX IF NOT EXISTS idx_agents_status ON agents(status); +` + +const migrationUsers = ` +CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + username TEXT UNIQUE NOT NULL, + email TEXT, + password_hash BLOB, + auth_provider TEXT NOT NULL DEFAULT 'local', + ldap_dn TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_login TIMESTAMP, + disabled INTEGER NOT NULL DEFAULT 0 +); +CREATE INDEX IF NOT EXISTS idx_users_username ON users(username); +` + +const migrationRoles = ` +CREATE TABLE IF NOT EXISTS roles ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + description TEXT, + permissions TEXT NOT NULL, -- JSON array + is_system INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS user_roles ( + user_id TEXT NOT NULL, + role_id TEXT NOT NULL, + PRIMARY KEY (user_id, role_id), + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (role_id) REFERENCES roles(id) ON DELETE CASCADE +); +` + +const migrationSessions = ` +CREATE TABLE IF NOT EXISTS sessions ( + token TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP NOT NULL, + ip_address TEXT, + user_agent TEXT, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_sessions_user ON sessions(user_id); +CREATE INDEX IF NOT EXISTS idx_sessions_expires ON sessions(expires_at); +` + +const migrationMetrics = ` +-- Raw metrics (high resolution, short retention) +CREATE TABLE IF NOT EXISTS metrics_raw ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + agent_id TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + data BLOB NOT NULL, -- Compressed JSON + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_metrics_raw_agent_time ON metrics_raw(agent_id, timestamp); + +-- 1-minute aggregations +CREATE TABLE IF NOT EXISTS metrics_1min ( + agent_id TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + cpu_avg REAL, cpu_min REAL, cpu_max REAL, + mem_avg REAL, mem_min REAL, mem_max REAL, + disk_avg REAL, + gpu_avg REAL, + sample_count INTEGER NOT NULL DEFAULT 1, + PRIMARY KEY (agent_id, timestamp), + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); + +-- 5-minute aggregations +CREATE TABLE IF NOT EXISTS metrics_5min ( + agent_id TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + cpu_avg REAL, cpu_min REAL, cpu_max REAL, + mem_avg REAL, mem_min REAL, mem_max REAL, + disk_avg REAL, + gpu_avg REAL, + sample_count INTEGER NOT NULL DEFAULT 1, + PRIMARY KEY (agent_id, timestamp), + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); + +-- Hourly aggregations +CREATE TABLE IF NOT EXISTS metrics_hourly ( + agent_id TEXT NOT NULL, + timestamp TIMESTAMP NOT NULL, + cpu_avg REAL, cpu_min REAL, cpu_max REAL, + mem_avg REAL, mem_min REAL, mem_max REAL, + disk_avg REAL, + gpu_avg REAL, + sample_count INTEGER NOT NULL DEFAULT 1, + PRIMARY KEY (agent_id, timestamp), + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); +` + +const migrationAlerts = ` +CREATE TABLE IF NOT EXISTS alerts ( + id TEXT PRIMARY KEY, + agent_id TEXT NOT NULL, + type TEXT NOT NULL, + severity TEXT NOT NULL, + message TEXT NOT NULL, + value REAL NOT NULL, + threshold REAL NOT NULL, + triggered_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + resolved_at TIMESTAMP, + acknowledged INTEGER NOT NULL DEFAULT 0, + FOREIGN KEY (agent_id) REFERENCES agents(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_alerts_agent ON alerts(agent_id); +CREATE INDEX IF NOT EXISTS idx_alerts_triggered ON alerts(triggered_at); +CREATE INDEX IF NOT EXISTS idx_alerts_severity ON alerts(severity); +` + +func (s *SQLiteDB) insertDefaultRoles() error { + defaultRoles := []struct { + id, name, desc string + perms []string + }{ + {"admin", "Administrator", "Full system access", []string{"*"}}, + {"operator", "Operator", "Manage agents and alerts", []string{ + "dashboard:view", "agents:view", "agents:manage", + "alerts:view", "alerts:acknowledge", "alerts:configure", + "metrics:export", "metrics:query", + }}, + {"viewer", "Viewer", "Read-only access", []string{ + "dashboard:view", "agents:view", "alerts:view", + }}, + } + + for _, r := range defaultRoles { + perms, _ := json.Marshal(r.perms) + _, err := s.db.Exec(` + INSERT OR IGNORE INTO roles (id, name, description, permissions, is_system) + VALUES (?, ?, ?, ?, 1) + `, r.id, r.name, r.desc, string(perms)) + if err != nil { + return fmt.Errorf("insert role %s: %w", r.id, err) + } + } + return nil +} + +// ============================================================================ +// Metrics Storage +// ============================================================================ + +// StoreMetrics stores raw metrics from an agent. +func (s *SQLiteDB) StoreMetrics(ctx context.Context, agentID string, metrics *models.AllMetrics) error { + data, err := json.Marshal(metrics) + if err != nil { + return fmt.Errorf("marshal metrics: %w", err) + } + + _, err = s.db.ExecContext(ctx, ` + INSERT INTO metrics_raw (agent_id, timestamp, data) + VALUES (?, ?, ?) + `, agentID, time.Now().UTC(), data) + + return err +} + +// QueryMetrics queries metrics with the specified resolution. +func (s *SQLiteDB) QueryMetrics(ctx context.Context, agentID string, from, to time.Time, resolution string) ([]MetricPoint, error) { + var table string + switch resolution { + case "raw": + return s.queryRawMetrics(ctx, agentID, from, to) + case "1min": + table = "metrics_1min" + case "5min": + table = "metrics_5min" + case "hourly": + table = "metrics_hourly" + default: + // Auto-select based on time range + table = s.selectResolution(from, to) + } + + query := fmt.Sprintf(` + SELECT timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg + FROM %s + WHERE agent_id = ? AND timestamp >= ? AND timestamp <= ? + ORDER BY timestamp ASC + `, table) + + rows, err := s.db.QueryContext(ctx, query, agentID, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + + var points []MetricPoint + for rows.Next() { + var p MetricPoint + p.AgentID = agentID + err := rows.Scan(&p.Timestamp, &p.CPUAvg, &p.CPUMin, &p.CPUMax, + &p.MemoryAvg, &p.MemoryMin, &p.MemoryMax, &p.DiskAvg, &p.GPUAvg) + if err != nil { + return nil, err + } + points = append(points, p) + } + return points, rows.Err() +} + +func (s *SQLiteDB) queryRawMetrics(ctx context.Context, agentID string, from, to time.Time) ([]MetricPoint, error) { + rows, err := s.db.QueryContext(ctx, ` + SELECT timestamp, data FROM metrics_raw + WHERE agent_id = ? AND timestamp >= ? AND timestamp <= ? + ORDER BY timestamp ASC + `, agentID, from, to) + if err != nil { + return nil, err + } + defer rows.Close() + + var points []MetricPoint + for rows.Next() { + var ts time.Time + var data []byte + if err := rows.Scan(&ts, &data); err != nil { + return nil, err + } + + var m models.AllMetrics + if err := json.Unmarshal(data, &m); err != nil { + continue // Skip corrupted entries + } + + p := MetricPoint{ + Timestamp: ts, + AgentID: agentID, + } + + // Extract CPU usage + p.CPUAvg = m.CPU.TotalUsage + p.CPUMin = m.CPU.TotalUsage + p.CPUMax = m.CPU.TotalUsage + + // Extract memory usage + if m.Memory.Total > 0 { + usedPct := float64(m.Memory.Used) / float64(m.Memory.Total) * 100 + p.MemoryAvg = usedPct + p.MemoryMin = usedPct + p.MemoryMax = usedPct + } + + // Extract disk usage (aggregate all mounts) + if len(m.Disk.Mounts) > 0 { + var totalUsed, totalTotal uint64 + for _, d := range m.Disk.Mounts { + totalUsed += d.Used + totalTotal += d.Total + } + if totalTotal > 0 { + p.DiskAvg = float64(totalUsed) / float64(totalTotal) * 100 + } + } + + // Extract GPU usage + if m.GPU.Available { + p.GPUAvg = float64(m.GPU.Utilization) + } + + points = append(points, p) + } + return points, rows.Err() +} + +func (s *SQLiteDB) selectResolution(from, to time.Time) string { + duration := to.Sub(from) + switch { + case duration <= 2*time.Hour: + return "metrics_1min" + case duration <= 24*time.Hour: + return "metrics_5min" + default: + return "metrics_hourly" + } +} + +// GetLatestMetrics returns the most recent metrics for an agent. +func (s *SQLiteDB) GetLatestMetrics(ctx context.Context, agentID string) (*models.AllMetrics, error) { + var data []byte + err := s.db.QueryRowContext(ctx, ` + SELECT data FROM metrics_raw + WHERE agent_id = ? + ORDER BY timestamp DESC + LIMIT 1 + `, agentID).Scan(&data) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + var m models.AllMetrics + if err := json.Unmarshal(data, &m); err != nil { + return nil, err + } + return &m, nil +} + +// ============================================================================ +// Agents +// ============================================================================ + +// StoreAgent stores or updates an agent record. +func (s *SQLiteDB) StoreAgent(ctx context.Context, agent *Agent) error { + caps, _ := json.Marshal(agent.Capabilities) + tags, _ := json.Marshal(agent.Tags) + + _, err := s.db.ExecContext(ctx, ` + INSERT INTO agents (id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + name = excluded.name, + hostname = excluded.hostname, + os = excluded.os, + architecture = excluded.architecture, + version = excluded.version, + capabilities = excluded.capabilities, + status = excluded.status, + cert_serial = excluded.cert_serial, + cert_expiry = excluded.cert_expiry, + last_seen = excluded.last_seen, + tags = excluded.tags + `, agent.ID, agent.Name, agent.Hostname, agent.OS, agent.Architecture, agent.Version, + string(caps), agent.Status, agent.CertSerial, agent.CertExpiry, agent.LastSeen, agent.RegisteredAt, string(tags)) + + return err +} + +// GetAgent retrieves an agent by ID. +func (s *SQLiteDB) GetAgent(ctx context.Context, id string) (*Agent, error) { + var agent Agent + var caps, tags string + var certExpiry, lastSeen sql.NullTime + + err := s.db.QueryRowContext(ctx, ` + SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags + FROM agents WHERE id = ? + `, id).Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version, + &caps, &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, &tags) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + if certExpiry.Valid { + agent.CertExpiry = certExpiry.Time + } + if lastSeen.Valid { + agent.LastSeen = lastSeen.Time + } + json.Unmarshal([]byte(caps), &agent.Capabilities) + json.Unmarshal([]byte(tags), &agent.Tags) + + return &agent, nil +} + +// ListAgents returns all registered agents. +func (s *SQLiteDB) ListAgents(ctx context.Context) ([]*Agent, error) { + rows, err := s.db.QueryContext(ctx, ` + SELECT id, name, hostname, os, architecture, version, capabilities, status, cert_serial, cert_expiry, last_seen, registered_at, tags + FROM agents ORDER BY registered_at DESC + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var agents []*Agent + for rows.Next() { + var agent Agent + var caps, tags string + var certExpiry, lastSeen sql.NullTime + + err := rows.Scan(&agent.ID, &agent.Name, &agent.Hostname, &agent.OS, &agent.Architecture, &agent.Version, + &caps, &agent.Status, &agent.CertSerial, &certExpiry, &lastSeen, &agent.RegisteredAt, &tags) + if err != nil { + return nil, err + } + + if certExpiry.Valid { + agent.CertExpiry = certExpiry.Time + } + if lastSeen.Valid { + agent.LastSeen = lastSeen.Time + } + json.Unmarshal([]byte(caps), &agent.Capabilities) + json.Unmarshal([]byte(tags), &agent.Tags) + + agents = append(agents, &agent) + } + return agents, rows.Err() +} + +// UpdateAgentStatus updates an agent's status and last seen time. +func (s *SQLiteDB) UpdateAgentStatus(ctx context.Context, id string, status AgentStatus, lastSeen time.Time) error { + result, err := s.db.ExecContext(ctx, ` + UPDATE agents SET status = ?, last_seen = ? WHERE id = ? + `, status, lastSeen, id) + if err != nil { + return err + } + + rows, _ := result.RowsAffected() + if rows == 0 { + return fmt.Errorf("agent not found: %s", id) + } + return nil +} + +// DeleteAgent removes an agent and all its data. +func (s *SQLiteDB) DeleteAgent(ctx context.Context, id string) error { + _, err := s.db.ExecContext(ctx, `DELETE FROM agents WHERE id = ?`, id) + return err +} + +// ============================================================================ +// Users +// ============================================================================ + +// CreateUser creates a new user. +func (s *SQLiteDB) CreateUser(ctx context.Context, user *User) error { + _, err := s.db.ExecContext(ctx, ` + INSERT INTO users (id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, disabled) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `, user.ID, user.Username, user.Email, user.PasswordHash, user.AuthProvider, + user.LDAPDN, user.CreatedAt, user.UpdatedAt, user.Disabled) + return err +} + +// GetUser retrieves a user by ID. +func (s *SQLiteDB) GetUser(ctx context.Context, id string) (*User, error) { + return s.scanUser(s.db.QueryRowContext(ctx, ` + SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled + FROM users WHERE id = ? + `, id)) +} + +// GetUserByUsername retrieves a user by username. +func (s *SQLiteDB) GetUserByUsername(ctx context.Context, username string) (*User, error) { + return s.scanUser(s.db.QueryRowContext(ctx, ` + SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled + FROM users WHERE username = ? + `, username)) +} + +func (s *SQLiteDB) scanUser(row *sql.Row) (*User, error) { + var user User + var lastLogin sql.NullTime + var email sql.NullString + + err := row.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider, + &user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + if email.Valid { + user.Email = email.String + } + if lastLogin.Valid { + user.LastLogin = lastLogin.Time + } + return &user, nil +} + +// UpdateUser updates a user record. +func (s *SQLiteDB) UpdateUser(ctx context.Context, user *User) error { + _, err := s.db.ExecContext(ctx, ` + UPDATE users SET + username = ?, email = ?, password_hash = ?, auth_provider = ?, + ldap_dn = ?, updated_at = ?, last_login = ?, disabled = ? + WHERE id = ? + `, user.Username, user.Email, user.PasswordHash, user.AuthProvider, + user.LDAPDN, time.Now().UTC(), user.LastLogin, user.Disabled, user.ID) + return err +} + +// DeleteUser removes a user. +func (s *SQLiteDB) DeleteUser(ctx context.Context, id string) error { + _, err := s.db.ExecContext(ctx, `DELETE FROM users WHERE id = ?`, id) + return err +} + +// ListUsers returns all users. +func (s *SQLiteDB) ListUsers(ctx context.Context) ([]*User, error) { + rows, err := s.db.QueryContext(ctx, ` + SELECT id, username, email, password_hash, auth_provider, ldap_dn, created_at, updated_at, last_login, disabled + FROM users ORDER BY username + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var users []*User + for rows.Next() { + var user User + var lastLogin sql.NullTime + var email sql.NullString + + err := rows.Scan(&user.ID, &user.Username, &email, &user.PasswordHash, &user.AuthProvider, + &user.LDAPDN, &user.CreatedAt, &user.UpdatedAt, &lastLogin, &user.Disabled) + if err != nil { + return nil, err + } + + if email.Valid { + user.Email = email.String + } + if lastLogin.Valid { + user.LastLogin = lastLogin.Time + } + users = append(users, &user) + } + return users, rows.Err() +} + +// ============================================================================ +// Roles +// ============================================================================ + +// CreateRole creates a new role. +func (s *SQLiteDB) CreateRole(ctx context.Context, role *Role) error { + perms, _ := json.Marshal(role.Permissions) + _, err := s.db.ExecContext(ctx, ` + INSERT INTO roles (id, name, description, permissions, is_system, created_at) + VALUES (?, ?, ?, ?, ?, ?) + `, role.ID, role.Name, role.Description, string(perms), role.IsSystem, role.CreatedAt) + return err +} + +// GetRole retrieves a role by ID. +func (s *SQLiteDB) GetRole(ctx context.Context, id string) (*Role, error) { + var role Role + var perms string + + err := s.db.QueryRowContext(ctx, ` + SELECT id, name, description, permissions, is_system, created_at + FROM roles WHERE id = ? + `, id).Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + json.Unmarshal([]byte(perms), &role.Permissions) + return &role, nil +} + +// ListRoles returns all roles. +func (s *SQLiteDB) ListRoles(ctx context.Context) ([]*Role, error) { + rows, err := s.db.QueryContext(ctx, ` + SELECT id, name, description, permissions, is_system, created_at + FROM roles ORDER BY name + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var roles []*Role + for rows.Next() { + var role Role + var perms string + + if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil { + return nil, err + } + + json.Unmarshal([]byte(perms), &role.Permissions) + roles = append(roles, &role) + } + return roles, rows.Err() +} + +// UpdateRole updates a role. +func (s *SQLiteDB) UpdateRole(ctx context.Context, role *Role) error { + perms, _ := json.Marshal(role.Permissions) + _, err := s.db.ExecContext(ctx, ` + UPDATE roles SET name = ?, description = ?, permissions = ? + WHERE id = ? AND is_system = 0 + `, role.Name, role.Description, string(perms), role.ID) + return err +} + +// DeleteRole removes a custom role. +func (s *SQLiteDB) DeleteRole(ctx context.Context, id string) error { + result, err := s.db.ExecContext(ctx, `DELETE FROM roles WHERE id = ? AND is_system = 0`, id) + if err != nil { + return err + } + rows, _ := result.RowsAffected() + if rows == 0 { + return errors.New("cannot delete system role or role not found") + } + return nil +} + +// GetUserRoles returns all roles assigned to a user. +func (s *SQLiteDB) GetUserRoles(ctx context.Context, userID string) ([]*Role, error) { + rows, err := s.db.QueryContext(ctx, ` + SELECT r.id, r.name, r.description, r.permissions, r.is_system, r.created_at + FROM roles r + JOIN user_roles ur ON r.id = ur.role_id + WHERE ur.user_id = ? + `, userID) + if err != nil { + return nil, err + } + defer rows.Close() + + var roles []*Role + for rows.Next() { + var role Role + var perms string + + if err := rows.Scan(&role.ID, &role.Name, &role.Description, &perms, &role.IsSystem, &role.CreatedAt); err != nil { + return nil, err + } + + json.Unmarshal([]byte(perms), &role.Permissions) + roles = append(roles, &role) + } + return roles, rows.Err() +} + +// AssignRole assigns a role to a user. +func (s *SQLiteDB) AssignRole(ctx context.Context, userID, roleID string) error { + _, err := s.db.ExecContext(ctx, ` + INSERT OR IGNORE INTO user_roles (user_id, role_id) VALUES (?, ?) + `, userID, roleID) + return err +} + +// RemoveRole removes a role from a user. +func (s *SQLiteDB) RemoveRole(ctx context.Context, userID, roleID string) error { + _, err := s.db.ExecContext(ctx, ` + DELETE FROM user_roles WHERE user_id = ? AND role_id = ? + `, userID, roleID) + return err +} + +// ============================================================================ +// Sessions +// ============================================================================ + +// CreateSession creates a new session. +func (s *SQLiteDB) CreateSession(ctx context.Context, session *Session) error { + _, err := s.db.ExecContext(ctx, ` + INSERT INTO sessions (token, user_id, created_at, expires_at, ip_address, user_agent) + VALUES (?, ?, ?, ?, ?, ?) + `, session.Token, session.UserID, session.CreatedAt, session.ExpiresAt, session.IPAddress, session.UserAgent) + return err +} + +// GetSession retrieves a session by token. +func (s *SQLiteDB) GetSession(ctx context.Context, token string) (*Session, error) { + var session Session + + err := s.db.QueryRowContext(ctx, ` + SELECT token, user_id, created_at, expires_at, ip_address, user_agent + FROM sessions WHERE token = ? AND expires_at > ? + `, token, time.Now().UTC()).Scan(&session.Token, &session.UserID, &session.CreatedAt, + &session.ExpiresAt, &session.IPAddress, &session.UserAgent) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + return &session, nil +} + +// DeleteSession removes a session. +func (s *SQLiteDB) DeleteSession(ctx context.Context, token string) error { + _, err := s.db.ExecContext(ctx, `DELETE FROM sessions WHERE token = ?`, token) + return err +} + +// DeleteUserSessions removes all sessions for a user. +func (s *SQLiteDB) DeleteUserSessions(ctx context.Context, userID string) error { + _, err := s.db.ExecContext(ctx, `DELETE FROM sessions WHERE user_id = ?`, userID) + return err +} + +// CleanupExpiredSessions removes all expired sessions. +func (s *SQLiteDB) CleanupExpiredSessions(ctx context.Context) error { + _, err := s.db.ExecContext(ctx, `DELETE FROM sessions WHERE expires_at < ?`, time.Now().UTC()) + return err +} + +// ============================================================================ +// Alerts +// ============================================================================ + +// StoreAlert stores a new alert. +func (s *SQLiteDB) StoreAlert(ctx context.Context, alert *Alert) error { + _, err := s.db.ExecContext(ctx, ` + INSERT INTO alerts (id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, alert.ID, alert.AgentID, alert.Type, alert.Severity, alert.Message, + alert.Value, alert.Threshold, alert.TriggeredAt, alert.ResolvedAt, alert.Acknowledged) + return err +} + +// GetAlert retrieves an alert by ID. +func (s *SQLiteDB) GetAlert(ctx context.Context, id string) (*Alert, error) { + var alert Alert + var resolvedAt sql.NullTime + + err := s.db.QueryRowContext(ctx, ` + SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged + FROM alerts WHERE id = ? + `, id).Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message, + &alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged) + + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + + if resolvedAt.Valid { + alert.ResolvedAt = &resolvedAt.Time + } + return &alert, nil +} + +// QueryAlerts queries alerts with filters. +func (s *SQLiteDB) QueryAlerts(ctx context.Context, filter AlertFilter) ([]*Alert, error) { + var conditions []string + var args []interface{} + + if filter.AgentID != "" { + conditions = append(conditions, "agent_id = ?") + args = append(args, filter.AgentID) + } + if filter.Type != "" { + conditions = append(conditions, "type = ?") + args = append(args, filter.Type) + } + if filter.Severity != "" { + conditions = append(conditions, "severity = ?") + args = append(args, filter.Severity) + } + if filter.Acknowledged != nil { + conditions = append(conditions, "acknowledged = ?") + if *filter.Acknowledged { + args = append(args, 1) + } else { + args = append(args, 0) + } + } + if !filter.From.IsZero() { + conditions = append(conditions, "triggered_at >= ?") + args = append(args, filter.From) + } + if !filter.To.IsZero() { + conditions = append(conditions, "triggered_at <= ?") + args = append(args, filter.To) + } + + query := "SELECT id, agent_id, type, severity, message, value, threshold, triggered_at, resolved_at, acknowledged FROM alerts" + if len(conditions) > 0 { + query += " WHERE " + strings.Join(conditions, " AND ") + } + query += " ORDER BY triggered_at DESC" + + if filter.Limit > 0 { + query += fmt.Sprintf(" LIMIT %d", filter.Limit) + } + if filter.Offset > 0 { + query += fmt.Sprintf(" OFFSET %d", filter.Offset) + } + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var alerts []*Alert + for rows.Next() { + var alert Alert + var resolvedAt sql.NullTime + + err := rows.Scan(&alert.ID, &alert.AgentID, &alert.Type, &alert.Severity, &alert.Message, + &alert.Value, &alert.Threshold, &alert.TriggeredAt, &resolvedAt, &alert.Acknowledged) + if err != nil { + return nil, err + } + + if resolvedAt.Valid { + alert.ResolvedAt = &resolvedAt.Time + } + alerts = append(alerts, &alert) + } + return alerts, rows.Err() +} + +// AcknowledgeAlert marks an alert as acknowledged. +func (s *SQLiteDB) AcknowledgeAlert(ctx context.Context, id string) error { + _, err := s.db.ExecContext(ctx, `UPDATE alerts SET acknowledged = 1 WHERE id = ?`, id) + return err +} + +// ============================================================================ +// Retention +// ============================================================================ + +// RunRetention runs the retention policy, aggregating and deleting old data. +func (s *SQLiteDB) RunRetention(ctx context.Context) error { + now := time.Now().UTC() + + // Aggregate raw -> 1min for data older than raw retention + rawCutoff := now.Add(-s.retention.RawRetention) + if err := s.aggregateRawTo1Min(ctx, rawCutoff); err != nil { + return fmt.Errorf("aggregate raw->1min: %w", err) + } + + // Delete raw data older than retention + if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_raw WHERE timestamp < ?`, rawCutoff); err != nil { + return fmt.Errorf("delete old raw: %w", err) + } + + // Aggregate 1min -> 5min for data older than 1min retention + oneMinCutoff := now.Add(-s.retention.OneMinuteRetention) + if err := s.aggregate1MinTo5Min(ctx, oneMinCutoff); err != nil { + return fmt.Errorf("aggregate 1min->5min: %w", err) + } + + // Delete 1min data older than retention + if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_1min WHERE timestamp < ?`, oneMinCutoff); err != nil { + return fmt.Errorf("delete old 1min: %w", err) + } + + // Aggregate 5min -> hourly for data older than 5min retention + fiveMinCutoff := now.Add(-s.retention.FiveMinuteRetention) + if err := s.aggregate5MinToHourly(ctx, fiveMinCutoff); err != nil { + return fmt.Errorf("aggregate 5min->hourly: %w", err) + } + + // Delete 5min data older than retention + if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_5min WHERE timestamp < ?`, fiveMinCutoff); err != nil { + return fmt.Errorf("delete old 5min: %w", err) + } + + // Delete hourly data older than retention + hourlyCutoff := now.Add(-s.retention.HourlyRetention) + if _, err := s.db.ExecContext(ctx, `DELETE FROM metrics_hourly WHERE timestamp < ?`, hourlyCutoff); err != nil { + return fmt.Errorf("delete old hourly: %w", err) + } + + return nil +} + +func (s *SQLiteDB) aggregateRawTo1Min(ctx context.Context, before time.Time) error { + // Get distinct agents with raw data to aggregate + rows, err := s.db.QueryContext(ctx, ` + SELECT DISTINCT agent_id FROM metrics_raw + WHERE timestamp < ? AND timestamp >= ? + `, before, before.Add(-24*time.Hour)) // Only look back 24h max + if err != nil { + return err + } + + var agents []string + for rows.Next() { + var agentID string + if err := rows.Scan(&agentID); err != nil { + rows.Close() + return err + } + agents = append(agents, agentID) + } + rows.Close() + + for _, agentID := range agents { + if err := s.aggregateRawTo1MinForAgent(ctx, agentID, before); err != nil { + return err + } + } + return nil +} + +func (s *SQLiteDB) aggregateRawTo1MinForAgent(ctx context.Context, agentID string, before time.Time) error { + // Fetch raw metrics and aggregate by minute + rows, err := s.db.QueryContext(ctx, ` + SELECT timestamp, data FROM metrics_raw + WHERE agent_id = ? AND timestamp < ? + ORDER BY timestamp + `, agentID, before) + if err != nil { + return err + } + defer rows.Close() + + // Group by minute + buckets := make(map[time.Time][]MetricPoint) + + for rows.Next() { + var ts time.Time + var data []byte + if err := rows.Scan(&ts, &data); err != nil { + return err + } + + var m models.AllMetrics + if err := json.Unmarshal(data, &m); err != nil { + continue + } + + // Truncate to minute + minute := ts.Truncate(time.Minute) + + p := MetricPoint{Timestamp: ts, AgentID: agentID} + p.CPUAvg = m.CPU.TotalUsage + p.CPUMin = m.CPU.TotalUsage + p.CPUMax = m.CPU.TotalUsage + if m.Memory.Total > 0 { + pct := float64(m.Memory.Used) / float64(m.Memory.Total) * 100 + p.MemoryAvg, p.MemoryMin, p.MemoryMax = pct, pct, pct + } + + buckets[minute] = append(buckets[minute], p) + } + + // Insert aggregated data + for minute, points := range buckets { + agg := aggregatePoints(points) + _, err := s.db.ExecContext(ctx, ` + INSERT OR REPLACE INTO metrics_1min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, agentID, minute, agg.CPUAvg, agg.CPUMin, agg.CPUMax, agg.MemoryAvg, agg.MemoryMin, agg.MemoryMax, agg.DiskAvg, agg.GPUAvg, len(points)) + if err != nil { + return err + } + } + + return nil +} + +func (s *SQLiteDB) aggregate1MinTo5Min(ctx context.Context, before time.Time) error { + _, err := s.db.ExecContext(ctx, ` + INSERT OR REPLACE INTO metrics_5min (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count) + SELECT + agent_id, + datetime((strftime('%s', timestamp) / 300) * 300, 'unixepoch') as ts, + AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max), + AVG(mem_avg), MIN(mem_min), MAX(mem_max), + AVG(disk_avg), AVG(gpu_avg), + SUM(sample_count) + FROM metrics_1min + WHERE timestamp < ? + GROUP BY agent_id, ts + `, before) + return err +} + +func (s *SQLiteDB) aggregate5MinToHourly(ctx context.Context, before time.Time) error { + _, err := s.db.ExecContext(ctx, ` + INSERT OR REPLACE INTO metrics_hourly (agent_id, timestamp, cpu_avg, cpu_min, cpu_max, mem_avg, mem_min, mem_max, disk_avg, gpu_avg, sample_count) + SELECT + agent_id, + datetime((strftime('%s', timestamp) / 3600) * 3600, 'unixepoch') as ts, + AVG(cpu_avg), MIN(cpu_min), MAX(cpu_max), + AVG(mem_avg), MIN(mem_min), MAX(mem_max), + AVG(disk_avg), AVG(gpu_avg), + SUM(sample_count) + FROM metrics_5min + WHERE timestamp < ? + GROUP BY agent_id, ts + `, before) + return err +} + +func aggregatePoints(points []MetricPoint) MetricPoint { + if len(points) == 0 { + return MetricPoint{} + } + + var agg MetricPoint + agg.CPUMin = points[0].CPUMin + agg.MemoryMin = points[0].MemoryMin + + for _, p := range points { + agg.CPUAvg += p.CPUAvg + agg.MemoryAvg += p.MemoryAvg + agg.DiskAvg += p.DiskAvg + agg.GPUAvg += p.GPUAvg + + if p.CPUMin < agg.CPUMin { + agg.CPUMin = p.CPUMin + } + if p.CPUMax > agg.CPUMax { + agg.CPUMax = p.CPUMax + } + if p.MemoryMin < agg.MemoryMin { + agg.MemoryMin = p.MemoryMin + } + if p.MemoryMax > agg.MemoryMax { + agg.MemoryMax = p.MemoryMax + } + } + + n := float64(len(points)) + agg.CPUAvg /= n + agg.MemoryAvg /= n + agg.DiskAvg /= n + agg.GPUAvg /= n + + return agg +}