Merge branch 'feat/discovery-auto-merge-crawl' — Ship 2 MRs 2–7 (enrichment foundation, crawl-enrich, LLM enrich, AI similarity, auto-merge)

Brings the full Ship 2 feature stack (except the eval harness and detail
drawer) into main. Conflicts resolved:

- repository.go: kept MR 1's sort params + queueOrderByClause builder on
  ListQueue, AND MR 7's FindPendingMatch + MergePendingSources (MR 7
  removed the old QueueHasPending). ListQueue SELECT keeps the enrichment
  columns MR 2 added.
- mock_repo_test.go: kept both MR 1's listQueueCalls capture and the
  MR 2-4 enrichment/similarity hooks.
- service_test.go: ListPendingQueuePaged uses MR 1's sort-param signature;
  NewService uses the MR 2-7 seven-arg form.
- handler_test.go: TestListQueueSortParamWhitelist's NewService call
  bumped from 4 args to 7 (nil geocoder, nil llm enricher, nil sim
  classifier).

Features landing on main:
- MR 2: enrichment schema (migration 000019), jsonb payload, enrich
  package with Merge/CacheKey/NoopLLMEnricher.
- MR 3: manual crawl-enrich-all button + async 202 status endpoint.
- MR 3b: per-row LLM enrich via scrape-then-prompt (pkg/scrape +
  MistralLLMEnricher).
- MR 4: AI similarity tiebreak (migration 000020), MistralSimilarityClassifier,
  per-candidate AI? button in the Similar panel.
- MR 7: cross-crawl auto-merge for new sources on pending queue rows
  (FindPendingMatch + MergePendingSources, AutoMerged counter).
This commit is contained in:
2026-04-24 12:13:30 +02:00
26 changed files with 3460 additions and 68 deletions

View File

@@ -0,0 +1,42 @@
package enrich
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"time"
)
// CacheKey derives a stable cache key from the (normalized name, city, year)
// tuple. Two rows with the same tuple share a cache entry — the LLM answer
// doesn't change when the crawler rediscovers the same market.
//
// A row without a known start date should pass year=0; those entries share a
// bucket, which is slightly lossy but acceptable — date-less markets with
// LLM-derived descriptions are rare and regenerating on demand is cheap.
func CacheKey(nameNormalized, stadt string, year int) string {
// nameNormalized is already lowercased + stripped by the caller; stadt is
// lowercased here to avoid cache misses from casing drift.
raw := fmt.Sprintf("%s|%s|%d", nameNormalized, lowerASCII(stadt), year)
sum := sha256.Sum256([]byte(raw))
return hex.EncodeToString(sum[:])
}
// DefaultCacheTTL is the default lifetime for enrichment cache entries.
// 30 days: long enough to amortise cost across re-crawls, short enough that
// source edits (new organiser, changed opening hours) eventually propagate.
const DefaultCacheTTL = 30 * 24 * time.Hour
// lowerASCII lower-cases bytes in the ASCII range. Avoids pulling in
// strings.ToLower's unicode handling so the key stays deterministic across
// locales; the German diacritics are already normalised out in
// NameNormalized, and Stadt values here are already ASCII-ish.
func lowerASCII(s string) string {
b := []byte(s)
for i, c := range b {
if c >= 'A' && c <= 'Z' {
b[i] = c + ('a' - 'A')
}
}
return string(b)
}

View File

@@ -0,0 +1,97 @@
package enrich
import (
"context"
"time"
)
// Geocoder is the narrow interface crawl-enrich depends on for lat/lng.
// *pkg/geocode.Geocoder satisfies this; tests inject a stub.
type Geocoder interface {
Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error)
}
// Contribution is the subset of one source's data crawl-enrich reads. Callers
// adapt their domain-specific contribution type to this before invoking
// CrawlEnrich — keeps the enrich package free of a reverse import.
type Contribution struct {
PLZ string
Venue string
Organizer string
}
// Input is the read-only context CrawlEnrich needs: the market's city and
// country (for geocoding) plus the per-source contributions already gathered
// by the crawler. All other domain-row fields are irrelevant to the crawl pass.
type Input struct {
Stadt string
Land string
Contributions []Contribution
}
// CrawlEnrich fills as many fields as possible from data already in hand —
// the caller's Contribution list and (optionally) a geocoder. Returns a
// fully-populated Sources map so downstream code can tell which pass
// contributed each field.
//
// No LLM, no AI, no paid APIs. Nominatim counts as crawl-enrich because the
// call is deterministic and already rate-limited by pkg/geocode.
//
// geocoder may be nil; lat/lng simply stay empty in that case.
func CrawlEnrich(ctx context.Context, in Input, geocoder Geocoder) Enrichment {
out := Enrichment{Sources: Sources{}}
// Consolidate text fields from contributions — first non-empty wins.
// Order is deterministic because the caller's list is expected to be
// pre-sorted (crawler Merge step orders by source name).
for _, c := range in.Contributions {
if out.PLZ == "" && c.PLZ != "" {
out.PLZ = c.PLZ
out.Sources["plz"] = ProvenanceCrawl
}
if out.Venue == "" && c.Venue != "" {
out.Venue = c.Venue
out.Sources["venue"] = ProvenanceCrawl
}
if out.Organizer == "" && c.Organizer != "" {
out.Organizer = c.Organizer
out.Sources["organizer"] = ProvenanceCrawl
}
}
// Geocode from (city, plz) — only if we have enough input to be useful.
// The plz we just consolidated is preferred.
if geocoder != nil && in.Stadt != "" {
lat, lng, err := geocoder.Geocode(ctx, "", in.Stadt, out.PLZ, landToISO(in.Land))
// Geocoder failures are non-fatal — lat/lng stay empty, other crawl-
// enriched fields still persist. An LLM pass later cannot fill
// lat/lng either, so failures just mean that row gets no coordinates.
if err == nil && lat != nil && lng != nil {
out.Lat = lat
out.Lng = lng
out.Sources["lat"] = ProvenanceCrawl
out.Sources["lng"] = ProvenanceCrawl
}
}
if len(out.Sources) > 0 {
now := time.Now().UTC()
out.EnrichedAt = &now
}
return out
}
// landToISO maps the Land value to an ISO-2 country code for Nominatim's
// countrycodes parameter. Returns empty string for unknowns so the geocoder
// runs without a country filter.
func landToISO(land string) string {
switch land {
case "Deutschland":
return "de"
case "Österreich", "Oesterreich":
return "at"
case "Schweiz":
return "ch"
}
return ""
}

View File

@@ -0,0 +1,243 @@
package enrich
import (
"context"
"errors"
"testing"
)
// stubGeocoder returns canned lat/lng. nil lat/lng simulate "not found".
type stubGeocoder struct {
lat *float64
lng *float64
err error
// capture last call args
lastCity, lastPLZ, lastCountry string
}
func (s *stubGeocoder) Geocode(_ context.Context, _, city, zip, country string) (*float64, *float64, error) {
s.lastCity, s.lastPLZ, s.lastCountry = city, zip, country
return s.lat, s.lng, s.err
}
func ptrFloat(v float64) *float64 { return &v }
const stallhof = "Stallhof"
func TestCrawlEnrich_EmptyInputs(t *testing.T) {
got := CrawlEnrich(context.Background(), Input{}, nil)
if got.PLZ != "" || got.Venue != "" || got.Organizer != "" {
t.Errorf("expected all fields empty, got %+v", got)
}
if len(got.Sources) != 0 {
t.Errorf("expected empty Sources on no input, got %v", got.Sources)
}
if got.EnrichedAt != nil {
t.Errorf("expected nil EnrichedAt when nothing enriched, got %v", got.EnrichedAt)
}
}
func TestCrawlEnrich_SingleSource(t *testing.T) {
in := Input{
Stadt: "Dresden",
Contributions: []Contribution{
{PLZ: "01067", Venue: stallhof, Organizer: "Tourismusverband"},
},
}
got := CrawlEnrich(context.Background(), in, nil)
if got.PLZ != "01067" || got.Venue != stallhof || got.Organizer != "Tourismusverband" {
t.Errorf("unexpected payload: %+v", got)
}
if got.Sources["plz"] != ProvenanceCrawl {
t.Errorf("plz provenance: got %q, want %q", got.Sources["plz"], ProvenanceCrawl)
}
if got.EnrichedAt == nil {
t.Error("expected EnrichedAt to be stamped when any field filled")
}
}
func TestCrawlEnrich_MultiSourceFirstWins(t *testing.T) {
// Two sources disagree on PLZ: first non-empty wins. Order is the
// crawler-produced contribution order (alphabetical by source name).
in := Input{
Stadt: "Dresden",
Contributions: []Contribution{
{PLZ: "01067"},
{PLZ: "01099"},
},
}
got := CrawlEnrich(context.Background(), in, nil)
if got.PLZ != "01067" {
t.Errorf("expected first source's PLZ, got %q", got.PLZ)
}
}
func TestCrawlEnrich_MultiSourceFillsGaps(t *testing.T) {
// Source A contributes PLZ only; source B contributes venue only; both
// end up in the payload.
in := Input{
Stadt: "Dresden",
Contributions: []Contribution{
{PLZ: "01067"},
{Venue: stallhof},
},
}
got := CrawlEnrich(context.Background(), in, nil)
if got.PLZ != "01067" {
t.Errorf("PLZ from A missing: %q", got.PLZ)
}
if got.Venue != stallhof {
t.Errorf("Venue from B missing: %q", got.Venue)
}
}
func TestCrawlEnrich_GeocoderCalled(t *testing.T) {
in := Input{
Stadt: "Dresden",
Land: "Deutschland",
Contributions: []Contribution{{PLZ: "01067"}},
}
sg := &stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)}
got := CrawlEnrich(context.Background(), in, sg)
if got.Lat == nil || *got.Lat != 51.05 {
t.Errorf("Lat = %v; want 51.05", got.Lat)
}
if got.Lng == nil || *got.Lng != 13.74 {
t.Errorf("Lng = %v; want 13.74", got.Lng)
}
if got.Sources["lat"] != ProvenanceCrawl {
t.Errorf("lat provenance: got %q, want %q", got.Sources["lat"], ProvenanceCrawl)
}
// Land → ISO-2 mapping sanity.
if sg.lastCountry != "de" {
t.Errorf("country code: got %q, want %q", sg.lastCountry, "de")
}
if sg.lastCity != "Dresden" || sg.lastPLZ != "01067" {
t.Errorf("geocoder inputs: city=%q plz=%q; want Dresden / 01067", sg.lastCity, sg.lastPLZ)
}
}
func TestCrawlEnrich_GeocoderFailureNonFatal(t *testing.T) {
in := Input{
Stadt: "Dresden",
Contributions: []Contribution{
{PLZ: "01067", Venue: stallhof},
},
}
sg := &stubGeocoder{err: errors.New("nominatim down")}
got := CrawlEnrich(context.Background(), in, sg)
// Non-geocode fields must still have landed.
if got.PLZ != "01067" || got.Venue != stallhof {
t.Errorf("expected crawl-consolidated fields on geocoder failure: %+v", got)
}
if got.Lat != nil || got.Lng != nil {
t.Error("expected nil lat/lng on geocoder failure")
}
if _, hasLat := got.Sources["lat"]; hasLat {
t.Error("expected no lat provenance when geocoder failed")
}
}
func TestCrawlEnrich_GeocoderNotCalledWithoutStadt(t *testing.T) {
in := Input{Contributions: []Contribution{{PLZ: "01067"}}}
sg := &stubGeocoder{lat: ptrFloat(1), lng: ptrFloat(1)}
got := CrawlEnrich(context.Background(), in, sg)
if sg.lastCity != "" {
t.Error("geocoder called despite empty Stadt")
}
if got.Lat != nil {
t.Error("lat populated despite geocoder being skipped")
}
}
func TestMerge_BaseWinsOverOverlay(t *testing.T) {
// Invariant: a field set by crawl (base) is never overwritten by llm (overlay).
base := Enrichment{
PLZ: "01067",
Venue: stallhof,
Sources: Sources{"plz": ProvenanceCrawl, "venue": ProvenanceCrawl},
}
overlay := Enrichment{
PLZ: "WRONG",
Venue: "WRONG",
Category: "mittelaltermarkt",
Description: "Ein großer Markt.",
InputTokens: 100,
OutputTokens: 50,
Model: "mistral-large-latest",
Sources: Sources{
"plz": ProvenanceLLM,
"venue": ProvenanceLLM,
"category": ProvenanceLLM,
"description": ProvenanceLLM,
},
}
got := Merge(base, overlay)
if got.PLZ != "01067" {
t.Errorf("PLZ overwritten by overlay: got %q, want %q", got.PLZ, "01067")
}
if got.Venue != stallhof {
t.Errorf("Venue overwritten by overlay: got %q, want %q", got.Venue, stallhof)
}
if got.Category != "mittelaltermarkt" {
t.Errorf("Category should come from overlay: got %q", got.Category)
}
if got.Description != "Ein großer Markt." {
t.Errorf("Description should come from overlay: got %q", got.Description)
}
if got.Sources["plz"] != ProvenanceCrawl {
t.Errorf("plz provenance overwritten: got %q, want crawl", got.Sources["plz"])
}
if got.Sources["category"] != ProvenanceLLM {
t.Errorf("category provenance lost: got %q, want llm", got.Sources["category"])
}
if got.InputTokens != 100 || got.OutputTokens != 50 {
t.Errorf("token counts lost: in=%d out=%d", got.InputTokens, got.OutputTokens)
}
}
func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) {
base := Enrichment{Sources: Sources{}}
overlay := Enrichment{
Lat: ptrFloat(51.05),
Lng: ptrFloat(13.74),
Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl},
}
got := Merge(base, overlay)
if got.Lat == nil || *got.Lat != 51.05 {
t.Errorf("Lat not merged from overlay")
}
// Now base already has Lat; overlay must not overwrite.
baseWithLat := Enrichment{
Lat: ptrFloat(50.00),
Lng: ptrFloat(10.00),
Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl},
}
got = Merge(baseWithLat, overlay)
if *got.Lat != 50.00 {
t.Errorf("Lat overwritten despite base non-nil: got %v", *got.Lat)
}
}
// Sanity: CacheKey must be stable across re-runs and differ across inputs.
func TestCacheKey_StableAndDistinct(t *testing.T) {
aKey := CacheKey("ritterfest dresden", "Dresden", 2026)
bKey := CacheKey("ritterfest dresden", "Dresden", 2026)
cKey := CacheKey("ritterfest leipzig", "Leipzig", 2026)
if aKey != bKey {
t.Error("identical inputs must produce identical keys")
}
if aKey == cKey {
t.Error("different cities must produce different keys")
}
// Stadt casing drift doesn't change the key.
dKey := CacheKey("ritterfest dresden", "DRESDEN", 2026)
if aKey != dKey {
t.Error("stadt casing drift should not change key")
}
}

View File

@@ -0,0 +1,116 @@
// Package enrich derives additional market metadata in two passes.
//
// Pass A (crawl-enrich) is deterministic and free. It consolidates fields
// from per-source contributions (PLZ, venue, organizer) and geocodes city
// + PLZ via Nominatim. First non-empty value wins per field; provenance is
// recorded as "crawl".
//
// Pass B (llm-enrich) runs only for fields Pass A could not fill and does
// cost money. Provenance is recorded as "llm".
//
// Merge combines partial payloads preserving crawl-over-llm preference: a
// crawl-written field is never overwritten by an llm pass. The eval harness
// uses per-field provenance to measure the two passes separately.
//
// The package is intentionally standalone — it does NOT import the parent
// discovery package. Callers adapt domain types to the narrow Contribution
// input before invoking CrawlEnrich, which keeps the dependency edge one-way
// (discovery imports enrich; enrich imports no domain code).
package enrich
import "time"
// Enrichment is the JSONB payload persisted on discovered_markets.enrichment.
// Every field is optional; Sources records which pass filled each field so
// the eval harness can separate crawl-enrich (free, deterministic) from
// llm-enrich (paid, synthesised) contributions.
//
// Field shape is intentionally flat — promotion to typed columns is easier
// from a flat blob than from nested structures.
type Enrichment struct {
PLZ string `json:"plz,omitempty"`
Venue string `json:"venue,omitempty"`
Organizer string `json:"organizer,omitempty"`
Lat *float64 `json:"lat,omitempty"`
Lng *float64 `json:"lng,omitempty"`
Category string `json:"category,omitempty"`
OpeningHours string `json:"opening_hours,omitempty"`
Description string `json:"description,omitempty"`
Sources Sources `json:"sources,omitempty"`
EnrichedAt *time.Time `json:"enriched_at,omitempty"`
Model string `json:"model,omitempty"`
InputTokens int `json:"input_tokens,omitempty"`
OutputTokens int `json:"output_tokens,omitempty"`
}
// Sources maps each enrichment field name to the pass that filled it.
// Values are Provenance constants.
type Sources map[string]string
// Provenance constants for Sources entries. Crawl covers both offline
// consolidation of per-source contributions and Nominatim geocoding — both
// deterministic, neither counts against the LLM budget.
const (
ProvenanceCrawl = "crawl"
ProvenanceLLM = "llm"
)
// Merge combines two Enrichment payloads. The base is taken as authoritative;
// fields from overlay fill gaps in base but never overwrite. Provenance maps
// are merged the same way — base wins on conflict.
//
// Intended usage: base = output of crawl-enrich, overlay = output of
// llm-enrich. This enforces the "prefer crawl over llm" invariant: the LLM
// pass cannot clobber a field the crawler already filled in, even if the
// LLM is confident.
func Merge(base, overlay Enrichment) Enrichment {
out := base
if out.PLZ == "" {
out.PLZ = overlay.PLZ
}
if out.Venue == "" {
out.Venue = overlay.Venue
}
if out.Organizer == "" {
out.Organizer = overlay.Organizer
}
if out.Lat == nil {
out.Lat = overlay.Lat
}
if out.Lng == nil {
out.Lng = overlay.Lng
}
if out.Category == "" {
out.Category = overlay.Category
}
if out.OpeningHours == "" {
out.OpeningHours = overlay.OpeningHours
}
if out.Description == "" {
out.Description = overlay.Description
}
// Merge provenance maps with base-wins semantics.
if out.Sources == nil && len(overlay.Sources) > 0 {
out.Sources = Sources{}
}
for k, v := range overlay.Sources {
if _, exists := out.Sources[k]; !exists {
out.Sources[k] = v
}
}
// Token counts accumulate — llm overlay carries its own usage.
out.InputTokens += overlay.InputTokens
out.OutputTokens += overlay.OutputTokens
// Model / EnrichedAt come from whichever side actually ran an LLM call;
// only overwrite if base didn't have them.
if out.Model == "" {
out.Model = overlay.Model
}
if out.EnrichedAt == nil {
out.EnrichedAt = overlay.EnrichedAt
}
return out
}

