Merge branch 'feature/dach-market-discovery' into 'main'
feat(discovery): DACH market discovery pipeline See merge request vikingowl/marktvogt.de!3
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
{{- if or .Values.ai.apiKey .Values.turnstile.secretKey }}
|
||||
{{- if or .Values.ai.apiKey .Values.turnstile.secretKey .Values.discovery.token }}
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
@@ -17,4 +17,7 @@ stringData:
|
||||
{{- if .Values.turnstile.secretKey }}
|
||||
TURNSTILE_SECRET_KEY: {{ .Values.turnstile.secretKey | quote }}
|
||||
{{- end }}
|
||||
{{- if .Values.discovery.token }}
|
||||
DISCOVERY_TOKEN: {{ .Values.discovery.token | quote }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
|
||||
@@ -51,8 +51,8 @@ spec:
|
||||
- secretRef:
|
||||
name: {{ include "marktvogt-backend.fullname" . }}-smtp
|
||||
{{- end }}
|
||||
{{- if or .Values.ai.apiKey .Values.turnstile.secretKey }}
|
||||
# AI + Turnstile credentials (Helm-managed, passed via CI)
|
||||
{{- if or .Values.ai.apiKey .Values.turnstile.secretKey .Values.discovery.token }}
|
||||
# AI, Turnstile + Discovery credentials (Helm-managed, passed via CI)
|
||||
- secretRef:
|
||||
name: {{ include "marktvogt-backend.fullname" . }}-ci-secrets
|
||||
{{- end }}
|
||||
@@ -92,6 +92,19 @@ spec:
|
||||
secretKeyRef:
|
||||
name: {{ include "marktvogt-backend.pgAppSecret" . }}
|
||||
key: dbname
|
||||
- name: DISCOVERY_TOKEN
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ include "marktvogt-backend.fullname" . }}-ci-secrets
|
||||
key: DISCOVERY_TOKEN
|
||||
- name: AI_AGENT_DISCOVERY
|
||||
value: {{ .Values.ai.agentDiscovery | quote }}
|
||||
- name: AI_RATE_LIMIT_RPS
|
||||
value: {{ .Values.ai.rateLimitRps | default 1 | quote }}
|
||||
- name: DISCOVERY_BATCH_SIZE
|
||||
value: {{ .Values.discovery.batchSize | default 4 | quote }}
|
||||
- name: DISCOVERY_FORWARD_MONTHS
|
||||
value: {{ .Values.discovery.forwardMonths | default 12 | quote }}
|
||||
startupProbe:
|
||||
httpGet:
|
||||
path: /healthz
|
||||
|
||||
37
backend/deploy/helm/templates/discovery-cron.yaml
Normal file
37
backend/deploy/helm/templates/discovery-cron.yaml
Normal file
@@ -0,0 +1,37 @@
|
||||
{{- if .Values.discovery.enabled }}
|
||||
apiVersion: batch/v1
|
||||
kind: CronJob
|
||||
metadata:
|
||||
name: {{ include "marktvogt-backend.fullname" . }}-discovery-tick
|
||||
namespace: {{ .Release.Namespace }}
|
||||
labels:
|
||||
{{- include "marktvogt-backend.labels" . | nindent 4 }}
|
||||
spec:
|
||||
schedule: {{ .Values.discovery.schedule | quote }}
|
||||
concurrencyPolicy: Forbid
|
||||
successfulJobsHistoryLimit: 3
|
||||
failedJobsHistoryLimit: 3
|
||||
jobTemplate:
|
||||
spec:
|
||||
backoffLimit: 2
|
||||
template:
|
||||
spec:
|
||||
restartPolicy: OnFailure
|
||||
containers:
|
||||
- name: tick
|
||||
image: curlimages/curl:8.9.1
|
||||
command:
|
||||
- sh
|
||||
- -c
|
||||
- |
|
||||
curl -fsS --retry 2 --retry-delay 10 \
|
||||
-X POST \
|
||||
-H "Authorization: Bearer $DISCOVERY_TOKEN" \
|
||||
"http://{{ include "marktvogt-backend.fullname" . }}:8080/api/v1/admin/discovery/tick"
|
||||
env:
|
||||
- name: DISCOVERY_TOKEN
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: {{ include "marktvogt-backend.fullname" . }}-ci-secrets
|
||||
key: DISCOVERY_TOKEN
|
||||
{{- end }}
|
||||
@@ -96,6 +96,16 @@ smtp:
|
||||
ai:
|
||||
apiKey: ""
|
||||
agentSimple: ""
|
||||
agentDiscovery: "" # ag_019d9ec1702675dbbd80e526c8957ce2 in production
|
||||
rateLimitRps: 1
|
||||
|
||||
# Discovery cron — token passed via CI secrets during deploy.
|
||||
discovery:
|
||||
enabled: true
|
||||
schedule: "0 */4 * * *"
|
||||
token: ""
|
||||
batchSize: 4
|
||||
forwardMonths: 12
|
||||
|
||||
# Cloudflare Turnstile — passed via Woodpecker secrets during deploy.
|
||||
turnstile:
|
||||
|
||||
@@ -2,6 +2,7 @@ package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
@@ -21,12 +22,21 @@ type Config struct {
|
||||
Turnstile TurnstileConfig
|
||||
Notification NotificationConfig
|
||||
AI AIConfig
|
||||
Discovery DiscoveryConfig
|
||||
}
|
||||
|
||||
type DiscoveryConfig struct {
|
||||
Token string // bearer token for /tick endpoint
|
||||
BatchSize int // buckets per tick (default 4)
|
||||
ForwardMonths int // forward window in months (default 12)
|
||||
}
|
||||
|
||||
type AIConfig struct {
|
||||
APIKey string
|
||||
AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search)
|
||||
ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest)
|
||||
APIKey string
|
||||
AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search)
|
||||
AgentDiscovery string // Agent ID for discovery pipeline (Task 7)
|
||||
ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest)
|
||||
RateLimitRPS float64 // Max requests per second to Mistral (0 = disabled)
|
||||
}
|
||||
|
||||
type AppConfig struct {
|
||||
@@ -169,6 +179,26 @@ func Load() (*Config, error) {
|
||||
return nil, fmt.Errorf("SMTP_PORT: %w", err)
|
||||
}
|
||||
|
||||
rpsAI, err := envFloat("AI_RATE_LIMIT_RPS", 1.0)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("AI_RATE_LIMIT_RPS: %w", err)
|
||||
}
|
||||
|
||||
batchSize, err := envInt("DISCOVERY_BATCH_SIZE", 4)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("DISCOVERY_BATCH_SIZE: %w", err)
|
||||
}
|
||||
|
||||
forwardMonths, err := envInt("DISCOVERY_FORWARD_MONTHS", 12)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("DISCOVERY_FORWARD_MONTHS: %w", err)
|
||||
}
|
||||
|
||||
discoveryToken := envStr("DISCOVERY_TOKEN", "")
|
||||
if discoveryToken == "" {
|
||||
slog.Warn("DISCOVERY_TOKEN is empty; /api/v1/admin/discovery/tick is disabled")
|
||||
}
|
||||
|
||||
jwtSecret := envStr("JWT_SECRET", "")
|
||||
if jwtSecret == "" {
|
||||
return nil, fmt.Errorf("JWT_SECRET is required")
|
||||
@@ -248,9 +278,16 @@ func Load() (*Config, error) {
|
||||
FrontendURL: envStr("FRONTEND_URL", "http://localhost:5173"),
|
||||
},
|
||||
AI: AIConfig{
|
||||
APIKey: envStr("AI_API_KEY", ""),
|
||||
AgentSimple: envStr("AI_AGENT_SIMPLE", ""),
|
||||
ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"),
|
||||
APIKey: envStr("AI_API_KEY", ""),
|
||||
AgentSimple: envStr("AI_AGENT_SIMPLE", ""),
|
||||
AgentDiscovery: envStr("AI_AGENT_DISCOVERY", ""),
|
||||
ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"),
|
||||
RateLimitRPS: rpsAI,
|
||||
},
|
||||
Discovery: DiscoveryConfig{
|
||||
Token: discoveryToken,
|
||||
BatchSize: batchSize,
|
||||
ForwardMonths: forwardMonths,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
115
backend/internal/domain/discovery/agent_client.go
Normal file
115
backend/internal/domain/discovery/agent_client.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"marktvogt.de/backend/internal/pkg/ai"
|
||||
)
|
||||
|
||||
// AgentClient wraps the Mistral Pass 0 agent for discovery.
|
||||
type AgentClient struct {
|
||||
ai *ai.Client
|
||||
agentID string
|
||||
}
|
||||
|
||||
func NewAgentClient(aiClient *ai.Client, agentID string) *AgentClient {
|
||||
return &AgentClient{ai: aiClient, agentID: agentID}
|
||||
}
|
||||
|
||||
func (c *AgentClient) Enabled() bool {
|
||||
return c.ai != nil && c.agentID != ""
|
||||
}
|
||||
|
||||
// Discover runs Pass 0 for the given bucket. The agent's full instructions
|
||||
// are set in the Mistral console (see spec §6.2). We only inject the bucket
|
||||
// parameters here.
|
||||
func (c *AgentClient) Discover(ctx context.Context, b Bucket) (Pass0Response, error) {
|
||||
if !c.Enabled() {
|
||||
return Pass0Response{}, fmt.Errorf("discovery agent not configured")
|
||||
}
|
||||
prompt := fmt.Sprintf(
|
||||
"Bucket:\nland: %s\nregion: %s\njahr_monat: %s\n\nFinde alle Maerkte in diesem Bucket und antworte im vorgegebenen JSON-Format.",
|
||||
b.Land, b.Region, b.YearMonth,
|
||||
)
|
||||
result, err := c.ai.Pass0(ctx, c.agentID, prompt)
|
||||
if err != nil {
|
||||
return Pass0Response{}, fmt.Errorf("mistral pass0: %w", err)
|
||||
}
|
||||
return parsePass0Response(result.Content)
|
||||
}
|
||||
|
||||
func parsePass0Response(raw string) (Pass0Response, error) {
|
||||
cleaned := extractJSON(raw)
|
||||
cleaned = stripJSONComments(cleaned)
|
||||
var out Pass0Response
|
||||
if err := json.Unmarshal([]byte(cleaned), &out); err != nil {
|
||||
return Pass0Response{}, fmt.Errorf("unmarshal pass0: %w (raw first 500: %q)", err, truncate(raw, 500))
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// --- JSON helpers (independent copy; logic mirrors domain/market/research.go).
|
||||
// Do not import from the market package — keeping packages decoupled.
|
||||
|
||||
func extractJSON(s string) string {
|
||||
start := strings.IndexByte(s, '{')
|
||||
if start < 0 {
|
||||
return s
|
||||
}
|
||||
s = s[start:]
|
||||
depth := 0
|
||||
for i := 0; i < len(s); i++ {
|
||||
switch s[i] {
|
||||
case '{':
|
||||
depth++
|
||||
case '}':
|
||||
depth--
|
||||
if depth == 0 {
|
||||
return s[:i+1]
|
||||
}
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func stripJSONComments(s string) string {
|
||||
var result []byte
|
||||
inString := false
|
||||
escaped := false
|
||||
for i := 0; i < len(s); i++ {
|
||||
c := s[i]
|
||||
if escaped {
|
||||
result = append(result, c)
|
||||
escaped = false
|
||||
continue
|
||||
}
|
||||
if c == '\\' && inString {
|
||||
result = append(result, c)
|
||||
escaped = true
|
||||
continue
|
||||
}
|
||||
if c == '"' {
|
||||
inString = !inString
|
||||
result = append(result, c)
|
||||
continue
|
||||
}
|
||||
if !inString && c == '/' && i+1 < len(s) && s[i+1] == '/' {
|
||||
for i < len(s) && s[i] != '\n' {
|
||||
i++
|
||||
}
|
||||
continue
|
||||
}
|
||||
result = append(result, c)
|
||||
}
|
||||
return string(result)
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n]
|
||||
}
|
||||
74
backend/internal/domain/discovery/agent_client_test.go
Normal file
74
backend/internal/domain/discovery/agent_client_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package discovery
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestParsePass0_Valid(t *testing.T) {
|
||||
raw := `{
|
||||
"bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"},
|
||||
"recherche_datum": "2026-04-18",
|
||||
"quellen_gesamt": ["https://a.example", "https://b.example"],
|
||||
"maerkte": [
|
||||
{
|
||||
"markt_name": "Mittelaltermarkt Trostberg",
|
||||
"stadt": "Trostberg",
|
||||
"bundesland": "Bayern",
|
||||
"start_datum": "2026-09-12",
|
||||
"end_datum": "2026-09-14",
|
||||
"website": "https://trostberg.example",
|
||||
"quellen": ["https://a.example"],
|
||||
"extraktion": "verbatim",
|
||||
"hinweis": null
|
||||
}
|
||||
]
|
||||
}`
|
||||
got, err := parsePass0Response(raw)
|
||||
if err != nil {
|
||||
t.Fatalf("parse err: %v", err)
|
||||
}
|
||||
if got.Bucket.Region != "Bayern" {
|
||||
t.Errorf("region = %q, want Bayern", got.Bucket.Region)
|
||||
}
|
||||
if len(got.Maerkte) != 1 || got.Maerkte[0].MarktName != "Mittelaltermarkt Trostberg" {
|
||||
t.Errorf("unexpected markets: %+v", got.Maerkte)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePass0_WithCommentsAndTrailingText(t *testing.T) {
|
||||
raw := `Here is the JSON:
|
||||
{
|
||||
"bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"},
|
||||
// a comment the agent added
|
||||
"recherche_datum": "2026-04-18",
|
||||
"quellen_gesamt": [],
|
||||
"maerkte": []
|
||||
}
|
||||
end.`
|
||||
got, err := parsePass0Response(raw)
|
||||
if err != nil {
|
||||
t.Fatalf("parse err: %v", err)
|
||||
}
|
||||
if got.Bucket.Region != "Bayern" || len(got.Maerkte) != 0 {
|
||||
t.Errorf("unexpected: %+v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePass0_Malformed(t *testing.T) {
|
||||
raw := `not JSON at all`
|
||||
if _, err := parsePass0Response(raw); err == nil {
|
||||
t.Error("expected error on non-JSON input")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePass0_EmptyMaerkte(t *testing.T) {
|
||||
raw := `{"bucket":{"land":"Deutschland","region":"Bayern","jahr_monat":"2026-09"},"recherche_datum":"","quellen_gesamt":[],"maerkte":[]}`
|
||||
got, err := parsePass0Response(raw)
|
||||
if err != nil {
|
||||
t.Fatalf("parse err: %v", err)
|
||||
}
|
||||
if got.Maerkte == nil {
|
||||
got.Maerkte = []Pass0Market{} // nil vs empty is fine
|
||||
}
|
||||
if len(got.Maerkte) != 0 {
|
||||
t.Errorf("expected empty, got %+v", got.Maerkte)
|
||||
}
|
||||
}
|
||||
118
backend/internal/domain/discovery/handler.go
Normal file
118
backend/internal/domain/discovery/handler.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"marktvogt.de/backend/internal/pkg/apierror"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
service *Service
|
||||
}
|
||||
|
||||
func NewHandler(s *Service) *Handler {
|
||||
return &Handler{service: s}
|
||||
}
|
||||
|
||||
func (h *Handler) Tick(c *gin.Context) {
|
||||
summary, err := h.service.Tick(c.Request.Context())
|
||||
if err != nil {
|
||||
slog.ErrorContext(c.Request.Context(), "discovery tick failed", "error", err)
|
||||
apiErr := apierror.Internal("tick failed")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"data": summary})
|
||||
}
|
||||
|
||||
func (h *Handler) ListQueue(c *gin.Context) {
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
|
||||
offset, _ := strconv.Atoi(c.DefaultQuery("offset", "0"))
|
||||
if limit < 1 || limit > 200 {
|
||||
limit = 50
|
||||
}
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
rows, err := h.service.ListPendingQueue(c.Request.Context(), limit, offset)
|
||||
if err != nil {
|
||||
apiErr := apierror.Internal("list queue failed")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"data": rows})
|
||||
}
|
||||
|
||||
func (h *Handler) Accept(c *gin.Context) {
|
||||
id, err := uuid.Parse(c.Param("id"))
|
||||
if err != nil {
|
||||
apiErr := apierror.BadRequest("invalid_id", "invalid queue id")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
reviewer, ok := currentUserID(c)
|
||||
if !ok {
|
||||
apiErr := apierror.Unauthorized("no user in context")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
seriesID, editionID, err := h.service.Accept(c.Request.Context(), id, reviewer)
|
||||
if err != nil {
|
||||
slog.WarnContext(c.Request.Context(), "accept failed", "queue_id", id, "error", err)
|
||||
apiErr := apierror.Internal("accept failed")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"data": gin.H{"series_id": seriesID, "edition_id": editionID}})
|
||||
}
|
||||
|
||||
type rejectRequest struct {
|
||||
Reason string `json:"reason" validate:"max=2000"`
|
||||
}
|
||||
|
||||
func (h *Handler) Reject(c *gin.Context) {
|
||||
id, err := uuid.Parse(c.Param("id"))
|
||||
if err != nil {
|
||||
apiErr := apierror.BadRequest("invalid_id", "invalid queue id")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
var req rejectRequest
|
||||
// Empty body is OK (reason is optional). Non-EOF parse errors are tolerated;
|
||||
// we fall through with zero-value req in all error cases.
|
||||
_ = c.ShouldBindJSON(&req)
|
||||
reviewer, ok := currentUserID(c)
|
||||
if !ok {
|
||||
apiErr := apierror.Unauthorized("no user in context")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
if err := h.service.Reject(c.Request.Context(), id, reviewer, req.Reason); err != nil {
|
||||
slog.WarnContext(c.Request.Context(), "reject failed", "queue_id", id, "error", err)
|
||||
apiErr := apierror.Internal("reject failed")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
c.Status(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func currentUserID(c *gin.Context) (uuid.UUID, bool) {
|
||||
raw, exists := c.Get("user_id")
|
||||
if !exists {
|
||||
return uuid.Nil, false
|
||||
}
|
||||
switch v := raw.(type) {
|
||||
case uuid.UUID:
|
||||
return v, true
|
||||
case string:
|
||||
id, err := uuid.Parse(v)
|
||||
return id, err == nil
|
||||
default:
|
||||
return uuid.Nil, false
|
||||
}
|
||||
}
|
||||
82
backend/internal/domain/discovery/mock_repo_test.go
Normal file
82
backend/internal/domain/discovery/mock_repo_test.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
// Compile-time guard: mockRepo must fully satisfy Repository.
|
||||
var _ Repository = (*mockRepo)(nil)
|
||||
|
||||
type mockRepo struct {
|
||||
pickStaleFn func(ctx context.Context, forwardMonths, limit int) ([]Bucket, error)
|
||||
updateBucketFn func(ctx context.Context, id uuid.UUID, errMsg string) error
|
||||
listSeriesFn func(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error)
|
||||
editionExistsFn func(ctx context.Context, seriesID uuid.UUID, year int) (bool, error)
|
||||
insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
|
||||
isRejectedFn func(ctx context.Context, nameNormalized, stadt string, year int) (bool, error)
|
||||
queuePendingFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error)
|
||||
getDiscoveredFn func(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error)
|
||||
beginTxFn func(ctx context.Context) (pgx.Tx, error)
|
||||
markAcceptedFn func(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error
|
||||
markRejectedFn func(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error
|
||||
insertRejFn func(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error
|
||||
}
|
||||
|
||||
func (m *mockRepo) PickStaleBuckets(ctx context.Context, fm, lim int) ([]Bucket, error) {
|
||||
return m.pickStaleFn(ctx, fm, lim)
|
||||
}
|
||||
func (m *mockRepo) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error {
|
||||
return m.updateBucketFn(ctx, id, errMsg)
|
||||
}
|
||||
func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) {
|
||||
return m.listSeriesFn(ctx, c)
|
||||
}
|
||||
func (m *mockRepo) EditionExists(ctx context.Context, sid uuid.UUID, y int) (bool, error) {
|
||||
return m.editionExistsFn(ctx, sid, y)
|
||||
}
|
||||
func (m *mockRepo) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) {
|
||||
return m.insertDiscFn(ctx, d)
|
||||
}
|
||||
func (m *mockRepo) IsRejected(ctx context.Context, n, s string, y int) (bool, error) {
|
||||
return m.isRejectedFn(ctx, n, s, y)
|
||||
}
|
||||
func (m *mockRepo) QueueHasPending(ctx context.Context, n, s string, sd *time.Time) (bool, error) {
|
||||
return m.queuePendingFn(ctx, n, s, sd)
|
||||
}
|
||||
func (m *mockRepo) ListQueue(ctx context.Context, status string, l, o int) ([]DiscoveredMarket, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockRepo) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) {
|
||||
if m.getDiscoveredFn != nil {
|
||||
return m.getDiscoveredFn(ctx, id)
|
||||
}
|
||||
return DiscoveredMarket{}, nil
|
||||
}
|
||||
func (m *mockRepo) BeginTx(ctx context.Context) (pgx.Tx, error) {
|
||||
if m.beginTxFn != nil {
|
||||
return m.beginTxFn(ctx)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockRepo) MarkAccepted(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error {
|
||||
if m.markAcceptedFn != nil {
|
||||
return m.markAcceptedFn(ctx, tx, id, eid, r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *mockRepo) MarkRejected(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error {
|
||||
if m.markRejectedFn != nil {
|
||||
return m.markRejectedFn(ctx, tx, id, r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *mockRepo) InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error {
|
||||
if m.insertRejFn != nil {
|
||||
return m.insertRejFn(ctx, tx, r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
86
backend/internal/domain/discovery/model.go
Normal file
86
backend/internal/domain/discovery/model.go
Normal file
@@ -0,0 +1,86 @@
|
||||
// backend/internal/domain/discovery/model.go
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Bucket is a scheduler row: one (land, region, year_month) tuple.
|
||||
type Bucket struct {
|
||||
ID uuid.UUID
|
||||
Land string
|
||||
Region string
|
||||
YearMonth string // 'YYYY-MM'
|
||||
LastQueriedAt *time.Time
|
||||
LastError string
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// DiscoveredMarket is a queue entry awaiting admin review.
|
||||
type DiscoveredMarket struct {
|
||||
ID uuid.UUID
|
||||
BucketID uuid.UUID
|
||||
MarktName string
|
||||
Stadt string
|
||||
Bundesland string
|
||||
Land string
|
||||
StartDatum *time.Time
|
||||
EndDatum *time.Time
|
||||
Website string
|
||||
Quellen []string
|
||||
Extraktion string
|
||||
Hinweis string
|
||||
NameNormalized string
|
||||
MatchedSeriesID *uuid.UUID
|
||||
Status string // 'pending' | 'accepted' | 'rejected'
|
||||
DiscoveredAt time.Time
|
||||
ReviewedAt *time.Time
|
||||
ReviewedBy *uuid.UUID
|
||||
CreatedEditionID *uuid.UUID
|
||||
}
|
||||
|
||||
// RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year).
|
||||
type RejectedDiscovery struct {
|
||||
ID uuid.UUID
|
||||
NameNormalized string
|
||||
Stadt string
|
||||
Year int
|
||||
RejectedAt time.Time
|
||||
RejectedBy *uuid.UUID
|
||||
Reason string
|
||||
}
|
||||
|
||||
// Pass0Response is the parsed shape of the Mistral Pass 0 agent reply.
|
||||
type Pass0Response struct {
|
||||
Bucket Pass0Bucket `json:"bucket"`
|
||||
RechercheDatum string `json:"recherche_datum"`
|
||||
QuellenGesamt []string `json:"quellen_gesamt"`
|
||||
Maerkte []Pass0Market `json:"maerkte"`
|
||||
}
|
||||
|
||||
type Pass0Bucket struct {
|
||||
Land string `json:"land"`
|
||||
Region string `json:"region"`
|
||||
JahrMonat string `json:"jahr_monat"`
|
||||
}
|
||||
|
||||
type Pass0Market struct {
|
||||
MarktName string `json:"markt_name"`
|
||||
Stadt string `json:"stadt"`
|
||||
Bundesland string `json:"bundesland"`
|
||||
StartDatum string `json:"start_datum"` // 'YYYY-MM-DD' or ""
|
||||
EndDatum string `json:"end_datum"`
|
||||
Website string `json:"website"`
|
||||
Quellen []string `json:"quellen"`
|
||||
Extraktion string `json:"extraktion"`
|
||||
Hinweis string `json:"hinweis"`
|
||||
}
|
||||
|
||||
// Status constants for discovered_markets.
|
||||
const (
|
||||
StatusPending = "pending"
|
||||
StatusAccepted = "accepted"
|
||||
StatusRejected = "rejected"
|
||||
)
|
||||
136
backend/internal/domain/discovery/normalize.go
Normal file
136
backend/internal/domain/discovery/normalize.go
Normal file
@@ -0,0 +1,136 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"unicode"
|
||||
)
|
||||
|
||||
// umlautMap expands German diacritics.
|
||||
var umlautMap = map[rune]string{
|
||||
'ä': "ae", 'ö': "oe", 'ü': "ue", 'ß': "ss",
|
||||
'Ä': "ae", 'Ö': "oe", 'Ü': "ue",
|
||||
}
|
||||
|
||||
// stripWords are removed from the start and end of normalized market names.
|
||||
// Kept short intentionally — over-aggressive stripping collides unrelated markets.
|
||||
var stripWords = map[string]struct{}{
|
||||
"mittelaltermarkt": {},
|
||||
"mittelalterlicher": {},
|
||||
"markt": {},
|
||||
"zu": {},
|
||||
"am": {},
|
||||
"der": {},
|
||||
"die": {},
|
||||
"das": {},
|
||||
"auf": {},
|
||||
"dem": {},
|
||||
"den": {},
|
||||
"in": {},
|
||||
"im": {},
|
||||
}
|
||||
|
||||
// NormalizeName returns a stable, dedup-safe form of a market name:
|
||||
// lowercase, umlauts expanded, punctuation stripped, whitespace collapsed,
|
||||
// leading/trailing filler words removed.
|
||||
//
|
||||
// Used exclusively for matching. NOT suitable for slugs — see slugify() in service.go
|
||||
// for the URL-safe form that preserves identifying words.
|
||||
func NormalizeName(s string) string {
|
||||
s = expandUmlauts(s)
|
||||
s = toLowerAlnumSpace(s)
|
||||
return trimStripWords(s)
|
||||
}
|
||||
|
||||
// NormalizeCity lowercases, expands umlauts, and collapses internal whitespace.
|
||||
// Keeps hyphens (Baden-Baden) and punctuation (St. Wendel). Used for
|
||||
// pre-filtering series candidates by city.
|
||||
func NormalizeCity(s string) string {
|
||||
s = expandUmlauts(s)
|
||||
s = strings.ToLower(s)
|
||||
return strings.Join(strings.Fields(s), " ")
|
||||
}
|
||||
|
||||
func expandUmlauts(s string) string {
|
||||
var b strings.Builder
|
||||
b.Grow(len(s))
|
||||
for _, r := range s {
|
||||
if rep, ok := umlautMap[r]; ok {
|
||||
b.WriteString(rep)
|
||||
} else {
|
||||
b.WriteRune(r)
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func toLowerAlnumSpace(s string) string {
|
||||
var b strings.Builder
|
||||
b.Grow(len(s))
|
||||
for _, r := range s {
|
||||
switch {
|
||||
case unicode.IsLetter(r) || unicode.IsDigit(r):
|
||||
b.WriteRune(unicode.ToLower(r))
|
||||
default:
|
||||
b.WriteRune(' ')
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func trimStripWords(s string) string {
|
||||
tokens := strings.Fields(s)
|
||||
// trim from left
|
||||
for len(tokens) > 0 {
|
||||
if _, ok := stripWords[tokens[0]]; !ok {
|
||||
break
|
||||
}
|
||||
if shouldStopStripping(tokens[1:]) {
|
||||
break
|
||||
}
|
||||
tokens = tokens[1:]
|
||||
}
|
||||
// trim from right
|
||||
for len(tokens) > 0 {
|
||||
if _, ok := stripWords[tokens[len(tokens)-1]]; !ok {
|
||||
break
|
||||
}
|
||||
if shouldStopStripping(tokens[:len(tokens)-1]) {
|
||||
break
|
||||
}
|
||||
tokens = tokens[:len(tokens)-1]
|
||||
}
|
||||
return strings.Join(tokens, " ")
|
||||
}
|
||||
|
||||
// shouldStopStripping reports whether the tokens remaining after a hypothetical
|
||||
// edge-strip would be "meaningless" — i.e., contain only stripwords and purely
|
||||
// numeric tokens. When true, the caller should preserve the edge stripword to
|
||||
// avoid destroying identifying content (e.g. "Markt 2026" → "2026"). When the
|
||||
// remaining set is entirely stripwords (no numerics), stripping continues;
|
||||
// that's what lets "der die das" reduce to "".
|
||||
func shouldStopStripping(remaining []string) bool {
|
||||
nonStripwordCount := 0
|
||||
allNumeric := true
|
||||
for _, t := range remaining {
|
||||
if _, ok := stripWords[t]; ok {
|
||||
continue
|
||||
}
|
||||
nonStripwordCount++
|
||||
if !isNumericOnly(t) {
|
||||
allNumeric = false
|
||||
}
|
||||
}
|
||||
return nonStripwordCount > 0 && allNumeric
|
||||
}
|
||||
|
||||
func isNumericOnly(s string) bool {
|
||||
if s == "" {
|
||||
return false
|
||||
}
|
||||
for _, r := range s {
|
||||
if !unicode.IsDigit(r) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
51
backend/internal/domain/discovery/normalize_test.go
Normal file
51
backend/internal/domain/discovery/normalize_test.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package discovery
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestNormalizeName(t *testing.T) {
|
||||
tests := []struct {
|
||||
name, in, want string
|
||||
}{
|
||||
{"lowercase", "Trostberg", "trostberg"},
|
||||
{"umlauts", "Rüdesheim", "ruedesheim"},
|
||||
{"eszett", "Straßburg", "strassburg"},
|
||||
{"punctuation", "St. Goar!", "st goar"},
|
||||
{"collapse_whitespace", "Bad Muskau", "bad muskau"},
|
||||
{"strip_market_prefix", "Mittelaltermarkt zu Trostberg", "trostberg"},
|
||||
{"strip_market_suffix", "Trostberg Mittelaltermarkt", "trostberg"},
|
||||
{"strip_der", "Markt der Ritter", "ritter"},
|
||||
{"preserve_numbers", "Markt 2026", "markt 2026"},
|
||||
{"empty", "", ""},
|
||||
{"only_stripwords", "der die das", ""},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := NormalizeName(tc.in)
|
||||
if got != tc.want {
|
||||
t.Errorf("NormalizeName(%q) = %q, want %q", tc.in, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeCity(t *testing.T) {
|
||||
tests := []struct {
|
||||
name, in, want string
|
||||
}{
|
||||
{"umlaut", "München", "muenchen"},
|
||||
{"hyphen_preserved", "Baden-Baden", "baden-baden"},
|
||||
{"whitespace_trim", " Trostberg ", "trostberg"},
|
||||
{"empty", "", ""},
|
||||
{"eszett", "Großenhain", "grossenhain"},
|
||||
{"internal_double_whitespace", "Bad Muskau", "bad muskau"},
|
||||
{"punctuation_preserved", "St. Wendel", "st. wendel"},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := NormalizeCity(tc.in)
|
||||
if got != tc.want {
|
||||
t.Errorf("NormalizeCity(%q) = %q, want %q", tc.in, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
216
backend/internal/domain/discovery/repository.go
Normal file
216
backend/internal/domain/discovery/repository.go
Normal file
@@ -0,0 +1,216 @@
|
||||
// backend/internal/domain/discovery/repository.go
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
type Repository interface {
|
||||
PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error)
|
||||
UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error
|
||||
ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error)
|
||||
EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error)
|
||||
InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
|
||||
IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error)
|
||||
QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error)
|
||||
ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error)
|
||||
GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error)
|
||||
MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error
|
||||
MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error
|
||||
InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error
|
||||
BeginTx(ctx context.Context) (pgx.Tx, error)
|
||||
}
|
||||
|
||||
// SeriesCandidate is a minimal projection used for name-normalization comparison in Go.
|
||||
type SeriesCandidate struct {
|
||||
ID uuid.UUID
|
||||
Name string
|
||||
City string
|
||||
}
|
||||
|
||||
type pgRepository struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewRepository(pool *pgxpool.Pool) Repository {
|
||||
return &pgRepository{pool: pool}
|
||||
}
|
||||
|
||||
func (r *pgRepository) PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) {
|
||||
q := `
|
||||
SELECT id, land, region, year_month, last_queried_at, coalesce(last_error, ''), created_at
|
||||
FROM discovery_buckets
|
||||
WHERE year_month >= to_char(date_trunc('month', now()), 'YYYY-MM')
|
||||
AND year_month <= to_char(date_trunc('month', now()) + ($1 * interval '1 month'), 'YYYY-MM')
|
||||
AND (last_queried_at IS NULL OR last_queried_at < now() - interval '7 days')
|
||||
ORDER BY last_queried_at NULLS FIRST, year_month
|
||||
LIMIT $2`
|
||||
rows, err := r.pool.Query(ctx, q, forwardMonths, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pick buckets: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []Bucket
|
||||
for rows.Next() {
|
||||
var b Bucket
|
||||
if err := rows.Scan(&b.ID, &b.Land, &b.Region, &b.YearMonth, &b.LastQueriedAt, &b.LastError, &b.CreatedAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, b)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (r *pgRepository) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error {
|
||||
var errValue any
|
||||
if errMsg == "" {
|
||||
errValue = nil
|
||||
} else {
|
||||
errValue = errMsg
|
||||
}
|
||||
_, err := r.pool.Exec(ctx,
|
||||
`UPDATE discovery_buckets SET last_queried_at = now(), last_error = $2 WHERE id = $1`,
|
||||
id, errValue)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *pgRepository) ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) {
|
||||
rows, err := r.pool.Query(ctx,
|
||||
`SELECT id, name, city FROM market_series WHERE LOWER(city) = $1`,
|
||||
cityNormalized)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []SeriesCandidate
|
||||
for rows.Next() {
|
||||
var c SeriesCandidate
|
||||
if err := rows.Scan(&c.ID, &c.Name, &c.City); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, c)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (r *pgRepository) EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) {
|
||||
var exists bool
|
||||
err := r.pool.QueryRow(ctx,
|
||||
`SELECT EXISTS(SELECT 1 FROM market_editions WHERE series_id = $1 AND year = $2)`,
|
||||
seriesID, year).Scan(&exists)
|
||||
return exists, err
|
||||
}
|
||||
|
||||
func (r *pgRepository) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) {
|
||||
var id uuid.UUID
|
||||
err := r.pool.QueryRow(ctx, `
|
||||
INSERT INTO discovered_markets
|
||||
(bucket_id, markt_name, stadt, bundesland, land, start_datum, end_datum, website,
|
||||
quellen, extraktion, hinweis, name_normalized, matched_series_id)
|
||||
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
|
||||
RETURNING id`,
|
||||
d.BucketID, d.MarktName, d.Stadt, d.Bundesland, d.Land, d.StartDatum, d.EndDatum, d.Website,
|
||||
d.Quellen, d.Extraktion, d.Hinweis, d.NameNormalized, d.MatchedSeriesID).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (r *pgRepository) IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) {
|
||||
var exists bool
|
||||
err := r.pool.QueryRow(ctx,
|
||||
`SELECT EXISTS(SELECT 1 FROM rejected_discoveries WHERE name_normalized=$1 AND stadt=$2 AND year=$3)`,
|
||||
nameNormalized, stadt, year).Scan(&exists)
|
||||
return exists, err
|
||||
}
|
||||
|
||||
func (r *pgRepository) QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) {
|
||||
var exists bool
|
||||
err := r.pool.QueryRow(ctx, `
|
||||
SELECT EXISTS(
|
||||
SELECT 1 FROM discovered_markets
|
||||
WHERE status='pending'
|
||||
AND name_normalized=$1 AND stadt=$2
|
||||
AND start_datum IS NOT DISTINCT FROM $3
|
||||
)`, nameNormalized, stadt, startDatum).Scan(&exists)
|
||||
return exists, err
|
||||
}
|
||||
|
||||
func (r *pgRepository) ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error) {
|
||||
rows, err := r.pool.Query(ctx, `
|
||||
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
|
||||
start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''),
|
||||
coalesce(hinweis,''), name_normalized, matched_series_id, status,
|
||||
discovered_at, reviewed_at, reviewed_by, created_edition_id
|
||||
FROM discovered_markets
|
||||
WHERE status = $1
|
||||
ORDER BY discovered_at DESC
|
||||
LIMIT $2 OFFSET $3`, status, limit, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []DiscoveredMarket
|
||||
for rows.Next() {
|
||||
var d DiscoveredMarket
|
||||
if err := rows.Scan(
|
||||
&d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land,
|
||||
&d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion,
|
||||
&d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status,
|
||||
&d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, d)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) {
|
||||
var d DiscoveredMarket
|
||||
err := r.pool.QueryRow(ctx, `
|
||||
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
|
||||
start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''),
|
||||
coalesce(hinweis,''), name_normalized, matched_series_id, status,
|
||||
discovered_at, reviewed_at, reviewed_by, created_edition_id
|
||||
FROM discovered_markets WHERE id = $1`, id).Scan(
|
||||
&d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land,
|
||||
&d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion,
|
||||
&d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status,
|
||||
&d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID,
|
||||
)
|
||||
return d, err
|
||||
}
|
||||
|
||||
func (r *pgRepository) MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error {
|
||||
_, err := tx.Exec(ctx, `
|
||||
UPDATE discovered_markets
|
||||
SET status='accepted', reviewed_at=now(), reviewed_by=$2, created_edition_id=$3
|
||||
WHERE id = $1 AND status='pending'`, id, reviewer, editionID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *pgRepository) MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error {
|
||||
_, err := tx.Exec(ctx, `
|
||||
UPDATE discovered_markets
|
||||
SET status='rejected', reviewed_at=now(), reviewed_by=$2
|
||||
WHERE id = $1 AND status='pending'`, id, reviewer)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *pgRepository) InsertRejection(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error {
|
||||
_, err := tx.Exec(ctx, `
|
||||
INSERT INTO rejected_discoveries (name_normalized, stadt, year, rejected_by, reason)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (name_normalized, stadt, year) DO NOTHING`,
|
||||
rej.NameNormalized, rej.Stadt, rej.Year, rej.RejectedBy, rej.Reason)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *pgRepository) BeginTx(ctx context.Context) (pgx.Tx, error) {
|
||||
return r.pool.BeginTx(ctx, pgx.TxOptions{})
|
||||
}
|
||||
22
backend/internal/domain/discovery/routes.go
Normal file
22
backend/internal/domain/discovery/routes.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package discovery
|
||||
|
||||
import "github.com/gin-gonic/gin"
|
||||
|
||||
// RegisterRoutes mounts both the admin-session routes (queue mgmt) and the
|
||||
// bearer-token route (tick). The two middlewares are passed in separately.
|
||||
func RegisterRoutes(
|
||||
rg *gin.RouterGroup,
|
||||
h *Handler,
|
||||
requireAuth, requireAdmin, requireTickToken gin.HandlerFunc,
|
||||
) {
|
||||
// Machine-driven tick (bearer token).
|
||||
rg.POST("/admin/discovery/tick", requireTickToken, h.Tick)
|
||||
|
||||
// Admin-session queue routes.
|
||||
admin := rg.Group("/admin/discovery", requireAuth, requireAdmin)
|
||||
{
|
||||
admin.GET("/queue", h.ListQueue)
|
||||
admin.POST("/queue/:id/accept", h.Accept)
|
||||
admin.POST("/queue/:id/reject", h.Reject)
|
||||
}
|
||||
}
|
||||
316
backend/internal/domain/discovery/service.go
Normal file
316
backend/internal/domain/discovery/service.go
Normal file
@@ -0,0 +1,316 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"marktvogt.de/backend/internal/domain/market"
|
||||
)
|
||||
|
||||
type marketCreator interface {
|
||||
Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error)
|
||||
CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error)
|
||||
}
|
||||
|
||||
// Service orchestrates bucket scheduling, agent invocation, and queue management.
|
||||
type Service struct {
|
||||
repo Repository
|
||||
agent *AgentClient
|
||||
marketCreator marketCreator
|
||||
batchSize int
|
||||
forwardMonths int
|
||||
}
|
||||
|
||||
// NewService constructs a Service. batchSize and forwardMonths configure bucket picking.
|
||||
func NewService(repo Repository, agent *AgentClient, mc marketCreator, batchSize, forwardMonths int) *Service {
|
||||
return &Service{
|
||||
repo: repo,
|
||||
agent: agent,
|
||||
marketCreator: mc,
|
||||
batchSize: batchSize,
|
||||
forwardMonths: forwardMonths,
|
||||
}
|
||||
}
|
||||
|
||||
// PickBuckets returns stale buckets eligible for the next discovery run.
|
||||
func (s *Service) PickBuckets(ctx context.Context) ([]Bucket, error) {
|
||||
return s.repo.PickStaleBuckets(ctx, s.forwardMonths, s.batchSize)
|
||||
}
|
||||
|
||||
// TickSummary reports what happened in one tick.
|
||||
type TickSummary struct {
|
||||
BucketsProcessed int `json:"buckets_processed"`
|
||||
Discovered int `json:"markets_discovered"`
|
||||
DedupedExisting int `json:"deduped_existing"`
|
||||
DedupedRejected int `json:"deduped_rejected"`
|
||||
DedupedQueue int `json:"deduped_queue"`
|
||||
Errors int `json:"errors"`
|
||||
}
|
||||
|
||||
// Tick picks N stale buckets and runs Pass 0 for each, writing net-new discoveries.
|
||||
func (s *Service) Tick(ctx context.Context) (TickSummary, error) {
|
||||
if s.agent == nil || !s.agent.Enabled() {
|
||||
return TickSummary{}, errors.New("discovery agent not configured")
|
||||
}
|
||||
buckets, err := s.PickBuckets(ctx)
|
||||
if err != nil {
|
||||
return TickSummary{}, fmt.Errorf("pick buckets: %w", err)
|
||||
}
|
||||
var summary TickSummary
|
||||
summary.BucketsProcessed = len(buckets)
|
||||
for _, b := range buckets {
|
||||
s.processOneBucket(ctx, b, &summary)
|
||||
}
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) {
|
||||
resp, err := s.agent.Discover(ctx, b)
|
||||
if err != nil {
|
||||
// Retry once after 2s per spec §9.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
resp, err = s.agent.Discover(ctx, b)
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "pass0 failed twice", "bucket_id", b.ID, "error", err)
|
||||
_ = s.repo.UpdateBucketQueried(ctx, b.ID, err.Error())
|
||||
summary.Errors++
|
||||
return
|
||||
}
|
||||
}
|
||||
sub := s.processBucketResponse(ctx, b, resp)
|
||||
summary.Discovered += sub.Discovered
|
||||
summary.DedupedExisting += sub.DedupedExisting
|
||||
summary.DedupedRejected += sub.DedupedRejected
|
||||
summary.DedupedQueue += sub.DedupedQueue
|
||||
if err := s.repo.UpdateBucketQueried(ctx, b.ID, ""); err != nil {
|
||||
slog.ErrorContext(ctx, "update bucket queried", "bucket_id", b.ID, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary {
|
||||
var summary TickSummary
|
||||
seen := make(map[string]bool) // in-request dedup
|
||||
for _, m := range resp.Maerkte {
|
||||
key := NormalizeName(m.MarktName) + "|" + NormalizeCity(m.Stadt) + "|" + m.StartDatum
|
||||
if seen[key] {
|
||||
continue
|
||||
}
|
||||
seen[key] = true
|
||||
|
||||
// Series candidates pre-filtered by city.
|
||||
candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.Stadt))
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "list series by city", "city", m.Stadt, "error", err)
|
||||
continue
|
||||
}
|
||||
matchedSeriesID := findSeriesMatch(m.MarktName, candidates)
|
||||
|
||||
startDatum, endDatum := parseOptionalDate(m.StartDatum), parseOptionalDate(m.EndDatum)
|
||||
year := 0
|
||||
if startDatum != nil {
|
||||
year = startDatum.Year()
|
||||
}
|
||||
|
||||
// Dedup checks in order (spec §7.1.e).
|
||||
if matchedSeriesID != nil && year > 0 {
|
||||
exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year)
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "edition exists check", "error", err)
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
summary.DedupedExisting++
|
||||
continue
|
||||
}
|
||||
}
|
||||
nameNorm := NormalizeName(m.MarktName)
|
||||
if year > 0 {
|
||||
rejected, err := s.repo.IsRejected(ctx, nameNorm, m.Stadt, year)
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "is rejected check", "error", err)
|
||||
continue
|
||||
}
|
||||
if rejected {
|
||||
summary.DedupedRejected++
|
||||
continue
|
||||
}
|
||||
}
|
||||
pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.Stadt, startDatum)
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "queue pending check", "error", err)
|
||||
continue
|
||||
}
|
||||
if pending {
|
||||
summary.DedupedQueue++
|
||||
continue
|
||||
}
|
||||
|
||||
dm := DiscoveredMarket{
|
||||
BucketID: b.ID,
|
||||
MarktName: m.MarktName,
|
||||
Stadt: m.Stadt,
|
||||
Bundesland: m.Bundesland,
|
||||
Land: b.Land,
|
||||
StartDatum: startDatum,
|
||||
EndDatum: endDatum,
|
||||
Website: m.Website,
|
||||
Quellen: m.Quellen,
|
||||
Extraktion: m.Extraktion,
|
||||
Hinweis: m.Hinweis,
|
||||
NameNormalized: nameNorm,
|
||||
MatchedSeriesID: matchedSeriesID,
|
||||
}
|
||||
if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil {
|
||||
slog.WarnContext(ctx, "insert discovered", "error", err)
|
||||
continue
|
||||
}
|
||||
summary.Discovered++
|
||||
}
|
||||
return summary
|
||||
}
|
||||
|
||||
func parseOptionalDate(s string) *time.Time {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
t, err := time.Parse("2006-01-02", s)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return &t
|
||||
}
|
||||
|
||||
// Accept transitions a pending queue entry into a market edition.
|
||||
// Returns (seriesID, editionID, error).
|
||||
func (s *Service) Accept(ctx context.Context, queueID, reviewerID uuid.UUID) (uuid.UUID, uuid.UUID, error) {
|
||||
d, err := s.repo.GetDiscovered(ctx, queueID)
|
||||
if err != nil {
|
||||
return uuid.Nil, uuid.Nil, fmt.Errorf("load queue entry: %w", err)
|
||||
}
|
||||
if d.Status != StatusPending {
|
||||
return uuid.Nil, uuid.Nil, fmt.Errorf("queue entry is %s, expected pending", d.Status)
|
||||
}
|
||||
if d.StartDatum == nil || d.EndDatum == nil {
|
||||
return uuid.Nil, uuid.Nil, errors.New("cannot accept entry without start/end date")
|
||||
}
|
||||
|
||||
var created market.Market
|
||||
if d.MatchedSeriesID != nil {
|
||||
req := market.CreateEditionRequest{
|
||||
Name: d.MarktName,
|
||||
City: d.Stadt,
|
||||
State: d.Bundesland,
|
||||
Country: landToISO(d.Land),
|
||||
StartDate: d.StartDatum.Format("2006-01-02"),
|
||||
EndDate: d.EndDatum.Format("2006-01-02"),
|
||||
Website: d.Website,
|
||||
}
|
||||
created, err = s.marketCreator.CreateEditionForSeries(ctx, *d.MatchedSeriesID, req)
|
||||
} else {
|
||||
req := market.CreateMarketRequest{
|
||||
Name: d.MarktName,
|
||||
City: d.Stadt,
|
||||
State: d.Bundesland,
|
||||
Country: landToISO(d.Land),
|
||||
StartDate: d.StartDatum.Format("2006-01-02"),
|
||||
EndDate: d.EndDatum.Format("2006-01-02"),
|
||||
Website: d.Website,
|
||||
}
|
||||
created, err = s.marketCreator.Create(ctx, req)
|
||||
}
|
||||
if err != nil {
|
||||
return uuid.Nil, uuid.Nil, fmt.Errorf("create market: %w", err)
|
||||
}
|
||||
|
||||
// Discovery-side transition.
|
||||
tx, err := s.repo.BeginTx(ctx)
|
||||
if err != nil {
|
||||
return created.SeriesID, created.ID, fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback(ctx) }()
|
||||
if err := s.repo.MarkAccepted(ctx, tx, queueID, created.ID, reviewerID); err != nil {
|
||||
return created.SeriesID, created.ID, fmt.Errorf("mark accepted: %w", err)
|
||||
}
|
||||
if err := tx.Commit(ctx); err != nil {
|
||||
return created.SeriesID, created.ID, fmt.Errorf("commit accept tx: %w", err)
|
||||
}
|
||||
return created.SeriesID, created.ID, nil
|
||||
}
|
||||
|
||||
// Reject marks a pending entry rejected and records a sticky rejection.
|
||||
func (s *Service) Reject(ctx context.Context, queueID, reviewerID uuid.UUID, reason string) error {
|
||||
d, err := s.repo.GetDiscovered(ctx, queueID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load queue entry: %w", err)
|
||||
}
|
||||
if d.Status != StatusPending {
|
||||
return fmt.Errorf("queue entry is %s, expected pending", d.Status)
|
||||
}
|
||||
year := time.Now().Year()
|
||||
if d.StartDatum != nil {
|
||||
year = d.StartDatum.Year()
|
||||
}
|
||||
|
||||
tx, err := s.repo.BeginTx(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin tx: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback(ctx) }()
|
||||
if err := s.repo.MarkRejected(ctx, tx, queueID, reviewerID); err != nil {
|
||||
return fmt.Errorf("mark rejected: %w", err)
|
||||
}
|
||||
rejection := RejectedDiscovery{
|
||||
NameNormalized: d.NameNormalized,
|
||||
Stadt: d.Stadt,
|
||||
Year: year,
|
||||
RejectedBy: &reviewerID,
|
||||
Reason: reason,
|
||||
}
|
||||
if err := s.repo.InsertRejection(ctx, tx, rejection); err != nil {
|
||||
return fmt.Errorf("insert rejection: %w", err)
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// landToISO maps the Pass 0 land string to ISO-2 codes required by market.CreateMarketRequest.
|
||||
func landToISO(land string) string {
|
||||
switch land {
|
||||
case "Deutschland":
|
||||
return "DE"
|
||||
case "Österreich", "Oesterreich":
|
||||
return "AT"
|
||||
case "Schweiz":
|
||||
return "CH"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ListPendingQueue exposes queue listing for the handler layer.
|
||||
func (s *Service) ListPendingQueue(ctx context.Context, limit, offset int) ([]DiscoveredMarket, error) {
|
||||
return s.repo.ListQueue(ctx, StatusPending, limit, offset)
|
||||
}
|
||||
|
||||
// findSeriesMatch returns the ID of the first candidate whose normalized name matches
|
||||
// incomingName after normalization. Candidates are expected to be pre-filtered by city.
|
||||
func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID {
|
||||
target := NormalizeName(incomingName)
|
||||
if target == "" {
|
||||
return nil
|
||||
}
|
||||
for _, c := range candidates {
|
||||
if NormalizeName(c.Name) == target {
|
||||
id := c.ID
|
||||
return &id
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
219
backend/internal/domain/discovery/service_test.go
Normal file
219
backend/internal/domain/discovery/service_test.go
Normal file
@@ -0,0 +1,219 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"marktvogt.de/backend/internal/domain/market"
|
||||
)
|
||||
|
||||
func TestFindSeriesMatch(t *testing.T) {
|
||||
target := SeriesCandidate{
|
||||
ID: uuid.MustParse("11111111-1111-1111-1111-111111111111"),
|
||||
Name: "Mittelaltermarkt Trostberg",
|
||||
City: "Trostberg",
|
||||
}
|
||||
other := SeriesCandidate{
|
||||
ID: uuid.MustParse("22222222-2222-2222-2222-222222222222"),
|
||||
Name: "Ritterfest Straßburg",
|
||||
City: "Straßburg",
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
incoming string
|
||||
cands []SeriesCandidate
|
||||
wantID *uuid.UUID
|
||||
}{
|
||||
{"exact match", "Mittelaltermarkt Trostberg", []SeriesCandidate{target, other}, &target.ID},
|
||||
{"suffix variant matches", "Trostberg Mittelaltermarkt", []SeriesCandidate{target}, &target.ID},
|
||||
{"umlaut match", "Ritterfest Strassburg", []SeriesCandidate{other}, &other.ID},
|
||||
{"no match", "Ganz anderer Markt", []SeriesCandidate{target, other}, nil},
|
||||
{"empty candidates", "Mittelaltermarkt Trostberg", nil, nil},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := findSeriesMatch(tc.incoming, tc.cands)
|
||||
switch {
|
||||
case tc.wantID == nil && got == nil:
|
||||
// ok
|
||||
case tc.wantID != nil && got != nil && *got == *tc.wantID:
|
||||
// ok
|
||||
default:
|
||||
t.Errorf("got %v, want %v", got, tc.wantID)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPickBucketsPassesConfigToRepo(t *testing.T) {
|
||||
var gotFM, gotLim int
|
||||
m := &mockRepo{
|
||||
pickStaleFn: func(_ context.Context, fm, lim int) ([]Bucket, error) {
|
||||
gotFM, gotLim = fm, lim
|
||||
return []Bucket{{ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09"}}, nil
|
||||
},
|
||||
}
|
||||
svc := NewService(m, nil, nil, 4, 12)
|
||||
got, err := svc.PickBuckets(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Errorf("got %d buckets, want 1", len(got))
|
||||
}
|
||||
if gotFM != 12 || gotLim != 4 {
|
||||
t.Errorf("forwarded fm=%d lim=%d, want 12,4", gotFM, gotLim)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessBucket_DedupsExisting(t *testing.T) {
|
||||
bucket := Bucket{
|
||||
ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09",
|
||||
}
|
||||
seriesID := uuid.New()
|
||||
var inserted []DiscoveredMarket
|
||||
m := &mockRepo{
|
||||
listSeriesFn: func(_ context.Context, city string) ([]SeriesCandidate, error) {
|
||||
return []SeriesCandidate{{ID: seriesID, Name: "Mittelaltermarkt Trostberg", City: city}}, nil
|
||||
},
|
||||
editionExistsFn: func(_ context.Context, _ uuid.UUID, year int) (bool, error) {
|
||||
return year == 2026, nil // 2026 edition exists → dedup
|
||||
},
|
||||
isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil },
|
||||
queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil },
|
||||
insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) {
|
||||
inserted = append(inserted, d)
|
||||
return uuid.New(), nil
|
||||
},
|
||||
updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil },
|
||||
}
|
||||
svc := NewService(m, nil, nil, 4, 12)
|
||||
|
||||
resp := Pass0Response{
|
||||
Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"},
|
||||
Maerkte: []Pass0Market{
|
||||
{MarktName: "Mittelaltermarkt Trostberg", Stadt: "Trostberg", StartDatum: "2026-09-12", EndDatum: "2026-09-14", Quellen: []string{"https://x"}},
|
||||
},
|
||||
}
|
||||
summary := svc.processBucketResponse(context.Background(), bucket, resp)
|
||||
if summary.Discovered != 0 {
|
||||
t.Errorf("expected 0 discovered, got %d", summary.Discovered)
|
||||
}
|
||||
if summary.DedupedExisting != 1 {
|
||||
t.Errorf("expected 1 deduped_existing, got %d", summary.DedupedExisting)
|
||||
}
|
||||
if len(inserted) != 0 {
|
||||
t.Errorf("expected no inserts, got %d", len(inserted))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessBucket_InsertsNetNew(t *testing.T) {
|
||||
bucket := Bucket{
|
||||
ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09",
|
||||
}
|
||||
var inserted []DiscoveredMarket
|
||||
m := &mockRepo{
|
||||
listSeriesFn: func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil },
|
||||
editionExistsFn: func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil },
|
||||
isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil },
|
||||
queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil },
|
||||
insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) {
|
||||
inserted = append(inserted, d)
|
||||
return uuid.New(), nil
|
||||
},
|
||||
updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil },
|
||||
}
|
||||
svc := NewService(m, nil, nil, 4, 12)
|
||||
|
||||
resp := Pass0Response{
|
||||
Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"},
|
||||
Maerkte: []Pass0Market{
|
||||
{MarktName: "Neuer Markt", Stadt: "Passau", StartDatum: "2026-09-20", EndDatum: "2026-09-22", Quellen: []string{"https://x"}},
|
||||
},
|
||||
}
|
||||
summary := svc.processBucketResponse(context.Background(), bucket, resp)
|
||||
if summary.Discovered != 1 {
|
||||
t.Errorf("expected 1 discovered, got %d", summary.Discovered)
|
||||
}
|
||||
if len(inserted) != 1 {
|
||||
t.Errorf("expected 1 insert, got %d", len(inserted))
|
||||
}
|
||||
if inserted[0].NameNormalized != "neuer" {
|
||||
t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized)
|
||||
}
|
||||
}
|
||||
|
||||
type stubCreator struct {
|
||||
createCalls int
|
||||
createEditionForSeriesCalls int
|
||||
returnErr error
|
||||
}
|
||||
|
||||
func (c *stubCreator) Create(_ context.Context, _ market.CreateMarketRequest) (market.Market, error) {
|
||||
c.createCalls++
|
||||
if c.returnErr != nil {
|
||||
return market.Market{}, c.returnErr
|
||||
}
|
||||
return market.Market{ID: uuid.New(), SeriesID: uuid.New()}, nil
|
||||
}
|
||||
|
||||
func (c *stubCreator) CreateEditionForSeries(_ context.Context, sid uuid.UUID, _ market.CreateEditionRequest) (market.Market, error) {
|
||||
c.createEditionForSeriesCalls++
|
||||
if c.returnErr != nil {
|
||||
return market.Market{}, c.returnErr
|
||||
}
|
||||
return market.Market{ID: uuid.New(), SeriesID: sid}, nil
|
||||
}
|
||||
|
||||
type noopTx struct{ pgx.Tx }
|
||||
|
||||
func (noopTx) Commit(_ context.Context) error { return nil }
|
||||
func (noopTx) Rollback(_ context.Context) error { return nil }
|
||||
|
||||
func TestAccept_NewSeries_CallsCreate(t *testing.T) {
|
||||
start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC)
|
||||
end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC)
|
||||
qID := uuid.New()
|
||||
m := &mockRepo{
|
||||
getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) {
|
||||
return DiscoveredMarket{ID: qID, Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end}, nil
|
||||
},
|
||||
beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil },
|
||||
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
|
||||
}
|
||||
mc := &stubCreator{}
|
||||
svc := NewService(m, nil, mc, 4, 12)
|
||||
_, _, err := svc.Accept(context.Background(), qID, uuid.New())
|
||||
if err != nil {
|
||||
t.Fatalf("accept err: %v", err)
|
||||
}
|
||||
if mc.createCalls != 1 || mc.createEditionForSeriesCalls != 0 {
|
||||
t.Errorf("expected Create=1 CreateEdition=0, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) {
|
||||
start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC)
|
||||
end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC)
|
||||
sid := uuid.New()
|
||||
m := &mockRepo{
|
||||
getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) {
|
||||
return DiscoveredMarket{Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end, MatchedSeriesID: &sid}, nil
|
||||
},
|
||||
beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil },
|
||||
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
|
||||
}
|
||||
mc := &stubCreator{}
|
||||
svc := NewService(m, nil, mc, 4, 12)
|
||||
_, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New())
|
||||
if err != nil {
|
||||
t.Fatalf("accept err: %v", err)
|
||||
}
|
||||
if mc.createCalls != 0 || mc.createEditionForSeriesCalls != 1 {
|
||||
t.Errorf("expected Create=0 CreateEdition=1, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls)
|
||||
}
|
||||
}
|
||||
38
backend/internal/middleware/bearer_token.go
Normal file
38
backend/internal/middleware/bearer_token.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"crypto/subtle"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"marktvogt.de/backend/internal/pkg/apierror"
|
||||
)
|
||||
|
||||
// RequireBearerToken validates Authorization: Bearer <token> against the given
|
||||
// secret using constant-time compare. An empty secret disables the route (all
|
||||
// requests are rejected) — a safety default for dev environments that haven't
|
||||
// set the token.
|
||||
func RequireBearerToken(secret string) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
if secret == "" {
|
||||
apiErr := apierror.Unauthorized("endpoint disabled")
|
||||
c.AbortWithStatusJSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
auth := c.GetHeader("Authorization")
|
||||
const prefix = "Bearer "
|
||||
if !strings.HasPrefix(auth, prefix) {
|
||||
apiErr := apierror.Unauthorized("missing bearer token")
|
||||
c.AbortWithStatusJSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
provided := strings.TrimPrefix(auth, prefix)
|
||||
if subtle.ConstantTimeCompare([]byte(provided), []byte(secret)) != 1 {
|
||||
apiErr := apierror.Unauthorized("invalid bearer token")
|
||||
c.AbortWithStatusJSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
57
backend/internal/middleware/bearer_token_test.go
Normal file
57
backend/internal/middleware/bearer_token_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestRequireBearerToken(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
secret := "s3cret"
|
||||
mw := RequireBearerToken(secret)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
header string
|
||||
wantStatus int
|
||||
}{
|
||||
{"missing", "", http.StatusUnauthorized},
|
||||
{"wrong scheme", "Basic abcd", http.StatusUnauthorized},
|
||||
{"wrong token", "Bearer wrong", http.StatusUnauthorized},
|
||||
{"correct", "Bearer s3cret", http.StatusOK},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
w := httptest.NewRecorder()
|
||||
_, r := gin.CreateTestContext(w)
|
||||
r.Use(mw)
|
||||
r.GET("/t", func(c *gin.Context) { c.Status(http.StatusOK) })
|
||||
req := httptest.NewRequest(http.MethodGet, "/t", nil)
|
||||
if tc.header != "" {
|
||||
req.Header.Set("Authorization", tc.header)
|
||||
}
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != tc.wantStatus {
|
||||
t.Errorf("status=%d want %d", w.Code, tc.wantStatus)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequireBearerToken_EmptySecretRejectsAll(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
mw := RequireBearerToken("") // feature disabled → all requests 401
|
||||
w := httptest.NewRecorder()
|
||||
_, r := gin.CreateTestContext(w)
|
||||
r.Use(mw)
|
||||
r.GET("/t", func(c *gin.Context) { c.Status(http.StatusOK) })
|
||||
req := httptest.NewRequest(http.MethodGet, "/t", nil)
|
||||
req.Header.Set("Authorization", "Bearer anything")
|
||||
r.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("expected 401 when secret is empty, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package ai
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VikingOwl91/mistral-go-sdk"
|
||||
@@ -10,13 +11,46 @@ import (
|
||||
"github.com/VikingOwl91/mistral-go-sdk/conversation"
|
||||
)
|
||||
|
||||
// rateLimiter enforces a minimum interval between calls. Set rps<=0 to disable.
|
||||
// Mirrors backend/internal/pkg/geocode/nominatim.go's lastReq gate.
|
||||
type rateLimiter struct {
|
||||
mu sync.Mutex
|
||||
lastReq time.Time
|
||||
minInterval time.Duration
|
||||
}
|
||||
|
||||
func newRateLimiter(rps float64) *rateLimiter {
|
||||
if rps <= 0 {
|
||||
return &rateLimiter{minInterval: 0}
|
||||
}
|
||||
return &rateLimiter{minInterval: time.Duration(float64(time.Second) / rps)}
|
||||
}
|
||||
|
||||
// TODO: wait() does not honor context cancellation — a cancelled caller will
|
||||
// block up to minInterval while holding a mutex, and queued callers block up
|
||||
// to N*minInterval. Mirror pkg/geocode/nominatim.go which has the same gap.
|
||||
// Fix both sites together by taking ctx and using time.NewTimer + select.
|
||||
func (rl *rateLimiter) wait() {
|
||||
if rl.minInterval == 0 {
|
||||
return
|
||||
}
|
||||
rl.mu.Lock()
|
||||
defer rl.mu.Unlock()
|
||||
since := time.Since(rl.lastReq)
|
||||
if since < rl.minInterval {
|
||||
time.Sleep(rl.minInterval - since)
|
||||
}
|
||||
rl.lastReq = time.Now()
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
sdk *mistral.Client
|
||||
agentSimple string
|
||||
modelComplex string
|
||||
limiter *rateLimiter
|
||||
}
|
||||
|
||||
func New(apiKey, agentSimple, modelComplex string) *Client {
|
||||
func New(apiKey, agentSimple, modelComplex string, rps float64) *Client {
|
||||
if modelComplex == "" {
|
||||
modelComplex = "mistral-large-latest"
|
||||
}
|
||||
@@ -33,6 +67,7 @@ func New(apiKey, agentSimple, modelComplex string) *Client {
|
||||
sdk: sdk,
|
||||
agentSimple: agentSimple,
|
||||
modelComplex: modelComplex,
|
||||
limiter: newRateLimiter(rps),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +89,7 @@ type PassResult struct {
|
||||
|
||||
// Pass1 uses the Conversations API to call the pre-created agent (with web search).
|
||||
func (c *Client) Pass1(ctx context.Context, prompt string) (PassResult, error) {
|
||||
c.limiter.wait()
|
||||
storeFalse := false
|
||||
resp, err := c.sdk.StartConversation(ctx, &conversation.StartRequest{
|
||||
AgentID: c.agentSimple,
|
||||
@@ -76,8 +112,39 @@ func (c *Client) Pass1(ctx context.Context, prompt string) (PassResult, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Pass0 uses the Conversations API to call a discovery agent identified by agentID.
|
||||
// The agent ID is passed explicitly so the discovery domain can configure its own
|
||||
// agent independently of the agentSimple field used by Pass1.
|
||||
func (c *Client) Pass0(ctx context.Context, agentID, prompt string) (PassResult, error) {
|
||||
c.limiter.wait()
|
||||
if c.sdk == nil || agentID == "" {
|
||||
return PassResult{}, fmt.Errorf("pass0: ai client not configured (sdk=%v agentID=%q)", c.sdk != nil, agentID)
|
||||
}
|
||||
storeFalse := false
|
||||
resp, err := c.sdk.StartConversation(ctx, &conversation.StartRequest{
|
||||
AgentID: agentID,
|
||||
Inputs: conversation.TextInputs(prompt),
|
||||
Store: &storeFalse,
|
||||
})
|
||||
if err != nil {
|
||||
return PassResult{}, fmt.Errorf("pass0 conversation: %w", err)
|
||||
}
|
||||
|
||||
content := extractConversationContent(resp)
|
||||
if content == "" {
|
||||
return PassResult{}, fmt.Errorf("pass0: no assistant message in response")
|
||||
}
|
||||
|
||||
return PassResult{
|
||||
Content: content,
|
||||
Usage: convertConvUsage(resp.Usage),
|
||||
Model: "agent:" + agentID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Pass2 uses chat completions for description generation + retry fields.
|
||||
func (c *Client) Pass2(ctx context.Context, systemPrompt, userPrompt string) (PassResult, error) {
|
||||
c.limiter.wait()
|
||||
resp, err := c.sdk.ChatComplete(ctx, &chat.CompletionRequest{
|
||||
Model: c.modelComplex,
|
||||
Messages: []chat.Message{
|
||||
|
||||
48
backend/internal/pkg/ai/rate_limiter_test.go
Normal file
48
backend/internal/pkg/ai/rate_limiter_test.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package ai
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRateLimiterSerializesCalls(t *testing.T) {
|
||||
rl := newRateLimiter(2.0) // 2 req/s → minInterval 500ms
|
||||
var (
|
||||
mu sync.Mutex
|
||||
times []time.Time
|
||||
)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 3; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
rl.wait()
|
||||
mu.Lock()
|
||||
times = append(times, time.Now())
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Sort times; gaps between consecutive must be >= 500ms - small tolerance.
|
||||
sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) })
|
||||
if gap := times[1].Sub(times[0]); gap < 400*time.Millisecond {
|
||||
t.Errorf("gap[0->1] = %v, want >= 400ms", gap)
|
||||
}
|
||||
if gap := times[2].Sub(times[1]); gap < 400*time.Millisecond {
|
||||
t.Errorf("gap[1->2] = %v, want >= 400ms", gap)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRateLimiterDisabledWhenRPSZero(t *testing.T) {
|
||||
rl := newRateLimiter(0) // disabled
|
||||
start := time.Now()
|
||||
for i := 0; i < 5; i++ {
|
||||
rl.wait()
|
||||
}
|
||||
if elapsed := time.Since(start); elapsed > 50*time.Millisecond {
|
||||
t.Errorf("expected no throttling when rps=0, elapsed %v", elapsed)
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"marktvogt.de/backend/internal/domain/auth"
|
||||
"marktvogt.de/backend/internal/domain/discovery"
|
||||
"marktvogt.de/backend/internal/domain/market"
|
||||
"marktvogt.de/backend/internal/domain/user"
|
||||
"marktvogt.de/backend/internal/middleware"
|
||||
@@ -64,10 +65,24 @@ func (s *Server) registerRoutes() {
|
||||
|
||||
// Admin market routes
|
||||
adminMarketHandler := market.NewAdminHandler(marketSvc)
|
||||
aiClient := ai.New(s.cfg.AI.APIKey, s.cfg.AI.AgentSimple, s.cfg.AI.ModelComplex)
|
||||
aiClient := ai.New(s.cfg.AI.APIKey, s.cfg.AI.AgentSimple, s.cfg.AI.ModelComplex, s.cfg.AI.RateLimitRPS)
|
||||
researchHandler := market.NewResearchHandler(marketSvc, aiClient)
|
||||
requireAdmin := middleware.RequireRole("admin")
|
||||
market.RegisterAdminRoutes(v1, adminMarketHandler, researchHandler, requireAuth, requireAdmin)
|
||||
|
||||
// Discovery routes
|
||||
discoveryRepo := discovery.NewRepository(s.db)
|
||||
discoveryAgent := discovery.NewAgentClient(aiClient, s.cfg.AI.AgentDiscovery)
|
||||
discoveryService := discovery.NewService(
|
||||
discoveryRepo,
|
||||
discoveryAgent,
|
||||
marketSvc,
|
||||
s.cfg.Discovery.BatchSize,
|
||||
s.cfg.Discovery.ForwardMonths,
|
||||
)
|
||||
discoveryHandler := discovery.NewHandler(discoveryService)
|
||||
requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token)
|
||||
discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken)
|
||||
}
|
||||
|
||||
func (s *Server) healthz(c *gin.Context) {
|
||||
|
||||
1
backend/migrations/000011_discovery_buckets.down.sql
Normal file
1
backend/migrations/000011_discovery_buckets.down.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS discovery_buckets;
|
||||
13
backend/migrations/000011_discovery_buckets.up.sql
Normal file
13
backend/migrations/000011_discovery_buckets.up.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE discovery_buckets (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
land text NOT NULL,
|
||||
region text NOT NULL,
|
||||
year_month char(7) NOT NULL,
|
||||
last_queried_at timestamptz,
|
||||
last_error text,
|
||||
created_at timestamptz NOT NULL DEFAULT now(),
|
||||
UNIQUE (land, region, year_month)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_discovery_buckets_stale
|
||||
ON discovery_buckets (last_queried_at NULLS FIRST);
|
||||
1
backend/migrations/000012_discovered_markets.down.sql
Normal file
1
backend/migrations/000012_discovered_markets.down.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS discovered_markets;
|
||||
30
backend/migrations/000012_discovered_markets.up.sql
Normal file
30
backend/migrations/000012_discovered_markets.up.sql
Normal file
@@ -0,0 +1,30 @@
|
||||
CREATE TABLE discovered_markets (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
bucket_id uuid NOT NULL REFERENCES discovery_buckets(id) ON DELETE CASCADE,
|
||||
markt_name text NOT NULL,
|
||||
stadt text NOT NULL,
|
||||
bundesland text,
|
||||
land text NOT NULL,
|
||||
start_datum date,
|
||||
end_datum date,
|
||||
website text,
|
||||
quellen text[] NOT NULL DEFAULT '{}',
|
||||
extraktion text,
|
||||
hinweis text,
|
||||
name_normalized text NOT NULL,
|
||||
matched_series_id uuid REFERENCES market_series(id) ON DELETE SET NULL,
|
||||
status text NOT NULL DEFAULT 'pending'
|
||||
CHECK (status IN ('pending', 'accepted', 'rejected')),
|
||||
discovered_at timestamptz NOT NULL DEFAULT now(),
|
||||
reviewed_at timestamptz,
|
||||
reviewed_by uuid REFERENCES users(id) ON DELETE SET NULL,
|
||||
created_edition_id uuid REFERENCES market_editions(id) ON DELETE SET NULL
|
||||
);
|
||||
|
||||
CREATE INDEX idx_discovered_markets_pending
|
||||
ON discovered_markets (status, discovered_at)
|
||||
WHERE status = 'pending';
|
||||
|
||||
CREATE INDEX idx_discovered_markets_dedup
|
||||
ON discovered_markets (name_normalized, stadt, start_datum)
|
||||
WHERE status = 'pending';
|
||||
1
backend/migrations/000013_rejected_discoveries.down.sql
Normal file
1
backend/migrations/000013_rejected_discoveries.down.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS rejected_discoveries;
|
||||
13
backend/migrations/000013_rejected_discoveries.up.sql
Normal file
13
backend/migrations/000013_rejected_discoveries.up.sql
Normal file
@@ -0,0 +1,13 @@
|
||||
CREATE TABLE rejected_discoveries (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
name_normalized text NOT NULL,
|
||||
stadt text NOT NULL,
|
||||
year int NOT NULL,
|
||||
rejected_at timestamptz NOT NULL DEFAULT now(),
|
||||
rejected_by uuid REFERENCES users(id) ON DELETE SET NULL,
|
||||
reason text,
|
||||
UNIQUE (name_normalized, stadt, year)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_rejected_discoveries_lookup
|
||||
ON rejected_discoveries (name_normalized, stadt, year);
|
||||
@@ -0,0 +1,3 @@
|
||||
-- Do not delete rows on rollback — `last_queried_at` and `last_error` may carry
|
||||
-- operational state. The table itself is dropped by 000011's down migration.
|
||||
SELECT 1;
|
||||
38
backend/migrations/000014_seed_discovery_buckets.up.sql
Normal file
38
backend/migrations/000014_seed_discovery_buckets.up.sql
Normal file
@@ -0,0 +1,38 @@
|
||||
-- Seed buckets for the 24-month window starting current month.
|
||||
WITH regions(land, region) AS (
|
||||
VALUES
|
||||
('Deutschland', 'Baden-Württemberg'),
|
||||
('Deutschland', 'Bayern'),
|
||||
('Deutschland', 'Berlin'),
|
||||
('Deutschland', 'Brandenburg'),
|
||||
('Deutschland', 'Bremen'),
|
||||
('Deutschland', 'Hamburg'),
|
||||
('Deutschland', 'Hessen'),
|
||||
('Deutschland', 'Mecklenburg-Vorpommern'),
|
||||
('Deutschland', 'Niedersachsen'),
|
||||
('Deutschland', 'Nordrhein-Westfalen'),
|
||||
('Deutschland', 'Rheinland-Pfalz'),
|
||||
('Deutschland', 'Saarland'),
|
||||
('Deutschland', 'Sachsen'),
|
||||
('Deutschland', 'Sachsen-Anhalt'),
|
||||
('Deutschland', 'Schleswig-Holstein'),
|
||||
('Deutschland', 'Thüringen'),
|
||||
('Österreich', 'Burgenland'),
|
||||
('Österreich', 'Kärnten'),
|
||||
('Österreich', 'Niederösterreich'),
|
||||
('Österreich', 'Oberösterreich'),
|
||||
('Österreich', 'Salzburg'),
|
||||
('Österreich', 'Steiermark'),
|
||||
('Österreich', 'Tirol'),
|
||||
('Österreich', 'Vorarlberg'),
|
||||
('Österreich', 'Wien'),
|
||||
('Schweiz', 'Schweiz')
|
||||
),
|
||||
months AS (
|
||||
SELECT to_char(date_trunc('month', now())::date + (n || ' month')::interval, 'YYYY-MM') AS year_month
|
||||
FROM generate_series(0, 23) AS n
|
||||
)
|
||||
INSERT INTO discovery_buckets (land, region, year_month)
|
||||
SELECT r.land, r.region, m.year_month
|
||||
FROM regions r CROSS JOIN months m
|
||||
ON CONFLICT (land, region, year_month) DO NOTHING;
|
||||
76
web/src/routes/admin/discovery/+page.server.ts
Normal file
76
web/src/routes/admin/discovery/+page.server.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
import type { PageServerLoad, Actions } from './$types.js';
|
||||
import { redirect, fail } from '@sveltejs/kit';
|
||||
import { serverFetch } from '$lib/api/client.server.js';
|
||||
|
||||
type DiscoveredMarket = {
|
||||
id: string;
|
||||
markt_name: string;
|
||||
stadt: string;
|
||||
bundesland: string;
|
||||
land: string;
|
||||
start_datum: string | null;
|
||||
end_datum: string | null;
|
||||
website: string;
|
||||
quellen: string[];
|
||||
extraktion: string;
|
||||
hinweis: string;
|
||||
matched_series_id: string | null;
|
||||
discovered_at: string;
|
||||
};
|
||||
|
||||
export const load: PageServerLoad = async ({ cookies, url }) => {
|
||||
const limit = Number(url.searchParams.get('limit') ?? 50);
|
||||
const offset = Number(url.searchParams.get('offset') ?? 0);
|
||||
const res = await serverFetch<DiscoveredMarket[]>(
|
||||
`/admin/discovery/queue?limit=${limit}&offset=${offset}`,
|
||||
cookies
|
||||
);
|
||||
return { queue: res.data, limit, offset };
|
||||
};
|
||||
|
||||
export const actions: Actions = {
|
||||
accept: async ({ request, cookies, fetch }) => {
|
||||
const form = await request.formData();
|
||||
const id = String(form.get('id') ?? '');
|
||||
if (!id) return fail(400, { error: 'missing id' });
|
||||
|
||||
try {
|
||||
const res = await serverFetch<{ series_id: string; edition_id: string }>(
|
||||
`/admin/discovery/queue/${id}/accept`,
|
||||
cookies,
|
||||
{ method: 'POST', fetch }
|
||||
);
|
||||
// Fire research asynchronously — don't await, the edit page handles streaming suggestions.
|
||||
void serverFetch(`/admin/markets/${res.data.edition_id}/research`, cookies, {
|
||||
method: 'POST',
|
||||
fetch
|
||||
}).catch(() => {
|
||||
// Silent: the edit page shows a toast if research hasn't completed.
|
||||
});
|
||||
redirect(303, `/admin/maerkte/${res.data.edition_id}/edit`);
|
||||
} catch (err) {
|
||||
if (err instanceof Response || (err as { status?: number })?.status === 303) throw err;
|
||||
const message = err instanceof Error ? err.message : 'Accept fehlgeschlagen.';
|
||||
return fail(500, { error: message });
|
||||
}
|
||||
},
|
||||
|
||||
reject: async ({ request, cookies, fetch }) => {
|
||||
const form = await request.formData();
|
||||
const id = String(form.get('id') ?? '');
|
||||
const reason = String(form.get('reason') ?? '');
|
||||
if (!id) return fail(400, { error: 'missing id' });
|
||||
|
||||
try {
|
||||
await serverFetch(`/admin/discovery/queue/${id}/reject`, cookies, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ reason }),
|
||||
fetch
|
||||
});
|
||||
return { success: true };
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Reject fehlgeschlagen.';
|
||||
return fail(500, { error: message });
|
||||
}
|
||||
}
|
||||
};
|
||||
96
web/src/routes/admin/discovery/+page.svelte
Normal file
96
web/src/routes/admin/discovery/+page.svelte
Normal file
@@ -0,0 +1,96 @@
|
||||
<script lang="ts">
|
||||
import { enhance } from '$app/forms';
|
||||
let { data } = $props();
|
||||
</script>
|
||||
|
||||
<svelte:head>
|
||||
<title>Admin · Discovery Queue</title>
|
||||
</svelte:head>
|
||||
|
||||
<div class="mx-auto max-w-7xl px-4 py-8">
|
||||
<h1 class="text-2xl font-bold">Discovery Queue</h1>
|
||||
<p class="mt-1 text-sm text-stone-500">
|
||||
{data.queue.length} pending · showing from offset {data.offset}
|
||||
</p>
|
||||
|
||||
{#if data.queue.length === 0}
|
||||
<p class="mt-8 rounded border border-stone-200 bg-stone-50 p-6 text-center text-stone-600">
|
||||
Keine Einträge in der Warteschlange.
|
||||
</p>
|
||||
{:else}
|
||||
<table class="mt-6 w-full text-left text-sm">
|
||||
<thead class="border-b border-stone-200 text-stone-500">
|
||||
<tr>
|
||||
<th class="py-2">Land</th>
|
||||
<th>Region</th>
|
||||
<th>Markt</th>
|
||||
<th>Stadt</th>
|
||||
<th>Datum</th>
|
||||
<th>Website</th>
|
||||
<th>Quellen</th>
|
||||
<th>Extraktion</th>
|
||||
<th class="text-right">Aktion</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{#each data.queue as row (row.id)}
|
||||
<tr class="border-b border-stone-100">
|
||||
<td class="py-2">{row.land}</td>
|
||||
<td>{row.bundesland}</td>
|
||||
<td class="font-medium">{row.markt_name}</td>
|
||||
<td>{row.stadt}</td>
|
||||
<td>
|
||||
{#if row.start_datum}
|
||||
{row.start_datum}{row.end_datum ? ` – ${row.end_datum}` : ''}
|
||||
{:else}
|
||||
<span class="text-stone-400">—</span>
|
||||
{/if}
|
||||
</td>
|
||||
<td>
|
||||
{#if row.website}
|
||||
<a
|
||||
href={row.website}
|
||||
target="_blank"
|
||||
rel="noreferrer noopener"
|
||||
class="text-blue-600 underline">link</a
|
||||
>
|
||||
{:else}
|
||||
<span class="text-stone-400">—</span>
|
||||
{/if}
|
||||
</td>
|
||||
<td>{row.quellen?.length ?? 0}</td>
|
||||
<td>
|
||||
<span
|
||||
class="inline-block rounded px-2 py-0.5 text-xs {row.extraktion === 'verbatim'
|
||||
? 'bg-emerald-100 text-emerald-700'
|
||||
: 'bg-amber-100 text-amber-700'}"
|
||||
>
|
||||
{row.extraktion || '—'}
|
||||
</span>
|
||||
</td>
|
||||
<td class="text-right">
|
||||
<form method="POST" action="?/accept" use:enhance class="inline">
|
||||
<input type="hidden" name="id" value={row.id} />
|
||||
<button
|
||||
type="submit"
|
||||
class="rounded bg-emerald-600 px-2 py-1 text-xs text-white hover:bg-emerald-700"
|
||||
>
|
||||
Accept
|
||||
</button>
|
||||
</form>
|
||||
<form method="POST" action="?/reject" use:enhance class="inline">
|
||||
<input type="hidden" name="id" value={row.id} />
|
||||
<button
|
||||
type="submit"
|
||||
class="ml-1 rounded bg-stone-200 px-2 py-1 text-xs text-stone-700 hover:bg-stone-300"
|
||||
>
|
||||
Reject
|
||||
</button>
|
||||
</form>
|
||||
</td>
|
||||
</tr>
|
||||
{/each}
|
||||
</tbody>
|
||||
</table>
|
||||
{/if}
|
||||
</div>
|
||||
Reference in New Issue
Block a user