feat(discovery): AI tiebreak for ambiguous similarity matches

Ship 2 MR 4. Adds per-pair AI-backed classification for operator use
inside the existing Similar panel: an "AI?" button next to each
candidate asks Mistral whether the two queue rows refer to the same
underlying market. Result shown inline as a green "✓ same N%" or
grey "✗ diff N%" chip with the LLM's reason on hover.

No scraping — the classifier works from (name, city, year) alone,
which is enough for the common cases (same venue on two calendars,
typos, cross-year recurrence). Call is short (usually <3s) so the
handler is synchronous, 15s deadline.

Caching
- Migration 000020 adds similarity_ai_cache keyed on a content hash
  over (normalized_name|stadt|year) for both rows, sorted for
  symmetry. Survives queue row accept/reject because the hash is
  about markt-content, not queue-row lifecycle.
- enrich.SimilarityPairKey computes the key. Classify(a,b) and
  Classify(b,a) hit the same entry. Stadt casing drift doesn't
  invalidate.
- Repo methods GetSimilarityCache / SetSimilarityCache + corresponding
  mock hooks. DefaultSimilarityCacheTTL=30d.

Mistral integration
- enrich.MistralSimilarityClassifier reuses the same aiPass2
  interface as the enricher. English system prompt asks for
  JSON-only output with {same_market, confidence 0..1, reason}.
  Confidence clamped to [0,1] because models occasionally return
  1.2 or -0.1. Reason is short German justification.
- NoopSimilarityClassifier returns an error — callers must check
  ai.Enabled() before deciding which binding to pass.

Service.ClassifySimilarPair loads both rows, computes pair key,
cache-first, calls classifier on miss, writes cache, returns
verdict. Rejects self-comparison (pair-key collapses). Handler
POST /admin/discovery/queue/:aid/similar/:bid/classify.

UI: new AI? column inside the Similar panel. Per-candidate pending
state via Set<string>, disabled button while in-flight, inline
verdict chip after response. Tooltip shows the LLM's reason.

Tests: pair-key symmetry + differentiation + casing tolerance;
Mistral classifier happy path, clamping edge cases, error
propagation, bad-JSON handling, Noop rejection. Service tests:
happy path writes cache, cache-hit skips LLM, self-comparison
rejected, classifier errors don't poison the cache.

NewService signature grows by one param (sim enrich.
SimilarityClassifier). All 14 existing callers (routes.go + tests)
updated; tests pass nil.
This commit is contained in:
2026-04-24 11:04:15 +02:00
parent ce32f76731
commit e0b73acfd6
13 changed files with 703 additions and 22 deletions

View File