View File

@@ -0,0 +1,35 @@
package enrich
import "context"
// LLMRequest is the narrow context the LLM enricher needs: identifying
// fields, the quellen URLs for web-search grounding, and the crawl-enrich
// payload already computed so the LLM can be instructed not to overwrite.
type LLMRequest struct {
MarktName string
Stadt string
Land string
Bundesland string
Quellen []string
// Partial is the output of crawl-enrich for this row. The LLM
// implementation MUST NOT overwrite any field set here — Merge enforces
// the invariant regardless, but the prompt should respect it to save
// tokens.
Partial Enrichment
}
// LLMEnricher fills enrichment fields that crawl-enrich could not reach.
// Returned payload must populate Sources with "llm" for every field it
// writes; Model and token counts are required so the eval harness can
// attribute cost.
type LLMEnricher interface {
EnrichMissing(ctx context.Context, req LLMRequest) (Enrichment, error)
}
// NoopLLMEnricher returns the partial unchanged. Used by tests and as a
// default when AI integration is disabled.
type NoopLLMEnricher struct{}
func (NoopLLMEnricher) EnrichMissing(_ context.Context, req LLMRequest) (Enrichment, error) {
return req.Partial, nil
}

View File

@@ -0,0 +1,192 @@
package enrich
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"strings"
"time"
"marktvogt.de/backend/internal/pkg/ai"
)
// maxScrapeURLs limits how many quellen we pull per enrichment call.
// Real-world queue rows rarely have more than 2-3 sources; 5 is a safe cap
// that prevents a worst-case row from burning minutes on fetches.
const maxScrapeURLs = 5
// ErrNoScrapedContent signals that every quellen URL failed or returned
// empty text. Callers should surface this to the operator rather than send
// a no-context LLM prompt — grounding is required for useful output.
var ErrNoScrapedContent = errors.New("no scrapeable content from any source URL")
// Scraper is the narrow interface MistralLLMEnricher depends on. Satisfied
// by *pkg/scrape.Client; tests inject a stub.
type Scraper interface {
Fetch(ctx context.Context, url string) (string, error)
}
// aiPass2 is the narrow interface for the AI client's Pass2 chat-completion
// method. Lets tests inject a stub that returns canned JSON without hitting
// the real Mistral API.
type aiPass2 interface {
Pass2(ctx context.Context, systemPrompt, userPrompt string) (ai.PassResult, error)
}
// MistralLLMEnricher implements LLMEnricher by scraping quellen URLs and
// feeding the concatenated text to Mistral's chat-completion endpoint with
// a JSON response format.
type MistralLLMEnricher struct {
Client aiPass2
Scraper Scraper
}
// NewMistralLLMEnricher constructs an enricher bound to a Mistral ai.Client
// and a scraper. Both are required; call sites should fall back to
// NoopLLMEnricher when AI is disabled rather than passing nil here.
func NewMistralLLMEnricher(client aiPass2, scraper Scraper) *MistralLLMEnricher {
return &MistralLLMEnricher{Client: client, Scraper: scraper}
}
// llmResponse is the JSON shape we instruct Mistral to return. Any field may
// be absent if the content doesn't support it — the enricher only writes
// what the model actually produced.
type llmResponse struct {
Category string `json:"category"`
OpeningHours string `json:"opening_hours"`
Description string `json:"description"`
}
// EnrichMissing scrapes up to maxScrapeURLs of req.Quellen, concatenates the
// extracted text, and asks Mistral to fill category / opening_hours /
// description. Fails with ErrNoScrapedContent if zero URLs return usable
// text — empty-context LLM calls hallucinate.
func (e *MistralLLMEnricher) EnrichMissing(ctx context.Context, req LLMRequest) (Enrichment, error) {
if e.Client == nil || e.Scraper == nil {
return Enrichment{}, errors.New("mistral enricher not configured")
}
urls := req.Quellen
if len(urls) > maxScrapeURLs {
urls = urls[:maxScrapeURLs]
}
blocks := make([]string, 0, len(urls))
for _, u := range urls {
text, err := e.Scraper.Fetch(ctx, u)
if err != nil {
slog.InfoContext(ctx, "scrape failed; continuing", "url", u, "error", err)
continue
}
text = strings.TrimSpace(text)
if text == "" {
continue
}
blocks = append(blocks, fmt.Sprintf("=== Quelle: %s ===\n%s", u, text))
}
if len(blocks) == 0 {
return Enrichment{}, ErrNoScrapedContent
}
systemPrompt := buildSystemPrompt()
userPrompt := buildUserPrompt(req, blocks)
result, err := e.Client.Pass2(ctx, systemPrompt, userPrompt)
if err != nil {
return Enrichment{}, fmt.Errorf("pass2: %w", err)
}
var parsed llmResponse
if err := json.Unmarshal([]byte(result.Content), &parsed); err != nil {
return Enrichment{}, fmt.Errorf("parse llm response: %w (content=%q)", err, result.Content)
}
// Build the Enrichment payload with only the fields the model produced.
// Sources entries + token counts + model tag feed the eval harness.
now := time.Now().UTC()
out := Enrichment{
Sources: Sources{},
Model: result.Model,
EnrichedAt: &now,
}
if result.Usage != nil {
out.InputTokens = result.Usage.PromptTokens
out.OutputTokens = result.Usage.CompletionTokens
}
if s := strings.TrimSpace(parsed.Category); s != "" {
out.Category = s
out.Sources["category"] = ProvenanceLLM
}
if s := strings.TrimSpace(parsed.OpeningHours); s != "" {
out.OpeningHours = s
out.Sources["opening_hours"] = ProvenanceLLM
}
if s := strings.TrimSpace(parsed.Description); s != "" {
out.Description = s
out.Sources["description"] = ProvenanceLLM
}
return out, nil
}
// buildSystemPrompt returns the English instruction block. Mistral follows
// English instructions more reliably; only the *output* is German.
func buildSystemPrompt() string {
return strings.TrimSpace(`
You are enriching metadata for a medieval market (Mittelaltermarkt) in the
DACH region. Using the provided source-page excerpts as grounding, return a
JSON object with exactly these fields:
{
"category": string, // short German label, e.g. "mittelaltermarkt",
// "weihnachtsmarkt", "ritterfest". Lowercase.
"opening_hours": string, // German; brief (e.g. "Sa-So 10:00-18:00").
// Empty string if unclear from sources.
"description": string // 1-3 sentences in German summarising what the
// market offers. Neutral tone, no hype.
// Empty string if sources have no useful info.
}
Rules:
- Return ONLY the JSON object. No prose, no code fences.
- If the sources do not support a field, return an empty string for it.
Do NOT invent details the sources don't mention.
- The description must be factual; avoid marketing language.
- All string values use straight ASCII quotes; escape internal quotes.
`)
}
// buildUserPrompt assembles the per-row context + scraped source blocks.
// Keeps the row identifiers (name, city) short; most of the prompt budget
// goes to the grounding blocks.
func buildUserPrompt(req LLMRequest, blocks []string) string {
var b strings.Builder
fmt.Fprintf(&b, "Markt: %s\n", req.MarktName)
fmt.Fprintf(&b, "Stadt: %s\n", req.Stadt)
if req.Bundesland != "" {
fmt.Fprintf(&b, "Bundesland: %s\n", req.Bundesland)
}
if req.Land != "" {
fmt.Fprintf(&b, "Land: %s\n", req.Land)
}
// Tell the LLM what crawl-enrich already covered so it doesn't spend
// tokens re-deriving fields we'd discard via Merge anyway.
if req.Partial.PLZ != "" || req.Partial.Venue != "" || req.Partial.Organizer != "" {
b.WriteString("\nBereits bekannt (nicht erneut liefern):\n")
if req.Partial.PLZ != "" {
fmt.Fprintf(&b, "- PLZ: %s\n", req.Partial.PLZ)
}
if req.Partial.Venue != "" {
fmt.Fprintf(&b, "- Venue: %s\n", req.Partial.Venue)
}
if req.Partial.Organizer != "" {
fmt.Fprintf(&b, "- Organizer: %s\n", req.Partial.Organizer)
}
}
b.WriteString("\n")
for _, blk := range blocks {
b.WriteString(blk)
b.WriteString("\n\n")
}
return strings.TrimSpace(b.String())
}

View File

@@ -0,0 +1,206 @@
package enrich
import (
"context"
"errors"
"strings"
"testing"
"marktvogt.de/backend/internal/pkg/ai"
)
const catMittelaltermarkt = "mittelaltermarkt"
// stubScraper returns canned responses per URL; an empty string simulates
// a blocked / empty page.
type stubScraper struct {
responses map[string]string
errs map[string]error
}
func (s *stubScraper) Fetch(_ context.Context, url string) (string, error) {
if err, ok := s.errs[url]; ok {
return "", err
}
return s.responses[url], nil
}
// stubPass2 captures the prompts it received and returns a canned JSON body.
type stubPass2 struct {
lastSystem string
lastUser string
result ai.PassResult
err error
}
func (s *stubPass2) Pass2(_ context.Context, systemPrompt, userPrompt string) (ai.PassResult, error) {
s.lastSystem = systemPrompt
s.lastUser = userPrompt
return s.result, s.err
}
func TestMistralEnrich_HappyPath(t *testing.T) {
scraper := &stubScraper{responses: map[string]string{
"https://a.example/markt": "Ein Mittelaltermarkt mit Ritterspielen und Markttreiben.",
"https://b.example/info": "Sa-So jeweils 10-18 Uhr.",
}}
client := &stubPass2{
result: ai.PassResult{
Content: `{"category":"mittelaltermarkt","opening_hours":"Sa-So 10:00-18:00","description":"Ein Markt mit Ritterspielen."}`,
Model: "mistral-large-latest",
Usage: &ai.UsageInfo{PromptTokens: 450, CompletionTokens: 60},
},
}
e := NewMistralLLMEnricher(client, scraper)
req := LLMRequest{
MarktName: "Mittelaltermarkt Dresden",
Stadt: "Dresden",
Land: "Deutschland",
Quellen: []string{"https://a.example/markt", "https://b.example/info"},
Partial: Enrichment{PLZ: "01067", Sources: Sources{"plz": ProvenanceCrawl}},
}
got, err := e.EnrichMissing(context.Background(), req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// Result carries LLM fields, provenance llm, model + token counts.
if got.Category != catMittelaltermarkt || got.Description == "" || got.OpeningHours == "" {
t.Errorf("missing fields in result: %+v", got)
}
if got.Sources["category"] != ProvenanceLLM {
t.Errorf("category provenance: got %q, want llm", got.Sources["category"])
}
if got.Model != "mistral-large-latest" {
t.Errorf("model: got %q, want mistral-large-latest", got.Model)
}
if got.InputTokens != 450 || got.OutputTokens != 60 {
t.Errorf("token counts: in=%d out=%d", got.InputTokens, got.OutputTokens)
}
// Prompt inspection — verify grounding blocks include the scraped content
// and the already-known fields are listed so LLM doesn't redo them.
if !strings.Contains(client.lastUser, "Mittelaltermarkt Dresden") {
t.Error("user prompt missing markt name")
}
if !strings.Contains(client.lastUser, "Ein Mittelaltermarkt mit Ritterspielen") {
t.Error("user prompt missing scraped content from URL 1")
}
if !strings.Contains(client.lastUser, "Sa-So jeweils 10-18 Uhr") {
t.Error("user prompt missing scraped content from URL 2")
}
if !strings.Contains(client.lastUser, "Bereits bekannt") {
t.Error("user prompt should announce already-known fields when Partial is populated")
}
if !strings.Contains(client.lastUser, "PLZ: 01067") {
t.Error("user prompt missing already-known PLZ")
}
// System prompt asks for JSON only.
if !strings.Contains(client.lastSystem, "JSON") {
t.Error("system prompt should mention JSON")
}
}
func TestMistralEnrich_AllScrapesFail(t *testing.T) {
scraper := &stubScraper{errs: map[string]error{
"https://a.example": errors.New("timeout"),
"https://b.example": errors.New("404"),
}}
client := &stubPass2{} // must not be called
e := NewMistralLLMEnricher(client, scraper)
req := LLMRequest{
Quellen: []string{"https://a.example", "https://b.example"},
}
_, err := e.EnrichMissing(context.Background(), req)
if !errors.Is(err, ErrNoScrapedContent) {
t.Errorf("err = %v; want ErrNoScrapedContent", err)
}
if client.lastUser != "" {
t.Error("LLM must not be called when zero URLs scrape")
}
}
func TestMistralEnrich_SomeScrapesFailStillCallsLLM(t *testing.T) {
// One URL fails, one succeeds — the LLM should still run with partial
// context, not fail because of the one bad source.
scraper := &stubScraper{
responses: map[string]string{"https://ok.example": "Useful content."},
errs: map[string]error{"https://bad.example": errors.New("timeout")},
}
client := &stubPass2{
result: ai.PassResult{Content: `{"category":"mittelaltermarkt","opening_hours":"","description":""}`},
}
e := NewMistralLLMEnricher(client, scraper)
req := LLMRequest{Quellen: []string{"https://bad.example", "https://ok.example"}}
got, err := e.EnrichMissing(context.Background(), req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got.Category != catMittelaltermarkt {
t.Errorf("category: got %q", got.Category)
}
if !strings.Contains(client.lastUser, "Useful content") {
t.Error("user prompt missing successful scrape")
}
}
func TestMistralEnrich_EmptyFieldsNoProvenance(t *testing.T) {
// LLM returns empty strings for fields it can't support. Those fields
// must NOT appear in Sources — an empty provenance is misleading.
scraper := &stubScraper{responses: map[string]string{"https://a.example": "Content."}}
client := &stubPass2{
result: ai.PassResult{Content: `{"category":"mittelaltermarkt","opening_hours":"","description":""}`},
}
e := NewMistralLLMEnricher(client, scraper)
got, err := e.EnrichMissing(context.Background(), LLMRequest{Quellen: []string{"https://a.example"}})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got.Sources["category"] != ProvenanceLLM {
t.Errorf("category should have llm provenance")
}
if _, has := got.Sources["opening_hours"]; has {
t.Error("empty opening_hours must not carry provenance")
}
if _, has := got.Sources["description"]; has {
t.Error("empty description must not carry provenance")
}
}
func TestMistralEnrich_CapsURLsAtFive(t *testing.T) {
// Supply 7 URLs; only the first 5 should be fetched.
urls := []string{"u1", "u2", "u3", "u4", "u5", "u6", "u7"}
responses := map[string]string{}
for _, u := range urls {
responses[u] = "content"
}
fetched := map[string]bool{}
scraper := &trackingScraper{fetch: func(u string) (string, error) {
fetched[u] = true
return responses[u], nil
}}
client := &stubPass2{result: ai.PassResult{Content: `{"category":"x","opening_hours":"","description":""}`}}
e := NewMistralLLMEnricher(client, scraper)
_, _ = e.EnrichMissing(context.Background(), LLMRequest{Quellen: urls})
if len(fetched) != 5 {
t.Errorf("fetched %d URLs; want 5", len(fetched))
}
for _, u := range []string{"u6", "u7"} {
if fetched[u] {
t.Errorf("URL %s should have been skipped past the cap", u)
}
}
}
type trackingScraper struct {
fetch func(string) (string, error)
}
func (t *trackingScraper) Fetch(_ context.Context, url string) (string, error) {
return t.fetch(url)
}

View File

@@ -0,0 +1,181 @@
package enrich
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
)
// SimilarityRow carries the minimal identifying fields the classifier reads.
// Callers adapt their domain row into this shape — the enrich package stays
// free of a reverse import on discovery.
type SimilarityRow struct {
NameNormalized string
Stadt string
// Year is StartDatum.Year() (or 0 when unknown). Two rows differing only
// in year are almost always the same series in different editions —
// useful signal but not identity.
Year int
// Display fields — shown to the LLM verbatim so it can reason about
// casing / diacritics that NameNormalized strips.
Name string
// Quellen are passed in but not scraped — the classifier works from
// name/city/date alone. Included in the prompt only as metadata.
Quellen []string
}
// Verdict is the classifier's answer about one pair.
type Verdict struct {
Same bool `json:"same"`
Confidence float64 `json:"confidence"` // 0..1
Reason string `json:"reason"`
Model string `json:"model,omitempty"`
ClassifiedAt time.Time `json:"classified_at,omitempty"`
}
// SimilarityClassifier decides whether two queue rows refer to the same
// underlying market. Implementations should be deterministic across calls
// on the same input — callers cache the result.
type SimilarityClassifier interface {
Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error)
}
// NoopSimilarityClassifier returns a zero-confidence verdict without calling
// anything. Used as a fallback when AI is disabled; callers should check
// ai.Enabled() and fall back to this type rather than passing nil.
type NoopSimilarityClassifier struct{}
func (NoopSimilarityClassifier) Classify(_ context.Context, _, _ SimilarityRow) (Verdict, error) {
return Verdict{}, errors.New("similarity classifier not configured")
}
// SimilarityPairKey derives a content-hash cache key over the two rows'
// identifying tuples. Ordering-independent: Classify(a,b) and Classify(b,a)
// must hit the same cache entry.
//
// Uses NameNormalized + lowered Stadt + Year so the key survives casing
// drift, whitespace, and umlaut normalisation (NameNormalized already did
// the heavy lifting).
func SimilarityPairKey(a, b SimilarityRow) string {
keyA := fmt.Sprintf("%s|%s|%d", a.NameNormalized, lowerASCII(a.Stadt), a.Year)
keyB := fmt.Sprintf("%s|%s|%d", b.NameNormalized, lowerASCII(b.Stadt), b.Year)
// Sort lexicographically for symmetry.
var raw string
if keyA <= keyB {
raw = keyA + "||" + keyB
} else {
raw = keyB + "||" + keyA
}
sum := sha256.Sum256([]byte(raw))
return hex.EncodeToString(sum[:])
}
// DefaultSimilarityCacheTTL: 30 days matches the enrichment cache TTL —
// same reasoning (amortise cost across re-crawls, source edits eventually
// propagate).
const DefaultSimilarityCacheTTL = 30 * 24 * time.Hour
// MistralSimilarityClassifier implements SimilarityClassifier by sending a
// JSON-formatted comparison prompt to Mistral's chat endpoint.
type MistralSimilarityClassifier struct {
Client aiPass2
}
// NewMistralSimilarityClassifier binds a Mistral ai.Client. client must be
// non-nil; routes.go falls back to NoopSimilarityClassifier when AI is off.
func NewMistralSimilarityClassifier(client aiPass2) *MistralSimilarityClassifier {
return &MistralSimilarityClassifier{Client: client}
}
// simResponse is the JSON shape we instruct Mistral to return. Confidence
// must be parseable as a float 0..1; anything outside that range is clamped.
type simResponse struct {
SameMarket bool `json:"same_market"`
Confidence float64 `json:"confidence"`
Reason string `json:"reason"`
}
// Classify sends the paired metadata to Mistral and parses the JSON response.
// No web scraping — the classifier works from name/city/year alone, which is
// enough for the common cases (same venue listed on two different calendars,
// editing typos, cross-year recurrence).
func (m *MistralSimilarityClassifier) Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error) {
if m.Client == nil {
return Verdict{}, errors.New("mistral similarity classifier not configured")
}
systemPrompt := simSystemPrompt()
userPrompt := simUserPrompt(a, b)
result, err := m.Client.Pass2(ctx, systemPrompt, userPrompt)
if err != nil {
return Verdict{}, fmt.Errorf("pass2: %w", err)
}
var parsed simResponse
if err := json.Unmarshal([]byte(result.Content), &parsed); err != nil {
return Verdict{}, fmt.Errorf("parse response: %w (content=%q)", err, result.Content)
}
// Clamp confidence to [0,1]; the model occasionally returns 1.2 or -0.1.
conf := parsed.Confidence
if conf < 0 {
conf = 0
}
if conf > 1 {
conf = 1
}
return Verdict{
Same: parsed.SameMarket,
Confidence: conf,
Reason: strings.TrimSpace(parsed.Reason),
Model: result.Model,
ClassifiedAt: time.Now().UTC(),
}, nil
}
func simSystemPrompt() string {
return strings.TrimSpace(`
You decide whether two candidate entries refer to the same medieval market
(Mittelaltermarkt) in the DACH region. Input: two objects each with a name,
city, and year. Output a single JSON object:
{
"same_market": true|false,
"confidence": 0.0-1.0, // how sure you are
"reason": "..." // short German justification (<= 140 chars)
}
Rules:
- Return ONLY the JSON object. No prose, no code fences.
- "Same market" means same recurring event — same venue, same organiser,
same audience. A market and its anniversary edition in a later year ARE
the same market (just different editions).
- Different cities = different markets, even if the name matches.
- Rephrasings, typos, and umlaut differences (Dresden vs Straßburg vs
Strassburg) are the same market if the underlying identifiers align.
- If the evidence is weak, return same_market=false with low confidence
rather than guessing. Low confidence is more useful than a wrong guess.
`)
}
func simUserPrompt(a, b SimilarityRow) string {
// Keep the JSON compact; the model handles inline JSON better than
// pretty-printed when the task is "read two records".
ja, _ := json.Marshal(map[string]any{
"name": a.Name,
"city": a.Stadt,
"year": a.Year,
})
jb, _ := json.Marshal(map[string]any{
"name": b.Name,
"city": b.Stadt,
"year": b.Year,
})
return fmt.Sprintf("A: %s\nB: %s", ja, jb)
}

