Mittelaltermarkt Dresden
+Samstag und Sonntag von 10:00 bis 18:00 Uhr.
+diff --git a/backend/internal/domain/discovery/enrich/cache.go b/backend/internal/domain/discovery/enrich/cache.go new file mode 100644 index 0000000..3e812b2 --- /dev/null +++ b/backend/internal/domain/discovery/enrich/cache.go @@ -0,0 +1,42 @@ +package enrich + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "time" +) + +// CacheKey derives a stable cache key from the (normalized name, city, year) +// tuple. Two rows with the same tuple share a cache entry — the LLM answer +// doesn't change when the crawler rediscovers the same market. +// +// A row without a known start date should pass year=0; those entries share a +// bucket, which is slightly lossy but acceptable — date-less markets with +// LLM-derived descriptions are rare and regenerating on demand is cheap. +func CacheKey(nameNormalized, stadt string, year int) string { + // nameNormalized is already lowercased + stripped by the caller; stadt is + // lowercased here to avoid cache misses from casing drift. + raw := fmt.Sprintf("%s|%s|%d", nameNormalized, lowerASCII(stadt), year) + sum := sha256.Sum256([]byte(raw)) + return hex.EncodeToString(sum[:]) +} + +// DefaultCacheTTL is the default lifetime for enrichment cache entries. +// 30 days: long enough to amortise cost across re-crawls, short enough that +// source edits (new organiser, changed opening hours) eventually propagate. +const DefaultCacheTTL = 30 * 24 * time.Hour + +// lowerASCII lower-cases bytes in the ASCII range. Avoids pulling in +// strings.ToLower's unicode handling so the key stays deterministic across +// locales; the German diacritics are already normalised out in +// NameNormalized, and Stadt values here are already ASCII-ish. +func lowerASCII(s string) string { + b := []byte(s) + for i, c := range b { + if c >= 'A' && c <= 'Z' { + b[i] = c + ('a' - 'A') + } + } + return string(b) +} diff --git a/backend/internal/domain/discovery/enrich/crawl.go b/backend/internal/domain/discovery/enrich/crawl.go new file mode 100644 index 0000000..a87fbac --- /dev/null +++ b/backend/internal/domain/discovery/enrich/crawl.go @@ -0,0 +1,97 @@ +package enrich + +import ( + "context" + "time" +) + +// Geocoder is the narrow interface crawl-enrich depends on for lat/lng. +// *pkg/geocode.Geocoder satisfies this; tests inject a stub. +type Geocoder interface { + Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error) +} + +// Contribution is the subset of one source's data crawl-enrich reads. Callers +// adapt their domain-specific contribution type to this before invoking +// CrawlEnrich — keeps the enrich package free of a reverse import. +type Contribution struct { + PLZ string + Venue string + Organizer string +} + +// Input is the read-only context CrawlEnrich needs: the market's city and +// country (for geocoding) plus the per-source contributions already gathered +// by the crawler. All other domain-row fields are irrelevant to the crawl pass. +type Input struct { + Stadt string + Land string + Contributions []Contribution +} + +// CrawlEnrich fills as many fields as possible from data already in hand — +// the caller's Contribution list and (optionally) a geocoder. Returns a +// fully-populated Sources map so downstream code can tell which pass +// contributed each field. +// +// No LLM, no AI, no paid APIs. Nominatim counts as crawl-enrich because the +// call is deterministic and already rate-limited by pkg/geocode. +// +// geocoder may be nil; lat/lng simply stay empty in that case. +func CrawlEnrich(ctx context.Context, in Input, geocoder Geocoder) Enrichment { + out := Enrichment{Sources: Sources{}} + + // Consolidate text fields from contributions — first non-empty wins. + // Order is deterministic because the caller's list is expected to be + // pre-sorted (crawler Merge step orders by source name). + for _, c := range in.Contributions { + if out.PLZ == "" && c.PLZ != "" { + out.PLZ = c.PLZ + out.Sources["plz"] = ProvenanceCrawl + } + if out.Venue == "" && c.Venue != "" { + out.Venue = c.Venue + out.Sources["venue"] = ProvenanceCrawl + } + if out.Organizer == "" && c.Organizer != "" { + out.Organizer = c.Organizer + out.Sources["organizer"] = ProvenanceCrawl + } + } + + // Geocode from (city, plz) — only if we have enough input to be useful. + // The plz we just consolidated is preferred. + if geocoder != nil && in.Stadt != "" { + lat, lng, err := geocoder.Geocode(ctx, "", in.Stadt, out.PLZ, landToISO(in.Land)) + // Geocoder failures are non-fatal — lat/lng stay empty, other crawl- + // enriched fields still persist. An LLM pass later cannot fill + // lat/lng either, so failures just mean that row gets no coordinates. + if err == nil && lat != nil && lng != nil { + out.Lat = lat + out.Lng = lng + out.Sources["lat"] = ProvenanceCrawl + out.Sources["lng"] = ProvenanceCrawl + } + } + + if len(out.Sources) > 0 { + now := time.Now().UTC() + out.EnrichedAt = &now + } + return out +} + +// landToISO maps the Land value to an ISO-2 country code for Nominatim's +// countrycodes parameter. Returns empty string for unknowns so the geocoder +// runs without a country filter. +func landToISO(land string) string { + switch land { + case "Deutschland": + return "de" + case "Österreich", "Oesterreich": + return "at" + case "Schweiz": + return "ch" + } + return "" +} diff --git a/backend/internal/domain/discovery/enrich/crawl_test.go b/backend/internal/domain/discovery/enrich/crawl_test.go new file mode 100644 index 0000000..77b6ea9 --- /dev/null +++ b/backend/internal/domain/discovery/enrich/crawl_test.go @@ -0,0 +1,243 @@ +package enrich + +import ( + "context" + "errors" + "testing" +) + +// stubGeocoder returns canned lat/lng. nil lat/lng simulate "not found". +type stubGeocoder struct { + lat *float64 + lng *float64 + err error + // capture last call args + lastCity, lastPLZ, lastCountry string +} + +func (s *stubGeocoder) Geocode(_ context.Context, _, city, zip, country string) (*float64, *float64, error) { + s.lastCity, s.lastPLZ, s.lastCountry = city, zip, country + return s.lat, s.lng, s.err +} + +func ptrFloat(v float64) *float64 { return &v } + +const stallhof = "Stallhof" + +func TestCrawlEnrich_EmptyInputs(t *testing.T) { + got := CrawlEnrich(context.Background(), Input{}, nil) + if got.PLZ != "" || got.Venue != "" || got.Organizer != "" { + t.Errorf("expected all fields empty, got %+v", got) + } + if len(got.Sources) != 0 { + t.Errorf("expected empty Sources on no input, got %v", got.Sources) + } + if got.EnrichedAt != nil { + t.Errorf("expected nil EnrichedAt when nothing enriched, got %v", got.EnrichedAt) + } +} + +func TestCrawlEnrich_SingleSource(t *testing.T) { + in := Input{ + Stadt: "Dresden", + Contributions: []Contribution{ + {PLZ: "01067", Venue: stallhof, Organizer: "Tourismusverband"}, + }, + } + got := CrawlEnrich(context.Background(), in, nil) + if got.PLZ != "01067" || got.Venue != stallhof || got.Organizer != "Tourismusverband" { + t.Errorf("unexpected payload: %+v", got) + } + if got.Sources["plz"] != ProvenanceCrawl { + t.Errorf("plz provenance: got %q, want %q", got.Sources["plz"], ProvenanceCrawl) + } + if got.EnrichedAt == nil { + t.Error("expected EnrichedAt to be stamped when any field filled") + } +} + +func TestCrawlEnrich_MultiSourceFirstWins(t *testing.T) { + // Two sources disagree on PLZ: first non-empty wins. Order is the + // crawler-produced contribution order (alphabetical by source name). + in := Input{ + Stadt: "Dresden", + Contributions: []Contribution{ + {PLZ: "01067"}, + {PLZ: "01099"}, + }, + } + got := CrawlEnrich(context.Background(), in, nil) + if got.PLZ != "01067" { + t.Errorf("expected first source's PLZ, got %q", got.PLZ) + } +} + +func TestCrawlEnrich_MultiSourceFillsGaps(t *testing.T) { + // Source A contributes PLZ only; source B contributes venue only; both + // end up in the payload. + in := Input{ + Stadt: "Dresden", + Contributions: []Contribution{ + {PLZ: "01067"}, + {Venue: stallhof}, + }, + } + got := CrawlEnrich(context.Background(), in, nil) + if got.PLZ != "01067" { + t.Errorf("PLZ from A missing: %q", got.PLZ) + } + if got.Venue != stallhof { + t.Errorf("Venue from B missing: %q", got.Venue) + } +} + +func TestCrawlEnrich_GeocoderCalled(t *testing.T) { + in := Input{ + Stadt: "Dresden", + Land: "Deutschland", + Contributions: []Contribution{{PLZ: "01067"}}, + } + sg := &stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)} + got := CrawlEnrich(context.Background(), in, sg) + + if got.Lat == nil || *got.Lat != 51.05 { + t.Errorf("Lat = %v; want 51.05", got.Lat) + } + if got.Lng == nil || *got.Lng != 13.74 { + t.Errorf("Lng = %v; want 13.74", got.Lng) + } + if got.Sources["lat"] != ProvenanceCrawl { + t.Errorf("lat provenance: got %q, want %q", got.Sources["lat"], ProvenanceCrawl) + } + // Land → ISO-2 mapping sanity. + if sg.lastCountry != "de" { + t.Errorf("country code: got %q, want %q", sg.lastCountry, "de") + } + if sg.lastCity != "Dresden" || sg.lastPLZ != "01067" { + t.Errorf("geocoder inputs: city=%q plz=%q; want Dresden / 01067", sg.lastCity, sg.lastPLZ) + } +} + +func TestCrawlEnrich_GeocoderFailureNonFatal(t *testing.T) { + in := Input{ + Stadt: "Dresden", + Contributions: []Contribution{ + {PLZ: "01067", Venue: stallhof}, + }, + } + sg := &stubGeocoder{err: errors.New("nominatim down")} + got := CrawlEnrich(context.Background(), in, sg) + + // Non-geocode fields must still have landed. + if got.PLZ != "01067" || got.Venue != stallhof { + t.Errorf("expected crawl-consolidated fields on geocoder failure: %+v", got) + } + if got.Lat != nil || got.Lng != nil { + t.Error("expected nil lat/lng on geocoder failure") + } + if _, hasLat := got.Sources["lat"]; hasLat { + t.Error("expected no lat provenance when geocoder failed") + } +} + +func TestCrawlEnrich_GeocoderNotCalledWithoutStadt(t *testing.T) { + in := Input{Contributions: []Contribution{{PLZ: "01067"}}} + sg := &stubGeocoder{lat: ptrFloat(1), lng: ptrFloat(1)} + got := CrawlEnrich(context.Background(), in, sg) + if sg.lastCity != "" { + t.Error("geocoder called despite empty Stadt") + } + if got.Lat != nil { + t.Error("lat populated despite geocoder being skipped") + } +} + +func TestMerge_BaseWinsOverOverlay(t *testing.T) { + // Invariant: a field set by crawl (base) is never overwritten by llm (overlay). + base := Enrichment{ + PLZ: "01067", + Venue: stallhof, + Sources: Sources{"plz": ProvenanceCrawl, "venue": ProvenanceCrawl}, + } + overlay := Enrichment{ + PLZ: "WRONG", + Venue: "WRONG", + Category: "mittelaltermarkt", + Description: "Ein großer Markt.", + InputTokens: 100, + OutputTokens: 50, + Model: "mistral-large-latest", + Sources: Sources{ + "plz": ProvenanceLLM, + "venue": ProvenanceLLM, + "category": ProvenanceLLM, + "description": ProvenanceLLM, + }, + } + got := Merge(base, overlay) + + if got.PLZ != "01067" { + t.Errorf("PLZ overwritten by overlay: got %q, want %q", got.PLZ, "01067") + } + if got.Venue != stallhof { + t.Errorf("Venue overwritten by overlay: got %q, want %q", got.Venue, stallhof) + } + if got.Category != "mittelaltermarkt" { + t.Errorf("Category should come from overlay: got %q", got.Category) + } + if got.Description != "Ein großer Markt." { + t.Errorf("Description should come from overlay: got %q", got.Description) + } + if got.Sources["plz"] != ProvenanceCrawl { + t.Errorf("plz provenance overwritten: got %q, want crawl", got.Sources["plz"]) + } + if got.Sources["category"] != ProvenanceLLM { + t.Errorf("category provenance lost: got %q, want llm", got.Sources["category"]) + } + if got.InputTokens != 100 || got.OutputTokens != 50 { + t.Errorf("token counts lost: in=%d out=%d", got.InputTokens, got.OutputTokens) + } +} + +func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) { + base := Enrichment{Sources: Sources{}} + overlay := Enrichment{ + Lat: ptrFloat(51.05), + Lng: ptrFloat(13.74), + Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl}, + } + got := Merge(base, overlay) + if got.Lat == nil || *got.Lat != 51.05 { + t.Errorf("Lat not merged from overlay") + } + + // Now base already has Lat; overlay must not overwrite. + baseWithLat := Enrichment{ + Lat: ptrFloat(50.00), + Lng: ptrFloat(10.00), + Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl}, + } + got = Merge(baseWithLat, overlay) + if *got.Lat != 50.00 { + t.Errorf("Lat overwritten despite base non-nil: got %v", *got.Lat) + } +} + +// Sanity: CacheKey must be stable across re-runs and differ across inputs. +func TestCacheKey_StableAndDistinct(t *testing.T) { + aKey := CacheKey("ritterfest dresden", "Dresden", 2026) + bKey := CacheKey("ritterfest dresden", "Dresden", 2026) + cKey := CacheKey("ritterfest leipzig", "Leipzig", 2026) + + if aKey != bKey { + t.Error("identical inputs must produce identical keys") + } + if aKey == cKey { + t.Error("different cities must produce different keys") + } + // Stadt casing drift doesn't change the key. + dKey := CacheKey("ritterfest dresden", "DRESDEN", 2026) + if aKey != dKey { + t.Error("stadt casing drift should not change key") + } +} diff --git a/backend/internal/domain/discovery/enrich/enrich.go b/backend/internal/domain/discovery/enrich/enrich.go new file mode 100644 index 0000000..f77d6f9 --- /dev/null +++ b/backend/internal/domain/discovery/enrich/enrich.go @@ -0,0 +1,116 @@ +// Package enrich derives additional market metadata in two passes. +// +// Pass A (crawl-enrich) is deterministic and free. It consolidates fields +// from per-source contributions (PLZ, venue, organizer) and geocodes city +// + PLZ via Nominatim. First non-empty value wins per field; provenance is +// recorded as "crawl". +// +// Pass B (llm-enrich) runs only for fields Pass A could not fill and does +// cost money. Provenance is recorded as "llm". +// +// Merge combines partial payloads preserving crawl-over-llm preference: a +// crawl-written field is never overwritten by an llm pass. The eval harness +// uses per-field provenance to measure the two passes separately. +// +// The package is intentionally standalone — it does NOT import the parent +// discovery package. Callers adapt domain types to the narrow Contribution +// input before invoking CrawlEnrich, which keeps the dependency edge one-way +// (discovery imports enrich; enrich imports no domain code). +package enrich + +import "time" + +// Enrichment is the JSONB payload persisted on discovered_markets.enrichment. +// Every field is optional; Sources records which pass filled each field so +// the eval harness can separate crawl-enrich (free, deterministic) from +// llm-enrich (paid, synthesised) contributions. +// +// Field shape is intentionally flat — promotion to typed columns is easier +// from a flat blob than from nested structures. +type Enrichment struct { + PLZ string `json:"plz,omitempty"` + Venue string `json:"venue,omitempty"` + Organizer string `json:"organizer,omitempty"` + Lat *float64 `json:"lat,omitempty"` + Lng *float64 `json:"lng,omitempty"` + Category string `json:"category,omitempty"` + OpeningHours string `json:"opening_hours,omitempty"` + Description string `json:"description,omitempty"` + Sources Sources `json:"sources,omitempty"` + EnrichedAt *time.Time `json:"enriched_at,omitempty"` + Model string `json:"model,omitempty"` + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` +} + +// Sources maps each enrichment field name to the pass that filled it. +// Values are Provenance constants. +type Sources map[string]string + +// Provenance constants for Sources entries. Crawl covers both offline +// consolidation of per-source contributions and Nominatim geocoding — both +// deterministic, neither counts against the LLM budget. +const ( + ProvenanceCrawl = "crawl" + ProvenanceLLM = "llm" +) + +// Merge combines two Enrichment payloads. The base is taken as authoritative; +// fields from overlay fill gaps in base but never overwrite. Provenance maps +// are merged the same way — base wins on conflict. +// +// Intended usage: base = output of crawl-enrich, overlay = output of +// llm-enrich. This enforces the "prefer crawl over llm" invariant: the LLM +// pass cannot clobber a field the crawler already filled in, even if the +// LLM is confident. +func Merge(base, overlay Enrichment) Enrichment { + out := base + + if out.PLZ == "" { + out.PLZ = overlay.PLZ + } + if out.Venue == "" { + out.Venue = overlay.Venue + } + if out.Organizer == "" { + out.Organizer = overlay.Organizer + } + if out.Lat == nil { + out.Lat = overlay.Lat + } + if out.Lng == nil { + out.Lng = overlay.Lng + } + if out.Category == "" { + out.Category = overlay.Category + } + if out.OpeningHours == "" { + out.OpeningHours = overlay.OpeningHours + } + if out.Description == "" { + out.Description = overlay.Description + } + + // Merge provenance maps with base-wins semantics. + if out.Sources == nil && len(overlay.Sources) > 0 { + out.Sources = Sources{} + } + for k, v := range overlay.Sources { + if _, exists := out.Sources[k]; !exists { + out.Sources[k] = v + } + } + + // Token counts accumulate — llm overlay carries its own usage. + out.InputTokens += overlay.InputTokens + out.OutputTokens += overlay.OutputTokens + // Model / EnrichedAt come from whichever side actually ran an LLM call; + // only overwrite if base didn't have them. + if out.Model == "" { + out.Model = overlay.Model + } + if out.EnrichedAt == nil { + out.EnrichedAt = overlay.EnrichedAt + } + return out +} diff --git a/backend/internal/domain/discovery/enrich/llm.go b/backend/internal/domain/discovery/enrich/llm.go new file mode 100644 index 0000000..ea5f290 --- /dev/null +++ b/backend/internal/domain/discovery/enrich/llm.go @@ -0,0 +1,35 @@ +package enrich + +import "context" + +// LLMRequest is the narrow context the LLM enricher needs: identifying +// fields, the quellen URLs for web-search grounding, and the crawl-enrich +// payload already computed so the LLM can be instructed not to overwrite. +type LLMRequest struct { + MarktName string + Stadt string + Land string + Bundesland string + Quellen []string + // Partial is the output of crawl-enrich for this row. The LLM + // implementation MUST NOT overwrite any field set here — Merge enforces + // the invariant regardless, but the prompt should respect it to save + // tokens. + Partial Enrichment +} + +// LLMEnricher fills enrichment fields that crawl-enrich could not reach. +// Returned payload must populate Sources with "llm" for every field it +// writes; Model and token counts are required so the eval harness can +// attribute cost. +type LLMEnricher interface { + EnrichMissing(ctx context.Context, req LLMRequest) (Enrichment, error) +} + +// NoopLLMEnricher returns the partial unchanged. Used by tests and as a +// default when AI integration is disabled. +type NoopLLMEnricher struct{} + +func (NoopLLMEnricher) EnrichMissing(_ context.Context, req LLMRequest) (Enrichment, error) { + return req.Partial, nil +} diff --git a/backend/internal/domain/discovery/enrich/mistral.go b/backend/internal/domain/discovery/enrich/mistral.go new file mode 100644 index 0000000..ec8cb9c --- /dev/null +++ b/backend/internal/domain/discovery/enrich/mistral.go @@ -0,0 +1,192 @@ +package enrich + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "strings" + "time" + + "marktvogt.de/backend/internal/pkg/ai" +) + +// maxScrapeURLs limits how many quellen we pull per enrichment call. +// Real-world queue rows rarely have more than 2-3 sources; 5 is a safe cap +// that prevents a worst-case row from burning minutes on fetches. +const maxScrapeURLs = 5 + +// ErrNoScrapedContent signals that every quellen URL failed or returned +// empty text. Callers should surface this to the operator rather than send +// a no-context LLM prompt — grounding is required for useful output. +var ErrNoScrapedContent = errors.New("no scrapeable content from any source URL") + +// Scraper is the narrow interface MistralLLMEnricher depends on. Satisfied +// by *pkg/scrape.Client; tests inject a stub. +type Scraper interface { + Fetch(ctx context.Context, url string) (string, error) +} + +// aiPass2 is the narrow interface for the AI client's Pass2 chat-completion +// method. Lets tests inject a stub that returns canned JSON without hitting +// the real Mistral API. +type aiPass2 interface { + Pass2(ctx context.Context, systemPrompt, userPrompt string) (ai.PassResult, error) +} + +// MistralLLMEnricher implements LLMEnricher by scraping quellen URLs and +// feeding the concatenated text to Mistral's chat-completion endpoint with +// a JSON response format. +type MistralLLMEnricher struct { + Client aiPass2 + Scraper Scraper +} + +// NewMistralLLMEnricher constructs an enricher bound to a Mistral ai.Client +// and a scraper. Both are required; call sites should fall back to +// NoopLLMEnricher when AI is disabled rather than passing nil here. +func NewMistralLLMEnricher(client aiPass2, scraper Scraper) *MistralLLMEnricher { + return &MistralLLMEnricher{Client: client, Scraper: scraper} +} + +// llmResponse is the JSON shape we instruct Mistral to return. Any field may +// be absent if the content doesn't support it — the enricher only writes +// what the model actually produced. +type llmResponse struct { + Category string `json:"category"` + OpeningHours string `json:"opening_hours"` + Description string `json:"description"` +} + +// EnrichMissing scrapes up to maxScrapeURLs of req.Quellen, concatenates the +// extracted text, and asks Mistral to fill category / opening_hours / +// description. Fails with ErrNoScrapedContent if zero URLs return usable +// text — empty-context LLM calls hallucinate. +func (e *MistralLLMEnricher) EnrichMissing(ctx context.Context, req LLMRequest) (Enrichment, error) { + if e.Client == nil || e.Scraper == nil { + return Enrichment{}, errors.New("mistral enricher not configured") + } + + urls := req.Quellen + if len(urls) > maxScrapeURLs { + urls = urls[:maxScrapeURLs] + } + blocks := make([]string, 0, len(urls)) + for _, u := range urls { + text, err := e.Scraper.Fetch(ctx, u) + if err != nil { + slog.InfoContext(ctx, "scrape failed; continuing", "url", u, "error", err) + continue + } + text = strings.TrimSpace(text) + if text == "" { + continue + } + blocks = append(blocks, fmt.Sprintf("=== Quelle: %s ===\n%s", u, text)) + } + if len(blocks) == 0 { + return Enrichment{}, ErrNoScrapedContent + } + + systemPrompt := buildSystemPrompt() + userPrompt := buildUserPrompt(req, blocks) + + result, err := e.Client.Pass2(ctx, systemPrompt, userPrompt) + if err != nil { + return Enrichment{}, fmt.Errorf("pass2: %w", err) + } + + var parsed llmResponse + if err := json.Unmarshal([]byte(result.Content), &parsed); err != nil { + return Enrichment{}, fmt.Errorf("parse llm response: %w (content=%q)", err, result.Content) + } + + // Build the Enrichment payload with only the fields the model produced. + // Sources entries + token counts + model tag feed the eval harness. + now := time.Now().UTC() + out := Enrichment{ + Sources: Sources{}, + Model: result.Model, + EnrichedAt: &now, + } + if result.Usage != nil { + out.InputTokens = result.Usage.PromptTokens + out.OutputTokens = result.Usage.CompletionTokens + } + if s := strings.TrimSpace(parsed.Category); s != "" { + out.Category = s + out.Sources["category"] = ProvenanceLLM + } + if s := strings.TrimSpace(parsed.OpeningHours); s != "" { + out.OpeningHours = s + out.Sources["opening_hours"] = ProvenanceLLM + } + if s := strings.TrimSpace(parsed.Description); s != "" { + out.Description = s + out.Sources["description"] = ProvenanceLLM + } + return out, nil +} + +// buildSystemPrompt returns the English instruction block. Mistral follows +// English instructions more reliably; only the *output* is German. +func buildSystemPrompt() string { + return strings.TrimSpace(` +You are enriching metadata for a medieval market (Mittelaltermarkt) in the +DACH region. Using the provided source-page excerpts as grounding, return a +JSON object with exactly these fields: + + { + "category": string, // short German label, e.g. "mittelaltermarkt", + // "weihnachtsmarkt", "ritterfest". Lowercase. + "opening_hours": string, // German; brief (e.g. "Sa-So 10:00-18:00"). + // Empty string if unclear from sources. + "description": string // 1-3 sentences in German summarising what the + // market offers. Neutral tone, no hype. + // Empty string if sources have no useful info. + } + +Rules: +- Return ONLY the JSON object. No prose, no code fences. +- If the sources do not support a field, return an empty string for it. + Do NOT invent details the sources don't mention. +- The description must be factual; avoid marketing language. +- All string values use straight ASCII quotes; escape internal quotes. +`) +} + +// buildUserPrompt assembles the per-row context + scraped source blocks. +// Keeps the row identifiers (name, city) short; most of the prompt budget +// goes to the grounding blocks. +func buildUserPrompt(req LLMRequest, blocks []string) string { + var b strings.Builder + fmt.Fprintf(&b, "Markt: %s\n", req.MarktName) + fmt.Fprintf(&b, "Stadt: %s\n", req.Stadt) + if req.Bundesland != "" { + fmt.Fprintf(&b, "Bundesland: %s\n", req.Bundesland) + } + if req.Land != "" { + fmt.Fprintf(&b, "Land: %s\n", req.Land) + } + // Tell the LLM what crawl-enrich already covered so it doesn't spend + // tokens re-deriving fields we'd discard via Merge anyway. + if req.Partial.PLZ != "" || req.Partial.Venue != "" || req.Partial.Organizer != "" { + b.WriteString("\nBereits bekannt (nicht erneut liefern):\n") + if req.Partial.PLZ != "" { + fmt.Fprintf(&b, "- PLZ: %s\n", req.Partial.PLZ) + } + if req.Partial.Venue != "" { + fmt.Fprintf(&b, "- Venue: %s\n", req.Partial.Venue) + } + if req.Partial.Organizer != "" { + fmt.Fprintf(&b, "- Organizer: %s\n", req.Partial.Organizer) + } + } + b.WriteString("\n") + for _, blk := range blocks { + b.WriteString(blk) + b.WriteString("\n\n") + } + return strings.TrimSpace(b.String()) +} diff --git a/backend/internal/domain/discovery/enrich/mistral_test.go b/backend/internal/domain/discovery/enrich/mistral_test.go new file mode 100644 index 0000000..075167f --- /dev/null +++ b/backend/internal/domain/discovery/enrich/mistral_test.go @@ -0,0 +1,206 @@ +package enrich + +import ( + "context" + "errors" + "strings" + "testing" + + "marktvogt.de/backend/internal/pkg/ai" +) + +const catMittelaltermarkt = "mittelaltermarkt" + +// stubScraper returns canned responses per URL; an empty string simulates +// a blocked / empty page. +type stubScraper struct { + responses map[string]string + errs map[string]error +} + +func (s *stubScraper) Fetch(_ context.Context, url string) (string, error) { + if err, ok := s.errs[url]; ok { + return "", err + } + return s.responses[url], nil +} + +// stubPass2 captures the prompts it received and returns a canned JSON body. +type stubPass2 struct { + lastSystem string + lastUser string + result ai.PassResult + err error +} + +func (s *stubPass2) Pass2(_ context.Context, systemPrompt, userPrompt string) (ai.PassResult, error) { + s.lastSystem = systemPrompt + s.lastUser = userPrompt + return s.result, s.err +} + +func TestMistralEnrich_HappyPath(t *testing.T) { + scraper := &stubScraper{responses: map[string]string{ + "https://a.example/markt": "Ein Mittelaltermarkt mit Ritterspielen und Markttreiben.", + "https://b.example/info": "Sa-So jeweils 10-18 Uhr.", + }} + client := &stubPass2{ + result: ai.PassResult{ + Content: `{"category":"mittelaltermarkt","opening_hours":"Sa-So 10:00-18:00","description":"Ein Markt mit Ritterspielen."}`, + Model: "mistral-large-latest", + Usage: &ai.UsageInfo{PromptTokens: 450, CompletionTokens: 60}, + }, + } + e := NewMistralLLMEnricher(client, scraper) + + req := LLMRequest{ + MarktName: "Mittelaltermarkt Dresden", + Stadt: "Dresden", + Land: "Deutschland", + Quellen: []string{"https://a.example/markt", "https://b.example/info"}, + Partial: Enrichment{PLZ: "01067", Sources: Sources{"plz": ProvenanceCrawl}}, + } + got, err := e.EnrichMissing(context.Background(), req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Result carries LLM fields, provenance llm, model + token counts. + if got.Category != catMittelaltermarkt || got.Description == "" || got.OpeningHours == "" { + t.Errorf("missing fields in result: %+v", got) + } + if got.Sources["category"] != ProvenanceLLM { + t.Errorf("category provenance: got %q, want llm", got.Sources["category"]) + } + if got.Model != "mistral-large-latest" { + t.Errorf("model: got %q, want mistral-large-latest", got.Model) + } + if got.InputTokens != 450 || got.OutputTokens != 60 { + t.Errorf("token counts: in=%d out=%d", got.InputTokens, got.OutputTokens) + } + + // Prompt inspection — verify grounding blocks include the scraped content + // and the already-known fields are listed so LLM doesn't redo them. + if !strings.Contains(client.lastUser, "Mittelaltermarkt Dresden") { + t.Error("user prompt missing markt name") + } + if !strings.Contains(client.lastUser, "Ein Mittelaltermarkt mit Ritterspielen") { + t.Error("user prompt missing scraped content from URL 1") + } + if !strings.Contains(client.lastUser, "Sa-So jeweils 10-18 Uhr") { + t.Error("user prompt missing scraped content from URL 2") + } + if !strings.Contains(client.lastUser, "Bereits bekannt") { + t.Error("user prompt should announce already-known fields when Partial is populated") + } + if !strings.Contains(client.lastUser, "PLZ: 01067") { + t.Error("user prompt missing already-known PLZ") + } + // System prompt asks for JSON only. + if !strings.Contains(client.lastSystem, "JSON") { + t.Error("system prompt should mention JSON") + } +} + +func TestMistralEnrich_AllScrapesFail(t *testing.T) { + scraper := &stubScraper{errs: map[string]error{ + "https://a.example": errors.New("timeout"), + "https://b.example": errors.New("404"), + }} + client := &stubPass2{} // must not be called + e := NewMistralLLMEnricher(client, scraper) + + req := LLMRequest{ + Quellen: []string{"https://a.example", "https://b.example"}, + } + _, err := e.EnrichMissing(context.Background(), req) + if !errors.Is(err, ErrNoScrapedContent) { + t.Errorf("err = %v; want ErrNoScrapedContent", err) + } + if client.lastUser != "" { + t.Error("LLM must not be called when zero URLs scrape") + } +} + +func TestMistralEnrich_SomeScrapesFailStillCallsLLM(t *testing.T) { + // One URL fails, one succeeds — the LLM should still run with partial + // context, not fail because of the one bad source. + scraper := &stubScraper{ + responses: map[string]string{"https://ok.example": "Useful content."}, + errs: map[string]error{"https://bad.example": errors.New("timeout")}, + } + client := &stubPass2{ + result: ai.PassResult{Content: `{"category":"mittelaltermarkt","opening_hours":"","description":""}`}, + } + e := NewMistralLLMEnricher(client, scraper) + + req := LLMRequest{Quellen: []string{"https://bad.example", "https://ok.example"}} + got, err := e.EnrichMissing(context.Background(), req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got.Category != catMittelaltermarkt { + t.Errorf("category: got %q", got.Category) + } + if !strings.Contains(client.lastUser, "Useful content") { + t.Error("user prompt missing successful scrape") + } +} + +func TestMistralEnrich_EmptyFieldsNoProvenance(t *testing.T) { + // LLM returns empty strings for fields it can't support. Those fields + // must NOT appear in Sources — an empty provenance is misleading. + scraper := &stubScraper{responses: map[string]string{"https://a.example": "Content."}} + client := &stubPass2{ + result: ai.PassResult{Content: `{"category":"mittelaltermarkt","opening_hours":"","description":""}`}, + } + e := NewMistralLLMEnricher(client, scraper) + + got, err := e.EnrichMissing(context.Background(), LLMRequest{Quellen: []string{"https://a.example"}}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got.Sources["category"] != ProvenanceLLM { + t.Errorf("category should have llm provenance") + } + if _, has := got.Sources["opening_hours"]; has { + t.Error("empty opening_hours must not carry provenance") + } + if _, has := got.Sources["description"]; has { + t.Error("empty description must not carry provenance") + } +} + +func TestMistralEnrich_CapsURLsAtFive(t *testing.T) { + // Supply 7 URLs; only the first 5 should be fetched. + urls := []string{"u1", "u2", "u3", "u4", "u5", "u6", "u7"} + responses := map[string]string{} + for _, u := range urls { + responses[u] = "content" + } + fetched := map[string]bool{} + scraper := &trackingScraper{fetch: func(u string) (string, error) { + fetched[u] = true + return responses[u], nil + }} + client := &stubPass2{result: ai.PassResult{Content: `{"category":"x","opening_hours":"","description":""}`}} + e := NewMistralLLMEnricher(client, scraper) + + _, _ = e.EnrichMissing(context.Background(), LLMRequest{Quellen: urls}) + if len(fetched) != 5 { + t.Errorf("fetched %d URLs; want 5", len(fetched)) + } + for _, u := range []string{"u6", "u7"} { + if fetched[u] { + t.Errorf("URL %s should have been skipped past the cap", u) + } + } +} + +type trackingScraper struct { + fetch func(string) (string, error) +} + +func (t *trackingScraper) Fetch(_ context.Context, url string) (string, error) { + return t.fetch(url) +} diff --git a/backend/internal/domain/discovery/enrich/similarity.go b/backend/internal/domain/discovery/enrich/similarity.go new file mode 100644 index 0000000..cd58e1d --- /dev/null +++ b/backend/internal/domain/discovery/enrich/similarity.go @@ -0,0 +1,181 @@ +package enrich + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "strings" + "time" +) + +// SimilarityRow carries the minimal identifying fields the classifier reads. +// Callers adapt their domain row into this shape — the enrich package stays +// free of a reverse import on discovery. +type SimilarityRow struct { + NameNormalized string + Stadt string + // Year is StartDatum.Year() (or 0 when unknown). Two rows differing only + // in year are almost always the same series in different editions — + // useful signal but not identity. + Year int + // Display fields — shown to the LLM verbatim so it can reason about + // casing / diacritics that NameNormalized strips. + Name string + // Quellen are passed in but not scraped — the classifier works from + // name/city/date alone. Included in the prompt only as metadata. + Quellen []string +} + +// Verdict is the classifier's answer about one pair. +type Verdict struct { + Same bool `json:"same"` + Confidence float64 `json:"confidence"` // 0..1 + Reason string `json:"reason"` + Model string `json:"model,omitempty"` + ClassifiedAt time.Time `json:"classified_at,omitempty"` +} + +// SimilarityClassifier decides whether two queue rows refer to the same +// underlying market. Implementations should be deterministic across calls +// on the same input — callers cache the result. +type SimilarityClassifier interface { + Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error) +} + +// NoopSimilarityClassifier returns a zero-confidence verdict without calling +// anything. Used as a fallback when AI is disabled; callers should check +// ai.Enabled() and fall back to this type rather than passing nil. +type NoopSimilarityClassifier struct{} + +func (NoopSimilarityClassifier) Classify(_ context.Context, _, _ SimilarityRow) (Verdict, error) { + return Verdict{}, errors.New("similarity classifier not configured") +} + +// SimilarityPairKey derives a content-hash cache key over the two rows' +// identifying tuples. Ordering-independent: Classify(a,b) and Classify(b,a) +// must hit the same cache entry. +// +// Uses NameNormalized + lowered Stadt + Year so the key survives casing +// drift, whitespace, and umlaut normalisation (NameNormalized already did +// the heavy lifting). +func SimilarityPairKey(a, b SimilarityRow) string { + keyA := fmt.Sprintf("%s|%s|%d", a.NameNormalized, lowerASCII(a.Stadt), a.Year) + keyB := fmt.Sprintf("%s|%s|%d", b.NameNormalized, lowerASCII(b.Stadt), b.Year) + // Sort lexicographically for symmetry. + var raw string + if keyA <= keyB { + raw = keyA + "||" + keyB + } else { + raw = keyB + "||" + keyA + } + sum := sha256.Sum256([]byte(raw)) + return hex.EncodeToString(sum[:]) +} + +// DefaultSimilarityCacheTTL: 30 days matches the enrichment cache TTL — +// same reasoning (amortise cost across re-crawls, source edits eventually +// propagate). +const DefaultSimilarityCacheTTL = 30 * 24 * time.Hour + +// MistralSimilarityClassifier implements SimilarityClassifier by sending a +// JSON-formatted comparison prompt to Mistral's chat endpoint. +type MistralSimilarityClassifier struct { + Client aiPass2 +} + +// NewMistralSimilarityClassifier binds a Mistral ai.Client. client must be +// non-nil; routes.go falls back to NoopSimilarityClassifier when AI is off. +func NewMistralSimilarityClassifier(client aiPass2) *MistralSimilarityClassifier { + return &MistralSimilarityClassifier{Client: client} +} + +// simResponse is the JSON shape we instruct Mistral to return. Confidence +// must be parseable as a float 0..1; anything outside that range is clamped. +type simResponse struct { + SameMarket bool `json:"same_market"` + Confidence float64 `json:"confidence"` + Reason string `json:"reason"` +} + +// Classify sends the paired metadata to Mistral and parses the JSON response. +// No web scraping — the classifier works from name/city/year alone, which is +// enough for the common cases (same venue listed on two different calendars, +// editing typos, cross-year recurrence). +func (m *MistralSimilarityClassifier) Classify(ctx context.Context, a, b SimilarityRow) (Verdict, error) { + if m.Client == nil { + return Verdict{}, errors.New("mistral similarity classifier not configured") + } + systemPrompt := simSystemPrompt() + userPrompt := simUserPrompt(a, b) + + result, err := m.Client.Pass2(ctx, systemPrompt, userPrompt) + if err != nil { + return Verdict{}, fmt.Errorf("pass2: %w", err) + } + + var parsed simResponse + if err := json.Unmarshal([]byte(result.Content), &parsed); err != nil { + return Verdict{}, fmt.Errorf("parse response: %w (content=%q)", err, result.Content) + } + + // Clamp confidence to [0,1]; the model occasionally returns 1.2 or -0.1. + conf := parsed.Confidence + if conf < 0 { + conf = 0 + } + if conf > 1 { + conf = 1 + } + + return Verdict{ + Same: parsed.SameMarket, + Confidence: conf, + Reason: strings.TrimSpace(parsed.Reason), + Model: result.Model, + ClassifiedAt: time.Now().UTC(), + }, nil +} + +func simSystemPrompt() string { + return strings.TrimSpace(` +You decide whether two candidate entries refer to the same medieval market +(Mittelaltermarkt) in the DACH region. Input: two objects each with a name, +city, and year. Output a single JSON object: + + { + "same_market": true|false, + "confidence": 0.0-1.0, // how sure you are + "reason": "..." // short German justification (<= 140 chars) + } + +Rules: +- Return ONLY the JSON object. No prose, no code fences. +- "Same market" means same recurring event — same venue, same organiser, + same audience. A market and its anniversary edition in a later year ARE + the same market (just different editions). +- Different cities = different markets, even if the name matches. +- Rephrasings, typos, and umlaut differences (Dresden vs Straßburg vs + Strassburg) are the same market if the underlying identifiers align. +- If the evidence is weak, return same_market=false with low confidence + rather than guessing. Low confidence is more useful than a wrong guess. +`) +} + +func simUserPrompt(a, b SimilarityRow) string { + // Keep the JSON compact; the model handles inline JSON better than + // pretty-printed when the task is "read two records". + ja, _ := json.Marshal(map[string]any{ + "name": a.Name, + "city": a.Stadt, + "year": a.Year, + }) + jb, _ := json.Marshal(map[string]any{ + "name": b.Name, + "city": b.Stadt, + "year": b.Year, + }) + return fmt.Sprintf("A: %s\nB: %s", ja, jb) +} diff --git a/backend/internal/domain/discovery/enrich/similarity_test.go b/backend/internal/domain/discovery/enrich/similarity_test.go new file mode 100644 index 0000000..ff374be --- /dev/null +++ b/backend/internal/domain/discovery/enrich/similarity_test.go @@ -0,0 +1,118 @@ +package enrich + +import ( + "context" + "errors" + "strings" + "testing" + + "marktvogt.de/backend/internal/pkg/ai" +) + +func TestSimilarityPairKey_Symmetric(t *testing.T) { + a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026} + b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026} + + if SimilarityPairKey(a, b) != SimilarityPairKey(b, a) { + t.Error("pair key must be symmetric: (a,b) and (b,a) should produce identical keys") + } +} + +func TestSimilarityPairKey_DifferentInputsDifferentKeys(t *testing.T) { + a := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2026} + b := SimilarityRow{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", Year: 2026} + c := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "Dresden", Year: 2027} + + if SimilarityPairKey(a, b) == SimilarityPairKey(a, c) { + t.Error("different pairs must produce different keys") + } + // Stadt casing must not change the key. + d := SimilarityRow{NameNormalized: "ritterfest dresden", Stadt: "DRESDEN", Year: 2026} + if SimilarityPairKey(a, b) != SimilarityPairKey(d, b) { + t.Error("stadt casing drift must not change the key") + } +} + +func TestMistralSimilarity_HappyPath(t *testing.T) { + client := &stubPass2{ + result: ai.PassResult{ + Content: `{"same_market":true,"confidence":0.82,"reason":"Gleicher Name, gleiche Stadt, gleiches Jahr."}`, + Model: "mistral-large-latest", + }, + } + c := NewMistralSimilarityClassifier(client) + + got, err := c.Classify(context.Background(), + SimilarityRow{Name: "Mittelaltermarkt Dresden", Stadt: "Dresden", Year: 2026}, + SimilarityRow{Name: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", Year: 2026}, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !got.Same { + t.Errorf("same = false; want true") + } + if got.Confidence < 0.8 || got.Confidence > 0.85 { + t.Errorf("confidence = %v; want ~0.82", got.Confidence) + } + if got.Reason == "" { + t.Error("reason missing") + } + if got.Model != "mistral-large-latest" { + t.Errorf("model = %q", got.Model) + } + // Prompt must carry both rows' identifying fields for the LLM to reason on. + if !strings.Contains(client.lastUser, "Mittelaltermarkt Dresden") { + t.Error("user prompt missing A.name") + } + if !strings.Contains(client.lastSystem, "same_market") { + t.Error("system prompt should describe the JSON schema (same_market key)") + } +} + +func TestMistralSimilarity_ClampsConfidence(t *testing.T) { + tests := []struct { + name string + raw string + wantConf float64 + }{ + {"above 1 clamps to 1", `{"same_market":true,"confidence":1.4,"reason":"x"}`, 1.0}, + {"below 0 clamps to 0", `{"same_market":false,"confidence":-0.3,"reason":"x"}`, 0.0}, + {"in range passes through", `{"same_market":true,"confidence":0.5,"reason":"x"}`, 0.5}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: tc.raw}}) + got, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got.Confidence != tc.wantConf { + t.Errorf("confidence = %v; want %v", got.Confidence, tc.wantConf) + } + }) + } +} + +func TestMistralSimilarity_PropagatesPass2Error(t *testing.T) { + c := NewMistralSimilarityClassifier(&stubPass2{err: errors.New("mistral down")}) + _, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err == nil { + t.Fatal("expected error; got nil") + } +} + +func TestMistralSimilarity_RejectsBadJSON(t *testing.T) { + c := NewMistralSimilarityClassifier(&stubPass2{result: ai.PassResult{Content: "not json at all"}}) + _, err := c.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err == nil { + t.Fatal("expected parse error; got nil") + } +} + +func TestNoopSimilarityClassifier_Errors(t *testing.T) { + _, err := NoopSimilarityClassifier{}.Classify(context.Background(), SimilarityRow{}, SimilarityRow{}) + if err == nil { + t.Error("NoopSimilarityClassifier should return error — it's the fallback when AI is disabled") + } +} diff --git a/backend/internal/domain/discovery/handler.go b/backend/internal/domain/discovery/handler.go index 883be5c..2b14b7f 100644 --- a/backend/internal/domain/discovery/handler.go +++ b/backend/internal/domain/discovery/handler.go @@ -32,6 +32,18 @@ type Handler struct { lastFinishedAt time.Time lastSummary *CrawlSummary lastError string + + // Async crawl-enrich-all state (manual bulk operator action). + // Reuses the CAS-gate + mutex pattern of the crawl flow above so the + // admin UI can poll a dedicated status endpoint without colliding with a + // running crawl. + enrichRunning atomic.Bool + enrichMu sync.RWMutex + // Guarded by enrichMu: + enrichStartedAt time.Time + enrichFinishedAt time.Time + enrichSummary *CrawlEnrichSummary + enrichError string } // NewHandler constructs a Handler. manualRateLimitPerHour controls how @@ -314,6 +326,125 @@ func (h *Handler) Similar(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"data": matches}) } +// RunCrawlEnrichAll kicks off the manual bulk crawl-enrich pass in the +// background and returns 202 immediately. Exactly one run may be in progress +// at a time (CAS gate). Completion is observable via +// GET /admin/discovery/enrichment/crawl-all-status. +func (h *Handler) RunCrawlEnrichAll(c *gin.Context) { + if !h.enrichRunning.CompareAndSwap(false, true) { + apiErr := apierror.TooManyRequests("A crawl-enrich run is already in progress") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + + h.enrichMu.Lock() + h.enrichStartedAt = time.Now().UTC() + h.enrichFinishedAt = time.Time{} + h.enrichSummary = nil + h.enrichError = "" + h.enrichMu.Unlock() + + go h.runEnrichAsync() + + c.JSON(http.StatusAccepted, gin.H{ + "data": gin.H{ + "status": "started", + "message": "Crawl-enrich started in background. Poll /api/v1/admin/discovery/enrichment/crawl-all-status for completion.", + }, + }) +} + +// runEnrichAsync runs RunCrawlEnrichAll with a detached context. 10m cap is +// generous for Nominatim's 1rps: a 600-row queue is the worst case we expect. +func (h *Handler) runEnrichAsync() { + defer h.enrichRunning.Store(false) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + summary, err := h.service.RunCrawlEnrichAll(ctx) + + h.enrichMu.Lock() + h.enrichFinishedAt = time.Now().UTC() + if err != nil { + h.enrichError = err.Error() + slog.ErrorContext(ctx, "async crawl-enrich failed", "error", err) + } else { + sCopy := summary + h.enrichSummary = &sCopy + } + h.enrichMu.Unlock() +} + +// CrawlEnrichStatus returns the state of the most recent async crawl-enrich. +// Shape mirrors CrawlStatus for UI reuse. +func (h *Handler) CrawlEnrichStatus(c *gin.Context) { + h.enrichMu.RLock() + defer h.enrichMu.RUnlock() + c.JSON(http.StatusOK, gin.H{ + "data": gin.H{ + "running": h.enrichRunning.Load(), + "started_at": h.enrichStartedAt, + "finished_at": h.enrichFinishedAt, + "summary": h.enrichSummary, + "error": h.enrichError, + }, + }) +} + +// EnrichLLM runs per-row LLM enrichment synchronously. 30s deadline is +// enough for scraping 5 URLs + one Mistral Pass2 call in typical conditions. +// Operator clicks the button, waits, sees the result — no polling. +func (h *Handler) EnrichLLM(c *gin.Context) { + id, err := uuid.Parse(c.Param("id")) + if err != nil { + apiErr := apierror.BadRequest("invalid_id", "invalid queue id") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second) + defer cancel() + + payload, err := h.service.RunLLMEnrichOne(ctx, id) + if err != nil { + slog.WarnContext(ctx, "llm enrich failed", "queue_id", id, "error", err) + apiErr := apierror.Internal("llm enrich failed: " + err.Error()) + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.JSON(http.StatusOK, gin.H{"data": payload}) +} + +// ClassifySimilarPair runs the LLM duplicate-tiebreaker on the two queue +// rows identified by URL params :aid and :bid. Synchronous, 15s deadline — +// the call is short (no scraping) so the operator can click and immediately +// see the verdict. +func (h *Handler) ClassifySimilarPair(c *gin.Context) { + aID, err := uuid.Parse(c.Param("aid")) + if err != nil { + apiErr := apierror.BadRequest("invalid_id", "invalid queue id A") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + bID, err := uuid.Parse(c.Param("bid")) + if err != nil { + apiErr := apierror.BadRequest("invalid_id", "invalid queue id B") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + ctx, cancel := context.WithTimeout(c.Request.Context(), 15*time.Second) + defer cancel() + + verdict, err := h.service.ClassifySimilarPair(ctx, aID, bID) + if err != nil { + slog.WarnContext(ctx, "classify similar failed", "a", aID, "b", bID, "error", err) + apiErr := apierror.Internal("classify failed: " + err.Error()) + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + c.JSON(http.StatusOK, gin.H{"data": verdict}) +} + func currentUserID(c *gin.Context) (uuid.UUID, bool) { raw, exists := c.Get("user_id") if !exists { diff --git a/backend/internal/domain/discovery/handler_test.go b/backend/internal/domain/discovery/handler_test.go index 4a87ce2..77a797b 100644 --- a/backend/internal/domain/discovery/handler_test.go +++ b/backend/internal/domain/discovery/handler_test.go @@ -55,6 +55,9 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) { &stubCrawlerRunner{result: crawler.CrawlResult{}}, noopLinkVerifier{}, noopMarketCreator{}, + nil, + nil, + nil, ) h := NewHandler(svc, 0) // rate limit disabled @@ -89,7 +92,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) { // TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler. func TestCrawlStatusInitialState(t *testing.T) { - svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) h := NewHandler(svc, 0) w := httptest.NewRecorder() @@ -139,7 +142,7 @@ func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) { started: make(chan struct{}), release: make(chan struct{}), } - svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) h := NewHandler(svc, 0) // rate limit disabled // First request — returns 202 and spawns goroutine. @@ -194,7 +197,7 @@ func TestListQueueSortParamWhitelist(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { repo := newMockRepo() - svc := NewService(repo, &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) h := NewHandler(svc, 0) w := httptest.NewRecorder() @@ -252,6 +255,9 @@ func TestCrawlHandlerRateLimit(t *testing.T) { &stubCrawlerRunner{result: crawler.CrawlResult{}}, noopLinkVerifier{}, noopMarketCreator{}, + nil, + nil, + nil, ) // 1 per hour window. h := NewHandler(svc, 1) diff --git a/backend/internal/domain/discovery/mock_repo_test.go b/backend/internal/domain/discovery/mock_repo_test.go index ec571b5..7d72ca1 100644 --- a/backend/internal/domain/discovery/mock_repo_test.go +++ b/backend/internal/domain/discovery/mock_repo_test.go @@ -6,6 +6,8 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" + + "marktvogt.de/backend/internal/domain/discovery/enrich" ) // Compile-time guard: mockRepo must fully satisfy Repository. @@ -16,7 +18,8 @@ type mockRepo struct { editionExistsFn func(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) isRejectedFn func(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) - queuePendingFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) + findMatchFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (*DiscoveredMarket, error) + mergePendingFn func(id uuid.UUID, addSources, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error countQueueFn func(ctx context.Context, status string) (int, error) getDiscoveredFn func(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) beginTxFn func(ctx context.Context) (pgx.Tx, error) @@ -32,6 +35,17 @@ type mockRepo struct { listQueueCalls []listQueueCall listQueueRows []DiscoveredMarket listQueueErr error + + // Enrichment hooks (optional). Tests set whichever they need; nil falls + // through to a no-op / empty response. + setEnrichmentFn func(id uuid.UUID, payload enrich.Enrichment, status string) error + getCacheFn func(key string) (enrich.Enrichment, bool, error) + setCacheFn func(key string, payload enrich.Enrichment, ttl time.Duration) error + listPendingEnrichFn func(limit int) ([]DiscoveredMarket, error) + + // Similarity AI cache hooks. + getSimCacheFn func(pairKey string) (enrich.Verdict, bool, error) + setSimCacheFn func(pairKey string, v enrich.Verdict, ttl time.Duration) error } // listQueueCall records arguments passed to mockRepo.ListQueue so tests can @@ -53,8 +67,17 @@ func (m *mockRepo) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uu func (m *mockRepo) IsRejected(ctx context.Context, n, s string, y int) (bool, error) { return m.isRejectedFn(ctx, n, s, y) } -func (m *mockRepo) QueueHasPending(ctx context.Context, n, s string, sd *time.Time) (bool, error) { - return m.queuePendingFn(ctx, n, s, sd) +func (m *mockRepo) FindPendingMatch(ctx context.Context, n, s string, sd *time.Time) (*DiscoveredMarket, error) { + if m.findMatchFn != nil { + return m.findMatchFn(ctx, n, s, sd) + } + return nil, nil +} +func (m *mockRepo) MergePendingSources(_ context.Context, id uuid.UUID, addSources, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error { + if m.mergePendingFn != nil { + return m.mergePendingFn(id, addSources, addQuellen, addContribs, newKonfidenz, newHinweis) + } + return nil } func (m *mockRepo) ListQueue(ctx context.Context, status, sortBy, order string, l, o int) ([]DiscoveredMarket, error) { m.listQueueCalls = append(m.listQueueCalls, listQueueCall{ @@ -104,6 +127,42 @@ func (m *mockRepo) Stats(ctx context.Context, forwardMonths, recentErrorsLimit i func (m *mockRepo) UpdatePending(ctx context.Context, id uuid.UUID, f UpdatePendingFields, nn *string) error { return nil } +func (m *mockRepo) SetEnrichment(_ context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error { + if m.setEnrichmentFn != nil { + return m.setEnrichmentFn(id, payload, status) + } + return nil +} +func (m *mockRepo) GetEnrichmentCache(_ context.Context, key string) (enrich.Enrichment, bool, error) { + if m.getCacheFn != nil { + return m.getCacheFn(key) + } + return enrich.Enrichment{}, false, nil +} +func (m *mockRepo) SetEnrichmentCache(_ context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error { + if m.setCacheFn != nil { + return m.setCacheFn(key, payload, ttl) + } + return nil +} +func (m *mockRepo) ListPendingEnrichment(_ context.Context, limit int) ([]DiscoveredMarket, error) { + if m.listPendingEnrichFn != nil { + return m.listPendingEnrichFn(limit) + } + return nil, nil +} +func (m *mockRepo) GetSimilarityCache(_ context.Context, pairKey string) (enrich.Verdict, bool, error) { + if m.getSimCacheFn != nil { + return m.getSimCacheFn(pairKey) + } + return enrich.Verdict{}, false, nil +} +func (m *mockRepo) SetSimilarityCache(_ context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error { + if m.setSimCacheFn != nil { + return m.setSimCacheFn(pairKey, v, ttl) + } + return nil +} // noopLinkVerifier passes every URL — used by tests to isolate from network. type noopLinkVerifier struct{} diff --git a/backend/internal/domain/discovery/model.go b/backend/internal/domain/discovery/model.go index 5a61689..8957d32 100644 --- a/backend/internal/domain/discovery/model.go +++ b/backend/internal/domain/discovery/model.go @@ -5,6 +5,8 @@ import ( "time" "github.com/google/uuid" + + "marktvogt.de/backend/internal/domain/discovery/enrich" ) // SourceContribution captures what one source reported about a market. @@ -67,8 +69,25 @@ type DiscoveredMarket struct { ReviewedAt *time.Time `json:"reviewed_at"` ReviewedBy *uuid.UUID `json:"reviewed_by"` CreatedEditionID *uuid.UUID `json:"created_edition_id"` + + // Enrichment payload + worker lifecycle. See enrich.Enrichment for the contract. + Enrichment enrich.Enrichment `json:"enrichment"` + EnrichmentStatus string `json:"enrichment_status"` // pending|done|failed|skipped + EnrichmentAttempts int `json:"enrichment_attempts"` + EnrichedAt *time.Time `json:"enriched_at"` } +// EnrichmentStatus constants for discovered_markets.enrichment_status. +// The payload type + provenance constants live in the enrich package; this +// status string is a DB column value used by repository queries, kept here +// alongside the other DB-lifecycle constants. +const ( + EnrichmentStatusPending = "pending" + EnrichmentStatusDone = "done" + EnrichmentStatusFailed = "failed" + EnrichmentStatusSkipped = "skipped" +) + // RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year). type RejectedDiscovery struct { ID uuid.UUID `json:"id"` diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go index 29e02dd..9f47913 100644 --- a/backend/internal/domain/discovery/repository.go +++ b/backend/internal/domain/discovery/repository.go @@ -4,12 +4,15 @@ package discovery import ( "context" "encoding/json" + "errors" "fmt" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + + "marktvogt.de/backend/internal/domain/discovery/enrich" ) type Repository interface { @@ -17,7 +20,16 @@ type Repository interface { EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) - QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) + // FindPendingMatch looks up an existing pending queue row by the exact + // tuple (name_normalized, stadt, start_datum). Returns nil when no match + // exists. Used by the crawl auto-merge path — callers decide whether to + // merge sources into the match or skip as redundant. + FindPendingMatch(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (*DiscoveredMarket, error) + // MergePendingSources appends the supplied per-source data onto an existing + // pending row. Arrays are deduped (set-union semantics); konfidenz and + // hinweis are overwritten with the caller-computed values. Idempotent — + // calling with already-present sources is a no-op. + MergePendingSources(ctx context.Context, id uuid.UUID, addSources []string, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error ListQueue(ctx context.Context, status, sortBy, order string, limit, offset int) ([]DiscoveredMarket, error) CountQueue(ctx context.Context, status string) (int, error) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) @@ -27,6 +39,20 @@ type Repository interface { BeginTx(ctx context.Context) (pgx.Tx, error) Stats(ctx context.Context, forwardMonths, recentErrorsLimit int) (Stats, error) UpdatePending(ctx context.Context, id uuid.UUID, fields UpdatePendingFields, nameNormalized *string) error + + // Enrichment persistence. SetEnrichment writes payload + status + bumps + // attempts + stamps enriched_at in one UPDATE so the worker stays idempotent. + SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error + // ListPendingEnrichment returns queue rows awaiting their first enrichment + // pass. Ordered by discovered_at ASC so older rows get processed first. + ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error) + // Cache operations keyed on sha256(name_normalized|stadt|year). + GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error) + SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error + + // Similarity AI cache — keyed on enrich.SimilarityPairKey. + GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error) + SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error } // SeriesCandidate is a minimal projection used for name-normalization comparison in Go. @@ -106,16 +132,70 @@ func (r *pgRepository) IsRejected(ctx context.Context, nameNormalized, stadt str return exists, err } -func (r *pgRepository) QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) { - var exists bool - err := r.pool.QueryRow(ctx, ` -SELECT EXISTS( - SELECT 1 FROM discovered_markets - WHERE status='pending' - AND name_normalized=$1 AND stadt=$2 - AND start_datum IS NOT DISTINCT FROM $3 -)`, nameNormalized, stadt, startDatum).Scan(&exists) - return exists, err +func (r *pgRepository) FindPendingMatch(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (*DiscoveredMarket, error) { + row := r.pool.QueryRow(ctx, ` +SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, + start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''), + coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status, + discovered_at, reviewed_at, reviewed_by, created_edition_id, + sources, source_contributions, + enrichment, enrichment_status, enrichment_attempts, enriched_at +FROM discovered_markets +WHERE status='pending' + AND name_normalized=$1 AND stadt=$2 + AND start_datum IS NOT DISTINCT FROM $3 +LIMIT 1`, nameNormalized, stadt, startDatum) + d, err := scanDiscoveredMarket(row) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + return &d, nil +} + +// MergePendingSources appends new per-source data onto an existing pending +// row using set-union semantics for the text[] columns and a JSONB concat +// for source_contributions. Konfidenz and hinweis are overwritten with the +// caller-computed values — the caller already knows the new source count +// and whether a date_conflict hint still applies. +// +// Idempotent: if add* arrays contain entries already present, the DISTINCT +// clauses drop duplicates. Contributions use a stricter dedup via source_name +// uniqueness — a source that re-crawls should overwrite its own row, not +// append a second copy. +func (r *pgRepository) MergePendingSources(ctx context.Context, id uuid.UUID, addSources []string, addQuellen []string, addContribs []SourceContribution, newKonfidenz, newHinweis string) error { + contribJSON, err := json.Marshal(addContribs) + if err != nil { + return fmt.Errorf("marshal added contributions: %w", err) + } + // Dedup text[] via ARRAY(SELECT DISTINCT unnest(...)). + // source_contributions: de-dupe by source_name — keep the existing entry + // when a source name collides, otherwise append the new one. Ordering by + // source_name keeps the array deterministic for downstream comparison. + tag, err := r.pool.Exec(ctx, ` +UPDATE discovered_markets +SET sources = ARRAY(SELECT DISTINCT unnest(sources || $2::text[])), + quellen = ARRAY(SELECT DISTINCT unnest(quellen || $3::text[])), + source_contributions = ( + SELECT jsonb_agg(c ORDER BY c->>'source_name') + FROM ( + SELECT DISTINCT ON (c->>'source_name') c + FROM jsonb_array_elements(source_contributions || $4::jsonb) AS c + ) deduped + ), + konfidenz = NULLIF($5, ''), + hinweis = $6 +WHERE id = $1 AND status = 'pending'`, + id, addSources, addQuellen, contribJSON, newKonfidenz, newHinweis) + if err != nil { + return fmt.Errorf("merge pending sources: %w", err) + } + if tag.RowsAffected() == 0 { + return fmt.Errorf("no pending row with id %s", id) + } + return nil } func (r *pgRepository) ListQueue(ctx context.Context, status, sortBy, order string, limit, offset int) ([]DiscoveredMarket, error) { @@ -125,7 +205,8 @@ SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''), coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status, discovered_at, reviewed_at, reviewed_by, created_edition_id, - sources, source_contributions + sources, source_contributions, + enrichment, enrichment_status, enrichment_attempts, enriched_at FROM discovered_markets WHERE status = $1 ` + orderBy + ` @@ -188,13 +269,45 @@ func (r *pgRepository) CountQueue(ctx context.Context, status string) (int, erro return n, err } +// ListPendingEnrichment selects queue rows still awaiting enrichment. The +// status='pending' filter excludes accepted/rejected rows — enrichment only +// makes sense for review candidates. Ordered by discovered_at ASC so an +// operator running the bulk action sees the oldest queue entries improve first. +func (r *pgRepository) ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error) { + rows, err := r.pool.Query(ctx, ` +SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, + start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''), + coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status, + discovered_at, reviewed_at, reviewed_by, created_edition_id, + sources, source_contributions, + enrichment, enrichment_status, enrichment_attempts, enriched_at +FROM discovered_markets +WHERE status = 'pending' AND enrichment_status = $1 +ORDER BY discovered_at ASC +LIMIT $2`, EnrichmentStatusPending, limit) + if err != nil { + return nil, err + } + defer rows.Close() + out := make([]DiscoveredMarket, 0) + for rows.Next() { + d, err := scanDiscoveredMarket(rows) + if err != nil { + return nil, err + } + out = append(out, d) + } + return out, rows.Err() +} + func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) { row := r.pool.QueryRow(ctx, ` SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''), coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status, discovered_at, reviewed_at, reviewed_by, created_edition_id, - sources, source_contributions + sources, source_contributions, + enrichment, enrichment_status, enrichment_attempts, enriched_at FROM discovered_markets WHERE id = $1`, id) return scanDiscoveredMarket(row) } @@ -210,12 +323,14 @@ type scanner interface { func scanDiscoveredMarket(s scanner) (DiscoveredMarket, error) { var d DiscoveredMarket var contribJSON []byte + var enrichmentJSON []byte if err := s.Scan( &d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land, &d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Konfidenz, &d.AgentStatus, &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status, &d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID, &d.Sources, &contribJSON, + &enrichmentJSON, &d.EnrichmentStatus, &d.EnrichmentAttempts, &d.EnrichedAt, ); err != nil { return DiscoveredMarket{}, err } @@ -224,6 +339,11 @@ func scanDiscoveredMarket(s scanner) (DiscoveredMarket, error) { return DiscoveredMarket{}, fmt.Errorf("unmarshal source_contributions: %w", err) } } + if len(enrichmentJSON) > 0 { + if err := json.Unmarshal(enrichmentJSON, &d.Enrichment); err != nil { + return DiscoveredMarket{}, fmt.Errorf("unmarshal enrichment: %w", err) + } + } if d.Sources == nil { d.Sources = []string{} } @@ -329,3 +449,124 @@ LIMIT $1`, recentErrorsLimit) } return s, rows.Err() } + +// SetEnrichment persists a worker's enrichment outcome in a single UPDATE. +// Attempts is incremented server-side so concurrent retries stay honest even +// if the caller's read-modify-write is racy. enriched_at is only stamped for +// terminal states ('done' / 'failed'), not 'skipped', to distinguish "we ran +// and nothing changed" from "we ran and succeeded trivially". +func (r *pgRepository) SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error { + payloadJSON, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal enrichment: %w", err) + } + var enrichedExpr string + switch status { + case EnrichmentStatusDone, EnrichmentStatusFailed: + enrichedExpr = "now()" + default: + enrichedExpr = "enriched_at" // preserve existing value + } + tag, err := r.pool.Exec(ctx, fmt.Sprintf(` +UPDATE discovered_markets +SET enrichment = $2, + enrichment_status = $3, + enrichment_attempts = enrichment_attempts + 1, + enriched_at = %s +WHERE id = $1`, enrichedExpr), id, payloadJSON, status) + if err != nil { + return fmt.Errorf("set enrichment: %w", err) + } + if tag.RowsAffected() == 0 { + return fmt.Errorf("no discovered_markets row with id %s", id) + } + return nil +} + +// GetEnrichmentCache returns (payload, true, nil) on a hit that has not yet +// expired, (_, false, nil) on a miss or expired entry. Expired entries are +// treated as misses so the caller refetches rather than relying on a +// background pruner. +func (r *pgRepository) GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error) { + var payloadJSON []byte + err := r.pool.QueryRow(ctx, ` +SELECT payload FROM enrichment_cache +WHERE cache_key = $1 + AND (expires_at IS NULL OR expires_at > now())`, key).Scan(&payloadJSON) + if errors.Is(err, pgx.ErrNoRows) { + return enrich.Enrichment{}, false, nil + } + if err != nil { + return enrich.Enrichment{}, false, fmt.Errorf("cache get: %w", err) + } + var e enrich.Enrichment + if err := json.Unmarshal(payloadJSON, &e); err != nil { + return enrich.Enrichment{}, false, fmt.Errorf("unmarshal cache payload: %w", err) + } + return e, true, nil +} + +// SetEnrichmentCache writes or replaces a cache entry. ttl==0 means "no +// expiry" (expires_at stays NULL). Callers typically use 30 days. +func (r *pgRepository) SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error { + payloadJSON, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal cache payload: %w", err) + } + var expiresAt *time.Time + if ttl > 0 { + t := time.Now().Add(ttl) + expiresAt = &t + } + _, err = r.pool.Exec(ctx, ` +INSERT INTO enrichment_cache (cache_key, payload, expires_at) +VALUES ($1, $2, $3) +ON CONFLICT (cache_key) DO UPDATE + SET payload = EXCLUDED.payload, + created_at = now(), + expires_at = EXCLUDED.expires_at`, key, payloadJSON, expiresAt) + if err != nil { + return fmt.Errorf("cache set: %w", err) + } + return nil +} + +// GetSimilarityCache returns a cached AI verdict for a (pairKey) or +// (zero, false, nil) on miss/expiry. Expired entries are treated as misses. +func (r *pgRepository) GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error) { + var v enrich.Verdict + err := r.pool.QueryRow(ctx, ` +SELECT same, confidence, reason, model FROM similarity_ai_cache +WHERE pair_key = $1 + AND (expires_at IS NULL OR expires_at > now())`, pairKey).Scan(&v.Same, &v.Confidence, &v.Reason, &v.Model) + if errors.Is(err, pgx.ErrNoRows) { + return enrich.Verdict{}, false, nil + } + if err != nil { + return enrich.Verdict{}, false, fmt.Errorf("similarity cache get: %w", err) + } + return v, true, nil +} + +// SetSimilarityCache upserts a verdict. ttl=0 means "no expiry" (nullable). +func (r *pgRepository) SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error { + var expiresAt *time.Time + if ttl > 0 { + t := time.Now().Add(ttl) + expiresAt = &t + } + _, err := r.pool.Exec(ctx, ` +INSERT INTO similarity_ai_cache (pair_key, same, confidence, reason, model, expires_at) +VALUES ($1, $2, $3, $4, $5, $6) +ON CONFLICT (pair_key) DO UPDATE + SET same = EXCLUDED.same, + confidence = EXCLUDED.confidence, + reason = EXCLUDED.reason, + model = EXCLUDED.model, + created_at = now(), + expires_at = EXCLUDED.expires_at`, pairKey, v.Same, v.Confidence, v.Reason, v.Model, expiresAt) + if err != nil { + return fmt.Errorf("similarity cache set: %w", err) + } + return nil +} diff --git a/backend/internal/domain/discovery/routes.go b/backend/internal/domain/discovery/routes.go index 5460382..e884b77 100644 --- a/backend/internal/domain/discovery/routes.go +++ b/backend/internal/domain/discovery/routes.go @@ -24,9 +24,19 @@ func RegisterRoutes( admin.POST("/queue/:id/accept", h.Accept) admin.POST("/queue/:id/reject", h.Reject) admin.GET("/queue/:id/similar", h.Similar) + // Per-row LLM enrichment (MR 3b). Synchronous — operator waits. + admin.POST("/queue/:id/enrich", h.EnrichLLM) + // Per-pair AI similarity tiebreak (MR 4). Synchronous; short call. + admin.POST("/queue/:aid/similar/:bid/classify", h.ClassifySimilarPair) // Manual crawl trigger — subject to hourly rate limit. admin.POST("/crawl-manual", h.Crawl) // Async crawl status polling. admin.GET("/crawl-status", h.CrawlStatus) + // Manual bulk crawl-enrich — deterministic consolidation + Nominatim + // geocoding applied to every enrichment_status='pending' row. Async + // (202 + polling) because the Nominatim 1rps rate means a full queue + // can take minutes. + admin.POST("/enrichment/crawl-all", h.RunCrawlEnrichAll) + admin.GET("/enrichment/crawl-all-status", h.CrawlEnrichStatus) } } diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 2a257f0..9f5fe3a 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -12,9 +12,21 @@ import ( "github.com/google/uuid" "marktvogt.de/backend/internal/domain/discovery/crawler" + "marktvogt.de/backend/internal/domain/discovery/enrich" "marktvogt.de/backend/internal/domain/market" ) +// Geocoder is the narrow interface the enrichment flow depends on. +// *pkg/geocode.Geocoder satisfies it. Declared here instead of in the +// sub-package enrich/ to avoid an import cycle (enrich depends on discovery +// for the DiscoveredMarket / Enrichment types). +// +// Structurally identical to enrich.Geocoder, so a concrete *geocode.Geocoder +// satisfies both and can be passed across the package boundary. +type Geocoder interface { + Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error) +} + type marketCreator interface { Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error) CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error) @@ -39,15 +51,31 @@ type Service struct { crawler crawlerRunner marketCreator marketCreator linkChecker linkVerifier + // geocoder is optional — nil means crawl-enrich skips lat/lng. Wired from + // server/routes.go using the shared Nominatim client (1 rps limited). + geocoder Geocoder + // llmEnricher is the AI-backed fallback pass. Nil-safe via NoopLLMEnricher + // in test wiring; production code passes a real MistralLLMEnricher. + llmEnricher enrich.LLMEnricher + // simClassifier is the AI-backed duplicate tiebreaker. Nil-safe via + // NoopSimilarityClassifier. + simClassifier enrich.SimilarityClassifier } // NewService constructs a Service wired for the crawler-driven Crawl path. -func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service { +// geocoder may be nil; in that case crawl-enrich runs consolidation only and +// skips the lat/lng step. llm may be nil; per-row LLM enrichment then +// returns an error instead of attempting a call. simClassifier may be nil; +// ClassifySimilarPair returns an error rather than a zero-confidence answer. +func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder, llm enrich.LLMEnricher, sim enrich.SimilarityClassifier) *Service { return &Service{ repo: repo, crawler: cr, marketCreator: mc, linkChecker: lc, + geocoder: gc, + llmEnricher: llm, + simClassifier: sim, } } @@ -59,9 +87,13 @@ type CrawlSummary struct { Merged int `json:"merged"` MergedAcrossSites int `json:"merged_across_sites"` Discovered int `json:"discovered"` - DedupedExisting int `json:"deduped_existing"` - DedupedRejected int `json:"deduped_rejected"` - DedupedQueue int `json:"deduped_queue"` + // AutoMerged counts queue rows that received a new source this run — + // cross-crawl consolidation. DedupedQueue now only counts truly redundant + // pickups (same source, same tuple, seen again). + AutoMerged int `json:"auto_merged"` + DedupedExisting int `json:"deduped_existing"` + DedupedRejected int `json:"deduped_rejected"` + DedupedQueue int `json:"deduped_queue"` // LinkCheckFailed is retained for JSON compatibility with the admin UI; // no longer populated since the crawler pipeline skips link verification. // Consider removing in a future schema version. @@ -180,13 +212,42 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { continue } } - pending, err := s.repo.QueueHasPending(insertCtx, nameNorm, m.City, m.StartDate) + match, err := s.repo.FindPendingMatch(insertCtx, nameNorm, m.City, m.StartDate) if err != nil { slog.WarnContext(ctx, "queue pending check", "error", err) continue } - if pending { - summary.DedupedQueue++ + if match != nil { + // A pending row already covers this (name, stadt, start_date). + // Two cases: + // 1. The new event brings a source already present on the match + // — nothing to merge, count as redundant (same source + // re-emitted the event this crawl, or a previous run already + // captured it). + // 2. The new event introduces a source not yet represented — + // merge the per-source data onto the existing row. This is + // the "auto-merge" path: avoids creating a duplicate queue + // entry and upgrades konfidenz as source count grows. + if containsAllSources(match.Sources, m.Sources) { + summary.DedupedQueue++ + continue + } + addSources, addQuellen, addContribs := newSourceDelta(match, m) + mergedSources := unionStrings(match.Sources, addSources) + newHinweis := match.Hinweis + if m.Hinweis != "" && !strings.Contains(newHinweis, m.Hinweis) { + if newHinweis != "" { + newHinweis += "; " + } + newHinweis += m.Hinweis + } + if err := s.repo.MergePendingSources(insertCtx, match.ID, + addSources, addQuellen, addContribs, + konfidenzForSources(mergedSources), newHinweis); err != nil { + slog.WarnContext(ctx, "auto-merge failed", "id", match.ID, "error", err) + continue + } + summary.AutoMerged++ continue } @@ -238,6 +299,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { "merged", summary.Merged, "merged_across_sites", summary.MergedAcrossSites, "discovered", summary.Discovered, + "auto_merged", summary.AutoMerged, "deduped_existing", summary.DedupedExisting, "deduped_rejected", summary.DedupedRejected, "deduped_queue", summary.DedupedQueue, @@ -249,23 +311,126 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { return summary, nil } +// containsAllSources reports whether every name in incoming is already in +// existing. Used to detect the "no new source info" case — same crawler run +// re-emitting a row or a subsequent run repeating a single-source pickup. +func containsAllSources(existing, incoming []string) bool { + if len(incoming) == 0 { + return true + } + set := make(map[string]struct{}, len(existing)) + for _, s := range existing { + set[s] = struct{}{} + } + for _, s := range incoming { + if _, ok := set[s]; !ok { + return false + } + } + return true +} + +// unionStrings returns a de-duplicated union of two string slices, preserving +// first-seen order (a's entries before new entries from b). +func unionStrings(a, b []string) []string { + seen := make(map[string]struct{}, len(a)+len(b)) + out := make([]string, 0, len(a)+len(b)) + for _, s := range a { + if _, ok := seen[s]; ok { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + for _, s := range b { + if _, ok := seen[s]; ok { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + return out +} + +// newSourceDelta computes the per-source data the incoming event brings that +// the existing match does not already have. Returns slices of (source names, +// quellen URLs, source contributions) to append via MergePendingSources. +// Contributions whose SourceName is already present on the match are dropped — +// the repo's merge query would otherwise keep the first copy, but sending +// them over is just wire waste. +func newSourceDelta(match *DiscoveredMarket, m crawler.MergedEvent) ( + addSources []string, addQuellen []string, addContribs []SourceContribution, +) { + existingSources := make(map[string]struct{}, len(match.Sources)) + for _, s := range match.Sources { + existingSources[s] = struct{}{} + } + for _, s := range m.Sources { + if _, ok := existingSources[s]; ok { + continue + } + addSources = append(addSources, s) + } + + existingQuellen := make(map[string]struct{}, len(match.Quellen)) + for _, q := range match.Quellen { + existingQuellen[q] = struct{}{} + } + for _, q := range m.Quellen { + if _, ok := existingQuellen[q]; ok { + continue + } + addQuellen = append(addQuellen, q) + } + + incomingContribs := toContributions(m.Contributions) + for _, c := range incomingContribs { + if _, ok := existingSources[c.SourceName]; ok { + continue + } + addContribs = append(addContribs, c) + } + return addSources, addQuellen, addContribs +} + // crawlerKonfidenz derives a three-level confidence label for a merged event. +// Thin wrapper over konfidenzForSources to keep the call site compact. +func crawlerKonfidenz(m crawler.MergedEvent) string { + return konfidenzForSources(m.Sources) +} + +// Source name constants (discovery-package local). Kept in sync with the +// crawler sub-package's unexported equivalents via manual copy — they're +// compared against RawEvent.SourceName / MergedEvent.Sources values that +// the crawler parsers emit. +const ( + srcMittelaltermarktOnline = "mittelaltermarkt_online" + srcMarktkalendarium = "marktkalendarium" + srcMittelalterkalender = "mittelalterkalender" + srcFestivalAlarm = "festival_alarm" + srcSuendenfrei = "suendenfrei" +) + +// konfidenzForSources is the canonical signal-to-label mapping. Shared +// between the initial crawler-produced insert and the auto-merge path so a +// row transitioning from 1 source to 2 sources gets re-labelled correctly. +// // Signal: cross-source agreement is the strongest indicator — two or more // independent calendars emitting the same (normalized name, city, start_date) // triple is high confidence. Single-source rows fall back to source rank: // Tribe JSON and marktkalendarium curate their data, suendenfrei's prose // regex is brittle. -func crawlerKonfidenz(m crawler.MergedEvent) string { - if len(m.Sources) >= 2 { +func konfidenzForSources(sources []string) string { + if len(sources) >= 2 { return KonfidenzHoch } - if len(m.Sources) == 1 { - switch m.Sources[0] { - case "mittelaltermarkt_online", "marktkalendarium": + if len(sources) == 1 { + switch sources[0] { + case srcMittelaltermarktOnline, srcMarktkalendarium: return KonfidenzMittel - case "mittelalterkalender", "festival_alarm": + case srcMittelalterkalender, srcFestivalAlarm: return KonfidenzMittel - case "suendenfrei": + case srcSuendenfrei: return KonfidenzNiedrig } } @@ -449,6 +614,21 @@ func (s *Service) FindSimilarToQueueEntry(ctx context.Context, id uuid.UUID) ([] return FindSimilar(target, candidates, 0.5), nil } +// contributionsForEnrich maps the rich discovery-domain SourceContribution +// down to the narrow enrich.Contribution shape — keeps the enrich package +// free of a reverse import on discovery. +func contributionsForEnrich(cs []SourceContribution) []enrich.Contribution { + out := make([]enrich.Contribution, 0, len(cs)) + for _, c := range cs { + out = append(out, enrich.Contribution{ + PLZ: c.PLZ, + Venue: c.Venue, + Organizer: c.Organizer, + }) + } + return out +} + // toContributions translates the crawler's per-source RawEvents into the // discovery-domain SourceContribution shape for JSONB persistence. func toContributions(raws []crawler.RawEvent) []SourceContribution { @@ -488,3 +668,221 @@ func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UU } return nil } + +// RunLLMEnrichOne fills category / opening_hours / description for a single +// queue row via the injected LLMEnricher. Cache-first: if the row's +// CacheKey hits a fresh entry, we reuse it and skip the LLM call entirely. +// +// The persisted payload is a Merge(crawl, llm) — crawl-populated fields +// survive unchanged, LLM fills only the gaps. The cache stores the raw LLM +// output (not the merged result) so the same cached answer can be layered +// onto different crawl-enrich bases if the row gets re-enriched. +func (s *Service) RunLLMEnrichOne(ctx context.Context, queueID uuid.UUID) (enrich.Enrichment, error) { + if s.llmEnricher == nil { + return enrich.Enrichment{}, errors.New("llm enricher not configured") + } + + row, err := s.repo.GetDiscovered(ctx, queueID) + if err != nil { + return enrich.Enrichment{}, fmt.Errorf("load row: %w", err) + } + + year := 0 + if row.StartDatum != nil { + year = row.StartDatum.Year() + } + cacheKey := enrich.CacheKey(row.NameNormalized, row.Stadt, year) + + // Cache lookup — if we have a fresh LLM payload for this (name, city, + // year) tuple, skip the call. The merge still runs so a newly-populated + // crawl-enrich base gets its provenance preserved. + if cached, hit, err := s.repo.GetEnrichmentCache(ctx, cacheKey); err != nil { + slog.WarnContext(ctx, "enrichment cache get failed; continuing", + "cache_key", cacheKey, "error", err) + } else if hit { + merged := enrich.Merge(row.Enrichment, cached) + if err := s.repo.SetEnrichment(ctx, row.ID, merged, EnrichmentStatusDone); err != nil { + return enrich.Enrichment{}, fmt.Errorf("persist merged (cache hit): %w", err) + } + return merged, nil + } + + llmReq := enrich.LLMRequest{ + MarktName: row.MarktName, + Stadt: row.Stadt, + Land: row.Land, + Bundesland: row.Bundesland, + Quellen: row.Quellen, + Partial: row.Enrichment, + } + llmPayload, err := s.llmEnricher.EnrichMissing(ctx, llmReq) + if err != nil { + // Persist the failure so the operator sees the attempts counter + // tick; the merged payload stays as-is (crawl-enrich values + // preserved). Ignore persistence errors here — the original LLM + // error is what the caller needs to see. + _ = s.repo.SetEnrichment(ctx, row.ID, row.Enrichment, EnrichmentStatusFailed) + return enrich.Enrichment{}, fmt.Errorf("llm enrich: %w", err) + } + + // Cache the raw LLM output (not the merged result). A later re-crawl + // might change crawl-enrich fields; the cached answer should layer on + // top of whatever the current base is. + if err := s.repo.SetEnrichmentCache(ctx, cacheKey, llmPayload, enrich.DefaultCacheTTL); err != nil { + slog.WarnContext(ctx, "enrichment cache set failed; continuing", + "cache_key", cacheKey, "error", err) + } + + merged := enrich.Merge(row.Enrichment, llmPayload) + if err := s.repo.SetEnrichment(ctx, row.ID, merged, EnrichmentStatusDone); err != nil { + return enrich.Enrichment{}, fmt.Errorf("persist merged: %w", err) + } + return merged, nil +} + +// ClassifySimilarPair runs the AI classifier on the two queue rows identified +// by aID and bID, returning a verdict about whether they're the same market. +// Cache-first: a content-keyed entry (enrich.SimilarityPairKey) shortcuts the +// LLM call when the same pair has been classified before. +// +// Intended for operator-triggered "AI tiebreak" on ambiguous similarity +// matches. The crawl-time auto-merge in MR 7 will call this on its own. +func (s *Service) ClassifySimilarPair(ctx context.Context, aID, bID uuid.UUID) (enrich.Verdict, error) { + if s.simClassifier == nil { + return enrich.Verdict{}, errors.New("similarity classifier not configured") + } + if aID == bID { + return enrich.Verdict{}, errors.New("cannot classify a row against itself") + } + a, err := s.repo.GetDiscovered(ctx, aID) + if err != nil { + return enrich.Verdict{}, fmt.Errorf("load row A: %w", err) + } + b, err := s.repo.GetDiscovered(ctx, bID) + if err != nil { + return enrich.Verdict{}, fmt.Errorf("load row B: %w", err) + } + + rowA := rowToSimilarity(a) + rowB := rowToSimilarity(b) + pairKey := enrich.SimilarityPairKey(rowA, rowB) + + if cached, hit, err := s.repo.GetSimilarityCache(ctx, pairKey); err != nil { + slog.WarnContext(ctx, "similarity cache get failed; continuing", + "pair_key", pairKey, "error", err) + } else if hit { + return cached, nil + } + + verdict, err := s.simClassifier.Classify(ctx, rowA, rowB) + if err != nil { + return enrich.Verdict{}, fmt.Errorf("classify: %w", err) + } + + if err := s.repo.SetSimilarityCache(ctx, pairKey, verdict, enrich.DefaultSimilarityCacheTTL); err != nil { + slog.WarnContext(ctx, "similarity cache set failed; continuing", + "pair_key", pairKey, "error", err) + } + return verdict, nil +} + +// rowToSimilarity adapts a DiscoveredMarket to the narrow SimilarityRow the +// enrich package consumes. Year comes from StartDatum (0 when unknown). +func rowToSimilarity(r DiscoveredMarket) enrich.SimilarityRow { + year := 0 + if r.StartDatum != nil { + year = r.StartDatum.Year() + } + return enrich.SimilarityRow{ + NameNormalized: r.NameNormalized, + Stadt: r.Stadt, + Year: year, + Name: r.MarktName, + Quellen: r.Quellen, + } +} + +// CrawlEnrichSummary reports the outcome of one RunCrawlEnrichAll pass. +// Mirrors CrawlSummary's shape so the admin UI can reuse its render path. +type CrawlEnrichSummary struct { + StartedAt time.Time `json:"started_at"` + DurationMs int64 `json:"duration_ms"` + Total int `json:"total"` // rows loaded for enrichment + Succeeded int `json:"succeeded"` // enrichment_status -> done + Failed int `json:"failed"` // SetEnrichment returned error + // Bounded list of (id, error) pairs for operator debugging. Empty on + // happy paths; capped so a catastrophic run doesn't balloon the summary. + Errors []CrawlEnrichError `json:"errors"` +} + +// CrawlEnrichError records a single row that failed to persist enrichment. +type CrawlEnrichError struct { + QueueID uuid.UUID `json:"queue_id"` + Error string `json:"error"` +} + +// runCrawlEnrichAllBatchSize caps how many rows we pull per bulk pass. Bigger +// than the usual queue depth keeps us single-pass; the Nominatim 1rps limit +// is the real throughput constraint, not batch size. +const runCrawlEnrichAllBatchSize = 2000 + +// runCrawlEnrichErrorCap bounds the Errors slice so a total outage doesn't +// produce a summary the admin UI can't render. Operators can check logs for +// the full set; this is just an at-a-glance list. +const runCrawlEnrichErrorCap = 50 + +// RunCrawlEnrichAll applies crawl-enrich (source consolidation + Nominatim +// geocoding) to every queue row with enrichment_status='pending'. Persists +// the resulting payload with status='done' on success, 'failed' on error. +// +// Already-enriched rows are skipped by the repo query — this is idempotent +// only in the sense that re-running picks up any pending rows that appeared +// since the last run. Forcing a re-enrich of already-done rows is a future +// feature; for now operators must manually reset enrichment_status. +// +// Designed to be called from a goroutine with a detached context — the +// Nominatim rate limit means a 200-row queue takes 3+ minutes and must +// outlive the originating HTTP request. +func (s *Service) RunCrawlEnrichAll(ctx context.Context) (CrawlEnrichSummary, error) { + summary := CrawlEnrichSummary{StartedAt: time.Now().UTC()} + rows, err := s.repo.ListPendingEnrichment(ctx, runCrawlEnrichAllBatchSize) + if err != nil { + return summary, fmt.Errorf("list pending enrichment: %w", err) + } + summary.Total = len(rows) + + for _, row := range rows { + if err := ctx.Err(); err != nil { + // Caller cancelled — stop cleanly. Summary reflects partial + // progress; the remaining rows stay in enrichment_status='pending' + // and will be picked up by the next run. + return summary, err + } + in := enrich.Input{ + Stadt: row.Stadt, + Land: row.Land, + Contributions: contributionsForEnrich(row.SourceContributions), + } + payload := enrich.CrawlEnrich(ctx, in, s.geocoder) + if err := s.repo.SetEnrichment(ctx, row.ID, payload, EnrichmentStatusDone); err != nil { + slog.WarnContext(ctx, "set enrichment failed", + "queue_id", row.ID, "error", err) + summary.Failed++ + if len(summary.Errors) < runCrawlEnrichErrorCap { + summary.Errors = append(summary.Errors, CrawlEnrichError{ + QueueID: row.ID, + Error: err.Error(), + }) + } + continue + } + summary.Succeeded++ + } + summary.DurationMs = time.Since(summary.StartedAt).Milliseconds() + slog.InfoContext(ctx, "crawl-enrich-all completed", + "total", summary.Total, + "succeeded", summary.Succeeded, + "failed", summary.Failed, + "duration_ms", summary.DurationMs) + return summary, nil +} diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index 52ddbbf..54c1fc3 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "errors" "testing" "time" @@ -9,9 +10,24 @@ import ( "github.com/jackc/pgx/v5" "marktvogt.de/backend/internal/domain/discovery/crawler" + "marktvogt.de/backend/internal/domain/discovery/enrich" "marktvogt.de/backend/internal/domain/market" ) +// stubGeocoder is a test-local geocoder that returns canned lat/lng. +type stubGeocoder struct { + lat, lng *float64 + err error +} + +func (s stubGeocoder) Geocode(_ context.Context, _, _, _, _ string) (*float64, *float64, error) { + return s.lat, s.lng, s.err +} + +func ptrFloat(v float64) *float64 { return &v } + +const catMittelaltermarkt = "mittelaltermarkt" + // newMockRepo returns a mockRepo with default no-op implementations and an // inserted field that captures every InsertDiscovered call. func newMockRepo() *mockRepo { @@ -23,7 +39,7 @@ func newMockRepo() *mockRepo { m.listSeriesFn = func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil } m.editionExistsFn = func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil } m.isRejectedFn = func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil } - m.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil } + m.findMatchFn = func(_ context.Context, _, _ string, _ *time.Time) (*DiscoveredMarket, error) { return nil, nil } return m } @@ -131,7 +147,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, noopLinkVerifier{}, mc) + svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil) _, _, err := svc.Accept(context.Background(), qID, uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -153,7 +169,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, noopLinkVerifier{}, mc) + svc := NewService(m, nil, noopLinkVerifier{}, mc, nil, nil, nil) _, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -184,7 +200,7 @@ func TestServiceCrawlHappyPath(t *testing.T) { PerSourceMS: map[string]int64{"marktkalendarium": 1}, }, } - svc := NewService(repo, sc, lc, noopMarketCreator{}) + svc := NewService(repo, sc, lc, noopMarketCreator{}, nil, nil, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -208,13 +224,25 @@ func TestServiceCrawlHappyPath(t *testing.T) { } } -func TestServiceCrawlDedupQueue(t *testing.T) { +func TestServiceCrawlDedupQueue_SameSourceRedundant(t *testing.T) { + // New semantic: DedupedQueue only counts when incoming source is already + // on the match (nothing new to merge). A fresh source would auto-merge + // instead — covered by TestServiceCrawlAutoMerge_NewSource. repo := newMockRepo() - // Simulate: queue already has a matching pending row. - repo.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { - return true, nil - } start := mustParseDate(t, "2026-05-01") + existing := &DiscoveredMarket{ + ID: uuid.New(), + Sources: []string{"marktkalendarium"}, + Konfidenz: KonfidenzMittel, + } + var mergeCalls int + repo.findMatchFn = func(_ context.Context, _, _ string, _ *time.Time) (*DiscoveredMarket, error) { + return existing, nil + } + repo.mergePendingFn = func(uuid.UUID, []string, []string, []SourceContribution, string, string) error { + mergeCalls++ + return nil + } sc := &stubCrawlerRunner{ result: crawler.CrawlResult{ @@ -225,23 +253,113 @@ func TestServiceCrawlDedupQueue(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) summary, err := svc.Crawl(context.Background()) if err != nil { t.Fatal(err) } if summary.DedupedQueue != 1 { - t.Errorf("DedupedQueue = %d; want 1", summary.DedupedQueue) + t.Errorf("DedupedQueue = %d; want 1 (same source already present)", summary.DedupedQueue) + } + if summary.AutoMerged != 0 { + t.Errorf("AutoMerged = %d; want 0", summary.AutoMerged) } if summary.Discovered != 0 { - t.Errorf("Discovered = %d; want 0 (dupe should block insert)", summary.Discovered) + t.Errorf("Discovered = %d; want 0", summary.Discovered) + } + if mergeCalls != 0 { + t.Errorf("MergePendingSources called %d times; want 0 (no new source info)", mergeCalls) } if len(repo.inserted) != 0 { t.Errorf("inserted = %d; want 0", len(repo.inserted)) } } +func TestServiceCrawlAutoMerge_NewSourceUpgradesKonfidenz(t *testing.T) { + // Pending row already carries source A with konfidenz=mittel. New crawl + // brings the same event from source B -> auto-merge: delta sources are + // computed, MergePendingSources is called, AutoMerged=1, konfidenz gets + // upgraded to hoch (2 sources). + repo := newMockRepo() + start := mustParseDate(t, "2026-05-01") + existing := &DiscoveredMarket{ + ID: uuid.New(), + Sources: []string{"marktkalendarium"}, + Quellen: []string{"https://a/"}, + Konfidenz: KonfidenzMittel, + } + + var merge struct { + called bool + id uuid.UUID + addSources []string + addQuellen []string + addContribs []SourceContribution + newKonfidenz string + } + repo.findMatchFn = func(_ context.Context, _, _ string, _ *time.Time) (*DiscoveredMarket, error) { + return existing, nil + } + repo.mergePendingFn = func(id uuid.UUID, addSources, addQuellen []string, addContribs []SourceContribution, newKonfidenz, _ string) error { + merge.called = true + merge.id = id + merge.addSources = addSources + merge.addQuellen = addQuellen + merge.addContribs = addContribs + merge.newKonfidenz = newKonfidenz + return nil + } + + sc := &stubCrawlerRunner{ + result: crawler.CrawlResult{ + PerSource: map[string][]crawler.RawEvent{ + srcMittelaltermarktOnline: { + {SourceName: srcMittelaltermarktOnline, SourceURL: "https://b/", + Name: "X", City: "Y", StartDate: start}, + }, + }, + }, + } + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) + + summary, err := svc.Crawl(context.Background()) + if err != nil { + t.Fatal(err) + } + if summary.AutoMerged != 1 { + t.Errorf("AutoMerged = %d; want 1", summary.AutoMerged) + } + if summary.DedupedQueue != 0 { + t.Errorf("DedupedQueue = %d; want 0 (this was a merge, not a dupe)", summary.DedupedQueue) + } + if summary.Discovered != 0 { + t.Errorf("Discovered = %d; want 0 (no new row inserted)", summary.Discovered) + } + if !merge.called { + t.Fatal("MergePendingSources not called on auto-merge path") + } + if merge.id != existing.ID { + t.Errorf("merge id = %v; want %v", merge.id, existing.ID) + } + if len(merge.addSources) != 1 || merge.addSources[0] != srcMittelaltermarktOnline { + t.Errorf("addSources = %v; want [mittelaltermarkt_online]", merge.addSources) + } + if len(merge.addQuellen) != 1 || merge.addQuellen[0] != "https://b/" { + t.Errorf("addQuellen = %v; want [https://b/]", merge.addQuellen) + } + if len(merge.addContribs) != 1 || merge.addContribs[0].SourceName != srcMittelaltermarktOnline { + t.Errorf("addContribs first SourceName = %v; want mittelaltermarkt_online", + merge.addContribs) + } + if merge.newKonfidenz != KonfidenzHoch { + t.Errorf("newKonfidenz = %q; want %q (2 sources)", merge.newKonfidenz, KonfidenzHoch) + } + if len(repo.inserted) != 0 { + t.Errorf("inserted = %d; want 0 (merge path should not insert)", len(repo.inserted)) + } +} + func TestServiceCrawlDefaultsEndDate(t *testing.T) { repo := newMockRepo() start := mustParseDate(t, "2026-05-01") @@ -256,7 +374,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) if _, err := svc.Crawl(context.Background()); err != nil { t.Fatal(err) @@ -302,7 +420,7 @@ func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) { }, }, } - svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) // Cancel the context BEFORE Crawl runs — simulates gateway timeout // that fires while the handler is still mid-run. @@ -327,7 +445,7 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) { m := &mockRepo{ countQueueFn: func(_ context.Context, _ string) (int, error) { return 42, nil }, } - svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) rows, total, err := svc.ListPendingQueuePaged(context.Background(), QueueSortDefault, QueueOrderDefault, 50, 0) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -351,15 +469,15 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) { {SourceName: "marktkalendarium", SourceURL: "https://mk/", Name: "Markt X", City: "Dresden", PLZ: "01067", StartDate: start, Website: "https://organizer.de"}, }, - "mittelaltermarkt_online": { - {SourceName: "mittelaltermarkt_online", SourceURL: "https://mo/", + srcMittelaltermarktOnline: { + {SourceName: srcMittelaltermarktOnline, SourceURL: "https://mo/", DetailURL: "https://mo/e/1", Name: "Markt X", City: "Dresden", PLZ: "01067", StartDate: start, Venue: "Stallhof"}, }, }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) if _, err := svc.Crawl(context.Background()); err != nil { t.Fatal(err) @@ -380,7 +498,7 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) { } // First contribution should be the highest-rank source // (mittelaltermarkt_online = rank 1, marktkalendarium = rank 2). - if got.SourceContributions[0].SourceName != "mittelaltermarkt_online" { + if got.SourceContributions[0].SourceName != srcMittelaltermarktOnline { t.Errorf("first contribution source = %q; want mittelaltermarkt_online (rank 1)", got.SourceContributions[0].SourceName) } @@ -404,11 +522,11 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { result: crawler.CrawlResult{ PerSource: map[string][]crawler.RawEvent{ "marktkalendarium": {{SourceName: "marktkalendarium", SourceURL: "https://a/", Name: "X", City: "Y", StartDate: start}}, - "mittelaltermarkt_online": {{SourceName: "mittelaltermarkt_online", SourceURL: "https://b/", Name: "X", City: "Y", StartDate: start}}, + srcMittelaltermarktOnline: {{SourceName: srcMittelaltermarktOnline, SourceURL: "https://b/", Name: "X", City: "Y", StartDate: start}}, }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -424,3 +542,463 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { t.Errorf("Konfidenz = %q; want %q (2+ sources)", repo.inserted[0].Konfidenz, KonfidenzHoch) } } + +// TestRunCrawlEnrichAll_HappyPath exercises the bulk path: three pending rows, +// each with differently-shaped contributions, get consolidated and geocoded +// into the SetEnrichment calls. Nothing queries the LLM — this is the +// crawl-only pass. +func TestRunCrawlEnrichAll_HappyPath(t *testing.T) { + id1, id2, id3 := uuid.New(), uuid.New(), uuid.New() + rows := []DiscoveredMarket{ + { + ID: id1, Stadt: "Dresden", Land: "Deutschland", + SourceContributions: []SourceContribution{ + {SourceName: "marktkalendarium", PLZ: "01067", Venue: "Stallhof"}, + }, + }, + { + ID: id2, Stadt: "Leipzig", Land: "Deutschland", + SourceContributions: []SourceContribution{ + {SourceName: "a", Organizer: "Verein e.V."}, + {SourceName: "b", PLZ: "04109"}, + }, + }, + {ID: id3, Stadt: "", Land: "Deutschland"}, // No stadt, no contribs — skips geocode. + } + + var writes []struct { + id uuid.UUID + payload enrich.Enrichment + status string + } + repo := &mockRepo{ + listPendingEnrichFn: func(limit int) ([]DiscoveredMarket, error) { + if limit <= 0 { + t.Errorf("limit must be positive, got %d", limit) + } + return rows, nil + }, + setEnrichmentFn: func(id uuid.UUID, payload enrich.Enrichment, status string) error { + writes = append(writes, struct { + id uuid.UUID + payload enrich.Enrichment + status string + }{id, payload, status}) + return nil + }, + } + gc := stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc, nil, nil) + + summary, err := svc.RunCrawlEnrichAll(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if summary.Total != 3 { + t.Errorf("Total = %d; want 3", summary.Total) + } + if summary.Succeeded != 3 { + t.Errorf("Succeeded = %d; want 3", summary.Succeeded) + } + if summary.Failed != 0 { + t.Errorf("Failed = %d; want 0", summary.Failed) + } + if len(writes) != 3 { + t.Fatalf("expected 3 SetEnrichment calls, got %d", len(writes)) + } + // Row 1: PLZ consolidated, geocode returned. + if writes[0].payload.PLZ != "01067" || writes[0].payload.Venue != "Stallhof" { + t.Errorf("row1 payload: %+v", writes[0].payload) + } + if writes[0].payload.Lat == nil || *writes[0].payload.Lat != 51.05 { + t.Errorf("row1 lat: %v", writes[0].payload.Lat) + } + if writes[0].payload.Sources["plz"] != enrich.ProvenanceCrawl { + t.Errorf("row1 plz provenance: %q", writes[0].payload.Sources["plz"]) + } + // Row 2: first non-empty wins across contributions. + if writes[1].payload.PLZ != "04109" || writes[1].payload.Organizer != "Verein e.V." { + t.Errorf("row2 payload: %+v", writes[1].payload) + } + // Row 3: no stadt means no geocode attempt — payload has empty sources. + if writes[2].payload.Lat != nil { + t.Errorf("row3 should have no lat (no stadt), got %v", writes[2].payload.Lat) + } + // All writes mark status=done. + for i, w := range writes { + if w.status != EnrichmentStatusDone { + t.Errorf("write %d: status = %q, want %q", i, w.status, EnrichmentStatusDone) + } + } +} + +// TestRunCrawlEnrichAll_SetEnrichmentFailure verifies a row with a persist +// error is counted in Failed but does not halt the pass. +func TestRunCrawlEnrichAll_SetEnrichmentFailure(t *testing.T) { + idOK, idBad := uuid.New(), uuid.New() + rows := []DiscoveredMarket{ + {ID: idOK, Stadt: "Dresden", SourceContributions: []SourceContribution{{PLZ: "01067"}}}, + {ID: idBad, Stadt: "Leipzig", SourceContributions: []SourceContribution{{PLZ: "04109"}}}, + } + repo := &mockRepo{ + listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) { return rows, nil }, + setEnrichmentFn: func(id uuid.UUID, _ enrich.Enrichment, _ string) error { + if id == idBad { + return errors.New("db down") + } + return nil + }, + } + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) + + summary, err := svc.RunCrawlEnrichAll(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if summary.Succeeded != 1 || summary.Failed != 1 { + t.Errorf("Succeeded=%d Failed=%d; want 1/1", summary.Succeeded, summary.Failed) + } + if len(summary.Errors) != 1 || summary.Errors[0].QueueID != idBad { + t.Errorf("expected single error pinned to idBad, got %+v", summary.Errors) + } +} + +// stubLLMEnricher returns canned output or an error. Captures the request +// so tests can assert the service built the LLMRequest from the right row +// fields. +type stubLLMEnricher struct { + result enrich.Enrichment + err error + called int + lastReq enrich.LLMRequest +} + +func (s *stubLLMEnricher) EnrichMissing(_ context.Context, req enrich.LLMRequest) (enrich.Enrichment, error) { + s.called++ + s.lastReq = req + return s.result, s.err +} + +// TestRunLLMEnrichOne_HappyPath: crawl-enrich base + LLM fills missing fields; +// merged payload persists with status=done; cache gets populated with the raw +// LLM output (not the merged result). +func TestRunLLMEnrichOne_HappyPath(t *testing.T) { + rowID := uuid.New() + start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + crawlBase := enrich.Enrichment{ + PLZ: "01067", + Venue: "Stallhof", + Sources: enrich.Sources{"plz": enrich.ProvenanceCrawl, "venue": enrich.ProvenanceCrawl}, + } + llmResult := enrich.Enrichment{ + Category: catMittelaltermarkt, + Description: "Ein großer Markt in der Altstadt.", + Sources: enrich.Sources{"category": enrich.ProvenanceLLM, "description": enrich.ProvenanceLLM}, + Model: "mistral-large-latest", + InputTokens: 500, + OutputTokens: 80, + } + + var cacheSet struct { + called bool + key string + payload enrich.Enrichment + } + var persistSet struct { + called bool + id uuid.UUID + payload enrich.Enrichment + status string + } + repo := &mockRepo{ + getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{ + ID: rowID, + MarktName: "Mittelaltermarkt Dresden", + Stadt: "Dresden", + Land: "Deutschland", + NameNormalized: "mittelaltermarkt dresden", + StartDatum: &start, + Quellen: []string{"https://example.com/1", "https://example.com/2"}, + Enrichment: crawlBase, + }, nil + }, + getCacheFn: func(_ string) (enrich.Enrichment, bool, error) { + return enrich.Enrichment{}, false, nil // miss + }, + setCacheFn: func(key string, payload enrich.Enrichment, _ time.Duration) error { + cacheSet.called = true + cacheSet.key = key + cacheSet.payload = payload + return nil + }, + setEnrichmentFn: func(id uuid.UUID, payload enrich.Enrichment, status string) error { + persistSet.called = true + persistSet.id = id + persistSet.payload = payload + persistSet.status = status + return nil + }, + } + llm := &stubLLMEnricher{result: llmResult} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil) + + got, err := svc.RunLLMEnrichOne(context.Background(), rowID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if llm.called != 1 { + t.Errorf("LLM called %d times; want 1", llm.called) + } + // The enricher got the row's quellen + markt_name. + if llm.lastReq.MarktName != "Mittelaltermarkt Dresden" { + t.Errorf("LLMRequest.MarktName = %q; want Mittelaltermarkt Dresden", llm.lastReq.MarktName) + } + if len(llm.lastReq.Quellen) != 2 { + t.Errorf("LLMRequest.Quellen = %v; want 2", llm.lastReq.Quellen) + } + if llm.lastReq.Partial.PLZ != "01067" { + t.Errorf("LLMRequest.Partial.PLZ lost: %+v", llm.lastReq.Partial) + } + + // Merged result: crawl fields + LLM fields, each with correct provenance. + if got.PLZ != "01067" || got.Category != catMittelaltermarkt || got.Description == "" { + t.Errorf("merged payload incomplete: %+v", got) + } + if got.Sources["plz"] != enrich.ProvenanceCrawl { + t.Errorf("plz provenance: got %q, want crawl", got.Sources["plz"]) + } + if got.Sources["category"] != enrich.ProvenanceLLM { + t.Errorf("category provenance: got %q, want llm", got.Sources["category"]) + } + + // Cache stores the raw LLM output (not merged). + if !cacheSet.called { + t.Error("expected SetEnrichmentCache to be called") + } + if cacheSet.payload.PLZ != "" { + t.Error("cache should hold raw LLM payload; PLZ came from crawl and must not be cached") + } + if cacheSet.payload.Category != catMittelaltermarkt { + t.Errorf("cache payload missing LLM category: %+v", cacheSet.payload) + } + + // Persist writes the merged payload with status=done. + if !persistSet.called || persistSet.status != EnrichmentStatusDone { + t.Errorf("persist: called=%v status=%q; want true/done", persistSet.called, persistSet.status) + } + if persistSet.payload.PLZ != "01067" || persistSet.payload.Category != catMittelaltermarkt { + t.Errorf("persisted payload not merged: %+v", persistSet.payload) + } +} + +// TestRunLLMEnrichOne_CacheHitSkipsLLM: a cache hit means no LLM call, but +// the merged payload still persists (so crawl-enrich updates surface). +func TestRunLLMEnrichOne_CacheHitSkipsLLM(t *testing.T) { + rowID := uuid.New() + start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + cachedLLM := enrich.Enrichment{ + Category: "weihnachtsmarkt", + Sources: enrich.Sources{"category": enrich.ProvenanceLLM}, + } + repo := &mockRepo{ + getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{ID: rowID, Stadt: "Dresden", NameNormalized: "foo", StartDatum: &start}, nil + }, + getCacheFn: func(string) (enrich.Enrichment, bool, error) { + return cachedLLM, true, nil + }, + } + llm := &stubLLMEnricher{} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil) + + got, err := svc.RunLLMEnrichOne(context.Background(), rowID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if llm.called != 0 { + t.Errorf("LLM called %d times; want 0 (cache should skip it)", llm.called) + } + if got.Category != "weihnachtsmarkt" { + t.Errorf("merged category: got %q, want weihnachtsmarkt", got.Category) + } +} + +// TestRunLLMEnrichOne_LLMErrorMarksFailed: when the enricher returns an error, +// the row gets marked EnrichmentStatusFailed and the error is surfaced. +func TestRunLLMEnrichOne_LLMErrorMarksFailed(t *testing.T) { + rowID := uuid.New() + var persistStatus string + repo := &mockRepo{ + getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{ID: rowID}, nil + }, + setEnrichmentFn: func(_ uuid.UUID, _ enrich.Enrichment, status string) error { + persistStatus = status + return nil + }, + } + llm := &stubLLMEnricher{err: errors.New("mistral down")} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil) + + _, err := svc.RunLLMEnrichOne(context.Background(), rowID) + if err == nil { + t.Fatal("expected error; got nil") + } + if persistStatus != EnrichmentStatusFailed { + t.Errorf("persist status = %q; want %q", persistStatus, EnrichmentStatusFailed) + } +} + +// TestRunCrawlEnrichAll_EmptyQueueNoOp: nothing pending, zero summary, no writes. +func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) { + var writes int + repo := &mockRepo{ + listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) { + return []DiscoveredMarket{}, nil + }, + setEnrichmentFn: func(uuid.UUID, enrich.Enrichment, string) error { + writes++ + return nil + }, + } + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, nil) + + summary, err := svc.RunCrawlEnrichAll(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if summary.Total != 0 || summary.Succeeded != 0 || summary.Failed != 0 { + t.Errorf("empty-queue summary should be all zeros, got %+v", summary) + } + if writes != 0 { + t.Errorf("SetEnrichment called %d times on empty queue", writes) + } +} + +// stubSimilarityClassifier returns canned verdicts or errors. +type stubSimilarityClassifier struct { + result enrich.Verdict + err error + calls int +} + +func (s *stubSimilarityClassifier) Classify(_ context.Context, _, _ enrich.SimilarityRow) (enrich.Verdict, error) { + s.calls++ + return s.result, s.err +} + +// TestClassifySimilarPair_HappyPath: cache miss → LLM → cache write → return. +func TestClassifySimilarPair_HappyPath(t *testing.T) { + aID, bID := uuid.New(), uuid.New() + start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + rowA := DiscoveredMarket{ID: aID, MarktName: "Ritterfest Dresden", Stadt: "Dresden", NameNormalized: "ritterfest dresden", StartDatum: &start} + rowB := DiscoveredMarket{ID: bID, MarktName: "Mittelaltermarkt Dresden 2026", Stadt: "Dresden", NameNormalized: "mittelaltermarkt dresden", StartDatum: &start} + + var cacheSet struct { + called bool + verdict enrich.Verdict + } + repo := &mockRepo{ + getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) { + if id == aID { + return rowA, nil + } + return rowB, nil + }, + setSimCacheFn: func(_ string, v enrich.Verdict, _ time.Duration) error { + cacheSet.called = true + cacheSet.verdict = v + return nil + }, + } + sim := &stubSimilarityClassifier{ + result: enrich.Verdict{Same: false, Confidence: 0.6, Reason: "Unterschiedliche Namen."}, + } + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim) + + got, err := svc.ClassifySimilarPair(context.Background(), aID, bID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if sim.calls != 1 { + t.Errorf("classifier called %d times; want 1", sim.calls) + } + if got.Same || got.Confidence != 0.6 { + t.Errorf("verdict = %+v; want same=false, confidence=0.6", got) + } + if !cacheSet.called { + t.Error("expected SetSimilarityCache to be called on a cache miss") + } + if cacheSet.verdict.Confidence != 0.6 { + t.Errorf("cached verdict lost confidence: %+v", cacheSet.verdict) + } +} + +// TestClassifySimilarPair_CacheHitSkipsLLM: cache hit returns directly, +// no classifier call. +func TestClassifySimilarPair_CacheHitSkipsLLM(t *testing.T) { + aID, bID := uuid.New(), uuid.New() + cached := enrich.Verdict{Same: true, Confidence: 0.9, Reason: "same venue"} + + repo := &mockRepo{ + getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{}, nil + }, + getSimCacheFn: func(string) (enrich.Verdict, bool, error) { + return cached, true, nil + }, + } + sim := &stubSimilarityClassifier{} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim) + + got, err := svc.ClassifySimilarPair(context.Background(), aID, bID) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if sim.calls != 0 { + t.Errorf("classifier called %d times on cache hit; want 0", sim.calls) + } + if !got.Same || got.Confidence != 0.9 { + t.Errorf("cached verdict not returned: %+v", got) + } +} + +// TestClassifySimilarPair_RejectsSelfComparison: the pair-key scheme would +// collapse (a,a) to a single key which never tells you anything useful. +func TestClassifySimilarPair_RejectsSelfComparison(t *testing.T) { + id := uuid.New() + svc := NewService(&mockRepo{}, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, &stubSimilarityClassifier{}) + + _, err := svc.ClassifySimilarPair(context.Background(), id, id) + if err == nil { + t.Error("expected error on self-comparison; got nil") + } +} + +// TestClassifySimilarPair_LLMErrorPropagates: classifier errors surface; +// cache is not written. +func TestClassifySimilarPair_LLMErrorPropagates(t *testing.T) { + aID, bID := uuid.New(), uuid.New() + cacheWritten := false + repo := &mockRepo{ + getDiscoveredFn: func(context.Context, uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{}, nil + }, + setSimCacheFn: func(string, enrich.Verdict, time.Duration) error { + cacheWritten = true + return nil + }, + } + sim := &stubSimilarityClassifier{err: errors.New("mistral 500")} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, nil, sim) + + _, err := svc.ClassifySimilarPair(context.Background(), aID, bID) + if err == nil { + t.Fatal("expected error; got nil") + } + if cacheWritten { + t.Error("cache should not be written when classifier errors") + } +} diff --git a/backend/internal/pkg/scrape/scrape.go b/backend/internal/pkg/scrape/scrape.go new file mode 100644 index 0000000..b745ee2 --- /dev/null +++ b/backend/internal/pkg/scrape/scrape.go @@ -0,0 +1,150 @@ +// Package scrape fetches a web page and extracts its visible text for LLM +// context. Not intended for structured data extraction — use goquery directly +// when you want specific fields. +// +// The extraction strategy is deliberately simple: drop scripts/styles/nav/ +// footer, walk the remaining body text, collapse runs of whitespace, truncate. +// Good enough for feeding a market-event page into a prompt; bad for +// anything that depends on document structure. +package scrape + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/PuerkitoBio/goquery" +) + +// DefaultTimeout caps individual HTTP fetches. +const DefaultTimeout = 10 * time.Second + +// DefaultMaxChars bounds the extracted text length. LLM prompts have a token +// budget; cutting here keeps the prompt deterministic and prevents a single +// huge page from dominating a multi-URL context. 4000 chars ≈ 1000 tokens +// assuming German-ish content. +const DefaultMaxChars = 4000 + +// Client wraps an *http.Client + output bound. Zero-value is usable — it +// falls back to http.DefaultClient behavior with DefaultTimeout applied via +// the per-request context. +type Client struct { + HTTP *http.Client + MaxChars int + // UserAgent, if set, overrides the default. Leaving empty lets net/http + // use its default — which some servers block; callers that scrape a lot + // of third-party pages should set a descriptive string. + UserAgent string +} + +// New constructs a Client with sane defaults. +func New(userAgent string) *Client { + return &Client{ + HTTP: &http.Client{ + Timeout: DefaultTimeout, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 5 { + return http.ErrUseLastResponse + } + return nil + }, + }, + MaxChars: DefaultMaxChars, + UserAgent: userAgent, + } +} + +// Fetch retrieves the URL and returns the visible text, truncated to MaxChars. +// Non-2xx responses and HTML parse failures return an error — caller decides +// whether to continue with other URLs or fail the whole operation. +func (c *Client) Fetch(ctx context.Context, url string) (string, error) { + client := c.HTTP + if client == nil { + client = &http.Client{Timeout: DefaultTimeout} + } + maxChars := c.MaxChars + if maxChars <= 0 { + maxChars = DefaultMaxChars + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return "", fmt.Errorf("new request %s: %w", url, err) + } + if c.UserAgent != "" { + req.Header.Set("User-Agent", c.UserAgent) + } + req.Header.Set("Accept", "text/html,application/xhtml+xml;q=0.9,*/*;q=0.8") + req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8") + + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("fetch %s: %w", url, err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("fetch %s: status %d", url, resp.StatusCode) + } + + // Cap the body read at a generous multiple of maxChars so a misbehaving + // server streaming gigabytes can't OOM us. 10x is headroom for whitespace + // stripping to still produce maxChars of useful text. + body, err := io.ReadAll(io.LimitReader(resp.Body, int64(maxChars)*10)) + if err != nil { + return "", fmt.Errorf("read body %s: %w", url, err) + } + + return extractText(body, maxChars) +} + +// extractText walks a parsed HTML body, drops noise nodes, gathers visible +// text, collapses whitespace, and truncates. Exported as a package-level +// helper so tests can exercise the HTML→text path without a live HTTP server. +func extractText(htmlBytes []byte, maxChars int) (string, error) { + doc, err := goquery.NewDocumentFromReader(bytes.NewReader(htmlBytes)) + if err != nil { + return "", fmt.Errorf("parse html: %w", err) + } + + // Remove noise before walking — script/style carry code, nav/footer/aside + // carry boilerplate that pollutes the LLM context. + doc.Find("script, style, nav, footer, aside, noscript, iframe").Remove() + + text := strings.TrimSpace(doc.Find("body").Text()) + if text == "" { + // Some sites don't use
explicitly (fragments, malformed docs); + // fall back to document-level text. + text = strings.TrimSpace(doc.Text()) + } + text = collapseWhitespace(text) + if len(text) > maxChars { + text = text[:maxChars] + } + return text, nil +} + +// collapseWhitespace replaces every run of whitespace with a single space. +// Preserves word boundaries but drops the layout that HTML text extraction +// leaves behind (tabs, long runs of newlines, non-breaking spaces). +func collapseWhitespace(s string) string { + var b strings.Builder + b.Grow(len(s)) + inSpace := false + for _, r := range s { + if r == ' ' || r == '\t' || r == '\n' || r == '\r' || r == ' ' { + if !inSpace { + b.WriteByte(' ') + inSpace = true + } + continue + } + b.WriteRune(r) + inSpace = false + } + return strings.TrimSpace(b.String()) +} diff --git a/backend/internal/pkg/scrape/scrape_test.go b/backend/internal/pkg/scrape/scrape_test.go new file mode 100644 index 0000000..da59d7e --- /dev/null +++ b/backend/internal/pkg/scrape/scrape_test.go @@ -0,0 +1,77 @@ +package scrape + +import ( + "strings" + "testing" +) + +func TestExtractText_StripsNoise(t *testing.T) { + html := []byte(` + + + +Samstag und Sonntag von 10:00 bis 18:00 Uhr.
+foo bar + + baz
`) + got, err := extractText(html, 1000) + if err != nil { + t.Fatalf("extractText: %v", err) + } + if got != "foo bar baz" { + t.Errorf("whitespace not collapsed: %q", got) + } +} + +func TestExtractText_Truncates(t *testing.T) { + // Build a long body. + body := strings.Repeat("a b c ", 2000) // ~12000 chars after collapse + html := []byte("" + body + "
") + got, err := extractText(html, 100) + if err != nil { + t.Fatalf("extractText: %v", err) + } + if len(got) != 100 { + t.Errorf("len(got) = %d; want 100", len(got)) + } +} + +func TestExtractText_FallsBackToDocumentWhenNoBody(t *testing.T) { + // Document fragment without / tags. goquery still parses this + // but .Find("body") returns nothing; we fall back to doc-level text. + html := []byte(`Direktes Fragment.
{#if (data.total ?? 0) > 0} @@ -640,6 +869,30 @@ > Similar +
{#if (row.sources?.length ?? 0) >= 2}{row.hinweis}
{/if} + + {#if row.enrichment_status && row.enrichment_status !== 'pending'} ++ {form.llmEnrichError} +
+ {/if} + {/if}