@@ -0,0 +1,181 @@
package enrich
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
)
// SimilarityRow carries the minimal identifying fields the classifier reads.
// Callers adapt their domain row into this shape — the enrich package stays
// free of a reverse import on discovery.
type SimilarityRow struct {
NameNormalized string
Stadt string
// Year is StartDatum.Year() (or 0 when unknown). Two rows differing only
// in year are almost always the same series in different editions —
// useful signal but not identity.
Year int
// Display fields — shown to the LLM verbatim so it can reason about
// casing / diacritics that NameNormalized strips.
Name string
// Quellen are passed in but not scraped — the classifier works from
// name/city/date alone. Included in the prompt only as metadata.
Quellen []string
}
// Verdict is the classifier's answer about one pair.
type Verdict struct {
Same bool `json:"same"`
Confidence float64 `json:"confidence"` // 0..1
Reason string `json:"reason"`
Model string `json:"model,omitempty"`
ClassifiedAt time.Time `json:"classified_at,omitempty"`
}
// SimilarityClassifier decides whether two queue rows refer to the same
// underlying market. Implementations should be deterministic across calls
// on the same input — callers cache the result.
type SimilarityClassifier interface {
Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error)
}
// NoopSimilarityClassifier returns a zero-confidence verdict without calling
// anything. Used as a fallback when AI is disabled; callers should check
// ai.Enabled() and fall back to this type rather than passing nil.
type NoopSimilarityClassifier struct{}
func (NoopSimilarityClassifier) Classify(_ context.Context, _, _ SimilarityRow) (Verdict, error) {
return Verdict{}, errors.New("similarity classifier not configured")
}
// SimilarityPairKey derives a content-hash cache key over the two rows'
// identifying tuples. Ordering-independent: Classify(a,b) and Classify(b,a)
// must hit the same cache entry.
//
// Uses NameNormalized + lowered Stadt + Year so the key survives casing
// drift, whitespace, and umlaut normalisation (NameNormalized already did
// the heavy lifting).
func SimilarityPairKey(a, b SimilarityRow) string {
keyA := fmt.Sprintf("%s|%s|%d", a.NameNormalized, lowerASCII(a.Stadt), a.Year)
keyB := fmt.Sprintf("%s|%s|%d", b.NameNormalized, lowerASCII(b.Stadt), b.Year)
// Sort lexicographically for symmetry.
var raw string
if keyA <= keyB {
raw = keyA + "||" + keyB
} else {
raw = keyB + "||" + keyA
}
sum := sha256.Sum256([]byte(raw))
return hex.EncodeToString(sum[:])
}
// DefaultSimilarityCacheTTL: 30 days matches the enrichment cache TTL —
// same reasoning (amortise cost across re-crawls, source edits eventually
// propagate).
const DefaultSimilarityCacheTTL = 30 * 24 * time.Hour
// MistralSimilarityClassifier implements SimilarityClassifier by sending a
// JSON-formatted comparison prompt to Mistral's chat endpoint.
type MistralSimilarityClassifier struct {
Client aiPass2
}
// NewMistralSimilarityClassifier binds a Mistral ai.Client. client must be
// non-nil; routes.go falls back to NoopSimilarityClassifier when AI is off.
func NewMistralSimilarityClassifier(client aiPass2) *MistralSimilarityClassifier {
return &MistralSimilarityClassifier{Client: client}
}
// simResponse is the JSON shape we instruct Mistral to return. Confidence
// must be parseable as a float 0..1; anything outside that range is clamped.
type simResponse struct {
SameMarket bool `json:"same_market"`
Confidence float64 `json:"confidence"`
Reason string `json:"reason"`
}
// Classify sends the paired metadata to Mistral and parses the JSON response.
// No web scraping — the classifier works from name/city/year alone, which is
// enough for the common cases (same venue listed on two different calendars,
// editing typos, cross-year recurrence).
func (m *MistralSimilarityClassifier) Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error) {
if m.Client == nil {
return Verdict{}, errors.New("mistral similarity classifier not configured")
}
systemPrompt := simSystemPrompt()
userPrompt := simUserPrompt(a, b)
result, err := m.Client.Pass2(ctx, systemPrompt, userPrompt)
if err != nil {
return Verdict{}, fmt.Errorf("pass2: %w", err)
}
var parsed simResponse
if err := json.Unmarshal([]byte(result.Content), &parsed); err != nil {
return Verdict{}, fmt.Errorf("parse response: %w (content=%q)", err, result.Content)
}
// Clamp confidence to [0,1]; the model occasionally returns 1.2 or -0.1.
conf := parsed.Confidence
if conf < 0 {
conf = 0
}
if conf > 1 {
conf = 1
}
return Verdict{
Same: parsed.SameMarket,
Confidence: conf,
Reason: strings.TrimSpace(parsed.Reason),
Model: result.Model,
ClassifiedAt: time.Now().UTC(),
}, nil
}
func simSystemPrompt() string {
return strings.TrimSpace(`
You decide whether two candidate entries refer to the same medieval market
(Mittelaltermarkt) in the DACH region. Input: two objects each with a name,
city, and year. Output a single JSON object:
{
"same_market": true|false,
"confidence": 0.0-1.0, // how sure you are
"reason": "..." // short German justification (<= 140 chars)
}
Rules:
- Return ONLY the JSON object. No prose, no code fences.
- "Same market" means same recurring event — same venue, same organiser,
same audience. A market and its anniversary edition in a later year ARE
the same market (just different editions).
- Different cities = different markets, even if the name matches.
- Rephrasings, typos, and umlaut differences (Dresden vs Straßburg vs
Strassburg) are the same market if the underlying identifiers align.
- If the evidence is weak, return same_market=false with low confidence
rather than guessing. Low confidence is more useful than a wrong guess.
`)
}
func simUserPrompt(a, b SimilarityRow) string {
// Keep the JSON compact; the model handles inline JSON better than
// pretty-printed when the task is "read two records".
ja, _ := json.Marshal(map[string]any{
"name": a.Name,
"city": a.Stadt,
"year": a.Year,
})
jb, _ := json.Marshal(map[string]any{
"name": b.Name,
"city": b.Stadt,
"year": b.Year,
})
return fmt.Sprintf("A: %s\nB: %s", ja, jb)
}