View File

@@ -0,0 +1,118 @@
package enrich
import (
"context"
"errors"
"strings"
"testing"
"marktvogt.de/backend/internal/pkg/ai"
)
func TestSimilarityPairKey_Symmetric(t *testing.T) {
a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026}
b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026}
if SimilarityPairKey(a, b) != SimilarityPairKey(b, a) {
t.Error("pair key must be symmetric: (a,b) and (b,a) should produce identical keys")
}
}
func TestSimilarityPairKey_DifferentInputsDifferentKeys(t *testing.T) {
a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026}
b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026}
c := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2027}
if SimilarityPairKey(a, b) == SimilarityPairKey(a, c) {
t.Error("different pairs must produce different keys")
}
// Stadt casing must not change the key.
d := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "DRESDEN", Year: 2026}
if SimilarityPairKey(a, b) != SimilarityPairKey(d, b) {
t.Error("stadt casing drift must not change the key")
}
}
func TestMistralSimilarity_HappyPath(t *testing.T) {
client := &stubPass2{
result: ai.PassResult{
Content: `{"same_market":true,"confidence":0.82,"reason":"Gleicher Name, gleiche Stadt, gleiches Jahr."}`,
Model: "mistral-large-latest",
},
}
c := NewMistralSimilarityClassifier(client)
got, err := c.Classify(context.Background(),
SimilarityRow{Name: "Mittelaltermarkt Dresden", Stadt: "Dresden", Year: 2026},
SimilarityRow{Name: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", Year: 2026},
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !got.Same {
t.Errorf("same = false; want true")
}
if got.Confidence < 0.8 || got.Confidence > 0.85 {
t.Errorf("confidence = %v; want ~0.82", got.Confidence)
}
if got.Reason == "" {
t.Error("reason missing")
}
if got.Model != "mistral-large-latest" {
t.Errorf("model = %q", got.Model)
}
// Prompt must carry both rows' identifying fields for the LLM to reason on.
if !strings.Contains(client.lastUser, "Mittelaltermarkt Dresden") {
t.Error("user prompt missing A.name")
}
if !strings.Contains(client.lastSystem, "same_market") {
t.Error("system prompt should describe the JSON schema (same_market key)")
}
}
func TestMistralSimilarity_ClampsConfidence(t *testing.T) {
tests := []struct {
name string
raw string
wantConf float64
}{
{"above 1 clamps to 1", `{"same_market":true,"confidence":1.4,"reason":"x"}`, 1.0},
{"below 0 clamps to 0", `{"same_market":false,"confidence":-0.3,"reason":"x"}`, 0.0},
{"in range passes through", `{"same_market":true,"confidence":0.5,"reason":"x"}`, 0.5},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: tc.raw}})
got, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got.Confidence != tc.wantConf {
t.Errorf("confidence = %v; want %v", got.Confidence, tc.wantConf)
}
})
}
}
func TestMistralSimilarity_PropagatesPass2Error(t *testing.T) {
c := NewMistralSimilarityClassifier(&stubPass2{err: errors.New("mistral down")})
_, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err == nil {
t.Fatal("expected error; got nil")
}
}
func TestMistralSimilarity_RejectsBadJSON(t *testing.T) {
c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: "not json at all"}})
_, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err == nil {
t.Fatal("expected parse error; got nil")
}
}
func TestNoopSimilarityClassifier_Errors(t *testing.T) {
_, err := NoopSimilarityClassifier{}.Classify(context.Background(), SimilarityRow{}, SimilarityRow{})
if err == nil {
t.Error("NoopSimilarityClassifier should return error — it's the fallback when AI is disabled")
}
}

View File

