diff --git a/backend/deploy/helm/templates/ci-secrets.yaml b/backend/deploy/helm/templates/ci-secrets.yaml index ef4f5ad..ea4e1bc 100644 --- a/backend/deploy/helm/templates/ci-secrets.yaml +++ b/backend/deploy/helm/templates/ci-secrets.yaml @@ -1,4 +1,4 @@ -{{- if or .Values.ai.apiKey .Values.turnstile.secretKey }} +{{- if or .Values.ai.apiKey .Values.turnstile.secretKey .Values.discovery.token }} apiVersion: v1 kind: Secret metadata: @@ -17,4 +17,7 @@ stringData: {{- if .Values.turnstile.secretKey }} TURNSTILE_SECRET_KEY: {{ .Values.turnstile.secretKey | quote }} {{- end }} + {{- if .Values.discovery.token }} + DISCOVERY_TOKEN: {{ .Values.discovery.token | quote }} + {{- end }} {{- end }} diff --git a/backend/deploy/helm/templates/deployment.yaml b/backend/deploy/helm/templates/deployment.yaml index 7ef22b9..6610ab1 100644 --- a/backend/deploy/helm/templates/deployment.yaml +++ b/backend/deploy/helm/templates/deployment.yaml @@ -51,8 +51,8 @@ spec: - secretRef: name: {{ include "marktvogt-backend.fullname" . }}-smtp {{- end }} - {{- if or .Values.ai.apiKey .Values.turnstile.secretKey }} - # AI + Turnstile credentials (Helm-managed, passed via CI) + {{- if or .Values.ai.apiKey .Values.turnstile.secretKey .Values.discovery.token }} + # AI, Turnstile + Discovery credentials (Helm-managed, passed via CI) - secretRef: name: {{ include "marktvogt-backend.fullname" . }}-ci-secrets {{- end }} @@ -92,6 +92,19 @@ spec: secretKeyRef: name: {{ include "marktvogt-backend.pgAppSecret" . }} key: dbname + - name: DISCOVERY_TOKEN + valueFrom: + secretKeyRef: + name: {{ include "marktvogt-backend.fullname" . }}-ci-secrets + key: DISCOVERY_TOKEN + - name: AI_AGENT_DISCOVERY + value: {{ .Values.ai.agentDiscovery | quote }} + - name: AI_RATE_LIMIT_RPS + value: {{ .Values.ai.rateLimitRps | default 1 | quote }} + - name: DISCOVERY_BATCH_SIZE + value: {{ .Values.discovery.batchSize | default 4 | quote }} + - name: DISCOVERY_FORWARD_MONTHS + value: {{ .Values.discovery.forwardMonths | default 12 | quote }} startupProbe: httpGet: path: /healthz diff --git a/backend/deploy/helm/templates/discovery-cron.yaml b/backend/deploy/helm/templates/discovery-cron.yaml new file mode 100644 index 0000000..531a978 --- /dev/null +++ b/backend/deploy/helm/templates/discovery-cron.yaml @@ -0,0 +1,37 @@ +{{- if .Values.discovery.enabled }} +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ include "marktvogt-backend.fullname" . }}-discovery-tick + namespace: {{ .Release.Namespace }} + labels: + {{- include "marktvogt-backend.labels" . | nindent 4 }} +spec: + schedule: {{ .Values.discovery.schedule | quote }} + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 3 + jobTemplate: + spec: + backoffLimit: 2 + template: + spec: + restartPolicy: OnFailure + containers: + - name: tick + image: curlimages/curl:8.9.1 + command: + - sh + - -c + - | + curl -fsS --retry 2 --retry-delay 10 \ + -X POST \ + -H "Authorization: Bearer $DISCOVERY_TOKEN" \ + "http://{{ include "marktvogt-backend.fullname" . }}:8080/api/v1/admin/discovery/tick" + env: + - name: DISCOVERY_TOKEN + valueFrom: + secretKeyRef: + name: {{ include "marktvogt-backend.fullname" . }}-ci-secrets + key: DISCOVERY_TOKEN +{{- end }} diff --git a/backend/deploy/helm/values.yaml b/backend/deploy/helm/values.yaml index 712184d..09fed88 100644 --- a/backend/deploy/helm/values.yaml +++ b/backend/deploy/helm/values.yaml @@ -96,6 +96,16 @@ smtp: ai: apiKey: "" agentSimple: "" + agentDiscovery: "" # ag_019d9ec1702675dbbd80e526c8957ce2 in production + rateLimitRps: 1 + +# Discovery cron — token passed via CI secrets during deploy. +discovery: + enabled: true + schedule: "0 */4 * * *" + token: "" + batchSize: 4 + forwardMonths: 12 # Cloudflare Turnstile — passed via Woodpecker secrets during deploy. turnstile: diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 7f4c362..a5e4a7b 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "log/slog" "os" "strconv" "time" @@ -21,12 +22,21 @@ type Config struct { Turnstile TurnstileConfig Notification NotificationConfig AI AIConfig + Discovery DiscoveryConfig +} + +type DiscoveryConfig struct { + Token string // bearer token for /tick endpoint + BatchSize int // buckets per tick (default 4) + ForwardMonths int // forward window in months (default 12) } type AIConfig struct { - APIKey string - AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search) - ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest) + APIKey string + AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search) + AgentDiscovery string // Agent ID for discovery pipeline (Task 7) + ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest) + RateLimitRPS float64 // Max requests per second to Mistral (0 = disabled) } type AppConfig struct { @@ -169,6 +179,26 @@ func Load() (*Config, error) { return nil, fmt.Errorf("SMTP_PORT: %w", err) } + rpsAI, err := envFloat("AI_RATE_LIMIT_RPS", 1.0) + if err != nil { + return nil, fmt.Errorf("AI_RATE_LIMIT_RPS: %w", err) + } + + batchSize, err := envInt("DISCOVERY_BATCH_SIZE", 4) + if err != nil { + return nil, fmt.Errorf("DISCOVERY_BATCH_SIZE: %w", err) + } + + forwardMonths, err := envInt("DISCOVERY_FORWARD_MONTHS", 12) + if err != nil { + return nil, fmt.Errorf("DISCOVERY_FORWARD_MONTHS: %w", err) + } + + discoveryToken := envStr("DISCOVERY_TOKEN", "") + if discoveryToken == "" { + slog.Warn("DISCOVERY_TOKEN is empty; /api/v1/admin/discovery/tick is disabled") + } + jwtSecret := envStr("JWT_SECRET", "") if jwtSecret == "" { return nil, fmt.Errorf("JWT_SECRET is required") @@ -248,9 +278,16 @@ func Load() (*Config, error) { FrontendURL: envStr("FRONTEND_URL", "http://localhost:5173"), }, AI: AIConfig{ - APIKey: envStr("AI_API_KEY", ""), - AgentSimple: envStr("AI_AGENT_SIMPLE", ""), - ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"), + APIKey: envStr("AI_API_KEY", ""), + AgentSimple: envStr("AI_AGENT_SIMPLE", ""), + AgentDiscovery: envStr("AI_AGENT_DISCOVERY", ""), + ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"), + RateLimitRPS: rpsAI, + }, + Discovery: DiscoveryConfig{ + Token: discoveryToken, + BatchSize: batchSize, + ForwardMonths: forwardMonths, }, }, nil } diff --git a/backend/internal/domain/discovery/agent_client.go b/backend/internal/domain/discovery/agent_client.go new file mode 100644 index 0000000..fac0f31 --- /dev/null +++ b/backend/internal/domain/discovery/agent_client.go @@ -0,0 +1,115 @@ +package discovery + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "marktvogt.de/backend/internal/pkg/ai" +) + +// AgentClient wraps the Mistral Pass 0 agent for discovery. +type AgentClient struct { + ai *ai.Client + agentID string +} + +func NewAgentClient(aiClient *ai.Client, agentID string) *AgentClient { + return &AgentClient{ai: aiClient, agentID: agentID} +} + +func (c *AgentClient) Enabled() bool { + return c.ai != nil && c.agentID != "" +} + +// Discover runs Pass 0 for the given bucket. The agent's full instructions +// are set in the Mistral console (see spec §6.2). We only inject the bucket +// parameters here. +func (c *AgentClient) Discover(ctx context.Context, b Bucket) (Pass0Response, error) { + if !c.Enabled() { + return Pass0Response{}, fmt.Errorf("discovery agent not configured") + } + prompt := fmt.Sprintf( + "Bucket:\nland: %s\nregion: %s\njahr_monat: %s\n\nFinde alle Maerkte in diesem Bucket und antworte im vorgegebenen JSON-Format.", + b.Land, b.Region, b.YearMonth, + ) + result, err := c.ai.Pass0(ctx, c.agentID, prompt) + if err != nil { + return Pass0Response{}, fmt.Errorf("mistral pass0: %w", err) + } + return parsePass0Response(result.Content) +} + +func parsePass0Response(raw string) (Pass0Response, error) { + cleaned := extractJSON(raw) + cleaned = stripJSONComments(cleaned) + var out Pass0Response + if err := json.Unmarshal([]byte(cleaned), &out); err != nil { + return Pass0Response{}, fmt.Errorf("unmarshal pass0: %w (raw first 500: %q)", err, truncate(raw, 500)) + } + return out, nil +} + +// --- JSON helpers (independent copy; logic mirrors domain/market/research.go). +// Do not import from the market package — keeping packages decoupled. + +func extractJSON(s string) string { + start := strings.IndexByte(s, '{') + if start < 0 { + return s + } + s = s[start:] + depth := 0 + for i := 0; i < len(s); i++ { + switch s[i] { + case '{': + depth++ + case '}': + depth-- + if depth == 0 { + return s[:i+1] + } + } + } + return s +} + +func stripJSONComments(s string) string { + var result []byte + inString := false + escaped := false + for i := 0; i < len(s); i++ { + c := s[i] + if escaped { + result = append(result, c) + escaped = false + continue + } + if c == '\\' && inString { + result = append(result, c) + escaped = true + continue + } + if c == '"' { + inString = !inString + result = append(result, c) + continue + } + if !inString && c == '/' && i+1 < len(s) && s[i+1] == '/' { + for i < len(s) && s[i] != '\n' { + i++ + } + continue + } + result = append(result, c) + } + return string(result) +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] +} diff --git a/backend/internal/domain/discovery/agent_client_test.go b/backend/internal/domain/discovery/agent_client_test.go new file mode 100644 index 0000000..7932881 --- /dev/null +++ b/backend/internal/domain/discovery/agent_client_test.go @@ -0,0 +1,74 @@ +package discovery + +import "testing" + +func TestParsePass0_Valid(t *testing.T) { + raw := `{ + "bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"}, + "recherche_datum": "2026-04-18", + "quellen_gesamt": ["https://a.example", "https://b.example"], + "maerkte": [ + { + "markt_name": "Mittelaltermarkt Trostberg", + "stadt": "Trostberg", + "bundesland": "Bayern", + "start_datum": "2026-09-12", + "end_datum": "2026-09-14", + "website": "https://trostberg.example", + "quellen": ["https://a.example"], + "extraktion": "verbatim", + "hinweis": null + } + ] +}` + got, err := parsePass0Response(raw) + if err != nil { + t.Fatalf("parse err: %v", err) + } + if got.Bucket.Region != "Bayern" { + t.Errorf("region = %q, want Bayern", got.Bucket.Region) + } + if len(got.Maerkte) != 1 || got.Maerkte[0].MarktName != "Mittelaltermarkt Trostberg" { + t.Errorf("unexpected markets: %+v", got.Maerkte) + } +} + +func TestParsePass0_WithCommentsAndTrailingText(t *testing.T) { + raw := `Here is the JSON: +{ + "bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"}, + // a comment the agent added + "recherche_datum": "2026-04-18", + "quellen_gesamt": [], + "maerkte": [] +} +end.` + got, err := parsePass0Response(raw) + if err != nil { + t.Fatalf("parse err: %v", err) + } + if got.Bucket.Region != "Bayern" || len(got.Maerkte) != 0 { + t.Errorf("unexpected: %+v", got) + } +} + +func TestParsePass0_Malformed(t *testing.T) { + raw := `not JSON at all` + if _, err := parsePass0Response(raw); err == nil { + t.Error("expected error on non-JSON input") + } +} + +func TestParsePass0_EmptyMaerkte(t *testing.T) { + raw := `{"bucket":{"land":"Deutschland","region":"Bayern","jahr_monat":"2026-09"},"recherche_datum":"","quellen_gesamt":[],"maerkte":[]}` + got, err := parsePass0Response(raw) + if err != nil { + t.Fatalf("parse err: %v", err) + } + if got.Maerkte == nil { + got.Maerkte = []Pass0Market{} // nil vs empty is fine + } + if len(got.Maerkte) != 0 { + t.Errorf("expected empty, got %+v", got.Maerkte) + } +} diff --git a/backend/internal/domain/discovery/handler.go b/backend/internal/domain/discovery/handler.go new file mode 100644 index 0000000..90e978c --- /dev/null +++ b/backend/internal/domain/discovery/handler.go @@ -0,0 +1,118 @@ +package discovery + +import ( + "log/slog" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "marktvogt.de/backend/internal/pkg/apierror" +) + +type Handler struct { + service *Service +} + +func NewHandler(s *Service) *Handler { + return &Handler{service: s} +} + +func (h *Handler) Tick(c *gin.Context) { + summary, err := h.service.Tick(c.Request.Context()) + if err != nil { + slog.ErrorContext(c.Request.Context(), "discovery tick failed", "error", err) + apiErr := apierror.Internal("tick failed") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.JSON(http.StatusOK, gin.H{"data": summary}) +} + +func (h *Handler) ListQueue(c *gin.Context) { + limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50")) + offset, _ := strconv.Atoi(c.DefaultQuery("offset", "0")) + if limit < 1 || limit > 200 { + limit = 50 + } + if offset < 0 { + offset = 0 + } + rows, err := h.service.ListPendingQueue(c.Request.Context(), limit, offset) + if err != nil { + apiErr := apierror.Internal("list queue failed") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.JSON(http.StatusOK, gin.H{"data": rows}) +} + +func (h *Handler) Accept(c *gin.Context) { + id, err := uuid.Parse(c.Param("id")) + if err != nil { + apiErr := apierror.BadRequest("invalid_id", "invalid queue id") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + reviewer, ok := currentUserID(c) + if !ok { + apiErr := apierror.Unauthorized("no user in context") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + seriesID, editionID, err := h.service.Accept(c.Request.Context(), id, reviewer) + if err != nil { + slog.WarnContext(c.Request.Context(), "accept failed", "queue_id", id, "error", err) + apiErr := apierror.Internal("accept failed") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.JSON(http.StatusOK, gin.H{"data": gin.H{"series_id": seriesID, "edition_id": editionID}}) +} + +type rejectRequest struct { + Reason string `json:"reason" validate:"max=2000"` +} + +func (h *Handler) Reject(c *gin.Context) { + id, err := uuid.Parse(c.Param("id")) + if err != nil { + apiErr := apierror.BadRequest("invalid_id", "invalid queue id") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + var req rejectRequest + // Empty body is OK (reason is optional). Non-EOF parse errors are tolerated; + // we fall through with zero-value req in all error cases. + _ = c.ShouldBindJSON(&req) + reviewer, ok := currentUserID(c) + if !ok { + apiErr := apierror.Unauthorized("no user in context") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + if err := h.service.Reject(c.Request.Context(), id, reviewer, req.Reason); err != nil { + slog.WarnContext(c.Request.Context(), "reject failed", "queue_id", id, "error", err) + apiErr := apierror.Internal("reject failed") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.Status(http.StatusNoContent) +} + +func currentUserID(c *gin.Context) (uuid.UUID, bool) { + raw, exists := c.Get("user_id") + if !exists { + return uuid.Nil, false + } + switch v := raw.(type) { + case uuid.UUID: + return v, true + case string: + id, err := uuid.Parse(v) + return id, err == nil + default: + return uuid.Nil, false + } +} diff --git a/backend/internal/domain/discovery/mock_repo_test.go b/backend/internal/domain/discovery/mock_repo_test.go new file mode 100644 index 0000000..e90a7e2 --- /dev/null +++ b/backend/internal/domain/discovery/mock_repo_test.go @@ -0,0 +1,82 @@ +package discovery + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" +) + +// Compile-time guard: mockRepo must fully satisfy Repository. +var _ Repository = (*mockRepo)(nil) + +type mockRepo struct { + pickStaleFn func(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) + updateBucketFn func(ctx context.Context, id uuid.UUID, errMsg string) error + listSeriesFn func(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) + editionExistsFn func(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) + insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) + isRejectedFn func(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) + queuePendingFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) + getDiscoveredFn func(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) + beginTxFn func(ctx context.Context) (pgx.Tx, error) + markAcceptedFn func(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error + markRejectedFn func(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error + insertRejFn func(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error +} + +func (m *mockRepo) PickStaleBuckets(ctx context.Context, fm, lim int) ([]Bucket, error) { + return m.pickStaleFn(ctx, fm, lim) +} +func (m *mockRepo) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error { + return m.updateBucketFn(ctx, id, errMsg) +} +func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) { + return m.listSeriesFn(ctx, c) +} +func (m *mockRepo) EditionExists(ctx context.Context, sid uuid.UUID, y int) (bool, error) { + return m.editionExistsFn(ctx, sid, y) +} +func (m *mockRepo) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) { + return m.insertDiscFn(ctx, d) +} +func (m *mockRepo) IsRejected(ctx context.Context, n, s string, y int) (bool, error) { + return m.isRejectedFn(ctx, n, s, y) +} +func (m *mockRepo) QueueHasPending(ctx context.Context, n, s string, sd *time.Time) (bool, error) { + return m.queuePendingFn(ctx, n, s, sd) +} +func (m *mockRepo) ListQueue(ctx context.Context, status string, l, o int) ([]DiscoveredMarket, error) { + return nil, nil +} +func (m *mockRepo) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) { + if m.getDiscoveredFn != nil { + return m.getDiscoveredFn(ctx, id) + } + return DiscoveredMarket{}, nil +} +func (m *mockRepo) BeginTx(ctx context.Context) (pgx.Tx, error) { + if m.beginTxFn != nil { + return m.beginTxFn(ctx) + } + return nil, nil +} +func (m *mockRepo) MarkAccepted(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error { + if m.markAcceptedFn != nil { + return m.markAcceptedFn(ctx, tx, id, eid, r) + } + return nil +} +func (m *mockRepo) MarkRejected(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error { + if m.markRejectedFn != nil { + return m.markRejectedFn(ctx, tx, id, r) + } + return nil +} +func (m *mockRepo) InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error { + if m.insertRejFn != nil { + return m.insertRejFn(ctx, tx, r) + } + return nil +} diff --git a/backend/internal/domain/discovery/model.go b/backend/internal/domain/discovery/model.go new file mode 100644 index 0000000..df4809a --- /dev/null +++ b/backend/internal/domain/discovery/model.go @@ -0,0 +1,86 @@ +// backend/internal/domain/discovery/model.go +package discovery + +import ( + "time" + + "github.com/google/uuid" +) + +// Bucket is a scheduler row: one (land, region, year_month) tuple. +type Bucket struct { + ID uuid.UUID + Land string + Region string + YearMonth string // 'YYYY-MM' + LastQueriedAt *time.Time + LastError string + CreatedAt time.Time +} + +// DiscoveredMarket is a queue entry awaiting admin review. +type DiscoveredMarket struct { + ID uuid.UUID + BucketID uuid.UUID + MarktName string + Stadt string + Bundesland string + Land string + StartDatum *time.Time + EndDatum *time.Time + Website string + Quellen []string + Extraktion string + Hinweis string + NameNormalized string + MatchedSeriesID *uuid.UUID + Status string // 'pending' | 'accepted' | 'rejected' + DiscoveredAt time.Time + ReviewedAt *time.Time + ReviewedBy *uuid.UUID + CreatedEditionID *uuid.UUID +} + +// RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year). +type RejectedDiscovery struct { + ID uuid.UUID + NameNormalized string + Stadt string + Year int + RejectedAt time.Time + RejectedBy *uuid.UUID + Reason string +} + +// Pass0Response is the parsed shape of the Mistral Pass 0 agent reply. +type Pass0Response struct { + Bucket Pass0Bucket `json:"bucket"` + RechercheDatum string `json:"recherche_datum"` + QuellenGesamt []string `json:"quellen_gesamt"` + Maerkte []Pass0Market `json:"maerkte"` +} + +type Pass0Bucket struct { + Land string `json:"land"` + Region string `json:"region"` + JahrMonat string `json:"jahr_monat"` +} + +type Pass0Market struct { + MarktName string `json:"markt_name"` + Stadt string `json:"stadt"` + Bundesland string `json:"bundesland"` + StartDatum string `json:"start_datum"` // 'YYYY-MM-DD' or "" + EndDatum string `json:"end_datum"` + Website string `json:"website"` + Quellen []string `json:"quellen"` + Extraktion string `json:"extraktion"` + Hinweis string `json:"hinweis"` +} + +// Status constants for discovered_markets. +const ( + StatusPending = "pending" + StatusAccepted = "accepted" + StatusRejected = "rejected" +) diff --git a/backend/internal/domain/discovery/normalize.go b/backend/internal/domain/discovery/normalize.go new file mode 100644 index 0000000..1496678 --- /dev/null +++ b/backend/internal/domain/discovery/normalize.go @@ -0,0 +1,136 @@ +package discovery + +import ( + "strings" + "unicode" +) + +// umlautMap expands German diacritics. +var umlautMap = map[rune]string{ + 'ä': "ae", 'ö': "oe", 'ü': "ue", 'ß': "ss", + 'Ä': "ae", 'Ö': "oe", 'Ü': "ue", +} + +// stripWords are removed from the start and end of normalized market names. +// Kept short intentionally — over-aggressive stripping collides unrelated markets. +var stripWords = map[string]struct{}{ + "mittelaltermarkt": {}, + "mittelalterlicher": {}, + "markt": {}, + "zu": {}, + "am": {}, + "der": {}, + "die": {}, + "das": {}, + "auf": {}, + "dem": {}, + "den": {}, + "in": {}, + "im": {}, +} + +// NormalizeName returns a stable, dedup-safe form of a market name: +// lowercase, umlauts expanded, punctuation stripped, whitespace collapsed, +// leading/trailing filler words removed. +// +// Used exclusively for matching. NOT suitable for slugs — see slugify() in service.go +// for the URL-safe form that preserves identifying words. +func NormalizeName(s string) string { + s = expandUmlauts(s) + s = toLowerAlnumSpace(s) + return trimStripWords(s) +} + +// NormalizeCity lowercases, expands umlauts, and collapses internal whitespace. +// Keeps hyphens (Baden-Baden) and punctuation (St. Wendel). Used for +// pre-filtering series candidates by city. +func NormalizeCity(s string) string { + s = expandUmlauts(s) + s = strings.ToLower(s) + return strings.Join(strings.Fields(s), " ") +} + +func expandUmlauts(s string) string { + var b strings.Builder + b.Grow(len(s)) + for _, r := range s { + if rep, ok := umlautMap[r]; ok { + b.WriteString(rep) + } else { + b.WriteRune(r) + } + } + return b.String() +} + +func toLowerAlnumSpace(s string) string { + var b strings.Builder + b.Grow(len(s)) + for _, r := range s { + switch { + case unicode.IsLetter(r) || unicode.IsDigit(r): + b.WriteRune(unicode.ToLower(r)) + default: + b.WriteRune(' ') + } + } + return b.String() +} + +func trimStripWords(s string) string { + tokens := strings.Fields(s) + // trim from left + for len(tokens) > 0 { + if _, ok := stripWords[tokens[0]]; !ok { + break + } + if shouldStopStripping(tokens[1:]) { + break + } + tokens = tokens[1:] + } + // trim from right + for len(tokens) > 0 { + if _, ok := stripWords[tokens[len(tokens)-1]]; !ok { + break + } + if shouldStopStripping(tokens[:len(tokens)-1]) { + break + } + tokens = tokens[:len(tokens)-1] + } + return strings.Join(tokens, " ") +} + +// shouldStopStripping reports whether the tokens remaining after a hypothetical +// edge-strip would be "meaningless" — i.e., contain only stripwords and purely +// numeric tokens. When true, the caller should preserve the edge stripword to +// avoid destroying identifying content (e.g. "Markt 2026" → "2026"). When the +// remaining set is entirely stripwords (no numerics), stripping continues; +// that's what lets "der die das" reduce to "". +func shouldStopStripping(remaining []string) bool { + nonStripwordCount := 0 + allNumeric := true + for _, t := range remaining { + if _, ok := stripWords[t]; ok { + continue + } + nonStripwordCount++ + if !isNumericOnly(t) { + allNumeric = false + } + } + return nonStripwordCount > 0 && allNumeric +} + +func isNumericOnly(s string) bool { + if s == "" { + return false + } + for _, r := range s { + if !unicode.IsDigit(r) { + return false + } + } + return true +} diff --git a/backend/internal/domain/discovery/normalize_test.go b/backend/internal/domain/discovery/normalize_test.go new file mode 100644 index 0000000..ab94ca8 --- /dev/null +++ b/backend/internal/domain/discovery/normalize_test.go @@ -0,0 +1,51 @@ +package discovery + +import "testing" + +func TestNormalizeName(t *testing.T) { + tests := []struct { + name, in, want string + }{ + {"lowercase", "Trostberg", "trostberg"}, + {"umlauts", "Rüdesheim", "ruedesheim"}, + {"eszett", "Straßburg", "strassburg"}, + {"punctuation", "St. Goar!", "st goar"}, + {"collapse_whitespace", "Bad Muskau", "bad muskau"}, + {"strip_market_prefix", "Mittelaltermarkt zu Trostberg", "trostberg"}, + {"strip_market_suffix", "Trostberg Mittelaltermarkt", "trostberg"}, + {"strip_der", "Markt der Ritter", "ritter"}, + {"preserve_numbers", "Markt 2026", "markt 2026"}, + {"empty", "", ""}, + {"only_stripwords", "der die das", ""}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NormalizeName(tc.in) + if got != tc.want { + t.Errorf("NormalizeName(%q) = %q, want %q", tc.in, got, tc.want) + } + }) + } +} + +func TestNormalizeCity(t *testing.T) { + tests := []struct { + name, in, want string + }{ + {"umlaut", "München", "muenchen"}, + {"hyphen_preserved", "Baden-Baden", "baden-baden"}, + {"whitespace_trim", " Trostberg ", "trostberg"}, + {"empty", "", ""}, + {"eszett", "Großenhain", "grossenhain"}, + {"internal_double_whitespace", "Bad Muskau", "bad muskau"}, + {"punctuation_preserved", "St. Wendel", "st. wendel"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NormalizeCity(tc.in) + if got != tc.want { + t.Errorf("NormalizeCity(%q) = %q, want %q", tc.in, got, tc.want) + } + }) + } +} diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go new file mode 100644 index 0000000..4425ab9 --- /dev/null +++ b/backend/internal/domain/discovery/repository.go @@ -0,0 +1,216 @@ +// backend/internal/domain/discovery/repository.go +package discovery + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type Repository interface { + PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) + UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error + ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) + EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) + InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) + IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) + QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) + ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error) + GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) + MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error + MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error + InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error + BeginTx(ctx context.Context) (pgx.Tx, error) +} + +// SeriesCandidate is a minimal projection used for name-normalization comparison in Go. +type SeriesCandidate struct { + ID uuid.UUID + Name string + City string +} + +type pgRepository struct { + pool *pgxpool.Pool +} + +func NewRepository(pool *pgxpool.Pool) Repository { + return &pgRepository{pool: pool} +} + +func (r *pgRepository) PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) { + q := ` +SELECT id, land, region, year_month, last_queried_at, coalesce(last_error, ''), created_at +FROM discovery_buckets +WHERE year_month >= to_char(date_trunc('month', now()), 'YYYY-MM') + AND year_month <= to_char(date_trunc('month', now()) + ($1 * interval '1 month'), 'YYYY-MM') + AND (last_queried_at IS NULL OR last_queried_at < now() - interval '7 days') +ORDER BY last_queried_at NULLS FIRST, year_month +LIMIT $2` + rows, err := r.pool.Query(ctx, q, forwardMonths, limit) + if err != nil { + return nil, fmt.Errorf("pick buckets: %w", err) + } + defer rows.Close() + var out []Bucket + for rows.Next() { + var b Bucket + if err := rows.Scan(&b.ID, &b.Land, &b.Region, &b.YearMonth, &b.LastQueriedAt, &b.LastError, &b.CreatedAt); err != nil { + return nil, err + } + out = append(out, b) + } + return out, rows.Err() +} + +func (r *pgRepository) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error { + var errValue any + if errMsg == "" { + errValue = nil + } else { + errValue = errMsg + } + _, err := r.pool.Exec(ctx, + `UPDATE discovery_buckets SET last_queried_at = now(), last_error = $2 WHERE id = $1`, + id, errValue) + return err +} + +func (r *pgRepository) ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) { + rows, err := r.pool.Query(ctx, + `SELECT id, name, city FROM market_series WHERE LOWER(city) = $1`, + cityNormalized) + if err != nil { + return nil, err + } + defer rows.Close() + var out []SeriesCandidate + for rows.Next() { + var c SeriesCandidate + if err := rows.Scan(&c.ID, &c.Name, &c.City); err != nil { + return nil, err + } + out = append(out, c) + } + return out, rows.Err() +} + +func (r *pgRepository) EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) { + var exists bool + err := r.pool.QueryRow(ctx, + `SELECT EXISTS(SELECT 1 FROM market_editions WHERE series_id = $1 AND year = $2)`, + seriesID, year).Scan(&exists) + return exists, err +} + +func (r *pgRepository) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) { + var id uuid.UUID + err := r.pool.QueryRow(ctx, ` +INSERT INTO discovered_markets + (bucket_id, markt_name, stadt, bundesland, land, start_datum, end_datum, website, + quellen, extraktion, hinweis, name_normalized, matched_series_id) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) +RETURNING id`, + d.BucketID, d.MarktName, d.Stadt, d.Bundesland, d.Land, d.StartDatum, d.EndDatum, d.Website, + d.Quellen, d.Extraktion, d.Hinweis, d.NameNormalized, d.MatchedSeriesID).Scan(&id) + return id, err +} + +func (r *pgRepository) IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) { + var exists bool + err := r.pool.QueryRow(ctx, + `SELECT EXISTS(SELECT 1 FROM rejected_discoveries WHERE name_normalized=$1 AND stadt=$2 AND year=$3)`, + nameNormalized, stadt, year).Scan(&exists) + return exists, err +} + +func (r *pgRepository) QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) { + var exists bool + err := r.pool.QueryRow(ctx, ` +SELECT EXISTS( + SELECT 1 FROM discovered_markets + WHERE status='pending' + AND name_normalized=$1 AND stadt=$2 + AND start_datum IS NOT DISTINCT FROM $3 +)`, nameNormalized, stadt, startDatum).Scan(&exists) + return exists, err +} + +func (r *pgRepository) ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error) { + rows, err := r.pool.Query(ctx, ` +SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, + start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''), + coalesce(hinweis,''), name_normalized, matched_series_id, status, + discovered_at, reviewed_at, reviewed_by, created_edition_id +FROM discovered_markets +WHERE status = $1 +ORDER BY discovered_at DESC +LIMIT $2 OFFSET $3`, status, limit, offset) + if err != nil { + return nil, err + } + defer rows.Close() + var out []DiscoveredMarket + for rows.Next() { + var d DiscoveredMarket + if err := rows.Scan( + &d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land, + &d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion, + &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status, + &d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID, + ); err != nil { + return nil, err + } + out = append(out, d) + } + return out, rows.Err() +} + +func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) { + var d DiscoveredMarket + err := r.pool.QueryRow(ctx, ` +SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, + start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''), + coalesce(hinweis,''), name_normalized, matched_series_id, status, + discovered_at, reviewed_at, reviewed_by, created_edition_id +FROM discovered_markets WHERE id = $1`, id).Scan( + &d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land, + &d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion, + &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status, + &d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID, + ) + return d, err +} + +func (r *pgRepository) MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error { + _, err := tx.Exec(ctx, ` +UPDATE discovered_markets +SET status='accepted', reviewed_at=now(), reviewed_by=$2, created_edition_id=$3 +WHERE id = $1 AND status='pending'`, id, reviewer, editionID) + return err +} + +func (r *pgRepository) MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error { + _, err := tx.Exec(ctx, ` +UPDATE discovered_markets +SET status='rejected', reviewed_at=now(), reviewed_by=$2 +WHERE id = $1 AND status='pending'`, id, reviewer) + return err +} + +func (r *pgRepository) InsertRejection(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error { + _, err := tx.Exec(ctx, ` +INSERT INTO rejected_discoveries (name_normalized, stadt, year, rejected_by, reason) +VALUES ($1, $2, $3, $4, $5) +ON CONFLICT (name_normalized, stadt, year) DO NOTHING`, + rej.NameNormalized, rej.Stadt, rej.Year, rej.RejectedBy, rej.Reason) + return err +} + +func (r *pgRepository) BeginTx(ctx context.Context) (pgx.Tx, error) { + return r.pool.BeginTx(ctx, pgx.TxOptions{}) +} diff --git a/backend/internal/domain/discovery/routes.go b/backend/internal/domain/discovery/routes.go new file mode 100644 index 0000000..6aba4ce --- /dev/null +++ b/backend/internal/domain/discovery/routes.go @@ -0,0 +1,22 @@ +package discovery + +import "github.com/gin-gonic/gin" + +// RegisterRoutes mounts both the admin-session routes (queue mgmt) and the +// bearer-token route (tick). The two middlewares are passed in separately. +func RegisterRoutes( + rg *gin.RouterGroup, + h *Handler, + requireAuth, requireAdmin, requireTickToken gin.HandlerFunc, +) { + // Machine-driven tick (bearer token). + rg.POST("/admin/discovery/tick", requireTickToken, h.Tick) + + // Admin-session queue routes. + admin := rg.Group("/admin/discovery", requireAuth, requireAdmin) + { + admin.GET("/queue", h.ListQueue) + admin.POST("/queue/:id/accept", h.Accept) + admin.POST("/queue/:id/reject", h.Reject) + } +} diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go new file mode 100644 index 0000000..c42f09c --- /dev/null +++ b/backend/internal/domain/discovery/service.go @@ -0,0 +1,316 @@ +package discovery + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + + "marktvogt.de/backend/internal/domain/market" +) + +type marketCreator interface { + Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error) + CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error) +} + +// Service orchestrates bucket scheduling, agent invocation, and queue management. +type Service struct { + repo Repository + agent *AgentClient + marketCreator marketCreator + batchSize int + forwardMonths int +} + +// NewService constructs a Service. batchSize and forwardMonths configure bucket picking. +func NewService(repo Repository, agent *AgentClient, mc marketCreator, batchSize, forwardMonths int) *Service { + return &Service{ + repo: repo, + agent: agent, + marketCreator: mc, + batchSize: batchSize, + forwardMonths: forwardMonths, + } +} + +// PickBuckets returns stale buckets eligible for the next discovery run. +func (s *Service) PickBuckets(ctx context.Context) ([]Bucket, error) { + return s.repo.PickStaleBuckets(ctx, s.forwardMonths, s.batchSize) +} + +// TickSummary reports what happened in one tick. +type TickSummary struct { + BucketsProcessed int `json:"buckets_processed"` + Discovered int `json:"markets_discovered"` + DedupedExisting int `json:"deduped_existing"` + DedupedRejected int `json:"deduped_rejected"` + DedupedQueue int `json:"deduped_queue"` + Errors int `json:"errors"` +} + +// Tick picks N stale buckets and runs Pass 0 for each, writing net-new discoveries. +func (s *Service) Tick(ctx context.Context) (TickSummary, error) { + if s.agent == nil || !s.agent.Enabled() { + return TickSummary{}, errors.New("discovery agent not configured") + } + buckets, err := s.PickBuckets(ctx) + if err != nil { + return TickSummary{}, fmt.Errorf("pick buckets: %w", err) + } + var summary TickSummary + summary.BucketsProcessed = len(buckets) + for _, b := range buckets { + s.processOneBucket(ctx, b, &summary) + } + return summary, nil +} + +func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) { + resp, err := s.agent.Discover(ctx, b) + if err != nil { + // Retry once after 2s per spec §9. + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Second): + } + resp, err = s.agent.Discover(ctx, b) + if err != nil { + slog.WarnContext(ctx, "pass0 failed twice", "bucket_id", b.ID, "error", err) + _ = s.repo.UpdateBucketQueried(ctx, b.ID, err.Error()) + summary.Errors++ + return + } + } + sub := s.processBucketResponse(ctx, b, resp) + summary.Discovered += sub.Discovered + summary.DedupedExisting += sub.DedupedExisting + summary.DedupedRejected += sub.DedupedRejected + summary.DedupedQueue += sub.DedupedQueue + if err := s.repo.UpdateBucketQueried(ctx, b.ID, ""); err != nil { + slog.ErrorContext(ctx, "update bucket queried", "bucket_id", b.ID, "error", err) + } +} + +func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary { + var summary TickSummary + seen := make(map[string]bool) // in-request dedup + for _, m := range resp.Maerkte { + key := NormalizeName(m.MarktName) + "|" + NormalizeCity(m.Stadt) + "|" + m.StartDatum + if seen[key] { + continue + } + seen[key] = true + + // Series candidates pre-filtered by city. + candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.Stadt)) + if err != nil { + slog.WarnContext(ctx, "list series by city", "city", m.Stadt, "error", err) + continue + } + matchedSeriesID := findSeriesMatch(m.MarktName, candidates) + + startDatum, endDatum := parseOptionalDate(m.StartDatum), parseOptionalDate(m.EndDatum) + year := 0 + if startDatum != nil { + year = startDatum.Year() + } + + // Dedup checks in order (spec §7.1.e). + if matchedSeriesID != nil && year > 0 { + exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year) + if err != nil { + slog.WarnContext(ctx, "edition exists check", "error", err) + continue + } + if exists { + summary.DedupedExisting++ + continue + } + } + nameNorm := NormalizeName(m.MarktName) + if year > 0 { + rejected, err := s.repo.IsRejected(ctx, nameNorm, m.Stadt, year) + if err != nil { + slog.WarnContext(ctx, "is rejected check", "error", err) + continue + } + if rejected { + summary.DedupedRejected++ + continue + } + } + pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.Stadt, startDatum) + if err != nil { + slog.WarnContext(ctx, "queue pending check", "error", err) + continue + } + if pending { + summary.DedupedQueue++ + continue + } + + dm := DiscoveredMarket{ + BucketID: b.ID, + MarktName: m.MarktName, + Stadt: m.Stadt, + Bundesland: m.Bundesland, + Land: b.Land, + StartDatum: startDatum, + EndDatum: endDatum, + Website: m.Website, + Quellen: m.Quellen, + Extraktion: m.Extraktion, + Hinweis: m.Hinweis, + NameNormalized: nameNorm, + MatchedSeriesID: matchedSeriesID, + } + if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil { + slog.WarnContext(ctx, "insert discovered", "error", err) + continue + } + summary.Discovered++ + } + return summary +} + +func parseOptionalDate(s string) *time.Time { + if s == "" { + return nil + } + t, err := time.Parse("2006-01-02", s) + if err != nil { + return nil + } + return &t +} + +// Accept transitions a pending queue entry into a market edition. +// Returns (seriesID, editionID, error). +func (s *Service) Accept(ctx context.Context, queueID, reviewerID uuid.UUID) (uuid.UUID, uuid.UUID, error) { + d, err := s.repo.GetDiscovered(ctx, queueID) + if err != nil { + return uuid.Nil, uuid.Nil, fmt.Errorf("load queue entry: %w", err) + } + if d.Status != StatusPending { + return uuid.Nil, uuid.Nil, fmt.Errorf("queue entry is %s, expected pending", d.Status) + } + if d.StartDatum == nil || d.EndDatum == nil { + return uuid.Nil, uuid.Nil, errors.New("cannot accept entry without start/end date") + } + + var created market.Market + if d.MatchedSeriesID != nil { + req := market.CreateEditionRequest{ + Name: d.MarktName, + City: d.Stadt, + State: d.Bundesland, + Country: landToISO(d.Land), + StartDate: d.StartDatum.Format("2006-01-02"), + EndDate: d.EndDatum.Format("2006-01-02"), + Website: d.Website, + } + created, err = s.marketCreator.CreateEditionForSeries(ctx, *d.MatchedSeriesID, req) + } else { + req := market.CreateMarketRequest{ + Name: d.MarktName, + City: d.Stadt, + State: d.Bundesland, + Country: landToISO(d.Land), + StartDate: d.StartDatum.Format("2006-01-02"), + EndDate: d.EndDatum.Format("2006-01-02"), + Website: d.Website, + } + created, err = s.marketCreator.Create(ctx, req) + } + if err != nil { + return uuid.Nil, uuid.Nil, fmt.Errorf("create market: %w", err) + } + + // Discovery-side transition. + tx, err := s.repo.BeginTx(ctx) + if err != nil { + return created.SeriesID, created.ID, fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + if err := s.repo.MarkAccepted(ctx, tx, queueID, created.ID, reviewerID); err != nil { + return created.SeriesID, created.ID, fmt.Errorf("mark accepted: %w", err) + } + if err := tx.Commit(ctx); err != nil { + return created.SeriesID, created.ID, fmt.Errorf("commit accept tx: %w", err) + } + return created.SeriesID, created.ID, nil +} + +// Reject marks a pending entry rejected and records a sticky rejection. +func (s *Service) Reject(ctx context.Context, queueID, reviewerID uuid.UUID, reason string) error { + d, err := s.repo.GetDiscovered(ctx, queueID) + if err != nil { + return fmt.Errorf("load queue entry: %w", err) + } + if d.Status != StatusPending { + return fmt.Errorf("queue entry is %s, expected pending", d.Status) + } + year := time.Now().Year() + if d.StartDatum != nil { + year = d.StartDatum.Year() + } + + tx, err := s.repo.BeginTx(ctx) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + if err := s.repo.MarkRejected(ctx, tx, queueID, reviewerID); err != nil { + return fmt.Errorf("mark rejected: %w", err) + } + rejection := RejectedDiscovery{ + NameNormalized: d.NameNormalized, + Stadt: d.Stadt, + Year: year, + RejectedBy: &reviewerID, + Reason: reason, + } + if err := s.repo.InsertRejection(ctx, tx, rejection); err != nil { + return fmt.Errorf("insert rejection: %w", err) + } + return tx.Commit(ctx) +} + +// landToISO maps the Pass 0 land string to ISO-2 codes required by market.CreateMarketRequest. +func landToISO(land string) string { + switch land { + case "Deutschland": + return "DE" + case "Österreich", "Oesterreich": + return "AT" + case "Schweiz": + return "CH" + } + return "" +} + +// ListPendingQueue exposes queue listing for the handler layer. +func (s *Service) ListPendingQueue(ctx context.Context, limit, offset int) ([]DiscoveredMarket, error) { + return s.repo.ListQueue(ctx, StatusPending, limit, offset) +} + +// findSeriesMatch returns the ID of the first candidate whose normalized name matches +// incomingName after normalization. Candidates are expected to be pre-filtered by city. +func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID { + target := NormalizeName(incomingName) + if target == "" { + return nil + } + for _, c := range candidates { + if NormalizeName(c.Name) == target { + id := c.ID + return &id + } + } + return nil +} diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go new file mode 100644 index 0000000..6177274 --- /dev/null +++ b/backend/internal/domain/discovery/service_test.go @@ -0,0 +1,219 @@ +package discovery + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + + "marktvogt.de/backend/internal/domain/market" +) + +func TestFindSeriesMatch(t *testing.T) { + target := SeriesCandidate{ + ID: uuid.MustParse("11111111-1111-1111-1111-111111111111"), + Name: "Mittelaltermarkt Trostberg", + City: "Trostberg", + } + other := SeriesCandidate{ + ID: uuid.MustParse("22222222-2222-2222-2222-222222222222"), + Name: "Ritterfest Straßburg", + City: "Straßburg", + } + tests := []struct { + name string + incoming string + cands []SeriesCandidate + wantID *uuid.UUID + }{ + {"exact match", "Mittelaltermarkt Trostberg", []SeriesCandidate{target, other}, &target.ID}, + {"suffix variant matches", "Trostberg Mittelaltermarkt", []SeriesCandidate{target}, &target.ID}, + {"umlaut match", "Ritterfest Strassburg", []SeriesCandidate{other}, &other.ID}, + {"no match", "Ganz anderer Markt", []SeriesCandidate{target, other}, nil}, + {"empty candidates", "Mittelaltermarkt Trostberg", nil, nil}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := findSeriesMatch(tc.incoming, tc.cands) + switch { + case tc.wantID == nil && got == nil: + // ok + case tc.wantID != nil && got != nil && *got == *tc.wantID: + // ok + default: + t.Errorf("got %v, want %v", got, tc.wantID) + } + }) + } +} + +func TestPickBucketsPassesConfigToRepo(t *testing.T) { + var gotFM, gotLim int + m := &mockRepo{ + pickStaleFn: func(_ context.Context, fm, lim int) ([]Bucket, error) { + gotFM, gotLim = fm, lim + return []Bucket{{ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09"}}, nil + }, + } + svc := NewService(m, nil, nil, 4, 12) + got, err := svc.PickBuckets(context.Background()) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(got) != 1 { + t.Errorf("got %d buckets, want 1", len(got)) + } + if gotFM != 12 || gotLim != 4 { + t.Errorf("forwarded fm=%d lim=%d, want 12,4", gotFM, gotLim) + } +} + +func TestProcessBucket_DedupsExisting(t *testing.T) { + bucket := Bucket{ + ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09", + } + seriesID := uuid.New() + var inserted []DiscoveredMarket + m := &mockRepo{ + listSeriesFn: func(_ context.Context, city string) ([]SeriesCandidate, error) { + return []SeriesCandidate{{ID: seriesID, Name: "Mittelaltermarkt Trostberg", City: city}}, nil + }, + editionExistsFn: func(_ context.Context, _ uuid.UUID, year int) (bool, error) { + return year == 2026, nil // 2026 edition exists → dedup + }, + isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }, + queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }, + insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) { + inserted = append(inserted, d) + return uuid.New(), nil + }, + updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil }, + } + svc := NewService(m, nil, nil, 4, 12) + + resp := Pass0Response{ + Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"}, + Maerkte: []Pass0Market{ + {MarktName: "Mittelaltermarkt Trostberg", Stadt: "Trostberg", StartDatum: "2026-09-12", EndDatum: "2026-09-14", Quellen: []string{"https://x"}}, + }, + } + summary := svc.processBucketResponse(context.Background(), bucket, resp) + if summary.Discovered != 0 { + t.Errorf("expected 0 discovered, got %d", summary.Discovered) + } + if summary.DedupedExisting != 1 { + t.Errorf("expected 1 deduped_existing, got %d", summary.DedupedExisting) + } + if len(inserted) != 0 { + t.Errorf("expected no inserts, got %d", len(inserted)) + } +} + +func TestProcessBucket_InsertsNetNew(t *testing.T) { + bucket := Bucket{ + ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09", + } + var inserted []DiscoveredMarket + m := &mockRepo{ + listSeriesFn: func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil }, + editionExistsFn: func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil }, + isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }, + queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }, + insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) { + inserted = append(inserted, d) + return uuid.New(), nil + }, + updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil }, + } + svc := NewService(m, nil, nil, 4, 12) + + resp := Pass0Response{ + Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"}, + Maerkte: []Pass0Market{ + {MarktName: "Neuer Markt", Stadt: "Passau", StartDatum: "2026-09-20", EndDatum: "2026-09-22", Quellen: []string{"https://x"}}, + }, + } + summary := svc.processBucketResponse(context.Background(), bucket, resp) + if summary.Discovered != 1 { + t.Errorf("expected 1 discovered, got %d", summary.Discovered) + } + if len(inserted) != 1 { + t.Errorf("expected 1 insert, got %d", len(inserted)) + } + if inserted[0].NameNormalized != "neuer" { + t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized) + } +} + +type stubCreator struct { + createCalls int + createEditionForSeriesCalls int + returnErr error +} + +func (c *stubCreator) Create(_ context.Context, _ market.CreateMarketRequest) (market.Market, error) { + c.createCalls++ + if c.returnErr != nil { + return market.Market{}, c.returnErr + } + return market.Market{ID: uuid.New(), SeriesID: uuid.New()}, nil +} + +func (c *stubCreator) CreateEditionForSeries(_ context.Context, sid uuid.UUID, _ market.CreateEditionRequest) (market.Market, error) { + c.createEditionForSeriesCalls++ + if c.returnErr != nil { + return market.Market{}, c.returnErr + } + return market.Market{ID: uuid.New(), SeriesID: sid}, nil +} + +type noopTx struct{ pgx.Tx } + +func (noopTx) Commit(_ context.Context) error { return nil } +func (noopTx) Rollback(_ context.Context) error { return nil } + +func TestAccept_NewSeries_CallsCreate(t *testing.T) { + start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC) + end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC) + qID := uuid.New() + m := &mockRepo{ + getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{ID: qID, Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end}, nil + }, + beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil }, + markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, + } + mc := &stubCreator{} + svc := NewService(m, nil, mc, 4, 12) + _, _, err := svc.Accept(context.Background(), qID, uuid.New()) + if err != nil { + t.Fatalf("accept err: %v", err) + } + if mc.createCalls != 1 || mc.createEditionForSeriesCalls != 0 { + t.Errorf("expected Create=1 CreateEdition=0, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls) + } +} + +func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) { + start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC) + end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC) + sid := uuid.New() + m := &mockRepo{ + getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end, MatchedSeriesID: &sid}, nil + }, + beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil }, + markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, + } + mc := &stubCreator{} + svc := NewService(m, nil, mc, 4, 12) + _, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New()) + if err != nil { + t.Fatalf("accept err: %v", err) + } + if mc.createCalls != 0 || mc.createEditionForSeriesCalls != 1 { + t.Errorf("expected Create=0 CreateEdition=1, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls) + } +} diff --git a/backend/internal/middleware/bearer_token.go b/backend/internal/middleware/bearer_token.go new file mode 100644 index 0000000..b524590 --- /dev/null +++ b/backend/internal/middleware/bearer_token.go @@ -0,0 +1,38 @@ +package middleware + +import ( + "crypto/subtle" + "strings" + + "github.com/gin-gonic/gin" + + "marktvogt.de/backend/internal/pkg/apierror" +) + +// RequireBearerToken validates Authorization: Bearer against the given +// secret using constant-time compare. An empty secret disables the route (all +// requests are rejected) — a safety default for dev environments that haven't +// set the token. +func RequireBearerToken(secret string) gin.HandlerFunc { + return func(c *gin.Context) { + if secret == "" { + apiErr := apierror.Unauthorized("endpoint disabled") + c.AbortWithStatusJSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + auth := c.GetHeader("Authorization") + const prefix = "Bearer " + if !strings.HasPrefix(auth, prefix) { + apiErr := apierror.Unauthorized("missing bearer token") + c.AbortWithStatusJSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + provided := strings.TrimPrefix(auth, prefix) + if subtle.ConstantTimeCompare([]byte(provided), []byte(secret)) != 1 { + apiErr := apierror.Unauthorized("invalid bearer token") + c.AbortWithStatusJSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.Next() + } +} diff --git a/backend/internal/middleware/bearer_token_test.go b/backend/internal/middleware/bearer_token_test.go new file mode 100644 index 0000000..46b0526 --- /dev/null +++ b/backend/internal/middleware/bearer_token_test.go @@ -0,0 +1,57 @@ +package middleware + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" +) + +func TestRequireBearerToken(t *testing.T) { + gin.SetMode(gin.TestMode) + secret := "s3cret" + mw := RequireBearerToken(secret) + + tests := []struct { + name string + header string + wantStatus int + }{ + {"missing", "", http.StatusUnauthorized}, + {"wrong scheme", "Basic abcd", http.StatusUnauthorized}, + {"wrong token", "Bearer wrong", http.StatusUnauthorized}, + {"correct", "Bearer s3cret", http.StatusOK}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + w := httptest.NewRecorder() + _, r := gin.CreateTestContext(w) + r.Use(mw) + r.GET("/t", func(c *gin.Context) { c.Status(http.StatusOK) }) + req := httptest.NewRequest(http.MethodGet, "/t", nil) + if tc.header != "" { + req.Header.Set("Authorization", tc.header) + } + r.ServeHTTP(w, req) + if w.Code != tc.wantStatus { + t.Errorf("status=%d want %d", w.Code, tc.wantStatus) + } + }) + } +} + +func TestRequireBearerToken_EmptySecretRejectsAll(t *testing.T) { + gin.SetMode(gin.TestMode) + mw := RequireBearerToken("") // feature disabled → all requests 401 + w := httptest.NewRecorder() + _, r := gin.CreateTestContext(w) + r.Use(mw) + r.GET("/t", func(c *gin.Context) { c.Status(http.StatusOK) }) + req := httptest.NewRequest(http.MethodGet, "/t", nil) + req.Header.Set("Authorization", "Bearer anything") + r.ServeHTTP(w, req) + if w.Code != http.StatusUnauthorized { + t.Errorf("expected 401 when secret is empty, got %d", w.Code) + } +} diff --git a/backend/internal/pkg/ai/client.go b/backend/internal/pkg/ai/client.go index f78da70..4d85d5d 100644 --- a/backend/internal/pkg/ai/client.go +++ b/backend/internal/pkg/ai/client.go @@ -3,6 +3,7 @@ package ai import ( "context" "fmt" + "sync" "time" "github.com/VikingOwl91/mistral-go-sdk" @@ -10,13 +11,46 @@ import ( "github.com/VikingOwl91/mistral-go-sdk/conversation" ) +// rateLimiter enforces a minimum interval between calls. Set rps<=0 to disable. +// Mirrors backend/internal/pkg/geocode/nominatim.go's lastReq gate. +type rateLimiter struct { + mu sync.Mutex + lastReq time.Time + minInterval time.Duration +} + +func newRateLimiter(rps float64) *rateLimiter { + if rps <= 0 { + return &rateLimiter{minInterval: 0} + } + return &rateLimiter{minInterval: time.Duration(float64(time.Second) / rps)} +} + +// TODO: wait() does not honor context cancellation — a cancelled caller will +// block up to minInterval while holding a mutex, and queued callers block up +// to N*minInterval. Mirror pkg/geocode/nominatim.go which has the same gap. +// Fix both sites together by taking ctx and using time.NewTimer + select. +func (rl *rateLimiter) wait() { + if rl.minInterval == 0 { + return + } + rl.mu.Lock() + defer rl.mu.Unlock() + since := time.Since(rl.lastReq) + if since < rl.minInterval { + time.Sleep(rl.minInterval - since) + } + rl.lastReq = time.Now() +} + type Client struct { sdk *mistral.Client agentSimple string modelComplex string + limiter *rateLimiter } -func New(apiKey, agentSimple, modelComplex string) *Client { +func New(apiKey, agentSimple, modelComplex string, rps float64) *Client { if modelComplex == "" { modelComplex = "mistral-large-latest" } @@ -33,6 +67,7 @@ func New(apiKey, agentSimple, modelComplex string) *Client { sdk: sdk, agentSimple: agentSimple, modelComplex: modelComplex, + limiter: newRateLimiter(rps), } } @@ -54,6 +89,7 @@ type PassResult struct { // Pass1 uses the Conversations API to call the pre-created agent (with web search). func (c *Client) Pass1(ctx context.Context, prompt string) (PassResult, error) { + c.limiter.wait() storeFalse := false resp, err := c.sdk.StartConversation(ctx, &conversation.StartRequest{ AgentID: c.agentSimple, @@ -76,8 +112,39 @@ func (c *Client) Pass1(ctx context.Context, prompt string) (PassResult, error) { }, nil } +// Pass0 uses the Conversations API to call a discovery agent identified by agentID. +// The agent ID is passed explicitly so the discovery domain can configure its own +// agent independently of the agentSimple field used by Pass1. +func (c *Client) Pass0(ctx context.Context, agentID, prompt string) (PassResult, error) { + c.limiter.wait() + if c.sdk == nil || agentID == "" { + return PassResult{}, fmt.Errorf("pass0: ai client not configured (sdk=%v agentID=%q)", c.sdk != nil, agentID) + } + storeFalse := false + resp, err := c.sdk.StartConversation(ctx, &conversation.StartRequest{ + AgentID: agentID, + Inputs: conversation.TextInputs(prompt), + Store: &storeFalse, + }) + if err != nil { + return PassResult{}, fmt.Errorf("pass0 conversation: %w", err) + } + + content := extractConversationContent(resp) + if content == "" { + return PassResult{}, fmt.Errorf("pass0: no assistant message in response") + } + + return PassResult{ + Content: content, + Usage: convertConvUsage(resp.Usage), + Model: "agent:" + agentID, + }, nil +} + // Pass2 uses chat completions for description generation + retry fields. func (c *Client) Pass2(ctx context.Context, systemPrompt, userPrompt string) (PassResult, error) { + c.limiter.wait() resp, err := c.sdk.ChatComplete(ctx, &chat.CompletionRequest{ Model: c.modelComplex, Messages: []chat.Message{ diff --git a/backend/internal/pkg/ai/rate_limiter_test.go b/backend/internal/pkg/ai/rate_limiter_test.go new file mode 100644 index 0000000..9629be4 --- /dev/null +++ b/backend/internal/pkg/ai/rate_limiter_test.go @@ -0,0 +1,48 @@ +package ai + +import ( + "sort" + "sync" + "testing" + "time" +) + +func TestRateLimiterSerializesCalls(t *testing.T) { + rl := newRateLimiter(2.0) // 2 req/s → minInterval 500ms + var ( + mu sync.Mutex + times []time.Time + ) + var wg sync.WaitGroup + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + rl.wait() + mu.Lock() + times = append(times, time.Now()) + mu.Unlock() + }() + } + wg.Wait() + + // Sort times; gaps between consecutive must be >= 500ms - small tolerance. + sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) }) + if gap := times[1].Sub(times[0]); gap < 400*time.Millisecond { + t.Errorf("gap[0->1] = %v, want >= 400ms", gap) + } + if gap := times[2].Sub(times[1]); gap < 400*time.Millisecond { + t.Errorf("gap[1->2] = %v, want >= 400ms", gap) + } +} + +func TestRateLimiterDisabledWhenRPSZero(t *testing.T) { + rl := newRateLimiter(0) // disabled + start := time.Now() + for i := 0; i < 5; i++ { + rl.wait() + } + if elapsed := time.Since(start); elapsed > 50*time.Millisecond { + t.Errorf("expected no throttling when rps=0, elapsed %v", elapsed) + } +} diff --git a/backend/internal/server/routes.go b/backend/internal/server/routes.go index 9d54155..1ddb409 100644 --- a/backend/internal/server/routes.go +++ b/backend/internal/server/routes.go @@ -6,6 +6,7 @@ import ( "github.com/gin-gonic/gin" "marktvogt.de/backend/internal/domain/auth" + "marktvogt.de/backend/internal/domain/discovery" "marktvogt.de/backend/internal/domain/market" "marktvogt.de/backend/internal/domain/user" "marktvogt.de/backend/internal/middleware" @@ -64,10 +65,24 @@ func (s *Server) registerRoutes() { // Admin market routes adminMarketHandler := market.NewAdminHandler(marketSvc) - aiClient := ai.New(s.cfg.AI.APIKey, s.cfg.AI.AgentSimple, s.cfg.AI.ModelComplex) + aiClient := ai.New(s.cfg.AI.APIKey, s.cfg.AI.AgentSimple, s.cfg.AI.ModelComplex, s.cfg.AI.RateLimitRPS) researchHandler := market.NewResearchHandler(marketSvc, aiClient) requireAdmin := middleware.RequireRole("admin") market.RegisterAdminRoutes(v1, adminMarketHandler, researchHandler, requireAuth, requireAdmin) + + // Discovery routes + discoveryRepo := discovery.NewRepository(s.db) + discoveryAgent := discovery.NewAgentClient(aiClient, s.cfg.AI.AgentDiscovery) + discoveryService := discovery.NewService( + discoveryRepo, + discoveryAgent, + marketSvc, + s.cfg.Discovery.BatchSize, + s.cfg.Discovery.ForwardMonths, + ) + discoveryHandler := discovery.NewHandler(discoveryService) + requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token) + discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken) } func (s *Server) healthz(c *gin.Context) { diff --git a/backend/migrations/000011_discovery_buckets.down.sql b/backend/migrations/000011_discovery_buckets.down.sql new file mode 100644 index 0000000..4268c71 --- /dev/null +++ b/backend/migrations/000011_discovery_buckets.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS discovery_buckets; diff --git a/backend/migrations/000011_discovery_buckets.up.sql b/backend/migrations/000011_discovery_buckets.up.sql new file mode 100644 index 0000000..0b9736a --- /dev/null +++ b/backend/migrations/000011_discovery_buckets.up.sql @@ -0,0 +1,13 @@ +CREATE TABLE discovery_buckets ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + land text NOT NULL, + region text NOT NULL, + year_month char(7) NOT NULL, + last_queried_at timestamptz, + last_error text, + created_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (land, region, year_month) +); + +CREATE INDEX idx_discovery_buckets_stale + ON discovery_buckets (last_queried_at NULLS FIRST); diff --git a/backend/migrations/000012_discovered_markets.down.sql b/backend/migrations/000012_discovered_markets.down.sql new file mode 100644 index 0000000..af5140d --- /dev/null +++ b/backend/migrations/000012_discovered_markets.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS discovered_markets; diff --git a/backend/migrations/000012_discovered_markets.up.sql b/backend/migrations/000012_discovered_markets.up.sql new file mode 100644 index 0000000..0f72c55 --- /dev/null +++ b/backend/migrations/000012_discovered_markets.up.sql @@ -0,0 +1,30 @@ +CREATE TABLE discovered_markets ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + bucket_id uuid NOT NULL REFERENCES discovery_buckets(id) ON DELETE CASCADE, + markt_name text NOT NULL, + stadt text NOT NULL, + bundesland text, + land text NOT NULL, + start_datum date, + end_datum date, + website text, + quellen text[] NOT NULL DEFAULT '{}', + extraktion text, + hinweis text, + name_normalized text NOT NULL, + matched_series_id uuid REFERENCES market_series(id) ON DELETE SET NULL, + status text NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending', 'accepted', 'rejected')), + discovered_at timestamptz NOT NULL DEFAULT now(), + reviewed_at timestamptz, + reviewed_by uuid REFERENCES users(id) ON DELETE SET NULL, + created_edition_id uuid REFERENCES market_editions(id) ON DELETE SET NULL +); + +CREATE INDEX idx_discovered_markets_pending + ON discovered_markets (status, discovered_at) + WHERE status = 'pending'; + +CREATE INDEX idx_discovered_markets_dedup + ON discovered_markets (name_normalized, stadt, start_datum) + WHERE status = 'pending'; diff --git a/backend/migrations/000013_rejected_discoveries.down.sql b/backend/migrations/000013_rejected_discoveries.down.sql new file mode 100644 index 0000000..6f6e210 --- /dev/null +++ b/backend/migrations/000013_rejected_discoveries.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS rejected_discoveries; diff --git a/backend/migrations/000013_rejected_discoveries.up.sql b/backend/migrations/000013_rejected_discoveries.up.sql new file mode 100644 index 0000000..47fb1b8 --- /dev/null +++ b/backend/migrations/000013_rejected_discoveries.up.sql @@ -0,0 +1,13 @@ +CREATE TABLE rejected_discoveries ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + name_normalized text NOT NULL, + stadt text NOT NULL, + year int NOT NULL, + rejected_at timestamptz NOT NULL DEFAULT now(), + rejected_by uuid REFERENCES users(id) ON DELETE SET NULL, + reason text, + UNIQUE (name_normalized, stadt, year) +); + +CREATE INDEX idx_rejected_discoveries_lookup + ON rejected_discoveries (name_normalized, stadt, year); diff --git a/backend/migrations/000014_seed_discovery_buckets.down.sql b/backend/migrations/000014_seed_discovery_buckets.down.sql new file mode 100644 index 0000000..1c8ad88 --- /dev/null +++ b/backend/migrations/000014_seed_discovery_buckets.down.sql @@ -0,0 +1,3 @@ +-- Do not delete rows on rollback — `last_queried_at` and `last_error` may carry +-- operational state. The table itself is dropped by 000011's down migration. +SELECT 1; diff --git a/backend/migrations/000014_seed_discovery_buckets.up.sql b/backend/migrations/000014_seed_discovery_buckets.up.sql new file mode 100644 index 0000000..4a2e9d3 --- /dev/null +++ b/backend/migrations/000014_seed_discovery_buckets.up.sql @@ -0,0 +1,38 @@ +-- Seed buckets for the 24-month window starting current month. +WITH regions(land, region) AS ( + VALUES + ('Deutschland', 'Baden-Württemberg'), + ('Deutschland', 'Bayern'), + ('Deutschland', 'Berlin'), + ('Deutschland', 'Brandenburg'), + ('Deutschland', 'Bremen'), + ('Deutschland', 'Hamburg'), + ('Deutschland', 'Hessen'), + ('Deutschland', 'Mecklenburg-Vorpommern'), + ('Deutschland', 'Niedersachsen'), + ('Deutschland', 'Nordrhein-Westfalen'), + ('Deutschland', 'Rheinland-Pfalz'), + ('Deutschland', 'Saarland'), + ('Deutschland', 'Sachsen'), + ('Deutschland', 'Sachsen-Anhalt'), + ('Deutschland', 'Schleswig-Holstein'), + ('Deutschland', 'Thüringen'), + ('Österreich', 'Burgenland'), + ('Österreich', 'Kärnten'), + ('Österreich', 'Niederösterreich'), + ('Österreich', 'Oberösterreich'), + ('Österreich', 'Salzburg'), + ('Österreich', 'Steiermark'), + ('Österreich', 'Tirol'), + ('Österreich', 'Vorarlberg'), + ('Österreich', 'Wien'), + ('Schweiz', 'Schweiz') +), +months AS ( + SELECT to_char(date_trunc('month', now())::date + (n || ' month')::interval, 'YYYY-MM') AS year_month + FROM generate_series(0, 23) AS n +) +INSERT INTO discovery_buckets (land, region, year_month) +SELECT r.land, r.region, m.year_month +FROM regions r CROSS JOIN months m +ON CONFLICT (land, region, year_month) DO NOTHING; diff --git a/web/src/routes/admin/discovery/+page.server.ts b/web/src/routes/admin/discovery/+page.server.ts new file mode 100644 index 0000000..cf0427e --- /dev/null +++ b/web/src/routes/admin/discovery/+page.server.ts @@ -0,0 +1,76 @@ +import type { PageServerLoad, Actions } from './$types.js'; +import { redirect, fail } from '@sveltejs/kit'; +import { serverFetch } from '$lib/api/client.server.js'; + +type DiscoveredMarket = { + id: string; + markt_name: string; + stadt: string; + bundesland: string; + land: string; + start_datum: string | null; + end_datum: string | null; + website: string; + quellen: string[]; + extraktion: string; + hinweis: string; + matched_series_id: string | null; + discovered_at: string; +}; + +export const load: PageServerLoad = async ({ cookies, url }) => { + const limit = Number(url.searchParams.get('limit') ?? 50); + const offset = Number(url.searchParams.get('offset') ?? 0); + const res = await serverFetch( + `/admin/discovery/queue?limit=${limit}&offset=${offset}`, + cookies + ); + return { queue: res.data, limit, offset }; +}; + +export const actions: Actions = { + accept: async ({ request, cookies, fetch }) => { + const form = await request.formData(); + const id = String(form.get('id') ?? ''); + if (!id) return fail(400, { error: 'missing id' }); + + try { + const res = await serverFetch<{ series_id: string; edition_id: string }>( + `/admin/discovery/queue/${id}/accept`, + cookies, + { method: 'POST', fetch } + ); + // Fire research asynchronously — don't await, the edit page handles streaming suggestions. + void serverFetch(`/admin/markets/${res.data.edition_id}/research`, cookies, { + method: 'POST', + fetch + }).catch(() => { + // Silent: the edit page shows a toast if research hasn't completed. + }); + redirect(303, `/admin/maerkte/${res.data.edition_id}/edit`); + } catch (err) { + if (err instanceof Response || (err as { status?: number })?.status === 303) throw err; + const message = err instanceof Error ? err.message : 'Accept fehlgeschlagen.'; + return fail(500, { error: message }); + } + }, + + reject: async ({ request, cookies, fetch }) => { + const form = await request.formData(); + const id = String(form.get('id') ?? ''); + const reason = String(form.get('reason') ?? ''); + if (!id) return fail(400, { error: 'missing id' }); + + try { + await serverFetch(`/admin/discovery/queue/${id}/reject`, cookies, { + method: 'POST', + body: JSON.stringify({ reason }), + fetch + }); + return { success: true }; + } catch (err) { + const message = err instanceof Error ? err.message : 'Reject fehlgeschlagen.'; + return fail(500, { error: message }); + } + } +}; diff --git a/web/src/routes/admin/discovery/+page.svelte b/web/src/routes/admin/discovery/+page.svelte new file mode 100644 index 0000000..85622de --- /dev/null +++ b/web/src/routes/admin/discovery/+page.svelte @@ -0,0 +1,96 @@ + + + + Admin · Discovery Queue + + +
+

Discovery Queue

+

+ {data.queue.length} pending · showing from offset {data.offset} +

+ + {#if data.queue.length === 0} +

+ Keine Einträge in der Warteschlange. +

+ {:else} + + + + + + + + + + + + + + + + {#each data.queue as row (row.id)} + + + + + + + + + + + + {/each} + +
LandRegionMarktStadtDatumWebsiteQuellenExtraktionAktion
{row.land}{row.bundesland}{row.markt_name}{row.stadt} + {#if row.start_datum} + {row.start_datum}{row.end_datum ? ` – ${row.end_datum}` : ''} + {:else} + + {/if} + + {#if row.website} + link + {:else} + + {/if} + {row.quellen?.length ?? 0} + + {row.extraktion || '—'} + + +
+ + +
+
+ + +
+
+ {/if} +