View File

@@ -0,0 +1,118 @@
package enrich
import (
"context"
"errors"
"strings"
"testing"
"marktvogt.de/backend/internal/pkg/ai"
)
func TestSimilarityPairKey_Symmetric(t *testing.T) {
a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026}
b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026}
if SimilarityPairKey(a, b) != SimilarityPairKey(b, a) {
t.Error("pair key must be symmetric: (a,b) and (b,a) should produce identical keys")
}
}
func TestSimilarityPairKey_DifferentInputsDifferentKeys(t *testing.T) {
a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026}
b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026}
c := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2027}
if SimilarityPairKey(a, b) == SimilarityPairKey(a, c) {
t.Error("different pairs must produce different keys")
}
// Stadt casing must not change the key.
d := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "DRESDEN", Year: 2026}
if SimilarityPairKey(a, b) != SimilarityPairKey(d, b) {
t.Error("stadt casing drift must not change the key")
}
}
func TestMistralSimilarity_HappyPath(t *testing.T) {
client := &stubPass2{
result: ai.PassResult{
Content: `{"same_market":true,"confidence":0.82,"reason":"Gleicher Name, gleiche Stadt, gleiches Jahr."}`,
Model: "mistral-large-latest",
},
}
c := NewMistralSimilarityClassifier(client)
got, err := c.Classify(context.Background(),
SimilarityRow{Name: "Mittelaltermarkt Dresden", Stadt: "Dresden", Year: 2026},
SimilarityRow{Name: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", Year: 2026},
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !got.Same {
t.Errorf("same = false; want true")
}
if got.Confidence < 0.8 || got.Confidence > 0.85 {
t.Errorf("confidence = %v; want ~0.82", got.Confidence)
}
if got.Reason == "" {
t.Error("reason missing")
}
if got.Model != "mistral-large-latest" {
t.Errorf("model = %q", got.Model)
}
// Prompt must carry both rows' identifying fields for the LLM to reason on.
if !strings.Contains(client.lastUser, "Mittelaltermarkt Dresden") {
t.Error("user prompt missing A.name")
}
if !strings.Contains(client.lastSystem, "same_market") {
t.Error("system prompt should describe the JSON schema (same_market key)")
}
}
func TestMistralSimilarity_ClampsConfidence(t *testing.T) {
tests := []struct {
name string
raw string
wantConf float64
}{
{"above 1 clamps to 1", `{"same_market":true,"confidence":1.4,"reason":"x"}`, 1.0},
{"below 0 clamps to 0", `{"same_market":false,"confidence":-0.3,"reason":"x"}`, 0.0},
{"in range passes through", `{"same_market":true,"confidence":0.5,"reason":"x"}`, 0.5},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: tc.raw}})
got, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got.Confidence != tc.wantConf {
t.Errorf("confidence = %v; want %v", got.Confidence, tc.wantConf)
}
})
}
}
func TestMistralSimilarity_PropagatesPass2Error(t *testing.T) {
c := NewMistralSimilarityClassifier(&stubPass2{err: errors.New("mistral down")})
_, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err == nil {
t.Fatal("expected error; got nil")
}
}
func TestMistralSimilarity_RejectsBadJSON(t *testing.T) {
c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: "not json at all"}})
_, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err == nil {
t.Fatal("expected parse error; got nil")
}
}
func TestNoopSimilarityClassifier_Errors(t *testing.T) {
_, err := NoopSimilarityClassifier{}.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err == nil {
t.Error("NoopSimilarityClassifier should return error — it's the fallback when AI is disabled")
}
}

View File