@@ -32,6 +32,18 @@ type Handler struct {
lastFinishedAt time.Time
lastSummary *CrawlSummary
lastError string
// Async crawl-enrich-all state (manual bulk operator action).
// Reuses the CAS-gate + mutex pattern of the crawl flow above so the
// admin UI can poll a dedicated status endpoint without colliding with a
// running crawl.
enrichRunning atomic.Bool
enrichMu sync.RWMutex
// Guarded by enrichMu:
enrichStartedAt time.Time
enrichFinishedAt time.Time
enrichSummary *CrawlEnrichSummary
enrichError string
}
// NewHandler constructs a Handler. manualRateLimitPerHour controls how
@@ -314,6 +326,125 @@ func (h *Handler) Similar(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"data": matches})
}
// RunCrawlEnrichAll kicks off the manual bulk crawl-enrich pass in the
// background and returns 202 immediately. Exactly one run may be in progress
// at a time (CAS gate). Completion is observable via
// GET /admin/discovery/enrichment/crawl-all-status.
func (h *Handler) RunCrawlEnrichAll(c *gin.Context) {
if !h.enrichRunning.CompareAndSwap(false, true) {
apiErr := apierror.TooManyRequests("A crawl-enrich run is already in progress")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
h.enrichMu.Lock()
h.enrichStartedAt = time.Now().UTC()
h.enrichFinishedAt = time.Time{}
h.enrichSummary = nil
h.enrichError = ""
h.enrichMu.Unlock()
go h.runEnrichAsync()
c.JSON(http.StatusAccepted, gin.H{
"data": gin.H{
"status": "started",
"message": "Crawl-enrich started in background. Poll /api/v1/admin/discovery/enrichment/crawl-all-status for completion.",
},
})
}
// runEnrichAsync runs RunCrawlEnrichAll with a detached context. 10m cap is
// generous for Nominatim's 1rps: a 600-row queue is the worst case we expect.
func (h *Handler) runEnrichAsync() {
defer h.enrichRunning.Store(false)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
summary, err := h.service.RunCrawlEnrichAll(ctx)
h.enrichMu.Lock()
h.enrichFinishedAt = time.Now().UTC()
if err != nil {
h.enrichError = err.Error()
slog.ErrorContext(ctx, "async crawl-enrich failed", "error", err)
} else {
sCopy := summary
h.enrichSummary = &sCopy
}
h.enrichMu.Unlock()
}
// CrawlEnrichStatus returns the state of the most recent async crawl-enrich.
// Shape mirrors CrawlStatus for UI reuse.
func (h *Handler) CrawlEnrichStatus(c *gin.Context) {
h.enrichMu.RLock()
defer h.enrichMu.RUnlock()
c.JSON(http.StatusOK, gin.H{
"data": gin.H{
"running": h.enrichRunning.Load(),
"started_at": h.enrichStartedAt,
"finished_at": h.enrichFinishedAt,
"summary": h.enrichSummary,
"error": h.enrichError,
},
})
}
// EnrichLLM runs per-row LLM enrichment synchronously. 30s deadline is
// enough for scraping 5 URLs + one Mistral Pass2 call in typical conditions.
// Operator clicks the button, waits, sees the result — no polling.
func (h *Handler) EnrichLLM(c *gin.Context) {
id, err := uuid.Parse(c.Param("id"))
if err != nil {
apiErr := apierror.BadRequest("invalid_id", "invalid queue id")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
defer cancel()
payload, err := h.service.RunLLMEnrichOne(ctx, id)
if err != nil {
slog.WarnContext(ctx, "llm enrich failed", "queue_id", id, "error", err)
apiErr := apierror.Internal("llm enrich failed: " + err.Error())
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
c.JSON(http.StatusOK, gin.H{"data": payload})
}
// ClassifySimilarPair runs the LLM duplicate-tiebreaker on the two queue
// rows identified by URL params :aid and :bid. Synchronous, 15s deadline —
// the call is short (no scraping) so the operator can click and immediately
// see the verdict.
func (h *Handler) ClassifySimilarPair(c *gin.Context) {
aID, err := uuid.Parse(c.Param("aid"))
if err != nil {
apiErr := apierror.BadRequest("invalid_id", "invalid queue id A")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
bID, err := uuid.Parse(c.Param("bid"))
if err != nil {
apiErr := apierror.BadRequest("invalid_id", "invalid queue id B")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 15*time.Second)
defer cancel()
verdict, err := h.service.ClassifySimilarPair(ctx, aID, bID)
if err != nil {
slog.WarnContext(ctx, "classify similar failed", "a", aID, "b", bID, "error", err)
apiErr := apierror.Internal("classify failed: " + err.Error())
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
c.JSON(http.StatusOK, gin.H{"data": verdict})
}
func currentUserID(c *gin.Context) (uuid.UUID, bool) {
raw, exists := c.Get("user_id")
if !exists {

View File

@@ -55,6 +55,9 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
nil,
nil,
nil,
)
h := NewHandler(svc, 0) // rate limit disabled
@@ -89,7 +92,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
// TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler.
func TestCrawlStatusInitialState(t *testing.T) {
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
h := NewHandler(svc, 0)
w := httptest.NewRecorder()
@@ -139,7 +142,7 @@ func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) {
started: make(chan struct{}),
release: make(chan struct{}),
}
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
h := NewHandler(svc, 0) // rate limit disabled
// First request — returns 202 and spawns goroutine.
@@ -194,7 +197,7 @@ func TestListQueueSortParamWhitelist(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
repo := newMockRepo()
svc := NewService(repo, &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
h := NewHandler(svc, 0)
w := httptest.NewRecorder()
@@ -252,6 +255,9 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
nil,
nil,
nil,
)
// 1 per hour window.
h := NewHandler(svc, 1)

View File

@@ -6,6 +6,8 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"marktvogt.de/backend/internal/domain/discovery/enrich"
)
// Compile-time guard: mockRepo must fully satisfy Repository.
@@ -16,7 +18,8 @@ type mockRepo struct {
editionExistsFn func(ctx context.Context, seriesID uuid.UUID, year int) (bool, error)
insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
isRejectedFn func(ctx context.Context, nameNormalized, stadt string, year int) (bool, error)
queuePendingFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error)
findMatchFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (*DiscoveredMarket, error)
mergePendingFn func(id uuid.UUID, addSources, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error
countQueueFn func(ctx context.Context, status string) (int, error)
getDiscoveredFn func(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error)
beginTxFn func(ctx context.Context) (pgx.Tx, error)
@@ -32,6 +35,17 @@ type mockRepo struct {
listQueueCalls []listQueueCall
listQueueRows []DiscoveredMarket
listQueueErr error
// Enrichment hooks (optional). Tests set whichever they need; nil falls
// through to a no-op / empty response.
setEnrichmentFn func(id uuid.UUID, payload enrich.Enrichment, status string) error
getCacheFn func(key string) (enrich.Enrichment, bool, error)
setCacheFn func(key string, payload enrich.Enrichment, ttl time.Duration) error
listPendingEnrichFn func(limit int) ([]DiscoveredMarket, error)
// Similarity AI cache hooks.
getSimCacheFn func(pairKey string) (enrich.Verdict, bool, error)
setSimCacheFn func(pairKey string, v enrich.Verdict, ttl time.Duration) error
}
// listQueueCall records arguments passed to mockRepo.ListQueue so tests can
@@ -53,8 +67,17 @@ func (m *mockRepo) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uu
func (m *mockRepo) IsRejected(ctx context.Context, n, s string, y int) (bool, error) {
return m.isRejectedFn(ctx, n, s, y)
}
func (m *mockRepo) QueueHasPending(ctx context.Context, n, s string, sd *time.Time) (bool, error) {
return m.queuePendingFn(ctx, n, s, sd)
func (m *mockRepo) FindPendingMatch(ctx context.Context, n, s string, sd *time.Time) (*DiscoveredMarket, error) {
if m.findMatchFn != nil {
return m.findMatchFn(ctx, n, s, sd)
}
return nil, nil
}
func (m *mockRepo) MergePendingSources(_ context.Context, id uuid.UUID, addSources, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error {
if m.mergePendingFn != nil {
return m.mergePendingFn(id, addSources, addQuellen, addContribs, newKonfidenz, newHinweis)
}
return nil
}
func (m *mockRepo) ListQueue(ctx context.Context, status, sortBy, order string, l, o int) ([]DiscoveredMarket, error) {
m.listQueueCalls = append(m.listQueueCalls, listQueueCall{
@@ -104,6 +127,42 @@ func (m *mockRepo) Stats(ctx context.Context, forwardMonths, recentErrorsLimit i
func (m *mockRepo) UpdatePending(ctx context.Context, id uuid.UUID, f UpdatePendingFields, nn *string) error {
return nil
}
func (m *mockRepo) SetEnrichment(_ context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error {
if m.setEnrichmentFn != nil {
return m.setEnrichmentFn(id, payload, status)
}
return nil
}
func (m *mockRepo) GetEnrichmentCache(_ context.Context, key string) (enrich.Enrichment, bool, error) {
if m.getCacheFn != nil {
return m.getCacheFn(key)
}
return enrich.Enrichment{}, false, nil
}
func (m *mockRepo) SetEnrichmentCache(_ context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error {
if m.setCacheFn != nil {
return m.setCacheFn(key, payload, ttl)
}
return nil
}
func (m *mockRepo) ListPendingEnrichment(_ context.Context, limit int) ([]DiscoveredMarket, error) {
if m.listPendingEnrichFn != nil {
return m.listPendingEnrichFn(limit)
}
return nil, nil
}
func (m *mockRepo) GetSimilarityCache(_ context.Context, pairKey string) (enrich.Verdict, bool, error) {
if m.getSimCacheFn != nil {
return m.getSimCacheFn(pairKey)
}
return enrich.Verdict{}, false, nil
}
func (m *mockRepo) SetSimilarityCache(_ context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error {
if m.setSimCacheFn != nil {
return m.setSimCacheFn(pairKey, v, ttl)
}
return nil
}
// noopLinkVerifier passes every URL — used by tests to isolate from network.
type noopLinkVerifier struct{}

View File

@@ -5,6 +5,8 @@ import (
"time"
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/discovery/enrich"
)
// SourceContribution captures what one source reported about a market.
@@ -67,8 +69,25 @@ type DiscoveredMarket struct {
ReviewedAt *time.Time `json:"reviewed_at"`
ReviewedBy *uuid.UUID `json:"reviewed_by"`
CreatedEditionID *uuid.UUID `json:"created_edition_id"`
// Enrichment payload + worker lifecycle. See enrich.Enrichment for the contract.
Enrichment enrich.Enrichment `json:"enrichment"`
EnrichmentStatus string `json:"enrichment_status"` // pending|done|failed|skipped
EnrichmentAttempts int `json:"enrichment_attempts"`
EnrichedAt *time.Time `json:"enriched_at"`
}
// EnrichmentStatus constants for discovered_markets.enrichment_status.
// The payload type + provenance constants live in the enrich package; this
// status string is a DB column value used by repository queries, kept here
// alongside the other DB-lifecycle constants.
const (
EnrichmentStatusPending = "pending"
EnrichmentStatusDone = "done"
EnrichmentStatusFailed = "failed"
EnrichmentStatusSkipped = "skipped"
)
// RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year).
type RejectedDiscovery struct {
ID uuid.UUID `json:"id"`

View File

@@ -4,12 +4,15 @@ package discovery
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"marktvogt.de/backend/internal/domain/discovery/enrich"
)
type Repository interface {
@@ -17,7 +20,16 @@ type Repository interface {
EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error)
InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error)
QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error)
// FindPendingMatch looks up an existing pending queue row by the exact
// tuple (name_normalized, stadt, start_datum). Returns nil when no match
// exists. Used by the crawl auto-merge path — callers decide whether to
// merge sources into the match or skip as redundant.
FindPendingMatch(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (*DiscoveredMarket, error)
// MergePendingSources appends the supplied per-source data onto an existing
// pending row. Arrays are deduped (set-union semantics); konfidenz and
// hinweis are overwritten with the caller-computed values. Idempotent —
// calling with already-present sources is a no-op.
MergePendingSources(ctx context.Context, id uuid.UUID, addSources []string, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error
ListQueue(ctx context.Context, status, sortBy, order string, limit, offset int) ([]DiscoveredMarket, error)
CountQueue(ctx context.Context, status string) (int, error)
GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error)
@@ -27,6 +39,20 @@ type Repository interface {
BeginTx(ctx context.Context) (pgx.Tx, error)
Stats(ctx context.Context, forwardMonths, recentErrorsLimit int) (Stats, error)
UpdatePending(ctx context.Context, id uuid.UUID, fields UpdatePendingFields, nameNormalized *string) error
// Enrichment persistence. SetEnrichment writes payload + status + bumps
// attempts + stamps enriched_at in one UPDATE so the worker stays idempotent.
SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error
// ListPendingEnrichment returns queue rows awaiting their first enrichment
// pass. Ordered by discovered_at ASC so older rows get processed first.
ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error)
// Cache operations keyed on sha256(name_normalized|stadt|year).
GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error)
SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error
// Similarity AI cache — keyed on enrich.SimilarityPairKey.
GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error)
SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error
}
// SeriesCandidate is a minimal projection used for name-normalization comparison in Go.
@@ -106,16 +132,70 @@ func (r *pgRepository) IsRejected(ctx context.Context, nameNormalized, stadt str
return exists, err
}
func (r *pgRepository) QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) {
var exists bool
err := r.pool.QueryRow(ctx, `
SELECT EXISTS(
SELECT 1 FROM discovered_markets
WHERE status='pending'
AND name_normalized=$1 AND stadt=$2
AND start_datum IS NOT DISTINCT FROM $3
)`, nameNormalized, stadt, startDatum).Scan(&exists)
return exists, err
func (r *pgRepository) FindPendingMatch(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (*DiscoveredMarket, error) {
row := r.pool.QueryRow(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''),
coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id,
sources, source_contributions,
enrichment, enrichment_status, enrichment_attempts, enriched_at
FROM discovered_markets
WHERE status='pending'
AND name_normalized=$1 AND stadt=$2
AND start_datum IS NOT DISTINCT FROM $3
LIMIT 1`, nameNormalized, stadt, startDatum)
d, err := scanDiscoveredMarket(row)
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return &d, nil
}
// MergePendingSources appends new per-source data onto an existing pending
// row using set-union semantics for the text[] columns and a JSONB concat
// for source_contributions. Konfidenz and hinweis are overwritten with the
// caller-computed values — the caller already knows the new source count
// and whether a date_conflict hint still applies.
//
// Idempotent: if add* arrays contain entries already present, the DISTINCT
// clauses drop duplicates. Contributions use a stricter dedup via source_name
// uniqueness — a source that re-crawls should overwrite its own row, not
// append a second copy.
func (r *pgRepository) MergePendingSources(ctx context.Context, id uuid.UUID, addSources []string, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error {
contribJSON, err := json.Marshal(addContribs)
if err != nil {
return fmt.Errorf("marshal added contributions: %w", err)
}
// Dedup text[] via ARRAY(SELECT DISTINCT unnest(...)).
// source_contributions: de-dupe by source_name — keep the existing entry
// when a source name collides, otherwise append the new one. Ordering by
// source_name keeps the array deterministic for downstream comparison.
tag, err := r.pool.Exec(ctx, `
UPDATE discovered_markets
SET sources = ARRAY(SELECT DISTINCT unnest(sources || $2::text[])),
quellen = ARRAY(SELECT DISTINCT unnest(quellen || $3::text[])),
source_contributions = (
SELECT jsonb_agg(c ORDER BY c->>'source_name')
FROM (
SELECT DISTINCT ON (c->>'source_name') c
FROM jsonb_array_elements(source_contributions || $4::jsonb) AS c
) deduped
),
konfidenz = NULLIF($5, ''),
hinweis = $6
WHERE id = $1 AND status = 'pending'`,
id, addSources, addQuellen, contribJSON, newKonfidenz, newHinweis)
if err != nil {
return fmt.Errorf("merge pending sources: %w", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("no pending row with id %s", id)
}
return nil
}
func (r *pgRepository) ListQueue(ctx context.Context, status, sortBy, order string, limit, offset int) ([]DiscoveredMarket, error) {
@@ -125,7 +205,8 @@ SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''),
coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id,
sources, source_contributions
sources, source_contributions,
enrichment, enrichment_status, enrichment_attempts, enriched_at
FROM discovered_markets
WHERE status = $1
` + orderBy + `
@@ -188,13 +269,45 @@ func (r *pgRepository) CountQueue(ctx context.Context, status string) (int, erro
return n, err
}
// ListPendingEnrichment selects queue rows still awaiting enrichment. The
// status='pending' filter excludes accepted/rejected rows — enrichment only
// makes sense for review candidates. Ordered by discovered_at ASC so an
// operator running the bulk action sees the oldest queue entries improve first.
func (r *pgRepository) ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error) {
rows, err := r.pool.Query(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''),
coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id,
sources, source_contributions,
enrichment, enrichment_status, enrichment_attempts, enriched_at
FROM discovered_markets
WHERE status = 'pending' AND enrichment_status = $1
ORDER BY discovered_at ASC
LIMIT $2`, EnrichmentStatusPending, limit)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]DiscoveredMarket, 0)
for rows.Next() {
d, err := scanDiscoveredMarket(rows)
if err != nil {
return nil, err
}
out = append(out, d)
}
return out, rows.Err()
}
func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) {
row := r.pool.QueryRow(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''),
coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id,
sources, source_contributions
sources, source_contributions,
enrichment, enrichment_status, enrichment_attempts, enriched_at
FROM discovered_markets WHERE id = $1`, id)
return scanDiscoveredMarket(row)
}
@@ -210,12 +323,14 @@ type scanner interface {
func scanDiscoveredMarket(s scanner) (DiscoveredMarket, error) {
var d DiscoveredMarket
var contribJSON []byte
var enrichmentJSON []byte
if err := s.Scan(
&d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land,
&d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Konfidenz,
&d.AgentStatus, &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status,
&d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID,
&d.Sources, &contribJSON,
&enrichmentJSON, &d.EnrichmentStatus, &d.EnrichmentAttempts, &d.EnrichedAt,
); err != nil {
return DiscoveredMarket{}, err
}
@@ -224,6 +339,11 @@ func scanDiscoveredMarket(s scanner) (DiscoveredMarket, error) {
return DiscoveredMarket{}, fmt.Errorf("unmarshal source_contributions: %w", err)
}
}
if len(enrichmentJSON) > 0 {
if err := json.Unmarshal(enrichmentJSON, &d.Enrichment); err != nil {
return DiscoveredMarket{}, fmt.Errorf("unmarshal enrichment: %w", err)
}
}
if d.Sources == nil {
d.Sources = []string{}
}
@@ -329,3 +449,124 @@ LIMIT $1`, recentErrorsLimit)
}
return s, rows.Err()
}
// SetEnrichment persists a worker's enrichment outcome in a single UPDATE.
// Attempts is incremented server-side so concurrent retries stay honest even
// if the caller's read-modify-write is racy. enriched_at is only stamped for
// terminal states ('done' / 'failed'), not 'skipped', to distinguish "we ran
// and nothing changed" from "we ran and succeeded trivially".
func (r *pgRepository) SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal enrichment: %w", err)
}
var enrichedExpr string
switch status {
case EnrichmentStatusDone, EnrichmentStatusFailed:
enrichedExpr = "now()"
default:
enrichedExpr = "enriched_at" // preserve existing value
}
tag, err := r.pool.Exec(ctx, fmt.Sprintf(`
UPDATE discovered_markets
SET enrichment = $2,
enrichment_status = $3,
enrichment_attempts = enrichment_attempts + 1,
enriched_at = %s
WHERE id = $1`, enrichedExpr), id, payloadJSON, status)
if err != nil {
return fmt.Errorf("set enrichment: %w", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("no discovered_markets row with id %s", id)
}
return nil
}
// GetEnrichmentCache returns (payload, true, nil) on a hit that has not yet
// expired, (_, false, nil) on a miss or expired entry. Expired entries are
// treated as misses so the caller refetches rather than relying on a
// background pruner.
func (r *pgRepository) GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error) {
var payloadJSON []byte
err := r.pool.QueryRow(ctx, `
SELECT payload FROM enrichment_cache
WHERE cache_key = $1
AND (expires_at IS NULL OR expires_at > now())`, key).Scan(&payloadJSON)
if errors.Is(err, pgx.ErrNoRows) {
return enrich.Enrichment{}, false, nil
}
if err != nil {
return enrich.Enrichment{}, false, fmt.Errorf("cache get: %w", err)
}
var e enrich.Enrichment
if err := json.Unmarshal(payloadJSON, &e); err != nil {
return enrich.Enrichment{}, false, fmt.Errorf("unmarshal cache payload: %w", err)
}
return e, true, nil
}
// SetEnrichmentCache writes or replaces a cache entry. ttl==0 means "no
// expiry" (expires_at stays NULL). Callers typically use 30 days.
func (r *pgRepository) SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal cache payload: %w", err)
}
var expiresAt *time.Time
if ttl > 0 {
t := time.Now().Add(ttl)
expiresAt = &t
}
_, err = r.pool.Exec(ctx, `
INSERT INTO enrichment_cache (cache_key, payload, expires_at)
VALUES ($1, $2, $3)
ON CONFLICT (cache_key) DO UPDATE
SET payload = EXCLUDED.payload,
created_at = now(),
expires_at = EXCLUDED.expires_at`, key, payloadJSON, expiresAt)
if err != nil {
return fmt.Errorf("cache set: %w", err)
}
return nil
}
// GetSimilarityCache returns a cached AI verdict for a (pairKey) or
// (zero, false, nil) on miss/expiry. Expired entries are treated as misses.
func (r *pgRepository) GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error) {
var v enrich.Verdict
err := r.pool.QueryRow(ctx, `
SELECT same, confidence, reason, model FROM similarity_ai_cache
WHERE pair_key = $1
AND (expires_at IS NULL OR expires_at > now())`, pairKey).Scan(&v.Same, &v.Confidence, &v.Reason, &v.Model)
if errors.Is(err, pgx.ErrNoRows) {
return enrich.Verdict{}, false, nil
}
if err != nil {
return enrich.Verdict{}, false, fmt.Errorf("similarity cache get: %w", err)
}
return v, true, nil
}
// SetSimilarityCache upserts a verdict. ttl=0 means "no expiry" (nullable).
func (r *pgRepository) SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error {
var expiresAt *time.Time
if ttl > 0 {
t := time.Now().Add(ttl)
expiresAt = &t
}
_, err := r.pool.Exec(ctx, `
INSERT INTO similarity_ai_cache (pair_key, same, confidence, reason, model, expires_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (pair_key) DO UPDATE
SET same = EXCLUDED.same,
confidence = EXCLUDED.confidence,
reason = EXCLUDED.reason,
model = EXCLUDED.model,
created_at = now(),
expires_at = EXCLUDED.expires_at`, pairKey, v.Same, v.Confidence, v.Reason, v.Model, expiresAt)
if err != nil {
return fmt.Errorf("similarity cache set: %w", err)
}
return nil
}

View File

@@ -24,9 +24,19 @@ func RegisterRoutes(
admin.POST("/queue/:id/accept", h.Accept)
admin.POST("/queue/:id/reject", h.Reject)
admin.GET("/queue/:id/similar", h.Similar)
// Per-row LLM enrichment (MR 3b). Synchronous — operator waits.
admin.POST("/queue/:id/enrich", h.EnrichLLM)
// Per-pair AI similarity tiebreak (MR 4). Synchronous; short call.
admin.POST("/queue/:aid/similar/:bid/classify", h.ClassifySimilarPair)
// Manual crawl trigger — subject to hourly rate limit.
admin.POST("/crawl-manual", h.Crawl)
// Async crawl status polling.
admin.GET("/crawl-status", h.CrawlStatus)
// Manual bulk crawl-enrich — deterministic consolidation + Nominatim
// geocoding applied to every enrichment_status='pending' row. Async
// (202 + polling) because the Nominatim 1rps rate means a full queue
// can take minutes.
admin.POST("/enrichment/crawl-all", h.RunCrawlEnrichAll)
admin.GET("/enrichment/crawl-all-status", h.CrawlEnrichStatus)
}
}

View File

@@ -12,9 +12,21 @@ import (
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/discovery/enrich"
"marktvogt.de/backend/internal/domain/market"
)
// Geocoder is the narrow interface the enrichment flow depends on.
// *pkg/geocode.Geocoder satisfies it. Declared here instead of in the
// sub-package enrich/ to avoid an import cycle (enrich depends on discovery
// for the DiscoveredMarket / Enrichment types).
//
// Structurally identical to enrich.Geocoder, so a concrete *geocode.Geocoder
// satisfies both and can be passed across the package boundary.
type Geocoder interface {
Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error)
}
type marketCreator interface {
Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error)
CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error)
@@ -39,15 +51,31 @@ type Service struct {
crawler crawlerRunner
marketCreator marketCreator
linkChecker linkVerifier
// geocoder is optional — nil means crawl-enrich skips lat/lng. Wired from
// server/routes.go using the shared Nominatim client (1 rps limited).
geocoder Geocoder
// llmEnricher is the AI-backed fallback pass. Nil-safe via NoopLLMEnricher
// in test wiring; production code passes a real MistralLLMEnricher.
llmEnricher enrich.LLMEnricher
// simClassifier is the AI-backed duplicate tiebreaker. Nil-safe via
// NoopSimilarityClassifier.
simClassifier enrich.SimilarityClassifier
}
// NewService constructs a Service wired for the crawler-driven Crawl path.
func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service {
// geocoder may be nil; in that case crawl-enrich runs consolidation only and
// skips the lat/lng step. llm may be nil; per-row LLM enrichment then
// returns an error instead of attempting a call. simClassifier may be nil;
// ClassifySimilarPair returns an error rather than a zero-confidence answer.
func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder, llm enrich.LLMEnricher, sim enrich.SimilarityClassifier) *Service {
return &Service{
repo: repo,
crawler: cr,
marketCreator: mc,
linkChecker: lc,
geocoder: gc,
llmEnricher: llm,
simClassifier: sim,
}
}
@@ -59,9 +87,13 @@ type CrawlSummary struct {
Merged int `json:"merged"`
MergedAcrossSites int `json:"merged_across_sites"`
Discovered int `json:"discovered"`
DedupedExisting int `json:"deduped_existing"`
DedupedRejected int `json:"deduped_rejected"`
DedupedQueue int `json:"deduped_queue"`
// AutoMerged counts queue rows that received a new source this run —
// cross-crawl consolidation. DedupedQueue now only counts truly redundant
// pickups (same source, same tuple, seen again).
AutoMerged int `json:"auto_merged"`
DedupedExisting int `json:"deduped_existing"`
DedupedRejected int `json:"deduped_rejected"`
DedupedQueue int `json:"deduped_queue"`
// LinkCheckFailed is retained for JSON compatibility with the admin UI;
// no longer populated since the crawler pipeline skips link verification.
// Consider removing in a future schema version.
@@ -180,13 +212,42 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
continue
}
}
pending, err := s.repo.QueueHasPending(insertCtx, nameNorm, m.City, m.StartDate)
match, err := s.repo.FindPendingMatch(insertCtx, nameNorm, m.City, m.StartDate)
if err != nil {
slog.WarnContext(ctx, "queue pending check", "error", err)
continue
}
if pending {
summary.DedupedQueue++
if match != nil {
// A pending row already covers this (name, stadt, start_date).
// Two cases:
// 1. The new event brings a source already present on the match
// — nothing to merge, count as redundant (same source
// re-emitted the event this crawl, or a previous run already
// captured it).
// 2. The new event introduces a source not yet represented —
// merge the per-source data onto the existing row. This is
// the "auto-merge" path: avoids creating a duplicate queue
// entry and upgrades konfidenz as source count grows.
if containsAllSources(match.Sources, m.Sources) {
summary.DedupedQueue++
continue
}
addSources, addQuellen, addContribs := newSourceDelta(match, m)
mergedSources := unionStrings(match.Sources, addSources)
newHinweis := match.Hinweis
if m.Hinweis != "" && !strings.Contains(newHinweis, m.Hinweis) {
if newHinweis != "" {
newHinweis += "; "
}
newHinweis += m.Hinweis
}
if err := s.repo.MergePendingSources(insertCtx, match.ID,
addSources, addQuellen, addContribs,
konfidenzForSources(mergedSources), newHinweis); err != nil {
slog.WarnContext(ctx, "auto-merge failed", "id", match.ID, "error", err)
continue
}
summary.AutoMerged++
continue
}
@@ -238,6 +299,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
"merged", summary.Merged,
"merged_across_sites", summary.MergedAcrossSites,
"discovered", summary.Discovered,
"auto_merged", summary.AutoMerged,
"deduped_existing", summary.DedupedExisting,
"deduped_rejected", summary.DedupedRejected,
"deduped_queue", summary.DedupedQueue,
@@ -249,23 +311,126 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
return summary, nil
}
// containsAllSources reports whether every name in incoming is already in
// existing. Used to detect the "no new source info" case — same crawler run
// re-emitting a row or a subsequent run repeating a single-source pickup.
func containsAllSources(existing, incoming []string) bool {
if len(incoming) == 0 {
return true
}
set := make(map[string]struct{}, len(existing))
for _, s := range existing {
set[s] = struct{}{}
}
for _, s := range incoming {
if _, ok := set[s]; !ok {
return false
}
}
return true
}
// unionStrings returns a de-duplicated union of two string slices, preserving
// first-seen order (a's entries before new entries from b).
func unionStrings(a, b []string) []string {
seen := make(map[string]struct{}, len(a)+len(b))
out := make([]string, 0, len(a)+len(b))
for _, s := range a {
if _, ok := seen[s]; ok {
continue
}
seen[s] = struct{}{}
out = append(out, s)
}
for _, s := range b {
if _, ok := seen[s]; ok {
continue
}
seen[s] = struct{}{}
out = append(out, s)
}
return out
}
// newSourceDelta computes the per-source data the incoming event brings that
// the existing match does not already have. Returns slices of (source names,
// quellen URLs, source contributions) to append via MergePendingSources.
// Contributions whose SourceName is already present on the match are dropped —
// the repo's merge query would otherwise keep the first copy, but sending
// them over is just wire waste.
func newSourceDelta(match *DiscoveredMarket, m crawler.MergedEvent) (
addSources []string, addQuellen []string, addContribs []SourceContribution,
) {
existingSources := make(map[string]struct{}, len(match.Sources))
for _, s := range match.Sources {
existingSources[s] = struct{}{}
}
for _, s := range m.Sources {
if _, ok := existingSources[s]; ok {
continue
}
addSources = append(addSources, s)
}
existingQuellen := make(map[string]struct{}, len(match.Quellen))
for _, q := range match.Quellen {
existingQuellen[q] = struct{}{}
}
for _, q := range m.Quellen {
if _, ok := existingQuellen[q]; ok {
continue
}
addQuellen = append(addQuellen, q)
}
incomingContribs := toContributions(m.Contributions)
for _, c := range incomingContribs {
if _, ok := existingSources[c.SourceName]; ok {
continue
}
addContribs = append(addContribs, c)
}
return addSources, addQuellen, addContribs
}
// crawlerKonfidenz derives a three-level confidence label for a merged event.
// Thin wrapper over konfidenzForSources to keep the call site compact.
func crawlerKonfidenz(m crawler.MergedEvent) string {
return konfidenzForSources(m.Sources)
}
// Source name constants (discovery-package local). Kept in sync with the
// crawler sub-package's unexported equivalents via manual copy — they're
// compared against RawEvent.SourceName / MergedEvent.Sources values that
// the crawler parsers emit.
const (
srcMittelaltermarktOnline = "mittelaltermarkt_online"
srcMarktkalendarium = "marktkalendarium"
srcMittelalterkalender = "mittelalterkalender"
srcFestivalAlarm = "festival_alarm"
srcSuendenfrei = "suendenfrei"
)
// konfidenzForSources is the canonical signal-to-label mapping. Shared
// between the initial crawler-produced insert and the auto-merge path so a
// row transitioning from 1 source to 2 sources gets re-labelled correctly.
//
// Signal: cross-source agreement is the strongest indicator — two or more
// independent calendars emitting the same (normalized name, city, start_date)
// triple is high confidence. Single-source rows fall back to source rank:
// Tribe JSON and marktkalendarium curate their data, suendenfrei's prose
// regex is brittle.
func crawlerKonfidenz(m crawler.MergedEvent) string {
if len(m.Sources) >= 2 {
func konfidenzForSources(sources []string) string {
if len(sources) >= 2 {
return KonfidenzHoch
}
if len(m.Sources) == 1 {
switch m.Sources[0] {
case "mittelaltermarkt_online", "marktkalendarium":
if len(sources) == 1 {
switch sources[0] {
case srcMittelaltermarktOnline, srcMarktkalendarium:
return KonfidenzMittel
case "mittelalterkalender", "festival_alarm":
case srcMittelalterkalender, srcFestivalAlarm:
return KonfidenzMittel
case "suendenfrei":
case srcSuendenfrei:
return KonfidenzNiedrig
}
}
@@ -449,6 +614,21 @@ func (s *Service) FindSimilarToQueueEntry(ctx context.Context, id uuid.UUID) ([]
return FindSimilar(target, candidates, 0.5), nil
}
// contributionsForEnrich maps the rich discovery-domain SourceContribution
// down to the narrow enrich.Contribution shape — keeps the enrich package
// free of a reverse import on discovery.
func contributionsForEnrich(cs []SourceContribution) []enrich.Contribution {
out := make([]enrich.Contribution, 0, len(cs))
for _, c := range cs {
out = append(out, enrich.Contribution{
PLZ: c.PLZ,
Venue: c.Venue,
Organizer: c.Organizer,
})
}
return out
}
// toContributions translates the crawler's per-source RawEvents into the
// discovery-domain SourceContribution shape for JSONB persistence.
func toContributions(raws []crawler.RawEvent) []SourceContribution {
@@ -488,3 +668,221 @@ func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UU
}
return nil
}
// RunLLMEnrichOne fills category / opening_hours / description for a single
// queue row via the injected LLMEnricher. Cache-first: if the row's
// CacheKey hits a fresh entry, we reuse it and skip the LLM call entirely.
//
// The persisted payload is a Merge(crawl, llm) — crawl-populated fields
// survive unchanged, LLM fills only the gaps. The cache stores the raw LLM
// output (not the merged result) so the same cached answer can be layered
// onto different crawl-enrich bases if the row gets re-enriched.
func (s *Service) RunLLMEnrichOne(ctx context.Context, queueID uuid.UUID) (enrich.Enrichment, error) {
if s.llmEnricher == nil {
return enrich.Enrichment{}, errors.New("llm enricher not configured")
}
row, err := s.repo.GetDiscovered(ctx, queueID)
if err != nil {
return enrich.Enrichment{}, fmt.Errorf("load row: %w", err)
}
year := 0
if row.StartDatum != nil {
year = row.StartDatum.Year()
}
cacheKey := enrich.CacheKey(row.NameNormalized, row.Stadt, year)
// Cache lookup — if we have a fresh LLM payload for this (name, city,
// year) tuple, skip the call. The merge still runs so a newly-populated
// crawl-enrich base gets its provenance preserved.
if cached, hit, err := s.repo.GetEnrichmentCache(ctx, cacheKey); err != nil {
slog.WarnContext(ctx, "enrichment cache get failed; continuing",
"cache_key", cacheKey, "error", err)
} else if hit {
merged := enrich.Merge(row.Enrichment, cached)
if err := s.repo.SetEnrichment(ctx, row.ID, merged, EnrichmentStatusDone); err != nil {
return enrich.Enrichment{}, fmt.Errorf("persist merged (cache hit): %w", err)
}
return merged, nil
}
llmReq := enrich.LLMRequest{
MarktName: row.MarktName,
Stadt: row.Stadt,
Land: row.Land,
Bundesland: row.Bundesland,
Quellen: row.Quellen,
Partial: row.Enrichment,
}
llmPayload, err := s.llmEnricher.EnrichMissing(ctx, llmReq)
if err != nil {
// Persist the failure so the operator sees the attempts counter
// tick; the merged payload stays as-is (crawl-enrich values
// preserved). Ignore persistence errors here — the original LLM
// error is what the caller needs to see.
_ = s.repo.SetEnrichment(ctx, row.ID, row.Enrichment, EnrichmentStatusFailed)
return enrich.Enrichment{}, fmt.Errorf("llm enrich: %w", err)
}
// Cache the raw LLM output (not the merged result). A later re-crawl
// might change crawl-enrich fields; the cached answer should layer on
// top of whatever the current base is.
if err := s.repo.SetEnrichmentCache(ctx, cacheKey, llmPayload, enrich.DefaultCacheTTL); err != nil {
slog.WarnContext(ctx, "enrichment cache set failed; continuing",
"cache_key", cacheKey, "error", err)
}
merged := enrich.Merge(row.Enrichment, llmPayload)
if err := s.repo.SetEnrichment(ctx, row.ID, merged, EnrichmentStatusDone); err != nil {
return enrich.Enrichment{}, fmt.Errorf("persist merged: %w", err)
}
return merged, nil
}
// ClassifySimilarPair runs the AI classifier on the two queue rows identified
// by aID and bID, returning a verdict about whether they're the same market.
// Cache-first: a content-keyed entry (enrich.SimilarityPairKey) shortcuts the
// LLM call when the same pair has been classified before.
//
// Intended for operator-triggered "AI tiebreak" on ambiguous similarity
// matches. The crawl-time auto-merge in MR 7 will call this on its own.
func (s *Service) ClassifySimilarPair(ctx context.Context, aID, bID uuid.UUID) (enrich.Verdict, error) {
if s.simClassifier == nil {
return enrich.Verdict{}, errors.New("similarity classifier not configured")
}
if aID == bID {
return enrich.Verdict{}, errors.New("cannot classify a row against itself")
}
a, err := s.repo.GetDiscovered(ctx, aID)
if err != nil {
return enrich.Verdict{}, fmt.Errorf("load row A: %w", err)
}
b, err := s.repo.GetDiscovered(ctx, bID)
if err != nil {
return enrich.Verdict{}, fmt.Errorf("load row B: %w", err)
}
rowA := rowToSimilarity(a)
rowB := rowToSimilarity(b)
pairKey := enrich.SimilarityPairKey(rowA, rowB)
if cached, hit, err := s.repo.GetSimilarityCache(ctx, pairKey); err != nil {
slog.WarnContext(ctx, "similarity cache get failed; continuing",
"pair_key", pairKey, "error", err)
} else if hit {
return cached, nil
}
verdict, err := s.simClassifier.Classify(ctx, rowA, rowB)
if err != nil {
return enrich.Verdict{}, fmt.Errorf("classify: %w", err)
}
if err := s.repo.SetSimilarityCache(ctx, pairKey, verdict, enrich.DefaultSimilarityCacheTTL); err != nil {
slog.WarnContext(ctx, "similarity cache set failed; continuing",
"pair_key", pairKey, "error", err)
}
return verdict, nil
}
// rowToSimilarity adapts a DiscoveredMarket to the narrow SimilarityRow the
// enrich package consumes. Year comes from StartDatum (0 when unknown).
func rowToSimilarity(r DiscoveredMarket) enrich.SimilarityRow {
year := 0
if r.StartDatum != nil {
year = r.StartDatum.Year()
}
return enrich.SimilarityRow{
NameNormalized: r.NameNormalized,
Stadt: r.Stadt,
Year: year,
Name: r.MarktName,
Quellen: r.Quellen,
}
}
// CrawlEnrichSummary reports the outcome of one RunCrawlEnrichAll pass.
// Mirrors CrawlSummary's shape so the admin UI can reuse its render path.
type CrawlEnrichSummary struct {
StartedAt time.Time `json:"started_at"`
DurationMs int64 `json:"duration_ms"`
Total int `json:"total"` // rows loaded for enrichment
Succeeded int `json:"succeeded"` // enrichment_status -> done
Failed int `json:"failed"` // SetEnrichment returned error
// Bounded list of (id, error) pairs for operator debugging. Empty on
// happy paths; capped so a catastrophic run doesn't balloon the summary.
Errors []CrawlEnrichError `json:"errors"`
}
// CrawlEnrichError records a single row that failed to persist enrichment.
type CrawlEnrichError struct {
QueueID uuid.UUID `json:"queue_id"`
Error string `json:"error"`
}
// runCrawlEnrichAllBatchSize caps how many rows we pull per bulk pass. Bigger
// than the usual queue depth keeps us single-pass; the Nominatim 1rps limit
// is the real throughput constraint, not batch size.
const runCrawlEnrichAllBatchSize = 2000
// runCrawlEnrichErrorCap bounds the Errors slice so a total outage doesn't
// produce a summary the admin UI can't render. Operators can check logs for
// the full set; this is just an at-a-glance list.
const runCrawlEnrichErrorCap = 50
// RunCrawlEnrichAll applies crawl-enrich (source consolidation + Nominatim
// geocoding) to every queue row with enrichment_status='pending'. Persists
// the resulting payload with status='done' on success, 'failed' on error.
//
// Already-enriched rows are skipped by the repo query — this is idempotent
// only in the sense that re-running picks up any pending rows that appeared
// since the last run. Forcing a re-enrich of already-done rows is a future
// feature; for now operators must manually reset enrichment_status.
//
// Designed to be called from a goroutine with a detached context — the
// Nominatim rate limit means a 200-row queue takes 3+ minutes and must
// outlive the originating HTTP request.
func (s *Service) RunCrawlEnrichAll(ctx context.Context) (CrawlEnrichSummary, error) {
summary := CrawlEnrichSummary{StartedAt: time.Now().UTC()}
rows, err := s.repo.ListPendingEnrichment(ctx, runCrawlEnrichAllBatchSize)
if err != nil {
return summary, fmt.Errorf("list pending enrichment: %w", err)
}
summary.Total = len(rows)
for _, row := range rows {
if err := ctx.Err(); err != nil {
// Caller cancelled — stop cleanly. Summary reflects partial
// progress; the remaining rows stay in enrichment_status='pending'
// and will be picked up by the next run.
return summary, err
}
in := enrich.Input{
Stadt: row.Stadt,
Land: row.Land,
Contributions: contributionsForEnrich(row.SourceContributions),
}
payload := enrich.CrawlEnrich(ctx, in, s.geocoder)
if err := s.repo.SetEnrichment(ctx, row.ID, payload, EnrichmentStatusDone); err != nil {
slog.WarnContext(ctx, "set enrichment failed",
"queue_id", row.ID, "error", err)
summary.Failed++
if len(summary.Errors) < runCrawlEnrichErrorCap {
summary.Errors = append(summary.Errors, CrawlEnrichError{
QueueID: row.ID,
Error: err.Error(),
})
}
continue
}
summary.Succeeded++
}
summary.DurationMs = time.Since(summary.StartedAt).Milliseconds()
slog.InfoContext(ctx, "crawl-enrich-all completed",
"total", summary.Total,
"succeeded", summary.Succeeded,
"failed", summary.Failed,
"duration_ms", summary.DurationMs)
return summary, nil
}

View File

@@ -2,6 +2,7 @@ package discovery
import (
"context"
"errors"
"testing"
"time"
@@ -9,9 +10,24 @@ import (
"github.com/jackc/pgx/v5"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/discovery/enrich"
"marktvogt.de/backend/internal/domain/market"
)
// stubGeocoder is a test-local geocoder that returns canned lat/lng.
type stubGeocoder struct {
lat, lng *float64
err error
}
func (s stubGeocoder) Geocode(_ context.Context, _, _, _, _ string) (*float64, *float64, error) {
return s.lat, s.lng, s.err
}
func ptrFloat(v float64) *float64 { return &v }
const catMittelaltermarkt = "mittelaltermarkt"
// newMockRepo returns a mockRepo with default no-op implementations and an
// inserted field that captures every InsertDiscovered call.
func newMockRepo() *mockRepo {
@@ -23,7 +39,7 @@ func newMockRepo() *mockRepo {
m.listSeriesFn = func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil }
m.editionExistsFn = func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil }
m.isRejectedFn = func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }
m.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }
m.findMatchFn = func(_ context.Context, _, _ string, _ *time.Time) (*DiscoveredMarket, error) { return nil, nil }
return m
}
@@ -131,7 +147,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, noopLinkVerifier{}, mc)
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil)
_, _, err := svc.Accept(context.Background(), qID, uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -153,7 +169,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, noopLinkVerifier{}, mc)
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil)
_, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -184,7 +200,7 @@ func TestServiceCrawlHappyPath(t *testing.T) {
PerSourceMS: map[string]int64{"marktkalendarium": 1},
},
}
svc := NewService(repo, sc, lc, noopMarketCreator{})
svc := NewService(repo, sc, lc, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -208,13 +224,25 @@ func TestServiceCrawlHappyPath(t *testing.T) {
}
}
func TestServiceCrawlDedupQueue(t *testing.T) {
func TestServiceCrawlDedupQueue_SameSourceRedundant(t *testing.T) {
// New semantic: DedupedQueue only counts when incoming source is already
// on the match (nothing new to merge). A fresh source would auto-merge
// instead — covered by TestServiceCrawlAutoMerge_NewSource.
repo := newMockRepo()
// Simulate: queue already has a matching pending row.
repo.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) {
return true, nil
}
start := mustParseDate(t, "2026-05-01")
existing := &DiscoveredMarket{
ID: uuid.New(),
Sources: []string{"marktkalendarium"},
Konfidenz: KonfidenzMittel,
}
var mergeCalls int
repo.findMatchFn = func(_ context.Context, _, _ string, _ *time.Time) (*DiscoveredMarket, error) {
return existing, nil
}
repo.mergePendingFn = func(uuid.UUID, []string, []string, []SourceContribution, string, string) error {
mergeCalls++
return nil
}
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
@@ -225,23 +253,113 @@ func TestServiceCrawlDedupQueue(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
t.Fatal(err)
}
if summary.DedupedQueue != 1 {
t.Errorf("DedupedQueue = %d; want 1", summary.DedupedQueue)
t.Errorf("DedupedQueue = %d; want 1 (same source already present)", summary.DedupedQueue)
}
if summary.AutoMerged != 0 {
t.Errorf("AutoMerged = %d; want 0", summary.AutoMerged)
}
if summary.Discovered != 0 {
t.Errorf("Discovered = %d; want 0 (dupe should block insert)", summary.Discovered)
t.Errorf("Discovered = %d; want 0", summary.Discovered)
}
if mergeCalls != 0 {
t.Errorf("MergePendingSources called %d times; want 0 (no new source info)", mergeCalls)
}
if len(repo.inserted) != 0 {
t.Errorf("inserted = %d; want 0", len(repo.inserted))
}
}
func TestServiceCrawlAutoMerge_NewSourceUpgradesKonfidenz(t *testing.T) {
// Pending row already carries source A with konfidenz=mittel. New crawl
// brings the same event from source B -> auto-merge: delta sources are
// computed, MergePendingSources is called, AutoMerged=1, konfidenz gets
// upgraded to hoch (2 sources).
repo := newMockRepo()
start := mustParseDate(t, "2026-05-01")
existing := &DiscoveredMarket{
ID: uuid.New(),
Sources: []string{"marktkalendarium"},
Quellen: []string{"https://a/"},
Konfidenz: KonfidenzMittel,
}
var merge struct {
called bool
id uuid.UUID
addSources []string
addQuellen []string
addContribs []SourceContribution
newKonfidenz string
}
repo.findMatchFn = func(_ context.Context, _, _ string, _ *time.Time) (*DiscoveredMarket, error) {
return existing, nil
}
repo.mergePendingFn = func(id uuid.UUID, addSources, addQuellen []string, addContribs []SourceContribution, newKonfidenz, _ string) error {
merge.called = true
merge.id = id
merge.addSources = addSources
merge.addQuellen = addQuellen
merge.addContribs = addContribs
merge.newKonfidenz = newKonfidenz
return nil
}
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
srcMittelaltermarktOnline: {
{SourceName: srcMittelaltermarktOnline, SourceURL: "https://b/",
Name: "X", City: "Y", StartDate: start},
},
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
t.Fatal(err)
}
if summary.AutoMerged != 1 {
t.Errorf("AutoMerged = %d; want 1", summary.AutoMerged)
}
if summary.DedupedQueue != 0 {
t.Errorf("DedupedQueue = %d; want 0 (this was a merge, not a dupe)", summary.DedupedQueue)
}
if summary.Discovered != 0 {
t.Errorf("Discovered = %d; want 0 (no new row inserted)", summary.Discovered)
}
if !merge.called {
t.Fatal("MergePendingSources not called on auto-merge path")
}
if merge.id != existing.ID {
t.Errorf("merge id = %v; want %v", merge.id, existing.ID)
}
if len(merge.addSources) != 1 || merge.addSources[0] != srcMittelaltermarktOnline {
t.Errorf("addSources = %v; want [mittelaltermarkt_online]", merge.addSources)
}
if len(merge.addQuellen) != 1 || merge.addQuellen[0] != "https://b/" {
t.Errorf("addQuellen = %v; want [https://b/]", merge.addQuellen)
}
if len(merge.addContribs) != 1 || merge.addContribs[0].SourceName != srcMittelaltermarktOnline {
t.Errorf("addContribs first SourceName = %v; want mittelaltermarkt_online",
merge.addContribs)
}
if merge.newKonfidenz != KonfidenzHoch {
t.Errorf("newKonfidenz = %q; want %q (2 sources)", merge.newKonfidenz, KonfidenzHoch)
}
if len(repo.inserted) != 0 {
t.Errorf("inserted = %d; want 0 (merge path should not insert)", len(repo.inserted))
}
}
func TestServiceCrawlDefaultsEndDate(t *testing.T) {
repo := newMockRepo()
start := mustParseDate(t, "2026-05-01")
@@ -256,7 +374,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
@@ -302,7 +420,7 @@ func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) {
},
},
}
svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
// Cancel the context BEFORE Crawl runs — simulates gateway timeout
// that fires while the handler is still mid-run.
@@ -327,7 +445,7 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) {
m := &mockRepo{
countQueueFn: func(_ context.Context, _ string) (int, error) { return 42, nil },
}
svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
rows, total, err := svc.ListPendingQueuePaged(context.Background(), QueueSortDefault, QueueOrderDefault, 50, 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -351,15 +469,15 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) {
{SourceName: "marktkalendarium", SourceURL: "https://mk/", Name: "Markt X",
City: "Dresden", PLZ: "01067", StartDate: start, Website: "https://organizer.de"},
},
"mittelaltermarkt_online": {
{SourceName: "mittelaltermarkt_online", SourceURL: "https://mo/",
srcMittelaltermarktOnline: {
{SourceName: srcMittelaltermarktOnline, SourceURL: "https://mo/",
DetailURL: "https://mo/e/1", Name: "Markt X", City: "Dresden",
PLZ: "01067", StartDate: start, Venue: "Stallhof"},
},
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
@@ -380,7 +498,7 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) {
}
// First contribution should be the highest-rank source
// (mittelaltermarkt_online = rank 1, marktkalendarium = rank 2).
if got.SourceContributions[0].SourceName != "mittelaltermarkt_online" {
if got.SourceContributions[0].SourceName != srcMittelaltermarktOnline {
t.Errorf("first contribution source = %q; want mittelaltermarkt_online (rank 1)",
got.SourceContributions[0].SourceName)
}
@@ -404,11 +522,11 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
"marktkalendarium": {{SourceName: "marktkalendarium", SourceURL: "https://a/", Name: "X", City: "Y", StartDate: start}},
"mittelaltermarkt_online": {{SourceName: "mittelaltermarkt_online", SourceURL: "https://b/", Name: "X", City: "Y", StartDate: start}},
srcMittelaltermarktOnline: {{SourceName: srcMittelaltermarktOnline, SourceURL: "https://b/", Name: "X", City: "Y", StartDate: start}},
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -424,3 +542,463 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
t.Errorf("Konfidenz = %q; want %q (2+ sources)", repo.inserted[0].Konfidenz, KonfidenzHoch)
}
}
// TestRunCrawlEnrichAll_HappyPath exercises the bulk path: three pending rows,
// each with differently-shaped contributions, get consolidated and geocoded
// into the SetEnrichment calls. Nothing queries the LLM — this is the
// crawl-only pass.
func TestRunCrawlEnrichAll_HappyPath(t *testing.T) {
id1, id2, id3 := uuid.New(), uuid.New(), uuid.New()
rows := []DiscoveredMarket{
{
ID: id1, Stadt: "Dresden", Land: "Deutschland",
SourceContributions: []SourceContribution{
{SourceName: "marktkalendarium", PLZ: "01067", Venue: "Stallhof"},
},
},
{
ID: id2, Stadt: "Leipzig", Land: "Deutschland",
SourceContributions: []SourceContribution{
{SourceName: "a", Organizer: "Verein e.V."},
{SourceName: "b", PLZ: "04109"},
},
},
{ID: id3, Stadt: "", Land: "Deutschland"}, // No stadt, no contribs — skips geocode.
}
var writes []struct {
id uuid.UUID
payload enrich.Enrichment
status string
}
repo := &mockRepo{
listPendingEnrichFn: func(limit int) ([]DiscoveredMarket, error) {
if limit <= 0 {
t.Errorf("limit must be positive, got %d", limit)
}
return rows, nil
},
setEnrichmentFn: func(id uuid.UUID, payload enrich.Enrichment, status string) error {
writes = append(writes, struct {
id uuid.UUID
payload enrich.Enrichment
status string
}{id, payload, status})
return nil
},
}
gc := stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc, nil, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if summary.Total != 3 {
t.Errorf("Total = %d; want 3", summary.Total)
}
if summary.Succeeded != 3 {
t.Errorf("Succeeded = %d; want 3", summary.Succeeded)
}
if summary.Failed != 0 {
t.Errorf("Failed = %d; want 0", summary.Failed)
}
if len(writes) != 3 {
t.Fatalf("expected 3 SetEnrichment calls, got %d", len(writes))
}
// Row 1: PLZ consolidated, geocode returned.
if writes[0].payload.PLZ != "01067" || writes[0].payload.Venue != "Stallhof" {
t.Errorf("row1 payload: %+v", writes[0].payload)
}
if writes[0].payload.Lat == nil || *writes[0].payload.Lat != 51.05 {
t.Errorf("row1 lat: %v", writes[0].payload.Lat)
}
if writes[0].payload.Sources["plz"] != enrich.ProvenanceCrawl {
t.Errorf("row1 plz provenance: %q", writes[0].payload.Sources["plz"])
}
// Row 2: first non-empty wins across contributions.
if writes[1].payload.PLZ != "04109" || writes[1].payload.Organizer != "Verein e.V." {
t.Errorf("row2 payload: %+v", writes[1].payload)
}
// Row 3: no stadt means no geocode attempt — payload has empty sources.
if writes[2].payload.Lat != nil {
t.Errorf("row3 should have no lat (no stadt), got %v", writes[2].payload.Lat)
}
// All writes mark status=done.
for i, w := range writes {
if w.status != EnrichmentStatusDone {
t.Errorf("write %d: status = %q, want %q", i, w.status, EnrichmentStatusDone)
}
}
}
// TestRunCrawlEnrichAll_SetEnrichmentFailure verifies a row with a persist
// error is counted in Failed but does not halt the pass.
func TestRunCrawlEnrichAll_SetEnrichmentFailure(t *testing.T) {
idOK, idBad := uuid.New(), uuid.New()
rows := []DiscoveredMarket{
{ID: idOK, Stadt: "Dresden", SourceContributions: []SourceContribution{{PLZ: "01067"}}},
{ID: idBad, Stadt: "Leipzig", SourceContributions: []SourceContribution{{PLZ: "04109"}}},
}
repo := &mockRepo{
listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) { return rows, nil },
setEnrichmentFn: func(id uuid.UUID, _ enrich.Enrichment, _ string) error {
if id == idBad {
return errors.New("db down")
}
return nil
},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if summary.Succeeded != 1 || summary.Failed != 1 {
t.Errorf("Succeeded=%d Failed=%d; want 1/1", summary.Succeeded, summary.Failed)
}
if len(summary.Errors) != 1 || summary.Errors[0].QueueID != idBad {
t.Errorf("expected single error pinned to idBad, got %+v", summary.Errors)
}
}
// stubLLMEnricher returns canned output or an error. Captures the request
// so tests can assert the service built the LLMRequest from the right row
// fields.
type stubLLMEnricher struct {
result enrich.Enrichment
err error
called int
lastReq enrich.LLMRequest
}
func (s *stubLLMEnricher) EnrichMissing(_ context.Context, req enrich.LLMRequest) (enrich.Enrichment, error) {
s.called++
s.lastReq = req
return s.result, s.err
}
// TestRunLLMEnrichOne_HappyPath: crawl-enrich base + LLM fills missing fields;
// merged payload persists with status=done; cache gets populated with the raw
// LLM output (not the merged result).
func TestRunLLMEnrichOne_HappyPath(t *testing.T) {
rowID := uuid.New()
start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
crawlBase := enrich.Enrichment{
PLZ: "01067",
Venue: "Stallhof",
Sources: enrich.Sources{"plz": enrich.ProvenanceCrawl, "venue": enrich.ProvenanceCrawl},
}
llmResult := enrich.Enrichment{
Category: catMittelaltermarkt,
Description: "Ein großer Markt in der Altstadt.",
Sources: enrich.Sources{"category": enrich.ProvenanceLLM, "description": enrich.ProvenanceLLM},
Model: "mistral-large-latest",
InputTokens: 500,
OutputTokens: 80,
}
var cacheSet struct {
called bool
key string
payload enrich.Enrichment
}
var persistSet struct {
called bool
id uuid.UUID
payload enrich.Enrichment
status string
}
repo := &mockRepo{
getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{
ID: rowID,
MarktName: "Mittelaltermarkt Dresden",
Stadt: "Dresden",
Land: "Deutschland",
NameNormalized: "mittelaltermarkt dresden",
StartDatum: &start,
Quellen: []string{"https://example.com/1", "https://example.com/2"},
Enrichment: crawlBase,
}, nil
},
getCacheFn: func(_ string) (enrich.Enrichment, bool, error) {
return enrich.Enrichment{}, false, nil // miss
},
setCacheFn: func(key string, payload enrich.Enrichment, _ time.Duration) error {
cacheSet.called = true
cacheSet.key = key
cacheSet.payload = payload
return nil
},
setEnrichmentFn: func(id uuid.UUID, payload enrich.Enrichment, status string) error {
persistSet.called = true
persistSet.id = id
persistSet.payload = payload
persistSet.status = status
return nil
},
}
llm := &stubLLMEnricher{result: llmResult}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil)
got, err := svc.RunLLMEnrichOne(context.Background(), rowID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if llm.called != 1 {
t.Errorf("LLM called %d times; want 1", llm.called)
}
// The enricher got the row's quellen + markt_name.
if llm.lastReq.MarktName != "Mittelaltermarkt Dresden" {
t.Errorf("LLMRequest.MarktName = %q; want Mittelaltermarkt Dresden", llm.lastReq.MarktName)
}
if len(llm.lastReq.Quellen) != 2 {
t.Errorf("LLMRequest.Quellen = %v; want 2", llm.lastReq.Quellen)
}
if llm.lastReq.Partial.PLZ != "01067" {
t.Errorf("LLMRequest.Partial.PLZ lost: %+v", llm.lastReq.Partial)
}
// Merged result: crawl fields + LLM fields, each with correct provenance.
if got.PLZ != "01067" || got.Category != catMittelaltermarkt || got.Description == "" {
t.Errorf("merged payload incomplete: %+v", got)
}
if got.Sources["plz"] != enrich.ProvenanceCrawl {
t.Errorf("plz provenance: got %q, want crawl", got.Sources["plz"])
}
if got.Sources["category"] != enrich.ProvenanceLLM {
t.Errorf("category provenance: got %q, want llm", got.Sources["category"])
}
// Cache stores the raw LLM output (not merged).
if !cacheSet.called {
t.Error("expected SetEnrichmentCache to be called")
}
if cacheSet.payload.PLZ != "" {
t.Error("cache should hold raw LLM payload; PLZ came from crawl and must not be cached")
}
if cacheSet.payload.Category != catMittelaltermarkt {
t.Errorf("cache payload missing LLM category: %+v", cacheSet.payload)
}
// Persist writes the merged payload with status=done.
if !persistSet.called || persistSet.status != EnrichmentStatusDone {
t.Errorf("persist: called=%v status=%q; want true/done", persistSet.called, persistSet.status)
}
if persistSet.payload.PLZ != "01067" || persistSet.payload.Category != catMittelaltermarkt {
t.Errorf("persisted payload not merged: %+v", persistSet.payload)
}
}
// TestRunLLMEnrichOne_CacheHitSkipsLLM: a cache hit means no LLM call, but
// the merged payload still persists (so crawl-enrich updates surface).
func TestRunLLMEnrichOne_CacheHitSkipsLLM(t *testing.T) {
rowID := uuid.New()
start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
cachedLLM := enrich.Enrichment{
Category: "weihnachtsmarkt",
Sources: enrich.Sources{"category": enrich.ProvenanceLLM},
}
repo := &mockRepo{
getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{ID: rowID, Stadt: "Dresden", NameNormalized: "foo", StartDatum: &start}, nil
},
getCacheFn: func(string) (enrich.Enrichment, bool, error) {
return cachedLLM, true, nil
},
}
llm := &stubLLMEnricher{}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil)
got, err := svc.RunLLMEnrichOne(context.Background(), rowID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if llm.called != 0 {
t.Errorf("LLM called %d times; want 0 (cache should skip it)", llm.called)
}
if got.Category != "weihnachtsmarkt" {
t.Errorf("merged category: got %q, want weihnachtsmarkt", got.Category)
}
}
// TestRunLLMEnrichOne_LLMErrorMarksFailed: when the enricher returns an error,
// the row gets marked EnrichmentStatusFailed and the error is surfaced.
func TestRunLLMEnrichOne_LLMErrorMarksFailed(t *testing.T) {
rowID := uuid.New()
var persistStatus string
repo := &mockRepo{
getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{ID: rowID}, nil
},
setEnrichmentFn: func(_ uuid.UUID, _ enrich.Enrichment, status string) error {
persistStatus = status
return nil
},
}
llm := &stubLLMEnricher{err: errors.New("mistral down")}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil)
_, err := svc.RunLLMEnrichOne(context.Background(), rowID)
if err == nil {
t.Fatal("expected error; got nil")
}
if persistStatus != EnrichmentStatusFailed {
t.Errorf("persist status = %q; want %q", persistStatus, EnrichmentStatusFailed)
}
}
// TestRunCrawlEnrichAll_EmptyQueueNoOp: nothing pending, zero summary, no writes.
func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) {
var writes int
repo := &mockRepo{
listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) {
return []DiscoveredMarket{}, nil
},
setEnrichmentFn: func(uuid.UUID, enrich.Enrichment, string) error {
writes++
return nil
},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if summary.Total != 0 || summary.Succeeded != 0 || summary.Failed != 0 {
t.Errorf("empty-queue summary should be all zeros, got %+v", summary)
}
if writes != 0 {
t.Errorf("SetEnrichment called %d times on empty queue", writes)
}
}
// stubSimilarityClassifier returns canned verdicts or errors.
type stubSimilarityClassifier struct {
result enrich.Verdict
err error
calls int
}
func (s *stubSimilarityClassifier) Classify(_ context.Context, _, _ enrich.SimilarityRow) (enrich.Verdict, error) {
s.calls++
return s.result, s.err
}
// TestClassifySimilarPair_HappyPath: cache miss → LLM → cache write → return.
func TestClassifySimilarPair_HappyPath(t *testing.T) {
aID, bID := uuid.New(), uuid.New()
start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
rowA := DiscoveredMarket{ID: aID, MarktName: "Ritterfest Dresden", Stadt: "Dresden", NameNormalized: "ritterfest dresden", StartDatum: &start}
rowB := DiscoveredMarket{ID: bID, MarktName: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", NameNormalized: "mittelaltermarkt dresden", StartDatum: &start}
var cacheSet struct {
called bool
verdict enrich.Verdict
}
repo := &mockRepo{
getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) {
if id == aID {
return rowA, nil
}
return rowB, nil
},
setSimCacheFn: func(_ string, v enrich.Verdict, _ time.Duration) error {
cacheSet.called = true
cacheSet.verdict = v
return nil
},
}
sim := &stubSimilarityClassifier{
result: enrich.Verdict{Same: false, Confidence: 0.6, Reason: "Unterschiedliche Namen."},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim)
got, err := svc.ClassifySimilarPair(context.Background(), aID, bID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if sim.calls != 1 {
t.Errorf("classifier called %d times; want 1", sim.calls)
}
if got.Same || got.Confidence != 0.6 {
t.Errorf("verdict = %+v; want same=false, confidence=0.6", got)
}
if !cacheSet.called {
t.Error("expected SetSimilarityCache to be called on a cache miss")
}
if cacheSet.verdict.Confidence != 0.6 {
t.Errorf("cached verdict lost confidence: %+v", cacheSet.verdict)
}
}
// TestClassifySimilarPair_CacheHitSkipsLLM: cache hit returns directly,
// no classifier call.
func TestClassifySimilarPair_CacheHitSkipsLLM(t *testing.T) {
aID, bID := uuid.New(), uuid.New()
cached := enrich.Verdict{Same: true, Confidence: 0.9, Reason: "same venue"}
repo := &mockRepo{
getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{}, nil
},
getSimCacheFn: func(string) (enrich.Verdict, bool, error) {
return cached, true, nil
},
}
sim := &stubSimilarityClassifier{}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim)
got, err := svc.ClassifySimilarPair(context.Background(), aID, bID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if sim.calls != 0 {
t.Errorf("classifier called %d times on cache hit; want 0", sim.calls)
}
if !got.Same || got.Confidence != 0.9 {
t.Errorf("cached verdict not returned: %+v", got)
}
}
// TestClassifySimilarPair_RejectsSelfComparison: the pair-key scheme would
// collapse (a,a) to a single key which never tells you anything useful.
func TestClassifySimilarPair_RejectsSelfComparison(t *testing.T) {
id := uuid.New()
svc := NewService(&mockRepo{}, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, &stubSimilarityClassifier{})
_, err := svc.ClassifySimilarPair(context.Background(), id, id)
if err == nil {
t.Error("expected error on self-comparison; got nil")
}
}
// TestClassifySimilarPair_LLMErrorPropagates: classifier errors surface;
// cache is not written.
func TestClassifySimilarPair_LLMErrorPropagates(t *testing.T) {
aID, bID := uuid.New(), uuid.New()
cacheWritten := false
repo := &mockRepo{
getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{}, nil
},
setSimCacheFn: func(string, enrich.Verdict, time.Duration) error {
cacheWritten = true
return nil
},
}
sim := &stubSimilarityClassifier{err: errors.New("mistral 500")}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim)
_, err := svc.ClassifySimilarPair(context.Background(), aID, bID)
if err == nil {
t.Fatal("expected error; got nil")
}
if cacheWritten {
t.Error("cache should not be written when classifier errors")
}
}

