diff --git a/backend/internal/domain/discovery/enrich/similarity.go b/backend/internal/domain/discovery/enrich/similarity.go new file mode 100644 index 0000000..cd58e1d --- /dev/null +++ b/backend/internal/domain/discovery/enrich/similarity.go @@ -0,0 +1,181 @@ +package enrich + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "strings" + "time" +) + +// SimilarityRow carries the minimal identifying fields the classifier reads. +// Callers adapt their domain row into this shape — the enrich package stays +// free of a reverse import on discovery. +type SimilarityRow struct { + NameNormalized string + Stadt string + // Year is StartDatum.Year() (or 0 when unknown). Two rows differing only + // in year are almost always the same series in different editions — + // useful signal but not identity. + Year int + // Display fields — shown to the LLM verbatim so it can reason about + // casing / diacritics that NameNormalized strips. + Name string + // Quellen are passed in but not scraped — the classifier works from + // name/city/date alone. Included in the prompt only as metadata. + Quellen []string +} + +// Verdict is the classifier's answer about one pair. +type Verdict struct { + Same bool `json:"same"` + Confidence float64 `json:"confidence"` // 0..1 + Reason string `json:"reason"` + Model string `json:"model,omitempty"` + ClassifiedAt time.Time `json:"classified_at,omitempty"` +} + +// SimilarityClassifier decides whether two queue rows refer to the same +// underlying market. Implementations should be deterministic across calls +// on the same input — callers cache the result. +type SimilarityClassifier interface { + Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error) +} + +// NoopSimilarityClassifier returns a zero-confidence verdict without calling +// anything. Used as a fallback when AI is disabled; callers should check +// ai.Enabled() and fall back to this type rather than passing nil. +type NoopSimilarityClassifier struct{} + +func (NoopSimilarityClassifier) Classify(_ context.Context, _, _ SimilarityRow) (Verdict, error) { + return Verdict{}, errors.New("similarity classifier not configured") +} + +// SimilarityPairKey derives a content-hash cache key over the two rows' +// identifying tuples. Ordering-independent: Classify(a,b) and Classify(b,a) +// must hit the same cache entry. +// +// Uses NameNormalized + lowered Stadt + Year so the key survives casing +// drift, whitespace, and umlaut normalisation (NameNormalized already did +// the heavy lifting). +func SimilarityPairKey(a, b SimilarityRow) string { + keyA := fmt.Sprintf("%s|%s|%d", a.NameNormalized, lowerASCII(a.Stadt), a.Year) + keyB := fmt.Sprintf("%s|%s|%d", b.NameNormalized, lowerASCII(b.Stadt), b.Year) + // Sort lexicographically for symmetry. + var raw string + if keyA <= keyB { + raw = keyA + "||" + keyB + } else { + raw = keyB + "||" + keyA + } + sum := sha256.Sum256([]byte(raw)) + return hex.EncodeToString(sum[:]) +} + +// DefaultSimilarityCacheTTL: 30 days matches the enrichment cache TTL — +// same reasoning (amortise cost across re-crawls, source edits eventually +// propagate). +const DefaultSimilarityCacheTTL = 30 * 24 * time.Hour + +// MistralSimilarityClassifier implements SimilarityClassifier by sending a +// JSON-formatted comparison prompt to Mistral's chat endpoint. +type MistralSimilarityClassifier struct { + Client aiPass2 +} + +// NewMistralSimilarityClassifier binds a Mistral ai.Client. client must be +// non-nil; routes.go falls back to NoopSimilarityClassifier when AI is off. +func NewMistralSimilarityClassifier(client aiPass2) *MistralSimilarityClassifier { + return &MistralSimilarityClassifier{Client: client} +} + +// simResponse is the JSON shape we instruct Mistral to return. Confidence +// must be parseable as a float 0..1; anything outside that range is clamped. +type simResponse struct { + SameMarket bool `json:"same_market"` + Confidence float64 `json:"confidence"` + Reason string `json:"reason"` +} + +// Classify sends the paired metadata to Mistral and parses the JSON response. +// No web scraping — the classifier works from name/city/year alone, which is +// enough for the common cases (same venue listed on two different calendars, +// editing typos, cross-year recurrence). +func (m *MistralSimilarityClassifier) Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error) { + if m.Client == nil { + return Verdict{}, errors.New("mistral similarity classifier not configured") + } + systemPrompt := simSystemPrompt() + userPrompt := simUserPrompt(a, b) + + result, err := m.Client.Pass2(ctx, systemPrompt, userPrompt) + if err != nil { + return Verdict{}, fmt.Errorf("pass2: %w", err) + } + + var parsed simResponse + if err := json.Unmarshal([]byte(result.Content), &parsed); err != nil { + return Verdict{}, fmt.Errorf("parse response: %w (content=%q)", err, result.Content) + } + + // Clamp confidence to [0,1]; the model occasionally returns 1.2 or -0.1. + conf := parsed.Confidence + if conf < 0 { + conf = 0 + } + if conf > 1 { + conf = 1 + } + + return Verdict{ + Same: parsed.SameMarket, + Confidence: conf, + Reason: strings.TrimSpace(parsed.Reason), + Model: result.Model, + ClassifiedAt: time.Now().UTC(), + }, nil +} + +func simSystemPrompt() string { + return strings.TrimSpace(` +You decide whether two candidate entries refer to the same medieval market +(Mittelaltermarkt) in the DACH region. Input: two objects each with a name, +city, and year. Output a single JSON object: + + { + "same_market": true|false, + "confidence": 0.0-1.0, // how sure you are + "reason": "..." // short German justification (<= 140 chars) + } + +Rules: +- Return ONLY the JSON object. No prose, no code fences. +- "Same market" means same recurring event — same venue, same organiser, + same audience. A market and its anniversary edition in a later year ARE + the same market (just different editions). +- Different cities = different markets, even if the name matches. +- Rephrasings, typos, and umlaut differences (Dresden vs Straßburg vs + Strassburg) are the same market if the underlying identifiers align. +- If the evidence is weak, return same_market=false with low confidence + rather than guessing. Low confidence is more useful than a wrong guess. +`) +} + +func simUserPrompt(a, b SimilarityRow) string { + // Keep the JSON compact; the model handles inline JSON better than + // pretty-printed when the task is "read two records". + ja, _ := json.Marshal(map[string]any{ + "name": a.Name, + "city": a.Stadt, + "year": a.Year, + }) + jb, _ := json.Marshal(map[string]any{ + "name": b.Name, + "city": b.Stadt, + "year": b.Year, + }) + return fmt.Sprintf("A: %s\nB: %s", ja, jb) +} diff --git a/backend/internal/domain/discovery/enrich/similarity_test.go b/backend/internal/domain/discovery/enrich/similarity_test.go new file mode 100644 index 0000000..ff374be --- /dev/null +++ b/backend/internal/domain/discovery/enrich/similarity_test.go @@ -0,0 +1,118 @@ +package enrich + +import ( + "context" + "errors" + "strings" + "testing" + + "marktvogt.de/backend/internal/pkg/ai" +) + +func TestSimilarityPairKey_Symmetric(t *testing.T) { + a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026} + b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026} + + if SimilarityPairKey(a, b) != SimilarityPairKey(b, a) { + t.Error("pair key must be symmetric: (a,b) and (b,a) should produce identical keys") + } +} + +func TestSimilarityPairKey_DifferentInputsDifferentKeys(t *testing.T) { + a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026} + b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026} + c := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2027} + + if SimilarityPairKey(a, b) == SimilarityPairKey(a, c) { + t.Error("different pairs must produce different keys") + } + // Stadt casing must not change the key. + d := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "DRESDEN", Year: 2026} + if SimilarityPairKey(a, b) != SimilarityPairKey(d, b) { + t.Error("stadt casing drift must not change the key") + } +} + +func TestMistralSimilarity_HappyPath(t *testing.T) { + client := &stubPass2{ + result: ai.PassResult{ + Content: `{"same_market":true,"confidence":0.82,"reason":"Gleicher Name, gleiche Stadt, gleiches Jahr."}`, + Model: "mistral-large-latest", + }, + } + c := NewMistralSimilarityClassifier(client) + + got, err := c.Classify(context.Background(), + SimilarityRow{Name: "Mittelaltermarkt Dresden", Stadt: "Dresden", Year: 2026}, + SimilarityRow{Name: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", Year: 2026}, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !got.Same { + t.Errorf("same = false; want true") + } + if got.Confidence < 0.8 || got.Confidence > 0.85 { + t.Errorf("confidence = %v; want ~0.82", got.Confidence) + } + if got.Reason == "" { + t.Error("reason missing") + } + if got.Model != "mistral-large-latest" { + t.Errorf("model = %q", got.Model) + } + // Prompt must carry both rows' identifying fields for the LLM to reason on. + if !strings.Contains(client.lastUser, "Mittelaltermarkt Dresden") { + t.Error("user prompt missing A.name") + } + if !strings.Contains(client.lastSystem, "same_market") { + t.Error("system prompt should describe the JSON schema (same_market key)") + } +} + +func TestMistralSimilarity_ClampsConfidence(t *testing.T) { + tests := []struct { + name string + raw string + wantConf float64 + }{ + {"above 1 clamps to 1", `{"same_market":true,"confidence":1.4,"reason":"x"}`, 1.0}, + {"below 0 clamps to 0", `{"same_market":false,"confidence":-0.3,"reason":"x"}`, 0.0}, + {"in range passes through", `{"same_market":true,"confidence":0.5,"reason":"x"}`, 0.5}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: tc.raw}}) + got, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got.Confidence != tc.wantConf { + t.Errorf("confidence = %v; want %v", got.Confidence, tc.wantConf) + } + }) + } +} + +func TestMistralSimilarity_PropagatesPass2Error(t *testing.T) { + c := NewMistralSimilarityClassifier(&stubPass2{err: errors.New("mistral down")}) + _, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err == nil { + t.Fatal("expected error; got nil") + } +} + +func TestMistralSimilarity_RejectsBadJSON(t *testing.T) { + c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: "not json at all"}}) + _, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err == nil { + t.Fatal("expected parse error; got nil") + } +} + +func TestNoopSimilarityClassifier_Errors(t *testing.T) { + _, err := NoopSimilarityClassifier{}.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err == nil { + t.Error("NoopSimilarityClassifier should return error — it's the fallback when AI is disabled") + } +} diff --git a/backend/internal/domain/discovery/handler.go b/backend/internal/domain/discovery/handler.go index e51cd50..0c1a275 100644 --- a/backend/internal/domain/discovery/handler.go +++ b/backend/internal/domain/discovery/handler.go @@ -386,6 +386,36 @@ func (h *Handler) EnrichLLM(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"data": payload}) } +// ClassifySimilarPair runs the LLM duplicate-tiebreaker on the two queue +// rows identified by URL params :aid and :bid. Synchronous, 15s deadline — +// the call is short (no scraping) so the operator can click and immediately +// see the verdict. +func (h *Handler) ClassifySimilarPair(c *gin.Context) { + aID, err := uuid.Parse(c.Param("aid")) + if err != nil { + apiErr := apierror.BadRequest("invalid_id", "invalid queue id A") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + bID, err := uuid.Parse(c.Param("bid")) + if err != nil { + apiErr := apierror.BadRequest("invalid_id", "invalid queue id B") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + ctx, cancel := context.WithTimeout(c.Request.Context(), 15*time.Second) + defer cancel() + + verdict, err := h.service.ClassifySimilarPair(ctx, aID, bID) + if err != nil { + slog.WarnContext(ctx, "classify similar failed", "a", aID, "b", bID, "error", err) + apiErr := apierror.Internal("classify failed: " + err.Error()) + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.JSON(http.StatusOK, gin.H{"data": verdict}) +} + func currentUserID(c *gin.Context) (uuid.UUID, bool) { raw, exists := c.Get("user_id") if !exists { diff --git a/backend/internal/domain/discovery/handler_test.go b/backend/internal/domain/discovery/handler_test.go index 226c0c8..7919e50 100644 --- a/backend/internal/domain/discovery/handler_test.go +++ b/backend/internal/domain/discovery/handler_test.go @@ -57,6 +57,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) { noopMarketCreator{}, nil, nil, + nil, ) h := NewHandler(svc, 0) // rate limit disabled @@ -91,7 +92,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) { // TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler. func TestCrawlStatusInitialState(t *testing.T) { - svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) h := NewHandler(svc, 0) w := httptest.NewRecorder() @@ -141,7 +142,7 @@ func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) { started: make(chan struct{}), release: make(chan struct{}), } - svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) h := NewHandler(svc, 0) // rate limit disabled // First request — returns 202 and spawns goroutine. @@ -184,6 +185,7 @@ func TestCrawlHandlerRateLimit(t *testing.T) { noopMarketCreator{}, nil, nil, + nil, ) // 1 per hour window. h := NewHandler(svc, 1) diff --git a/backend/internal/domain/discovery/mock_repo_test.go b/backend/internal/domain/discovery/mock_repo_test.go index f943804..9d3b120 100644 --- a/backend/internal/domain/discovery/mock_repo_test.go +++ b/backend/internal/domain/discovery/mock_repo_test.go @@ -36,6 +36,10 @@ type mockRepo struct { getCacheFn func(key string) (enrich.Enrichment, bool, error) setCacheFn func(key string, payload enrich.Enrichment, ttl time.Duration) error listPendingEnrichFn func(limit int) ([]DiscoveredMarket, error) + + // Similarity AI cache hooks. + getSimCacheFn func(pairKey string) (enrich.Verdict, bool, error) + setSimCacheFn func(pairKey string, v enrich.Verdict, ttl time.Duration) error } func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) { @@ -122,6 +126,18 @@ func (m *mockRepo) ListPendingEnrichment(_ context.Context, limit int) ([]Discov } return nil, nil } +func (m *mockRepo) GetSimilarityCache(_ context.Context, pairKey string) (enrich.Verdict, bool, error) { + if m.getSimCacheFn != nil { + return m.getSimCacheFn(pairKey) + } + return enrich.Verdict{}, false, nil +} +func (m *mockRepo) SetSimilarityCache(_ context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error { + if m.setSimCacheFn != nil { + return m.setSimCacheFn(pairKey, v, ttl) + } + return nil +} // noopLinkVerifier passes every URL — used by tests to isolate from network. type noopLinkVerifier struct{} diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go index f332c23..be74341 100644 --- a/backend/internal/domain/discovery/repository.go +++ b/backend/internal/domain/discovery/repository.go @@ -40,6 +40,10 @@ type Repository interface { // Cache operations keyed on sha256(name_normalized|stadt|year). GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error) SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error + + // Similarity AI cache — keyed on enrich.SimilarityPairKey. + GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error) + SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error } // SeriesCandidate is a minimal projection used for name-normalization comparison in Go. @@ -426,3 +430,43 @@ ON CONFLICT (cache_key) DO UPDATE } return nil } + +// GetSimilarityCache returns a cached AI verdict for a (pairKey) or +// (zero, false, nil) on miss/expiry. Expired entries are treated as misses. +func (r *pgRepository) GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error) { + var v enrich.Verdict + err := r.pool.QueryRow(ctx, ` +SELECT same, confidence, reason, model FROM similarity_ai_cache +WHERE pair_key = $1 + AND (expires_at IS NULL OR expires_at > now())`, pairKey).Scan(&v.Same, &v.Confidence, &v.Reason, &v.Model) + if errors.Is(err, pgx.ErrNoRows) { + return enrich.Verdict{}, false, nil + } + if err != nil { + return enrich.Verdict{}, false, fmt.Errorf("similarity cache get: %w", err) + } + return v, true, nil +} + +// SetSimilarityCache upserts a verdict. ttl=0 means "no expiry" (nullable). +func (r *pgRepository) SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error { + var expiresAt *time.Time + if ttl > 0 { + t := time.Now().Add(ttl) + expiresAt = &t + } + _, err := r.pool.Exec(ctx, ` +INSERT INTO similarity_ai_cache (pair_key, same, confidence, reason, model, expires_at) +VALUES ($1, $2, $3, $4, $5, $6) +ON CONFLICT (pair_key) DO UPDATE + SET same = EXCLUDED.same, + confidence = EXCLUDED.confidence, + reason = EXCLUDED.reason, + model = EXCLUDED.model, + created_at = now(), + expires_at = EXCLUDED.expires_at`, pairKey, v.Same, v.Confidence, v.Reason, v.Model, expiresAt) + if err != nil { + return fmt.Errorf("similarity cache set: %w", err) + } + return nil +} diff --git a/backend/internal/domain/discovery/routes.go b/backend/internal/domain/discovery/routes.go index ee91e8e..e884b77 100644 --- a/backend/internal/domain/discovery/routes.go +++ b/backend/internal/domain/discovery/routes.go @@ -26,6 +26,8 @@ func RegisterRoutes( admin.GET("/queue/:id/similar", h.Similar) // Per-row LLM enrichment (MR 3b). Synchronous — operator waits. admin.POST("/queue/:id/enrich", h.EnrichLLM) + // Per-pair AI similarity tiebreak (MR 4). Synchronous; short call. + admin.POST("/queue/:aid/similar/:bid/classify", h.ClassifySimilarPair) // Manual crawl trigger — subject to hourly rate limit. admin.POST("/crawl-manual", h.Crawl) // Async crawl status polling. diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 872b269..208540b 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -57,13 +57,17 @@ type Service struct { // llmEnricher is the AI-backed fallback pass. Nil-safe via NoopLLMEnricher // in test wiring; production code passes a real MistralLLMEnricher. llmEnricher enrich.LLMEnricher + // simClassifier is the AI-backed duplicate tiebreaker. Nil-safe via + // NoopSimilarityClassifier. + simClassifier enrich.SimilarityClassifier } // NewService constructs a Service wired for the crawler-driven Crawl path. // geocoder may be nil; in that case crawl-enrich runs consolidation only and // skips the lat/lng step. llm may be nil; per-row LLM enrichment then -// returns an error instead of attempting a call. -func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder, llm enrich.LLMEnricher) *Service { +// returns an error instead of attempting a call. simClassifier may be nil; +// ClassifySimilarPair returns an error rather than a zero-confidence answer. +func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder, llm enrich.LLMEnricher, sim enrich.SimilarityClassifier) *Service { return &Service{ repo: repo, crawler: cr, @@ -71,6 +75,7 @@ func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCre linkChecker: lc, geocoder: gc, llmEnricher: llm, + simClassifier: sim, } } @@ -595,6 +600,68 @@ func (s *Service) RunLLMEnrichOne(ctx context.Context, queueID uuid.UUID) (enric return merged, nil } +// ClassifySimilarPair runs the AI classifier on the two queue rows identified +// by aID and bID, returning a verdict about whether they're the same market. +// Cache-first: a content-keyed entry (enrich.SimilarityPairKey) shortcuts the +// LLM call when the same pair has been classified before. +// +// Intended for operator-triggered "AI tiebreak" on ambiguous similarity +// matches. The crawl-time auto-merge in MR 7 will call this on its own. +func (s *Service) ClassifySimilarPair(ctx context.Context, aID, bID uuid.UUID) (enrich.Verdict, error) { + if s.simClassifier == nil { + return enrich.Verdict{}, errors.New("similarity classifier not configured") + } + if aID == bID { + return enrich.Verdict{}, errors.New("cannot classify a row against itself") + } + a, err := s.repo.GetDiscovered(ctx, aID) + if err != nil { + return enrich.Verdict{}, fmt.Errorf("load row A: %w", err) + } + b, err := s.repo.GetDiscovered(ctx, bID) + if err != nil { + return enrich.Verdict{}, fmt.Errorf("load row B: %w", err) + } + + rowA := rowToSimilarity(a) + rowB := rowToSimilarity(b) + pairKey := enrich.SimilarityPairKey(rowA, rowB) + + if cached, hit, err := s.repo.GetSimilarityCache(ctx, pairKey); err != nil { + slog.WarnContext(ctx, "similarity cache get failed; continuing", + "pair_key", pairKey, "error", err) + } else if hit { + return cached, nil + } + + verdict, err := s.simClassifier.Classify(ctx, rowA, rowB) + if err != nil { + return enrich.Verdict{}, fmt.Errorf("classify: %w", err) + } + + if err := s.repo.SetSimilarityCache(ctx, pairKey, verdict, enrich.DefaultSimilarityCacheTTL); err != nil { + slog.WarnContext(ctx, "similarity cache set failed; continuing", + "pair_key", pairKey, "error", err) + } + return verdict, nil +} + +// rowToSimilarity adapts a DiscoveredMarket to the narrow SimilarityRow the +// enrich package consumes. Year comes from StartDatum (0 when unknown). +func rowToSimilarity(r DiscoveredMarket) enrich.SimilarityRow { + year := 0 + if r.StartDatum != nil { + year = r.StartDatum.Year() + } + return enrich.SimilarityRow{ + NameNormalized: r.NameNormalized, + Stadt: r.Stadt, + Year: year, + Name: r.MarktName, + Quellen: r.Quellen, + } +} + // CrawlEnrichSummary reports the outcome of one RunCrawlEnrichAll pass. // Mirrors CrawlSummary's shape so the admin UI can reuse its render path. type CrawlEnrichSummary struct { diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index b55971c..06f2a00 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -147,7 +147,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil) + svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil) _, _, err := svc.Accept(context.Background(), qID, uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -169,7 +169,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil) + svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil) _, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -200,7 +200,7 @@ func TestServiceCrawlHappyPath(t *testing.T) { PerSourceMS: map[string]int64{"marktkalendarium": 1}, }, } - svc := NewService(repo, sc, lc, noopMarketCreator{}, nil, nil) + svc := NewService(repo, sc, lc, noopMarketCreator{}, nil, nil, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -241,7 +241,7 @@ func TestServiceCrawlDedupQueue(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -272,7 +272,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) if _, err := svc.Crawl(context.Background()); err != nil { t.Fatal(err) @@ -318,7 +318,7 @@ func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) { }, }, } - svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) // Cancel the context BEFORE Crawl runs — simulates gateway timeout // that fires while the handler is still mid-run. @@ -343,7 +343,7 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) { m := &mockRepo{ countQueueFn: func(_ context.Context, _ string) (int, error) { return 42, nil }, } - svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) rows, total, err := svc.ListPendingQueuePaged(context.Background(), 50, 0) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -375,7 +375,7 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) if _, err := svc.Crawl(context.Background()); err != nil { t.Fatal(err) @@ -424,7 +424,7 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -486,7 +486,7 @@ func TestRunCrawlEnrichAll_HappyPath(t *testing.T) { }, } gc := stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)} - svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc, nil) + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc, nil, nil) summary, err := svc.RunCrawlEnrichAll(context.Background()) if err != nil { @@ -547,7 +547,7 @@ func TestRunCrawlEnrichAll_SetEnrichmentFailure(t *testing.T) { return nil }, } - svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) summary, err := svc.RunCrawlEnrichAll(context.Background()) if err != nil { @@ -639,7 +639,7 @@ func TestRunLLMEnrichOne_HappyPath(t *testing.T) { }, } llm := &stubLLMEnricher{result: llmResult} - svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm) + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil) got, err := svc.RunLLMEnrichOne(context.Background(), rowID) if err != nil { @@ -709,7 +709,7 @@ func TestRunLLMEnrichOne_CacheHitSkipsLLM(t *testing.T) { }, } llm := &stubLLMEnricher{} - svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm) + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil) got, err := svc.RunLLMEnrichOne(context.Background(), rowID) if err != nil { @@ -738,7 +738,7 @@ func TestRunLLMEnrichOne_LLMErrorMarksFailed(t *testing.T) { }, } llm := &stubLLMEnricher{err: errors.New("mistral down")} - svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm) + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil) _, err := svc.RunLLMEnrichOne(context.Background(), rowID) if err == nil { @@ -761,7 +761,7 @@ func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) { return nil }, } - svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil) + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) summary, err := svc.RunCrawlEnrichAll(context.Background()) if err != nil { @@ -774,3 +774,129 @@ func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) { t.Errorf("SetEnrichment called %d times on empty queue", writes) } } + +// stubSimilarityClassifier returns canned verdicts or errors. +type stubSimilarityClassifier struct { + result enrich.Verdict + err error + calls int +} + +func (s *stubSimilarityClassifier) Classify(_ context.Context, _, _ enrich.SimilarityRow) (enrich.Verdict, error) { + s.calls++ + return s.result, s.err +} + +// TestClassifySimilarPair_HappyPath: cache miss → LLM → cache write → return. +func TestClassifySimilarPair_HappyPath(t *testing.T) { + aID, bID := uuid.New(), uuid.New() + start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + rowA := DiscoveredMarket{ID: aID, MarktName: "Ritterfest Dresden", Stadt: "Dresden", NameNormalized: "ritterfest dresden", StartDatum: &start} + rowB := DiscoveredMarket{ID: bID, MarktName: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", NameNormalized: "mittelaltermarkt dresden", StartDatum: &start} + + var cacheSet struct { + called bool + verdict enrich.Verdict + } + repo := &mockRepo{ + getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) { + if id == aID { + return rowA, nil + } + return rowB, nil + }, + setSimCacheFn: func(_ string, v enrich.Verdict, _ time.Duration) error { + cacheSet.called = true + cacheSet.verdict = v + return nil + }, + } + sim := &stubSimilarityClassifier{ + result: enrich.Verdict{Same: false, Confidence: 0.6, Reason: "Unterschiedliche Namen."}, + } + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim) + + got, err := svc.ClassifySimilarPair(context.Background(), aID, bID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if sim.calls != 1 { + t.Errorf("classifier called %d times; want 1", sim.calls) + } + if got.Same || got.Confidence != 0.6 { + t.Errorf("verdict = %+v; want same=false, confidence=0.6", got) + } + if !cacheSet.called { + t.Error("expected SetSimilarityCache to be called on a cache miss") + } + if cacheSet.verdict.Confidence != 0.6 { + t.Errorf("cached verdict lost confidence: %+v", cacheSet.verdict) + } +} + +// TestClassifySimilarPair_CacheHitSkipsLLM: cache hit returns directly, +// no classifier call. +func TestClassifySimilarPair_CacheHitSkipsLLM(t *testing.T) { + aID, bID := uuid.New(), uuid.New() + cached := enrich.Verdict{Same: true, Confidence: 0.9, Reason: "same venue"} + + repo := &mockRepo{ + getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{}, nil + }, + getSimCacheFn: func(string) (enrich.Verdict, bool, error) { + return cached, true, nil + }, + } + sim := &stubSimilarityClassifier{} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim) + + got, err := svc.ClassifySimilarPair(context.Background(), aID, bID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if sim.calls != 0 { + t.Errorf("classifier called %d times on cache hit; want 0", sim.calls) + } + if !got.Same || got.Confidence != 0.9 { + t.Errorf("cached verdict not returned: %+v", got) + } +} + +// TestClassifySimilarPair_RejectsSelfComparison: the pair-key scheme would +// collapse (a,a) to a single key which never tells you anything useful. +func TestClassifySimilarPair_RejectsSelfComparison(t *testing.T) { + id := uuid.New() + svc := NewService(&mockRepo{}, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, &stubSimilarityClassifier{}) + + _, err := svc.ClassifySimilarPair(context.Background(), id, id) + if err == nil { + t.Error("expected error on self-comparison; got nil") + } +} + +// TestClassifySimilarPair_LLMErrorPropagates: classifier errors surface; +// cache is not written. +func TestClassifySimilarPair_LLMErrorPropagates(t *testing.T) { + aID, bID := uuid.New(), uuid.New() + cacheWritten := false + repo := &mockRepo{ + getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{}, nil + }, + setSimCacheFn: func(string, enrich.Verdict, time.Duration) error { + cacheWritten = true + return nil + }, + } + sim := &stubSimilarityClassifier{err: errors.New("mistral 500")} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim) + + _, err := svc.ClassifySimilarPair(context.Background(), aID, bID) + if err == nil { + t.Fatal("expected error; got nil") + } + if cacheWritten { + t.Error("cache should not be written when classifier errors") + } +} diff --git a/backend/internal/server/routes.go b/backend/internal/server/routes.go index b425804..7610d0b 100644 --- a/backend/internal/server/routes.go +++ b/backend/internal/server/routes.go @@ -79,11 +79,13 @@ func (s *Server) registerRoutes() { // Per-row LLM enrichment (MR 3b). Operator-triggered only; disabled rows // fall through via NoopLLMEnricher when AI isn't configured. var llmEnricher enrich.LLMEnricher = enrich.NoopLLMEnricher{} + var simClassifier enrich.SimilarityClassifier = enrich.NoopSimilarityClassifier{} if aiClient.Enabled() { scraper := scrape.New(s.cfg.Discovery.CrawlerUserAgent) llmEnricher = enrich.NewMistralLLMEnricher(aiClient, scraper) + simClassifier = enrich.NewMistralSimilarityClassifier(aiClient) } - discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher) + discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher, simClassifier) discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour) requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token) discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken) diff --git a/backend/migrations/000020_similarity_ai_cache.down.sql b/backend/migrations/000020_similarity_ai_cache.down.sql new file mode 100644 index 0000000..13a5cb9 --- /dev/null +++ b/backend/migrations/000020_similarity_ai_cache.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_similarity_ai_cache_expires_at; +DROP TABLE IF EXISTS similarity_ai_cache; diff --git a/backend/migrations/000020_similarity_ai_cache.up.sql b/backend/migrations/000020_similarity_ai_cache.up.sql new file mode 100644 index 0000000..51e8884 --- /dev/null +++ b/backend/migrations/000020_similarity_ai_cache.up.sql @@ -0,0 +1,18 @@ +-- Cache for LLM-based similarity verdicts. Pair key is a sha256 over the +-- content tuple (normalized_name|stadt|year for both rows, sorted) — NOT +-- over queue UUIDs, so the cache survives queue row deletion (accept/reject) +-- while still invalidating correctly when a row's identifying fields change. +CREATE TABLE similarity_ai_cache ( + pair_key text PRIMARY KEY, + same bool NOT NULL, + confidence real NOT NULL, + reason text NOT NULL DEFAULT '', + model text NOT NULL DEFAULT '', + created_at timestamptz NOT NULL DEFAULT now(), + expires_at timestamptz +); + +-- Opportunistic pruning; workers or manual jobs SELECT WHERE expires_at < now(). +CREATE INDEX idx_similarity_ai_cache_expires_at + ON similarity_ai_cache (expires_at) + WHERE expires_at IS NOT NULL; diff --git a/web/src/routes/admin/discovery/+page.svelte b/web/src/routes/admin/discovery/+page.svelte index aee378c..8303e7b 100644 --- a/web/src/routes/admin/discovery/+page.svelte +++ b/web/src/routes/admin/discovery/+page.svelte @@ -99,6 +99,7 @@ similarOpenId = id; similarLoading = true; similarEntries = []; + similarVerdicts = {}; try { const res = await fetch(`/admin/discovery/queue/${id}/similar`); if (!res.ok) { @@ -114,6 +115,53 @@ } } + // Per-pair AI similarity verdict. Keyed on the candidate's queue id since + // the "anchor" row (similarOpenId) is already known from context. + type SimilarityVerdict = { + same: boolean; + confidence: number; + reason: string; + model?: string; + classified_at?: string; + }; + let similarVerdicts = $state>({}); + let similarClassifying = $state(new Set()); + + async function classifySimilar(anchorId: string, candidateId: string) { + if (similarClassifying.has(candidateId)) return; + const next = new Set(similarClassifying); + next.add(candidateId); + similarClassifying = next; + try { + const res = await fetch( + `/admin/discovery/queue/${anchorId}/similar/${candidateId}/classify`, + { method: 'POST' } + ); + if (!res.ok) { + similarVerdicts = { + ...similarVerdicts, + [candidateId]: { same: false, confidence: 0, reason: `HTTP ${res.status}` } + }; + return; + } + const body = await res.json(); + similarVerdicts = { ...similarVerdicts, [candidateId]: body.data }; + } catch (err) { + similarVerdicts = { + ...similarVerdicts, + [candidateId]: { + same: false, + confidence: 0, + reason: err instanceof Error ? err.message : 'Fehler' + } + }; + } finally { + const afterNext = new Set(similarClassifying); + afterNext.delete(candidateId); + similarClassifying = afterNext; + } + } + // Pagination helpers. const totalPages = $derived(Math.ceil((data.total ?? 0) / data.limit)); @@ -777,7 +825,8 @@ Markt Stadt Datum - Konfidenz + Konfidenz + AI-Verdict @@ -791,7 +840,7 @@ {fmtDate(m.entry.start_datum)} - + + {#if similarVerdicts[m.entry.id]} + {@const v = similarVerdicts[m.entry.id]} + + {v.same ? '✓ same' : '✗ diff'} + {(v.confidence * 100).toFixed(0)}% + + {:else} + + {/if} + {/each}