@@ -386,6 +386,36 @@ func (h *Handler) EnrichLLM(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"data": payload})
}
// ClassifySimilarPair runs the LLM duplicate-tiebreaker on the two queue
// rows identified by URL params :aid and :bid. Synchronous, 15s deadline —
// the call is short (no scraping) so the operator can click and immediately
// see the verdict.
func (h *Handler) ClassifySimilarPair(c *gin.Context) {
aID, err := uuid.Parse(c.Param("aid"))
if err != nil {
apiErr := apierror.BadRequest("invalid_id", "invalid queue id A")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
bID, err := uuid.Parse(c.Param("bid"))
if err != nil {
apiErr := apierror.BadRequest("invalid_id", "invalid queue id B")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 15*time.Second)
defer cancel()
verdict, err := h.service.ClassifySimilarPair(ctx, aID, bID)
if err != nil {
slog.WarnContext(ctx, "classify similar failed", "a", aID, "b", bID, "error", err)
apiErr := apierror.Internal("classify failed: " + err.Error())
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
c.JSON(http.StatusOK, gin.H{"data": verdict})
}
func currentUserID(c *gin.Context) (uuid.UUID, bool) {
raw, exists := c.Get("user_id")
if !exists {

View File

@@ -57,6 +57,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
noopMarketCreator{},
nil,
nil,
nil,
)
h := NewHandler(svc, 0) // rate limit disabled
@@ -91,7 +92,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
// TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler.
func TestCrawlStatusInitialState(t *testing.T) {
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
h := NewHandler(svc, 0)
w := httptest.NewRecorder()
@@ -141,7 +142,7 @@ func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) {
started: make(chan struct{}),
release: make(chan struct{}),
}
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
h := NewHandler(svc, 0) // rate limit disabled
// First request — returns 202 and spawns goroutine.
@@ -184,6 +185,7 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
noopMarketCreator{},
nil,
nil,
nil,
)
// 1 per hour window.
h := NewHandler(svc, 1)

View File

@@ -36,6 +36,10 @@ type mockRepo struct {
getCacheFn func(key string) (enrich.Enrichment, bool, error)
setCacheFn func(key string, payload enrich.Enrichment, ttl time.Duration) error
listPendingEnrichFn func(limit int) ([]DiscoveredMarket, error)
// Similarity AI cache hooks.
getSimCacheFn func(pairKey string) (enrich.Verdict, bool, error)
setSimCacheFn func(pairKey string, v enrich.Verdict, ttl time.Duration) error
}
func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) {
@@ -122,6 +126,18 @@ func (m *mockRepo) ListPendingEnrichment(_ context.Context, limit int) ([]Discov
}
return nil, nil
}
func (m *mockRepo) GetSimilarityCache(_ context.Context, pairKey string) (enrich.Verdict, bool, error) {
if m.getSimCacheFn != nil {
return m.getSimCacheFn(pairKey)
}
return enrich.Verdict{}, false, nil
}
func (m *mockRepo) SetSimilarityCache(_ context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error {
if m.setSimCacheFn != nil {
return m.setSimCacheFn(pairKey, v, ttl)
}
return nil
}
// noopLinkVerifier passes every URL — used by tests to isolate from network.
type noopLinkVerifier struct{}

View File

@@ -40,6 +40,10 @@ type Repository interface {
// Cache operations keyed on sha256(name_normalized|stadt|year).
GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error)
SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error
// Similarity AI cache — keyed on enrich.SimilarityPairKey.
GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error)
SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error
}
// SeriesCandidate is a minimal projection used for name-normalization comparison in Go.
@@ -426,3 +430,43 @@ ON CONFLICT (cache_key) DO UPDATE
}
return nil
}
// GetSimilarityCache returns a cached AI verdict for a (pairKey) or
// (zero, false, nil) on miss/expiry. Expired entries are treated as misses.
func (r *pgRepository) GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error) {
var v enrich.Verdict
err := r.pool.QueryRow(ctx, `
SELECT same, confidence, reason, model FROM similarity_ai_cache
WHERE pair_key = $1
AND (expires_at IS NULL OR expires_at > now())`, pairKey).Scan(&v.Same, &v.Confidence, &v.Reason, &v.Model)
if errors.Is(err, pgx.ErrNoRows) {
return enrich.Verdict{}, false, nil
}
if err != nil {
return enrich.Verdict{}, false, fmt.Errorf("similarity cache get: %w", err)
}
return v, true, nil
}
// SetSimilarityCache upserts a verdict. ttl=0 means "no expiry" (nullable).
func (r *pgRepository) SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error {
var expiresAt *time.Time
if ttl > 0 {
t := time.Now().Add(ttl)
expiresAt = &t
}
_, err := r.pool.Exec(ctx, `
INSERT INTO similarity_ai_cache (pair_key, same, confidence, reason, model, expires_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (pair_key) DO UPDATE
SET same = EXCLUDED.same,
confidence = EXCLUDED.confidence,
reason = EXCLUDED.reason,
model = EXCLUDED.model,
created_at = now(),
expires_at = EXCLUDED.expires_at`, pairKey, v.Same, v.Confidence, v.Reason, v.Model, expiresAt)
if err != nil {
return fmt.Errorf("similarity cache set: %w", err)
}
return nil
}