View File

@@ -0,0 +1,150 @@
// Package scrape fetches a web page and extracts its visible text for LLM
// context. Not intended for structured data extraction — use goquery directly
// when you want specific fields.
//
// The extraction strategy is deliberately simple: drop scripts/styles/nav/
// footer, walk the remaining body text, collapse runs of whitespace, truncate.
// Good enough for feeding a market-event page into a prompt; bad for
// anything that depends on document structure.
package scrape
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/PuerkitoBio/goquery"
)
// DefaultTimeout caps individual HTTP fetches.
const DefaultTimeout = 10 * time.Second
// DefaultMaxChars bounds the extracted text length. LLM prompts have a token
// budget; cutting here keeps the prompt deterministic and prevents a single
// huge page from dominating a multi-URL context. 4000 chars ≈ 1000 tokens
// assuming German-ish content.
const DefaultMaxChars = 4000
// Client wraps an *http.Client + output bound. Zero-value is usable — it
// falls back to http.DefaultClient behavior with DefaultTimeout applied via
// the per-request context.
type Client struct {
HTTP *http.Client
MaxChars int
// UserAgent, if set, overrides the default. Leaving empty lets net/http
// use its default — which some servers block; callers that scrape a lot
// of third-party pages should set a descriptive string.
UserAgent string
}
// New constructs a Client with sane defaults.
func New(userAgent string) *Client {
return &Client{
HTTP: &http.Client{
Timeout: DefaultTimeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 5 {
return http.ErrUseLastResponse
}
return nil
},
},
MaxChars: DefaultMaxChars,
UserAgent: userAgent,
}
}
// Fetch retrieves the URL and returns the visible text, truncated to MaxChars.
// Non-2xx responses and HTML parse failures return an error — caller decides
// whether to continue with other URLs or fail the whole operation.
func (c *Client) Fetch(ctx context.Context, url string) (string, error) {
client := c.HTTP
if client == nil {
client = &http.Client{Timeout: DefaultTimeout}
}
maxChars := c.MaxChars
if maxChars <= 0 {
maxChars = DefaultMaxChars
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return "", fmt.Errorf("new request %s: %w", url, err)
}
if c.UserAgent != "" {
req.Header.Set("User-Agent", c.UserAgent)
}
req.Header.Set("Accept", "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8")
req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8")
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("fetch %s: %w", url, err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", fmt.Errorf("fetch %s: status %d", url, resp.StatusCode)
}
// Cap the body read at a generous multiple of maxChars so a misbehaving
// server streaming gigabytes can't OOM us. 10x is headroom for whitespace
// stripping to still produce maxChars of useful text.
body, err := io.ReadAll(io.LimitReader(resp.Body, int64(maxChars)*10))
if err != nil {
return "", fmt.Errorf("read body %s: %w", url, err)
}
return extractText(body, maxChars)
}
// extractText walks a parsed HTML body, drops noise nodes, gathers visible
// text, collapses whitespace, and truncates. Exported as a package-level
// helper so tests can exercise the HTML→text path without a live HTTP server.
func extractText(htmlBytes []byte, maxChars int) (string, error) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(htmlBytes))
if err != nil {
return "", fmt.Errorf("parse html: %w", err)
}
// Remove noise before walking — script/style carry code, nav/footer/aside
// carry boilerplate that pollutes the LLM context.
doc.Find("script, style, nav, footer, aside, noscript, iframe").Remove()
text := strings.TrimSpace(doc.Find("body").Text())
if text == "" {
// Some sites don't use <body> explicitly (fragments, malformed docs);
// fall back to document-level text.
text = strings.TrimSpace(doc.Text())
}
text = collapseWhitespace(text)
if len(text) > maxChars {
text = text[:maxChars]
}
return text, nil
}
// collapseWhitespace replaces every run of whitespace with a single space.
// Preserves word boundaries but drops the layout that HTML text extraction
// leaves behind (tabs, long runs of newlines, non-breaking spaces).
func collapseWhitespace(s string) string {
var b strings.Builder
b.Grow(len(s))
inSpace := false
for _, r := range s {
if r == ' ' || r == '\t' || r == '\n' || r == '\r' || r == ' ' {
if !inSpace {
b.WriteByte(' ')
inSpace = true
}
continue
}
b.WriteRune(r)
inSpace = false
}
return strings.TrimSpace(b.String())
}

View File

@@ -0,0 +1,77 @@
package scrape
import (
"strings"
"testing"
)
func TestExtractText_StripsNoise(t *testing.T) {
html := []byte(`<html>
<head><style>.foo{color:red}</style><script>var x = 1;</script></head>
<body>
<nav>HOME | ABOUT | CONTACT</nav>
<main>
<h1>Mittelaltermarkt Dresden</h1>
<p>Samstag und Sonntag von 10:00 bis 18:00 Uhr.</p>
</main>
<footer>Copyright 2026</footer>
</body>
</html>`)
got, err := extractText(html, 1000)
if err != nil {
t.Fatalf("extractText: %v", err)
}
// The content we care about is present.
if !strings.Contains(got, "Mittelaltermarkt Dresden") {
t.Errorf("missing h1: %q", got)
}
if !strings.Contains(got, "10:00 bis 18:00") {
t.Errorf("missing opening hours: %q", got)
}
// Noise is gone.
if strings.Contains(got, "color:red") || strings.Contains(got, "var x = 1") {
t.Errorf("style/script leaked: %q", got)
}
if strings.Contains(got, "Copyright") || strings.Contains(got, "HOME | ABOUT") {
t.Errorf("nav/footer leaked: %q", got)
}
}
func TestExtractText_CollapsesWhitespace(t *testing.T) {
html := []byte(`<html><body><p>foo bar
baz</p></body></html>`)
got, err := extractText(html, 1000)
if err != nil {
t.Fatalf("extractText: %v", err)
}
if got != "foo bar baz" {
t.Errorf("whitespace not collapsed: %q", got)
}
}
func TestExtractText_Truncates(t *testing.T) {
// Build a long body.
body := strings.Repeat("a b c ", 2000) // ~12000 chars after collapse
html := []byte("<html><body><p>" + body + "</p></body></html>")
got, err := extractText(html, 100)
if err != nil {
t.Fatalf("extractText: %v", err)
}
if len(got) != 100 {
t.Errorf("len(got) = %d; want 100", len(got))
}
}
func TestExtractText_FallsBackToDocumentWhenNoBody(t *testing.T) {
// Document fragment without <html>/<body> tags. goquery still parses this
// but .Find("body") returns nothing; we fall back to doc-level text.
html := []byte(`<div><p>Direktes Fragment.</p></div>`)
got, err := extractText(html, 1000)
if err != nil {
t.Fatalf("extractText: %v", err)
}
if !strings.Contains(got, "Direktes Fragment") {
t.Errorf("fallback failed: %q", got)
}
}