View File

@@ -26,6 +26,8 @@ func RegisterRoutes(
admin.GET("/queue/:id/similar", h.Similar)
// Per-row LLM enrichment (MR 3b). Synchronous — operator waits.
admin.POST("/queue/:id/enrich", h.EnrichLLM)
// Per-pair AI similarity tiebreak (MR 4). Synchronous; short call.
admin.POST("/queue/:aid/similar/:bid/classify", h.ClassifySimilarPair)
// Manual crawl trigger — subject to hourly rate limit.
admin.POST("/crawl-manual", h.Crawl)
// Async crawl status polling.

View File

@@ -57,13 +57,17 @@ type Service struct {
// llmEnricher is the AI-backed fallback pass. Nil-safe via NoopLLMEnricher
// in test wiring; production code passes a real MistralLLMEnricher.
llmEnricher enrich.LLMEnricher
// simClassifier is the AI-backed duplicate tiebreaker. Nil-safe via
// NoopSimilarityClassifier.
simClassifier enrich.SimilarityClassifier
}
// NewService constructs a Service wired for the crawler-driven Crawl path.
// geocoder may be nil; in that case crawl-enrich runs consolidation only and
// skips the lat/lng step. llm may be nil; per-row LLM enrichment then
// returns an error instead of attempting a call.
func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder, llm enrich.LLMEnricher) *Service {
// returns an error instead of attempting a call. simClassifier may be nil;
// ClassifySimilarPair returns an error rather than a zero-confidence answer.
func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder, llm enrich.LLMEnricher, sim enrich.SimilarityClassifier) *Service {
return &Service{
repo: repo,
crawler: cr,
@@ -71,6 +75,7 @@ func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCre
linkChecker: lc,
geocoder: gc,
llmEnricher: llm,
simClassifier: sim,
}
}
@@ -595,6 +600,68 @@ func (s *Service) RunLLMEnrichOne(ctx context.Context, queueID uuid.UUID) (enric
return merged, nil
}
// ClassifySimilarPair runs the AI classifier on the two queue rows identified
// by aID and bID, returning a verdict about whether they're the same market.
// Cache-first: a content-keyed entry (enrich.SimilarityPairKey) shortcuts the
// LLM call when the same pair has been classified before.
//
// Intended for operator-triggered "AI tiebreak" on ambiguous similarity
// matches. The crawl-time auto-merge in MR 7 will call this on its own.
func (s *Service) ClassifySimilarPair(ctx context.Context, aID, bID uuid.UUID) (enrich.Verdict, error) {
if s.simClassifier == nil {
return enrich.Verdict{}, errors.New("similarity classifier not configured")
}
if aID == bID {
return enrich.Verdict{}, errors.New("cannot classify a row against itself")
}
a, err := s.repo.GetDiscovered(ctx, aID)
if err != nil {
return enrich.Verdict{}, fmt.Errorf("load row A: %w", err)
}
b, err := s.repo.GetDiscovered(ctx, bID)
if err != nil {
return enrich.Verdict{}, fmt.Errorf("load row B: %w", err)
}
rowA := rowToSimilarity(a)
rowB := rowToSimilarity(b)
pairKey := enrich.SimilarityPairKey(rowA, rowB)
if cached, hit, err := s.repo.GetSimilarityCache(ctx, pairKey); err != nil {
slog.WarnContext(ctx, "similarity cache get failed; continuing",
"pair_key", pairKey, "error", err)
} else if hit {
return cached, nil
}
verdict, err := s.simClassifier.Classify(ctx, rowA, rowB)
if err != nil {
return enrich.Verdict{}, fmt.Errorf("classify: %w", err)
}
if err := s.repo.SetSimilarityCache(ctx, pairKey, verdict, enrich.DefaultSimilarityCacheTTL); err != nil {
slog.WarnContext(ctx, "similarity cache set failed; continuing",
"pair_key", pairKey, "error", err)
}
return verdict, nil
}
// rowToSimilarity adapts a DiscoveredMarket to the narrow SimilarityRow the
// enrich package consumes. Year comes from StartDatum (0 when unknown).
func rowToSimilarity(r DiscoveredMarket) enrich.SimilarityRow {
year := 0
if r.StartDatum != nil {
year = r.StartDatum.Year()
}
return enrich.SimilarityRow{
NameNormalized: r.NameNormalized,
Stadt: r.Stadt,
Year: year,
Name: r.MarktName,
Quellen: r.Quellen,
}
}
// CrawlEnrichSummary reports the outcome of one RunCrawlEnrichAll pass.
// Mirrors CrawlSummary's shape so the admin UI can reuse its render path.
type CrawlEnrichSummary struct {

View File

@@ -147,7 +147,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil)
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil)
_, _, err := svc.Accept(context.Background(), qID, uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -169,7 +169,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil)
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil)
_, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -200,7 +200,7 @@ func TestServiceCrawlHappyPath(t *testing.T) {
PerSourceMS: map[string]int64{"marktkalendarium": 1},
},
}
svc := NewService(repo, sc, lc, noopMarketCreator{}, nil, nil)
svc := NewService(repo, sc, lc, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -241,7 +241,7 @@ func TestServiceCrawlDedupQueue(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -272,7 +272,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
@@ -318,7 +318,7 @@ func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) {
},
},
}
svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
// Cancel the context BEFORE Crawl runs — simulates gateway timeout
// that fires while the handler is still mid-run.
@@ -343,7 +343,7 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) {
m := &mockRepo{
countQueueFn: func(_ context.Context, _ string) (int, error) { return 42, nil },
}
svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
rows, total, err := svc.ListPendingQueuePaged(context.Background(), 50, 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -375,7 +375,7 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
@@ -424,7 +424,7 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -486,7 +486,7 @@ func TestRunCrawlEnrichAll_HappyPath(t *testing.T) {
},
}
gc := stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc, nil)
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc, nil, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
@@ -547,7 +547,7 @@ func TestRunCrawlEnrichAll_SetEnrichmentFailure(t *testing.T) {
return nil
},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
@@ -639,7 +639,7 @@ func TestRunLLMEnrichOne_HappyPath(t *testing.T) {
},
}
llm := &stubLLMEnricher{result: llmResult}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm)
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil)
got, err := svc.RunLLMEnrichOne(context.Background(), rowID)
if err != nil {
@@ -709,7 +709,7 @@ func TestRunLLMEnrichOne_CacheHitSkipsLLM(t *testing.T) {
},
}
llm := &stubLLMEnricher{}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm)
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil)
got, err := svc.RunLLMEnrichOne(context.Background(), rowID)
if err != nil {
@@ -738,7 +738,7 @@ func TestRunLLMEnrichOne_LLMErrorMarksFailed(t *testing.T) {
},
}
llm := &stubLLMEnricher{err: errors.New("mistral down")}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm)
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil)
_, err := svc.RunLLMEnrichOne(context.Background(), rowID)
if err == nil {
@@ -761,7 +761,7 @@ func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) {
return nil
},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil)
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
@@ -774,3 +774,129 @@ func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) {
t.Errorf("SetEnrichment called %d times on empty queue", writes)
}
}
// stubSimilarityClassifier returns canned verdicts or errors.
type stubSimilarityClassifier struct {
result enrich.Verdict
err error
calls int
}
func (s *stubSimilarityClassifier) Classify(_ context.Context, _, _ enrich.SimilarityRow) (enrich.Verdict, error) {
s.calls++
return s.result, s.err
}
// TestClassifySimilarPair_HappyPath: cache miss → LLM → cache write → return.
func TestClassifySimilarPair_HappyPath(t *testing.T) {
aID, bID := uuid.New(), uuid.New()
start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
rowA := DiscoveredMarket{ID: aID, MarktName: "Ritterfest Dresden", Stadt: "Dresden", NameNormalized: "ritterfest dresden", StartDatum: &start}
rowB := DiscoveredMarket{ID: bID, MarktName: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", NameNormalized: "mittelaltermarkt dresden", StartDatum: &start}
var cacheSet struct {
called bool
verdict enrich.Verdict
}
repo := &mockRepo{
getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) {
if id == aID {
return rowA, nil
}
return rowB, nil
},
setSimCacheFn: func(_ string, v enrich.Verdict, _ time.Duration) error {
cacheSet.called = true
cacheSet.verdict = v
return nil
},
}
sim := &stubSimilarityClassifier{
result: enrich.Verdict{Same: false, Confidence: 0.6, Reason: "Unterschiedliche Namen."},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim)
got, err := svc.ClassifySimilarPair(context.Background(), aID, bID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if sim.calls != 1 {
t.Errorf("classifier called %d times; want 1", sim.calls)
}
if got.Same || got.Confidence != 0.6 {
t.Errorf("verdict = %+v; want same=false, confidence=0.6", got)
}
if !cacheSet.called {
t.Error("expected SetSimilarityCache to be called on a cache miss")
}
if cacheSet.verdict.Confidence != 0.6 {
t.Errorf("cached verdict lost confidence: %+v", cacheSet.verdict)
}
}
// TestClassifySimilarPair_CacheHitSkipsLLM: cache hit returns directly,
// no classifier call.
func TestClassifySimilarPair_CacheHitSkipsLLM(t *testing.T) {
aID, bID := uuid.New(), uuid.New()
cached := enrich.Verdict{Same: true, Confidence: 0.9, Reason: "same venue"}
repo := &mockRepo{
getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{}, nil
},
getSimCacheFn: func(string) (enrich.Verdict, bool, error) {
return cached, true, nil
},
}
sim := &stubSimilarityClassifier{}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim)
got, err := svc.ClassifySimilarPair(context.Background(), aID, bID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if sim.calls != 0 {
t.Errorf("classifier called %d times on cache hit; want 0", sim.calls)
}
if !got.Same || got.Confidence != 0.9 {
t.Errorf("cached verdict not returned: %+v", got)
}
}
// TestClassifySimilarPair_RejectsSelfComparison: the pair-key scheme would
// collapse (a,a) to a single key which never tells you anything useful.
func TestClassifySimilarPair_RejectsSelfComparison(t *testing.T) {
id := uuid.New()
svc := NewService(&mockRepo{}, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, &stubSimilarityClassifier{})
_, err := svc.ClassifySimilarPair(context.Background(), id, id)
if err == nil {
t.Error("expected error on self-comparison; got nil")
}
}
// TestClassifySimilarPair_LLMErrorPropagates: classifier errors surface;
// cache is not written.
func TestClassifySimilarPair_LLMErrorPropagates(t *testing.T) {
aID, bID := uuid.New(), uuid.New()
cacheWritten := false
repo := &mockRepo{
getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{}, nil
},
setSimCacheFn: func(string, enrich.Verdict, time.Duration) error {
cacheWritten = true
return nil
},
}
sim := &stubSimilarityClassifier{err: errors.New("mistral 500")}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim)
_, err := svc.ClassifySimilarPair(context.Background(), aID, bID)
if err == nil {
t.Fatal("expected error; got nil")
}
if cacheWritten {
t.Error("cache should not be written when classifier errors")
}
}

View File

@@ -79,11 +79,13 @@ func (s *Server) registerRoutes() {
// Per-row LLM enrichment (MR 3b). Operator-triggered only; disabled rows
// fall through via NoopLLMEnricher when AI isn't configured.
var llmEnricher enrich.LLMEnricher = enrich.NoopLLMEnricher{}
var simClassifier enrich.SimilarityClassifier = enrich.NoopSimilarityClassifier{}
if aiClient.Enabled() {
scraper := scrape.New(s.cfg.Discovery.CrawlerUserAgent)
llmEnricher = enrich.NewMistralLLMEnricher(aiClient, scraper)
simClassifier = enrich.NewMistralSimilarityClassifier(aiClient)
}
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher)
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher, simClassifier)
discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour)
requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token)
discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken)

View File

@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_similarity_ai_cache_expires_at;
DROP TABLE IF EXISTS similarity_ai_cache;

View File

@@ -0,0 +1,18 @@
-- Cache for LLM-based similarity verdicts. Pair key is a sha256 over the
-- content tuple (normalized_name|stadt|year for both rows, sorted) — NOT
-- over queue UUIDs, so the cache survives queue row deletion (accept/reject)
-- while still invalidating correctly when a row's identifying fields change.
CREATE TABLE similarity_ai_cache (
pair_key text PRIMARY KEY,
same bool NOT NULL,
confidence real NOT NULL,
reason text NOT NULL DEFAULT '',
model text NOT NULL DEFAULT '',
created_at timestamptz NOT NULL DEFAULT now(),
expires_at timestamptz
);
-- Opportunistic pruning; workers or manual jobs SELECT WHERE expires_at < now().
CREATE INDEX idx_similarity_ai_cache_expires_at
ON similarity_ai_cache (expires_at)
WHERE expires_at IS NOT NULL;