View File

@@ -8,12 +8,14 @@ import (
"marktvogt.de/backend/internal/domain/auth"
"marktvogt.de/backend/internal/domain/discovery"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/discovery/enrich"
"marktvogt.de/backend/internal/domain/market"
"marktvogt.de/backend/internal/domain/user"
"marktvogt.de/backend/internal/middleware"
"marktvogt.de/backend/internal/pkg/ai"
"marktvogt.de/backend/internal/pkg/email"
"marktvogt.de/backend/internal/pkg/geocode"
"marktvogt.de/backend/internal/pkg/scrape"
"marktvogt.de/backend/internal/pkg/turnstile"
)
@@ -74,7 +76,16 @@ func (s *Server) registerRoutes() {
// Discovery routes
discoveryRepo := discovery.NewRepository(s.db)
crawlerInstance := crawler.NewCrawler(s.cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs())
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc)
// Per-row LLM enrichment (MR 3b). Operator-triggered only; disabled rows
// fall through via NoopLLMEnricher when AI isn't configured.
var llmEnricher enrich.LLMEnricher = enrich.NoopLLMEnricher{}
var simClassifier enrich.SimilarityClassifier = enrich.NoopSimilarityClassifier{}
if aiClient.Enabled() {
scraper := scrape.New(s.cfg.Discovery.CrawlerUserAgent)
llmEnricher = enrich.NewMistralLLMEnricher(aiClient, scraper)
simClassifier = enrich.NewMistralSimilarityClassifier(aiClient)
}
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher, simClassifier)
discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour)
requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token)
discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken)

View File

@@ -0,0 +1,9 @@
DROP INDEX IF EXISTS idx_enrichment_cache_expires_at;
DROP TABLE IF EXISTS enrichment_cache;
DROP INDEX IF EXISTS idx_discovered_markets_enrichment_pending;
ALTER TABLE discovered_markets
DROP COLUMN IF EXISTS enriched_at,
DROP COLUMN IF EXISTS enrichment_attempts,
DROP COLUMN IF EXISTS enrichment_status,
DROP COLUMN IF EXISTS enrichment;

View File

@@ -0,0 +1,28 @@
-- Enrichment payload lives in a single jsonb column. Field set is still in
-- flux; typed columns can land in a later migration once MR 5 (eval harness)
-- has guided the final shape.
ALTER TABLE discovered_markets
ADD COLUMN enrichment jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN enrichment_status text NOT NULL DEFAULT 'pending',
ADD COLUMN enrichment_attempts smallint NOT NULL DEFAULT 0,
ADD COLUMN enriched_at timestamptz;
-- Worker claim path: only pending-queue rows still awaiting enrichment.
-- Partial predicate keeps the index small as accepted/rejected rows pile up.
CREATE INDEX idx_discovered_markets_enrichment_pending
ON discovered_markets (enrichment_status)
WHERE status = 'pending';
-- Keyed cache for LLM responses. cache_key = sha256(name_normalized|stadt|year)
-- so two crawl runs for the same market hit the same entry.
CREATE TABLE enrichment_cache (
cache_key text PRIMARY KEY,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
expires_at timestamptz
);
-- Opportunistic pruning support; workers SELECT WHERE expires_at < now().
CREATE INDEX idx_enrichment_cache_expires_at
ON enrichment_cache (expires_at)
WHERE expires_at IS NOT NULL;

View File

@@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_similarity_ai_cache_expires_at;
DROP TABLE IF EXISTS similarity_ai_cache;

View File

@@ -0,0 +1,18 @@
-- Cache for LLM-based similarity verdicts. Pair key is a sha256 over the
-- content tuple (normalized_name|stadt|year for both rows, sorted) — NOT
-- over queue UUIDs, so the cache survives queue row deletion (accept/reject)
-- while still invalidating correctly when a row's identifying fields change.
CREATE TABLE similarity_ai_cache (
pair_key text PRIMARY KEY,
same bool NOT NULL,
confidence real NOT NULL,
reason text NOT NULL DEFAULT '',
model text NOT NULL DEFAULT '',
created_at timestamptz NOT NULL DEFAULT now(),
expires_at timestamptz
);
-- Opportunistic pruning; workers or manual jobs SELECT WHERE expires_at < now().
CREATE INDEX idx_similarity_ai_cache_expires_at
ON similarity_ai_cache (expires_at)
WHERE expires_at IS NOT NULL;

View File