View File

@@ -99,6 +99,7 @@
similarOpenId = id;
similarLoading = true;
similarEntries = [];
similarVerdicts = {};
try {
const res = await fetch(`/admin/discovery/queue/${id}/similar`);
if (!res.ok) {
@@ -114,6 +115,53 @@
}
}
// Per-pair AI similarity verdict. Keyed on the candidate's queue id since
// the "anchor" row (similarOpenId) is already known from context.
type SimilarityVerdict = {
same: boolean;
confidence: number;
reason: string;
model?: string;
classified_at?: string;
};
let similarVerdicts = $state<Record<string, SimilarityVerdict>>({});
let similarClassifying = $state(new Set<string>());
async function classifySimilar(anchorId: string, candidateId: string) {
if (similarClassifying.has(candidateId)) return;
const next = new Set(similarClassifying);
next.add(candidateId);
similarClassifying = next;
try {
const res = await fetch(
`/admin/discovery/queue/${anchorId}/similar/${candidateId}/classify`,
{ method: 'POST' }
);
if (!res.ok) {
similarVerdicts = {
...similarVerdicts,
[candidateId]: { same: false, confidence: 0, reason: `HTTP ${res.status}` }
};
return;
}
const body = await res.json();
similarVerdicts = { ...similarVerdicts, [candidateId]: body.data };
} catch (err) {
similarVerdicts = {
...similarVerdicts,
[candidateId]: {
same: false,
confidence: 0,
reason: err instanceof Error ? err.message : 'Fehler'
}
};
} finally {
const afterNext = new Set(similarClassifying);
afterNext.delete(candidateId);
similarClassifying = afterNext;
}
}
// Pagination helpers.
const totalPages = $derived(Math.ceil((data.total ?? 0) / data.limit));
@@ -777,7 +825,8 @@
<th class="pr-4 pb-1 font-medium">Markt</th>
<th class="pr-4 pb-1 font-medium">Stadt</th>
<th class="pr-4 pb-1 font-medium">Datum</th>
<th class="pb-1 font-medium">Konfidenz</th>
<th class="pr-4 pb-1 font-medium">Konfidenz</th>
<th class="pb-1 font-medium">AI-Verdict</th>
</tr>
</thead>
<tbody>
@@ -791,7 +840,7 @@
<td class="py-1 pr-4 whitespace-nowrap"
>{fmtDate(m.entry.start_datum)}</td
>
<td class="py-1">
<td class="py-1 pr-4">
<span
class="inline-block rounded px-1.5 py-0.5 {konfidenzClass(
m.entry.konfidenz
@@ -800,6 +849,30 @@
{m.entry.konfidenz || '—'}
</span>
</td>
<td class="py-1">
{#if similarVerdicts[m.entry.id]}
{@const v = similarVerdicts[m.entry.id]}
<span
class="inline-block rounded px-1.5 py-0.5 text-[10px] {v.same
? 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/50 dark:text-emerald-300'
: 'bg-stone-200 text-stone-700 dark:bg-stone-700 dark:text-stone-200'}"
title={v.reason}
>
{v.same ? '✓ same' : '✗ diff'}
{(v.confidence * 100).toFixed(0)}%
</span>
{:else}
<button
type="button"
disabled={similarClassifying.has(m.entry.id)}
onclick={() => classifySimilar(row.id, m.entry.id)}
class="rounded bg-purple-100 px-1.5 py-0.5 text-[10px] text-purple-700 hover:bg-purple-200 disabled:cursor-not-allowed disabled:opacity-60 dark:bg-purple-900/50 dark:text-purple-300 dark:hover:bg-purple-900"
title="LLM-Tiebreak: sind das derselbe Markt?"
>
{similarClassifying.has(m.entry.id) ? 'AI…' : 'AI?'}
</button>
{/if}
</td>
</tr>
{/each}
</tbody>