@@ -19,6 +19,22 @@ type SourceContribution = {
organizer?: string;
};
type Enrichment = {
plz?: string;
venue?: string;
organizer?: string;
lat?: number;
lng?: number;
category?: string;
opening_hours?: string;
description?: string;
sources?: Record<string, 'crawl' | 'llm'>;
enriched_at?: string;
model?: string;
input_tokens?: number;
output_tokens?: number;
};
type DiscoveredMarket = {
id: string;
markt_name: string;
@@ -36,6 +52,10 @@ type DiscoveredMarket = {
hinweis: string;
matched_series_id: string | null;
discovered_at: string;
enrichment: Enrichment;
enrichment_status: 'pending' | 'done' | 'failed' | 'skipped';
enrichment_attempts: number;
enriched_at: string | null;
};
type BucketError = {
@@ -192,6 +212,43 @@ export const actions: Actions = {
}
},
enrichCrawl: async ({ cookies, fetch }) => {
try {
await serverFetch<unknown>(`/admin/discovery/enrichment/crawl-all`, cookies, {
method: 'POST',
fetch
});
return { enrichStarted: true };
} catch (err) {
if (err instanceof ApiClientError && err.status === 429) {
return fail(429, {
enrichError: `Ein Crawl-Enrich-Lauf läuft bereits. (${err.message})`
});
}
const message = err instanceof Error ? err.message : 'Crawl-Enrich fehlgeschlagen.';
return fail(500, { enrichError: message });
}
},
llmEnrich: async ({ request, cookies, fetch }) => {
const form = await request.formData();
const id = String(form.get('id') ?? '');
if (!id) return fail(400, { error: 'missing id' });
try {
await serverFetch<Enrichment>(`/admin/discovery/queue/${id}/enrich`, cookies, {
method: 'POST',
fetch
});
// Backend persisted the merged payload; page reload (default form
// behaviour) pulls the fresh row from the queue endpoint.
return { llmEnrichedId: id };
} catch (err) {
const message = err instanceof Error ? err.message : 'AI-Enrich fehlgeschlagen.';
return fail(500, { llmEnrichError: message, llmEnrichErrorId: id });
}
},
update: async ({ request, cookies, fetch }) => {
const form = await request.formData();
const id = String(form.get('id') ?? '');

View File

@@ -13,6 +13,7 @@
merged: number;
merged_across_sites: number;
discovered: number;
auto_merged: number;
deduped_existing: number;
deduped_rejected: number;
deduped_queue: number;
@@ -30,10 +31,36 @@
error: string;
};
type EnrichSummary = {
started_at: string;
duration_ms: number;
total: number;
succeeded: number;
failed: number;
errors: Array<{ queue_id: string; error: string }> | null;
};
type EnrichStatus = {
running: boolean;
started_at: string;
finished_at: string;
summary: EnrichSummary | null;
error: string;
};
let crawling = $state(false);
let crawlStatus = $state<CrawlStatus | null>(null);
let pollInterval: ReturnType<typeof setInterval> | null = null;
let enriching = $state(false);
let enrichStatus = $state<EnrichStatus | null>(null);
let enrichPollInterval: ReturnType<typeof setInterval> | null = null;
// Per-row LLM enrichment — set of row IDs currently waiting on the backend.
// Used to disable the AI button + show a spinner label while the sync
// request is in flight (typical duration 10-30s).
let llmEnrichingIds = $state(new Set<string>());
// Coalesce nullable list fields (Go encodes nil slices as null).
const queue = $derived(data.queue ?? []);
const recentErrors = $derived(data.stats.recent_errors ?? []);
@@ -73,6 +100,7 @@
similarOpenId = id;
similarLoading = true;
similarEntries = [];
similarVerdicts = {};
try {
const res = await fetch(`/admin/discovery/queue/${id}/similar`);
if (!res.ok) {
@@ -88,6 +116,53 @@
}
}
// Per-pair AI similarity verdict. Keyed on the candidate's queue id since
// the "anchor" row (similarOpenId) is already known from context.
type SimilarityVerdict = {
same: boolean;
confidence: number;
reason: string;
model?: string;
classified_at?: string;
};
let similarVerdicts = $state<Record<string, SimilarityVerdict>>({});
let similarClassifying = $state(new Set<string>());
async function classifySimilar(anchorId: string, candidateId: string) {
if (similarClassifying.has(candidateId)) return;
const next = new Set(similarClassifying);
next.add(candidateId);
similarClassifying = next;
try {
const res = await fetch(
`/admin/discovery/queue/${anchorId}/similar/${candidateId}/classify`,
{ method: 'POST' }
);
if (!res.ok) {
similarVerdicts = {
...similarVerdicts,
[candidateId]: { same: false, confidence: 0, reason: `HTTP ${res.status}` }
};
return;
}
const body = await res.json();
similarVerdicts = { ...similarVerdicts, [candidateId]: body.data };
} catch (err) {
similarVerdicts = {
...similarVerdicts,
[candidateId]: {
same: false,
confidence: 0,
reason: err instanceof Error ? err.message : 'Fehler'
}
};
} finally {
const afterNext = new Set(similarClassifying);
afterNext.delete(candidateId);
similarClassifying = afterNext;
}
}
// Pagination helpers.
const totalPages = $derived(Math.ceil((data.total ?? 0) / data.limit));
@@ -227,25 +302,69 @@
}, 3000);
}
// On mount: fetch current status so a page refresh picks up any in-progress
// or recently completed crawl.
async function fetchEnrichStatus(): Promise<EnrichStatus | null> {
try {
const res = await fetch('/admin/discovery/enrichment/crawl-all-status');
if (!res.ok) return null;
return (await res.json()) as EnrichStatus;
} catch {
return null;
}
}
function stopEnrichPolling() {
if (enrichPollInterval !== null) {
clearInterval(enrichPollInterval);
enrichPollInterval = null;
}
}
function startEnrichPolling() {
if (enrichPollInterval !== null) return; // already polling
enrichPollInterval = setInterval(async () => {
const status = await fetchEnrichStatus();
if (status === null) return;
enrichStatus = status;
if (!status.running) {
stopEnrichPolling();
enriching = false;
}
}, 3000);
}
// On mount: fetch both statuses so a page refresh picks up any in-progress
// or recently completed crawl / enrichment run.
onMount(async () => {
const status = await fetchCrawlStatus();
if (status === null) return;
crawlStatus = status;
if (status.running) {
crawling = true;
startPolling();
const [cs, es] = await Promise.all([fetchCrawlStatus(), fetchEnrichStatus()]);
if (cs !== null) {
crawlStatus = cs;
if (cs.running) {
crawling = true;
startPolling();
}
}
if (es !== null) {
enrichStatus = es;
if (es.running) {
enriching = true;
startEnrichPolling();
}
}
});
// When the crawl action returns crawlStarted, begin polling.
// When an action returns a *Started flag, begin the corresponding poller.
$effect(() => {
if (form?.crawlStarted) {
crawling = true;
startPolling();
}
});
$effect(() => {
if (form?.enrichStarted) {
enriching = true;
startEnrichPolling();
}
});
// Elapsed time display while running.
const elapsedLabel = $derived.by(() => {
@@ -260,6 +379,18 @@
const displayError = $derived(
crawlStatus && !crawlStatus.running && crawlStatus.error ? crawlStatus.error : null
);
// Enrichment-side display derived values — same shape as crawl for UI reuse.
const enrichElapsedLabel = $derived.by(() => {
if (!enrichStatus?.running || !enrichStatus.started_at) return '';
const ms = Date.now() - new Date(enrichStatus.started_at).getTime();
const s = Math.round(ms / 1000);
return s < 60 ? `${s}s` : `${Math.floor(s / 60)}m ${s % 60}s`;
});
const enrichDisplaySummary = $derived(enrichStatus?.summary ?? null);
const enrichDisplayError = $derived(
enrichStatus && !enrichStatus.running && enrichStatus.error ? enrichStatus.error : null
);
</script>
<svelte:head>
@@ -326,7 +457,7 @@
</details>
{/if}
<div class="mt-6 flex items-center gap-3">
<div class="mt-6 flex flex-wrap items-center gap-3">
<form
method="POST"
action="?/crawl"
@@ -349,6 +480,29 @@
{/if}
</button>
</form>
<form
method="POST"
action="?/enrichCrawl"
use:enhance={() => {
enriching = true;
return async ({ update }) => {
await update({ reset: false });
};
}}
>
<button
type="submit"
disabled={enriching}
class="rounded bg-teal-600 px-3 py-1.5 text-sm text-white hover:bg-teal-700 disabled:cursor-not-allowed disabled:opacity-60"
title="Konsolidiert Quellendaten + Geocoding für alle pending-Einträge. Kein LLM."
>
{#if enriching}
Enriching…{enrichElapsedLabel ? ` (${enrichElapsedLabel})` : ''}
{:else}
Run crawl-enrich
{/if}
</button>
</form>
</div>
{#if form?.crawlError}
@@ -382,7 +536,7 @@
</div>
<div class="mt-3 grid grid-cols-3 gap-2 sm:grid-cols-5">
{#each [['Entdeckt', s.discovered], ['Merged', s.merged], ['Merged cross-site', s.merged_across_sites], ['Dedup existing', s.deduped_existing], ['Dedup rejected', s.deduped_rejected], ['Dedup queue', s.deduped_queue], ['Link check fail', s.link_check_failed], ['Validation fail', s.validation_failed], ['Date conflicts', s.date_conflicts]] as [label, value]}
{#each [['Entdeckt', s.discovered], ['Merged', s.merged], ['Merged cross-site', s.merged_across_sites], ['Auto-merged', s.auto_merged], ['Dedup existing', s.deduped_existing], ['Dedup rejected', s.deduped_rejected], ['Dedup queue', s.deduped_queue], ['Link check fail', s.link_check_failed], ['Validation fail', s.validation_failed], ['Date conflicts', s.date_conflicts]] as [label, value]}
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
@@ -444,6 +598,81 @@
</div>
{/if}
{#if form?.enrichError}
<div
class="mt-4 rounded border border-red-300 bg-red-50 px-3 py-2 text-sm text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
>
{form.enrichError}
</div>
{/if}
{#if enrichDisplayError}
<div
class="mt-4 rounded border border-red-300 bg-red-50 px-3 py-2 text-sm text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
>
Crawl-enrich fehlgeschlagen: {enrichDisplayError}
</div>
{/if}
{#if enrichDisplaySummary}
{@const es = enrichDisplaySummary}
<div
class="mt-4 rounded-lg border border-stone-200 bg-stone-50 p-4 text-sm dark:border-stone-700 dark:bg-stone-900"
>
<div class="flex items-baseline justify-between">
<h2 class="font-semibold">Crawl-Enrich-Ergebnis</h2>
<span class="font-mono text-xs text-stone-500"
>{(es.duration_ms / 1000).toFixed(1)} s · {es.started_at
.slice(0, 16)
.replace('T', ' ')}</span
>
</div>
<div class="mt-3 grid grid-cols-3 gap-2">
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
<div class="text-[10px] text-stone-500 dark:text-stone-400">Total</div>
<div class="font-mono text-sm font-medium">{es.total}</div>
</div>
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
<div class="text-[10px] text-stone-500 dark:text-stone-400">Succeeded</div>
<div class="font-mono text-sm font-medium text-emerald-700 dark:text-emerald-300">
{es.succeeded}
</div>
</div>
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
<div class="text-[10px] text-stone-500 dark:text-stone-400">Failed</div>
<div
class="font-mono text-sm font-medium {es.failed > 0
? 'text-red-700 dark:text-red-300'
: ''}"
>
{es.failed}
</div>
</div>
</div>
{#if es.errors && es.errors.length > 0}
<details class="mt-3 text-xs">
<summary class="cursor-pointer text-red-700 dark:text-red-300"
>{es.errors.length} Fehler beim Persistieren</summary
>
<ul class="mt-2 space-y-1 font-mono">
{#each es.errors as e}
<li class="break-all">
<span class="text-stone-500">{e.queue_id.slice(0, 8)}</span>
<span class="ml-2 text-red-700 dark:text-red-300">{e.error}</span>
</li>
{/each}
</ul>
</details>
{/if}
</div>
{/if}
<h2 class="mt-8 text-lg font-semibold">Queue</h2>
<p class="mt-1 text-sm text-stone-500">
{#if (data.total ?? 0) > 0}
@@ -640,6 +869,30 @@
>
Similar
</button>
<form
method="POST"
action="?/llmEnrich"
use:enhance={() => {
llmEnrichingIds = new Set(llmEnrichingIds).add(row.id);
return async ({ update }) => {
await update({ reset: false });
const next = new Set(llmEnrichingIds);
next.delete(row.id);
llmEnrichingIds = next;
};
}}
class="inline"
>
<input type="hidden" name="id" value={row.id} />
<button
type="submit"
disabled={llmEnrichingIds.has(row.id)}
class="ml-1 rounded bg-purple-100 px-2 py-1 text-xs text-purple-700 hover:bg-purple-200 disabled:cursor-not-allowed disabled:opacity-60 dark:bg-purple-900/50 dark:text-purple-300 dark:hover:bg-purple-900"
title="Per-row LLM enrichment (scrapes Quellen, fills category/opening hours/description)"
>
{llmEnrichingIds.has(row.id) ? 'AI…' : 'AI'}
</button>
</form>
{#if (row.sources?.length ?? 0) >= 2}
<button
type="button"
@@ -678,7 +931,8 @@
<th class="pr-4 pb-1 font-medium">Markt</th>
<th class="pr-4 pb-1 font-medium">Stadt</th>
<th class="pr-4 pb-1 font-medium">Datum</th>
<th class="pb-1 font-medium">Konfidenz</th>
<th class="pr-4 pb-1 font-medium">Konfidenz</th>
<th class="pb-1 font-medium">AI-Verdict</th>
</tr>
</thead>
<tbody>
@@ -692,7 +946,7 @@
<td class="py-1 pr-4 whitespace-nowrap"
>{fmtDate(m.entry.start_datum)}</td
>
<td class="py-1">
<td class="py-1 pr-4">
<span
class="inline-block rounded px-1.5 py-0.5 {konfidenzClass(
m.entry.konfidenz
@@ -701,6 +955,30 @@
{m.entry.konfidenz || '—'}
</span>
</td>
<td class="py-1">
{#if similarVerdicts[m.entry.id]}
{@const v = similarVerdicts[m.entry.id]}
<span
class="inline-block rounded px-1.5 py-0.5 text-[10px] {v.same
? 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/50 dark:text-emerald-300'
: 'bg-stone-200 text-stone-700 dark:bg-stone-700 dark:text-stone-200'}"
title={v.reason}
>
{v.same ? '✓ same' : '✗ diff'}
{(v.confidence * 100).toFixed(0)}%
</span>
{:else}
<button
type="button"
disabled={similarClassifying.has(m.entry.id)}
onclick={() => classifySimilar(row.id, m.entry.id)}
class="rounded bg-purple-100 px-1.5 py-0.5 text-[10px] text-purple-700 hover:bg-purple-200 disabled:cursor-not-allowed disabled:opacity-60 dark:bg-purple-900/50 dark:text-purple-300 dark:hover:bg-purple-900"
title="LLM-Tiebreak: sind das derselbe Markt?"
>
{similarClassifying.has(m.entry.id) ? 'AI…' : 'AI?'}
</button>
{/if}
</td>
</tr>
{/each}
</tbody>
@@ -765,6 +1043,96 @@
</div>
<p class="mt-2 text-xs text-stone-600 dark:text-stone-300">{row.hinweis}</p>
{/if}
{#if row.enrichment_status && row.enrichment_status !== 'pending'}
<div
class="mt-4 text-xs font-medium tracking-wider text-stone-500 uppercase dark:text-stone-400"
>
Enrichment
<span
class="ml-1 inline-block rounded px-1.5 py-0.5 text-[10px] {row.enrichment_status ===
'done'
? 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/50 dark:text-emerald-300'
: row.enrichment_status === 'failed'
? 'bg-red-100 text-red-700 dark:bg-red-900/50 dark:text-red-300'
: 'bg-stone-100 text-stone-600 dark:bg-stone-800 dark:text-stone-300'}"
>
{row.enrichment_status}
</span>
</div>
<dl class="mt-2 grid grid-cols-[auto_1fr] gap-x-3 gap-y-1 text-xs">
{#if row.enrichment.plz}
<dt class="text-stone-500">PLZ</dt>
<dd>
{row.enrichment.plz}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.plz ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.venue}
<dt class="text-stone-500">Venue</dt>
<dd>
{row.enrichment.venue}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.venue ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.organizer}
<dt class="text-stone-500">Organizer</dt>
<dd>
{row.enrichment.organizer}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.organizer ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.lat != null && row.enrichment.lng != null}
<dt class="text-stone-500">Lat/Lng</dt>
<dd class="font-mono">
{row.enrichment.lat.toFixed(4)}, {row.enrichment.lng.toFixed(4)}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.lat ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.category}
<dt class="text-stone-500">Category</dt>
<dd>
{row.enrichment.category}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.category ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.opening_hours}
<dt class="text-stone-500">Öffnungszeiten</dt>
<dd>
{row.enrichment.opening_hours}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.opening_hours ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.description}
<dt class="text-stone-500">Beschreibung</dt>
<dd class="text-stone-600 dark:text-stone-300">
{row.enrichment.description}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.description ?? '—'})</span
>
</dd>
{/if}
</dl>
{#if form?.llmEnrichErrorId === row.id && form?.llmEnrichError}
<p
class="mt-2 rounded border border-red-300 bg-red-50 px-2 py-1 text-xs text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
>
{form.llmEnrichError}
</p>
{/if}
{/if}
</div>
<!-- Right: editable form -->