feat(discovery): manual crawl-enrich-all button + payload display

Replaces the originally-planned async-worker design with operator-
triggered bulk runs (see memory/project_ship2_enrichment.md). Crawl-
enrichment is cheap enough to always run against the whole list but
runs only when the admin clicks — the flow stays predictable and the
crawl itself stays fast.

Endpoints
- POST /admin/discovery/enrichment/crawl-all — 202 + goroutine, mirrors
  the crawl pattern. Per-process CAS gate prevents concurrent runs.
- GET  /admin/discovery/enrichment/crawl-all-status — polled shape
  identical to /crawl-status for UI reuse.

Service RunCrawlEnrichAll iterates enrichment_status='pending' rows,
builds an enrich.Input from each, runs CrawlEnrich (consolidation +
Nominatim geocoding via the shared geocoder), and persists via
SetEnrichment(status=done). Per-row errors count toward Failed and
append to a bounded Errors slice; the pass never halts.

Enrich package refactor
- Enrichment, Sources, Provenance constants moved from discovery ->
  enrich (they are the enrich package's own types; discovery previously
  held them for historical reasons).
- CrawlEnrich now takes a narrow enrich.Input / enrich.Contribution so
  the enrich package no longer imports the parent discovery package.
  This breaks the import cycle that appeared once discovery needed to
  call enrich (the MR 2 structure only worked because no caller went
  in that direction yet).
- LLMEnricher takes an LLMRequest (primitives) instead of a
  DiscoveredMarket. NoopLLMEnricher updated; real Mistral impl lands
  in MR 3b.
- CacheKey signature switched from (DiscoveredMarket) to primitive
  (nameNormalized, stadt, year).

Service geocoder wiring: discovery.NewService gains a Geocoder param
(routes.go passes the shared Nominatim client; the interface lives in
discovery to avoid another circular edge with enrich).

UI: "Run crawl-enrich" button next to "Run crawl"; identical poll +
summary card pattern. Queue row expand shows enrichment status badge
plus the PLZ/Venue/Organizer/Lat-Lng fields inline with per-field
provenance tag.

Tests: three new service tests (happy path, per-row SetEnrichment
failure, empty-queue no-op). Existing enrich package tests updated
for the primitive input signature. All 13 test NewService call-sites
updated for the new geocoder param.
This commit is contained in:
2026-04-24 10:29:58 +02:00
parent dcbf38f6e9
commit afe9d916d6
16 changed files with 897 additions and 209 deletions

View File

@@ -5,26 +5,19 @@ import (
"encoding/hex"
"fmt"
"time"
"marktvogt.de/backend/internal/domain/discovery"
)
// CacheKey derives a stable cache key for a DiscoveredMarket. Two rows with
// the same (normalized name, city, year) share a cache entry — the LLM
// answer doesn't change when the crawler rediscovers the same market.
// CacheKey derives a stable cache key from the (normalized name, city, year)
// tuple. Two rows with the same tuple share a cache entry — the LLM answer
// doesn't change when the crawler rediscovers the same market.
//
// Year comes from StartDatum. A row without a start date uses year=0; those
// entries share a bucket, which is slightly lossy but acceptable — a
// date-less market with an LLM-derived description is rare and regenerating
// on demand is cheap relative to other failure modes.
func CacheKey(m discovery.DiscoveredMarket) string {
year := 0
if m.StartDatum != nil {
year = m.StartDatum.Year()
}
// name_normalized is already lowercased + stripped elsewhere; stadt is
// A row without a known start date should pass year=0; those entries share a
// bucket, which is slightly lossy but acceptable — date-less markets with
// LLM-derived descriptions are rare and regenerating on demand is cheap.
func CacheKey(nameNormalized, stadt string, year int) string {
// nameNormalized is already lowercased + stripped by the caller; stadt is
// lowercased here to avoid cache misses from casing drift.
raw := fmt.Sprintf("%s|%s|%d", m.NameNormalized, lowerASCII(m.Stadt), year)
raw := fmt.Sprintf("%s|%s|%d", nameNormalized, lowerASCII(stadt), year)
sum := sha256.Sum256([]byte(raw))
return hex.EncodeToString(sum[:])
}

View File

@@ -3,8 +3,6 @@ package enrich
import (
"context"
"time"
"marktvogt.de/backend/internal/domain/discovery"
)
// Geocoder is the narrow interface crawl-enrich depends on for lat/lng.
@@ -13,49 +11,66 @@ type Geocoder interface {
Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error)
}
// Contribution is the subset of one source's data crawl-enrich reads. Callers
// adapt their domain-specific contribution type to this before invoking
// CrawlEnrich — keeps the enrich package free of a reverse import.
type Contribution struct {
PLZ string
Venue string
Organizer string
}
// Input is the read-only context CrawlEnrich needs: the market's city and
// country (for geocoding) plus the per-source contributions already gathered
// by the crawler. All other domain-row fields are irrelevant to the crawl pass.
type Input struct {
Stadt string
Land string
Contributions []Contribution
}
// CrawlEnrich fills as many fields as possible from data already in hand —
// the DiscoveredMarket's SourceContributions and (optionally) a geocoder.
// Returns a fully-populated Sources map so downstream code can tell which
// pass contributed each field.
// the caller's Contribution list and (optionally) a geocoder. Returns a
// fully-populated Sources map so downstream code can tell which pass
// contributed each field.
//
// No LLM, no AI, no paid APIs. Nominatim counts as crawl-enrich because the
// call is deterministic and already rate-limited by pkg/geocode.
//
// geocoder may be nil; lat/lng simply stay empty in that case.
func CrawlEnrich(ctx context.Context, m discovery.DiscoveredMarket, geocoder Geocoder) discovery.Enrichment {
out := discovery.Enrichment{Sources: discovery.EnrichmentSources{}}
func CrawlEnrich(ctx context.Context, in Input, geocoder Geocoder) Enrichment {
out := Enrichment{Sources: Sources{}}
// Consolidate text fields from contributions — first non-empty wins.
// Order is deterministic because the crawler's Merge step already sorts
// contributions by source name.
for _, c := range m.SourceContributions {
// Order is deterministic because the caller's list is expected to be
// pre-sorted (crawler Merge step orders by source name).
for _, c := range in.Contributions {
if out.PLZ == "" && c.PLZ != "" {
out.PLZ = c.PLZ
out.Sources["plz"] = discovery.EnrichmentProvenanceCrawl
out.Sources["plz"] = ProvenanceCrawl
}
if out.Venue == "" && c.Venue != "" {
out.Venue = c.Venue
out.Sources["venue"] = discovery.EnrichmentProvenanceCrawl
out.Sources["venue"] = ProvenanceCrawl
}
if out.Organizer == "" && c.Organizer != "" {
out.Organizer = c.Organizer
out.Sources["organizer"] = discovery.EnrichmentProvenanceCrawl
out.Sources["organizer"] = ProvenanceCrawl
}
}
// Geocode from (city, plz) — only if we have enough input to be useful.
// The plz we just consolidated is preferred; fall back to the market's
// top-level stadt when plz is missing.
if geocoder != nil && m.Stadt != "" {
lat, lng, err := geocoder.Geocode(ctx, "", m.Stadt, out.PLZ, landToISO(m.Land))
// The plz we just consolidated is preferred.
if geocoder != nil && in.Stadt != "" {
lat, lng, err := geocoder.Geocode(ctx, "", in.Stadt, out.PLZ, landToISO(in.Land))
// Geocoder failures are non-fatal — lat/lng stay empty, other crawl-
// enriched fields still persist. An LLM pass later cannot fill
// lat/lng either, so failures just mean that row gets no coordinates.
if err == nil && lat != nil && lng != nil {
out.Lat = lat
out.Lng = lng
out.Sources["lat"] = discovery.EnrichmentProvenanceCrawl
out.Sources["lng"] = discovery.EnrichmentProvenanceCrawl
out.Sources["lat"] = ProvenanceCrawl
out.Sources["lng"] = ProvenanceCrawl
}
}
@@ -66,9 +81,9 @@ func CrawlEnrich(ctx context.Context, m discovery.DiscoveredMarket, geocoder Geo
return out
}
// landToISO maps the discovery Land value to an ISO-2 country code for
// Nominatim's countrycodes parameter. Returns empty string for unknowns so
// the geocoder runs without a country filter.
// landToISO maps the Land value to an ISO-2 country code for Nominatim's
// countrycodes parameter. Returns empty string for unknowns so the geocoder
// runs without a country filter.
func landToISO(land string) string {
switch land {
case "Deutschland":

View File

@@ -4,9 +4,6 @@ import (
"context"
"errors"
"testing"
"time"
"marktvogt.de/backend/internal/domain/discovery"
)
// stubGeocoder returns canned lat/lng. nil lat/lng simulate "not found".
@@ -28,8 +25,7 @@ func ptrFloat(v float64) *float64 { return &v }
const stallhof = "Stallhof"
func TestCrawlEnrich_EmptyInputs(t *testing.T) {
m := discovery.DiscoveredMarket{}
got := CrawlEnrich(context.Background(), m, nil)
got := CrawlEnrich(context.Background(), Input{}, nil)
if got.PLZ != "" || got.Venue != "" || got.Organizer != "" {
t.Errorf("expected all fields empty, got %+v", got)
}
@@ -42,18 +38,18 @@ func TestCrawlEnrich_EmptyInputs(t *testing.T) {
}
func TestCrawlEnrich_SingleSource(t *testing.T) {
m := discovery.DiscoveredMarket{
in := Input{
Stadt: "Dresden",
SourceContributions: []discovery.SourceContribution{
{SourceName: "marktkalendarium", PLZ: "01067", Venue: stallhof, Organizer: "Tourismusverband"},
Contributions: []Contribution{
{PLZ: "01067", Venue: stallhof, Organizer: "Tourismusverband"},
},
}
got := CrawlEnrich(context.Background(), m, nil)
got := CrawlEnrich(context.Background(), in, nil)
if got.PLZ != "01067" || got.Venue != stallhof || got.Organizer != "Tourismusverband" {
t.Errorf("unexpected payload: %+v", got)
}
if got.Sources["plz"] != discovery.EnrichmentProvenanceCrawl {
t.Errorf("plz provenance: got %q, want %q", got.Sources["plz"], discovery.EnrichmentProvenanceCrawl)
if got.Sources["plz"] != ProvenanceCrawl {
t.Errorf("plz provenance: got %q, want %q", got.Sources["plz"], ProvenanceCrawl)
}
if got.EnrichedAt == nil {
t.Error("expected EnrichedAt to be stamped when any field filled")
@@ -63,14 +59,14 @@ func TestCrawlEnrich_SingleSource(t *testing.T) {
func TestCrawlEnrich_MultiSourceFirstWins(t *testing.T) {
// Two sources disagree on PLZ: first non-empty wins. Order is the
// crawler-produced contribution order (alphabetical by source name).
m := discovery.DiscoveredMarket{
in := Input{
Stadt: "Dresden",
SourceContributions: []discovery.SourceContribution{
{SourceName: "marktkalendarium", PLZ: "01067"},
{SourceName: "mittelaltermarkt_online", PLZ: "01099"},
Contributions: []Contribution{
{PLZ: "01067"},
{PLZ: "01099"},
},
}
got := CrawlEnrich(context.Background(), m, nil)
got := CrawlEnrich(context.Background(), in, nil)
if got.PLZ != "01067" {
t.Errorf("expected first source's PLZ, got %q", got.PLZ)
}
@@ -79,14 +75,14 @@ func TestCrawlEnrich_MultiSourceFirstWins(t *testing.T) {
func TestCrawlEnrich_MultiSourceFillsGaps(t *testing.T) {
// Source A contributes PLZ only; source B contributes venue only; both
// end up in the payload.
m := discovery.DiscoveredMarket{
in := Input{
Stadt: "Dresden",
SourceContributions: []discovery.SourceContribution{
{SourceName: "marktkalendarium", PLZ: "01067"},
{SourceName: "mittelaltermarkt_online", Venue: stallhof},
Contributions: []Contribution{
{PLZ: "01067"},
{Venue: stallhof},
},
}
got := CrawlEnrich(context.Background(), m, nil)
got := CrawlEnrich(context.Background(), in, nil)
if got.PLZ != "01067" {
t.Errorf("PLZ from A missing: %q", got.PLZ)
}
@@ -96,15 +92,13 @@ func TestCrawlEnrich_MultiSourceFillsGaps(t *testing.T) {
}
func TestCrawlEnrich_GeocoderCalled(t *testing.T) {
m := discovery.DiscoveredMarket{
Stadt: "Dresden",
Land: "Deutschland",
SourceContributions: []discovery.SourceContribution{
{SourceName: "a", PLZ: "01067"},
},
in := Input{
Stadt: "Dresden",
Land: "Deutschland",
Contributions: []Contribution{{PLZ: "01067"}},
}
sg := &stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)}
got := CrawlEnrich(context.Background(), m, sg)
got := CrawlEnrich(context.Background(), in, sg)
if got.Lat == nil || *got.Lat != 51.05 {
t.Errorf("Lat = %v; want 51.05", got.Lat)
@@ -112,8 +106,8 @@ func TestCrawlEnrich_GeocoderCalled(t *testing.T) {
if got.Lng == nil || *got.Lng != 13.74 {
t.Errorf("Lng = %v; want 13.74", got.Lng)
}
if got.Sources["lat"] != discovery.EnrichmentProvenanceCrawl {
t.Errorf("lat provenance: got %q, want %q", got.Sources["lat"], discovery.EnrichmentProvenanceCrawl)
if got.Sources["lat"] != ProvenanceCrawl {
t.Errorf("lat provenance: got %q, want %q", got.Sources["lat"], ProvenanceCrawl)
}
// Land → ISO-2 mapping sanity.
if sg.lastCountry != "de" {
@@ -125,14 +119,14 @@ func TestCrawlEnrich_GeocoderCalled(t *testing.T) {
}
func TestCrawlEnrich_GeocoderFailureNonFatal(t *testing.T) {
m := discovery.DiscoveredMarket{
in := Input{
Stadt: "Dresden",
SourceContributions: []discovery.SourceContribution{
{SourceName: "a", PLZ: "01067", Venue: stallhof},
Contributions: []Contribution{
{PLZ: "01067", Venue: stallhof},
},
}
sg := &stubGeocoder{err: errors.New("nominatim down")}
got := CrawlEnrich(context.Background(), m, sg)
got := CrawlEnrich(context.Background(), in, sg)
// Non-geocode fields must still have landed.
if got.PLZ != "01067" || got.Venue != stallhof {
@@ -147,13 +141,9 @@ func TestCrawlEnrich_GeocoderFailureNonFatal(t *testing.T) {
}
func TestCrawlEnrich_GeocoderNotCalledWithoutStadt(t *testing.T) {
m := discovery.DiscoveredMarket{
SourceContributions: []discovery.SourceContribution{
{SourceName: "a", PLZ: "01067"},
},
}
in := Input{Contributions: []Contribution{{PLZ: "01067"}}}
sg := &stubGeocoder{lat: ptrFloat(1), lng: ptrFloat(1)}
got := CrawlEnrich(context.Background(), m, sg)
got := CrawlEnrich(context.Background(), in, sg)
if sg.lastCity != "" {
t.Error("geocoder called despite empty Stadt")
}
@@ -164,12 +154,12 @@ func TestCrawlEnrich_GeocoderNotCalledWithoutStadt(t *testing.T) {
func TestMerge_BaseWinsOverOverlay(t *testing.T) {
// Invariant: a field set by crawl (base) is never overwritten by llm (overlay).
base := discovery.Enrichment{
base := Enrichment{
PLZ: "01067",
Venue: stallhof,
Sources: discovery.EnrichmentSources{"plz": "crawl", "venue": "crawl"},
Sources: Sources{"plz": ProvenanceCrawl, "venue": ProvenanceCrawl},
}
overlay := discovery.Enrichment{
overlay := Enrichment{
PLZ: "WRONG",
Venue: "WRONG",
Category: "mittelaltermarkt",
@@ -177,11 +167,11 @@ func TestMerge_BaseWinsOverOverlay(t *testing.T) {
InputTokens: 100,
OutputTokens: 50,
Model: "mistral-large-latest",
Sources: discovery.EnrichmentSources{
"plz": "llm",
"venue": "llm",
"category": "llm",
"description": "llm",
Sources: Sources{
"plz": ProvenanceLLM,
"venue": ProvenanceLLM,
"category": ProvenanceLLM,
"description": ProvenanceLLM,
},
}
got := Merge(base, overlay)
@@ -198,10 +188,10 @@ func TestMerge_BaseWinsOverOverlay(t *testing.T) {
if got.Description != "Ein großer Markt." {
t.Errorf("Description should come from overlay: got %q", got.Description)
}
if got.Sources["plz"] != "crawl" {
if got.Sources["plz"] != ProvenanceCrawl {
t.Errorf("plz provenance overwritten: got %q, want crawl", got.Sources["plz"])
}
if got.Sources["category"] != "llm" {
if got.Sources["category"] != ProvenanceLLM {
t.Errorf("category provenance lost: got %q, want llm", got.Sources["category"])
}
if got.InputTokens != 100 || got.OutputTokens != 50 {
@@ -210,11 +200,11 @@ func TestMerge_BaseWinsOverOverlay(t *testing.T) {
}
func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) {
base := discovery.Enrichment{Sources: discovery.EnrichmentSources{}}
overlay := discovery.Enrichment{
base := Enrichment{Sources: Sources{}}
overlay := Enrichment{
Lat: ptrFloat(51.05),
Lng: ptrFloat(13.74),
Sources: discovery.EnrichmentSources{"lat": "crawl", "lng": "crawl"},
Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl},
}
got := Merge(base, overlay)
if got.Lat == nil || *got.Lat != 51.05 {
@@ -222,10 +212,10 @@ func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) {
}
// Now base already has Lat; overlay must not overwrite.
baseWithLat := discovery.Enrichment{
baseWithLat := Enrichment{
Lat: ptrFloat(50.00),
Lng: ptrFloat(10.00),
Sources: discovery.EnrichmentSources{"lat": "crawl", "lng": "crawl"},
Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl},
}
got = Merge(baseWithLat, overlay)
if *got.Lat != 50.00 {
@@ -235,21 +225,19 @@ func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) {
// Sanity: CacheKey must be stable across re-runs and differ across inputs.
func TestCacheKey_StableAndDistinct(t *testing.T) {
start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
a := discovery.DiscoveredMarket{NameNormalized: "ritterfest dresden", Stadt: "Dresden", StartDatum: &start}
b := discovery.DiscoveredMarket{NameNormalized: "ritterfest dresden", Stadt: "Dresden", StartDatum: &start}
c := discovery.DiscoveredMarket{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", StartDatum: &start}
aKey := CacheKey("ritterfest dresden", "Dresden", 2026)
bKey := CacheKey("ritterfest dresden", "Dresden", 2026)
cKey := CacheKey("ritterfest leipzig", "Leipzig", 2026)
if CacheKey(a) != CacheKey(b) {
if aKey != bKey {
t.Error("identical inputs must produce identical keys")
}
if CacheKey(a) == CacheKey(c) {
if aKey == cKey {
t.Error("different cities must produce different keys")
}
// Stadt casing drift doesn't change the key.
d := a
d.Stadt = "DRESDEN"
if CacheKey(a) != CacheKey(d) {
dKey := CacheKey("ritterfest dresden", "DRESDEN", 2026)
if aKey != dKey {
t.Error("stadt casing drift should not change key")
}
}

View File

@@ -1,21 +1,58 @@
// Package enrich derives additional market metadata in two passes.
//
// Pass A (crawl-enrich) is deterministic and free. It consolidates fields
// already present in the DiscoveredMarket's SourceContributions (PLZ, venue,
// organizer, bundesland) and can geocode city+PLZ via Nominatim. First
// non-empty value wins per field; provenance is recorded as "crawl".
// from per-source contributions (PLZ, venue, organizer) and geocodes city
// + PLZ via Nominatim. First non-empty value wins per field; provenance is
// recorded as "crawl".
//
// Pass B (llm-enrich, MR 3) runs only for fields Pass A could not fill and
// does cost money. Provenance is recorded as "llm".
// Pass B (llm-enrich) runs only for fields Pass A could not fill and does
// cost money. Provenance is recorded as "llm".
//
// The Merge helper in this file combines partial payloads while preserving
// crawl-over-llm preference: a crawl-written field is never overwritten by
// an llm pass. Eval harness (MR 5) uses the per-field provenance to measure
// crawl-enrich and llm-enrich quality separately.
// Merge combines partial payloads preserving crawl-over-llm preference: a
// crawl-written field is never overwritten by an llm pass. The eval harness
// uses per-field provenance to measure the two passes separately.
//
// The package is intentionally standalone — it does NOT import the parent
// discovery package. Callers adapt domain types to the narrow Contribution
// input before invoking CrawlEnrich, which keeps the dependency edge one-way
// (discovery imports enrich; enrich imports no domain code).
package enrich
import (
"marktvogt.de/backend/internal/domain/discovery"
import "time"
// Enrichment is the JSONB payload persisted on discovered_markets.enrichment.
// Every field is optional; Sources records which pass filled each field so
// the eval harness can separate crawl-enrich (free, deterministic) from
// llm-enrich (paid, synthesised) contributions.
//
// Field shape is intentionally flat — promotion to typed columns is easier
// from a flat blob than from nested structures.
type Enrichment struct {
PLZ string `json:"plz,omitempty"`
Venue string `json:"venue,omitempty"`
Organizer string `json:"organizer,omitempty"`
Lat *float64 `json:"lat,omitempty"`
Lng *float64 `json:"lng,omitempty"`
Category string `json:"category,omitempty"`
OpeningHours string `json:"opening_hours,omitempty"`
Description string `json:"description,omitempty"`
Sources Sources `json:"sources,omitempty"`
EnrichedAt *time.Time `json:"enriched_at,omitempty"`
Model string `json:"model,omitempty"`
InputTokens int `json:"input_tokens,omitempty"`
OutputTokens int `json:"output_tokens,omitempty"`
}
// Sources maps each enrichment field name to the pass that filled it.
// Values are Provenance constants.
type Sources map[string]string
// Provenance constants for Sources entries. Crawl covers both offline
// consolidation of per-source contributions and Nominatim geocoding — both
// deterministic, neither counts against the LLM budget.
const (
ProvenanceCrawl = "crawl"
ProvenanceLLM = "llm"
)
// Merge combines two Enrichment payloads. The base is taken as authoritative;
@@ -26,7 +63,7 @@ import (
// llm-enrich. This enforces the "prefer crawl over llm" invariant: the LLM
// pass cannot clobber a field the crawler already filled in, even if the
// LLM is confident.
func Merge(base, overlay discovery.Enrichment) discovery.Enrichment {
func Merge(base, overlay Enrichment) Enrichment {
out := base
if out.PLZ == "" {
@@ -56,7 +93,7 @@ func Merge(base, overlay discovery.Enrichment) discovery.Enrichment {
// Merge provenance maps with base-wins semantics.
if out.Sources == nil && len(overlay.Sources) > 0 {
out.Sources = discovery.EnrichmentSources{}
out.Sources = Sources{}
}
for k, v := range overlay.Sources {
if _, exists := out.Sources[k]; !exists {

View File

@@ -1,28 +1,35 @@
package enrich
import (
"context"
import "context"
"marktvogt.de/backend/internal/domain/discovery"
)
// LLMRequest is the narrow context the LLM enricher needs: identifying
// fields, the quellen URLs for web-search grounding, and the crawl-enrich
// payload already computed so the LLM can be instructed not to overwrite.
type LLMRequest struct {
MarktName string
Stadt string
Land string
Bundesland string
Quellen []string
// Partial is the output of crawl-enrich for this row. The LLM
// implementation MUST NOT overwrite any field set here — Merge enforces
// the invariant regardless, but the prompt should respect it to save
// tokens.
Partial Enrichment
}
// LLMEnricher fills enrichment fields that crawl-enrich could not reach.
// Implementations should only write fields missing from `partial` — never
// overwrite a field the crawler already filled. Merge() enforces this too,
// but respecting it in the implementation keeps token usage down.
//
// Returned payload must populate Sources with "llm" for every field it
// writes; Model and token counts are required so the eval harness can
// attribute cost.
type LLMEnricher interface {
EnrichMissing(ctx context.Context, partial discovery.Enrichment, row discovery.DiscoveredMarket) (discovery.Enrichment, error)
EnrichMissing(ctx context.Context, req LLMRequest) (Enrichment, error)
}
// NoopLLMEnricher returns the partial unchanged. Used by tests and as a
// default when AI integration is disabled. Actual Mistral-backed
// implementation lands in MR 3.
// default when AI integration is disabled.
type NoopLLMEnricher struct{}
func (NoopLLMEnricher) EnrichMissing(_ context.Context, partial discovery.Enrichment, _ discovery.DiscoveredMarket) (discovery.Enrichment, error) {
return partial, nil
func (NoopLLMEnricher) EnrichMissing(_ context.Context, req LLMRequest) (Enrichment, error) {
return req.Partial, nil
}

View File

@@ -32,6 +32,18 @@ type Handler struct {
lastFinishedAt time.Time
lastSummary *CrawlSummary
lastError string
// Async crawl-enrich-all state (manual bulk operator action).
// Reuses the CAS-gate + mutex pattern of the crawl flow above so the
// admin UI can poll a dedicated status endpoint without colliding with a
// running crawl.
enrichRunning atomic.Bool
enrichMu sync.RWMutex
// Guarded by enrichMu:
enrichStartedAt time.Time
enrichFinishedAt time.Time
enrichSummary *CrawlEnrichSummary
enrichError string
}
// NewHandler constructs a Handler. manualRateLimitPerHour controls how
@@ -285,6 +297,72 @@ func (h *Handler) Similar(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"data": matches})
}
// RunCrawlEnrichAll kicks off the manual bulk crawl-enrich pass in the
// background and returns 202 immediately. Exactly one run may be in progress
// at a time (CAS gate). Completion is observable via
// GET /admin/discovery/enrichment/crawl-all-status.
func (h *Handler) RunCrawlEnrichAll(c *gin.Context) {
if !h.enrichRunning.CompareAndSwap(false, true) {
apiErr := apierror.TooManyRequests("A crawl-enrich run is already in progress")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
h.enrichMu.Lock()
h.enrichStartedAt = time.Now().UTC()
h.enrichFinishedAt = time.Time{}
h.enrichSummary = nil
h.enrichError = ""
h.enrichMu.Unlock()
go h.runEnrichAsync()
c.JSON(http.StatusAccepted, gin.H{
"data": gin.H{
"status": "started",
"message": "Crawl-enrich started in background. Poll /api/v1/admin/discovery/enrichment/crawl-all-status for completion.",
},
})
}
// runEnrichAsync runs RunCrawlEnrichAll with a detached context. 10m cap is
// generous for Nominatim's 1rps: a 600-row queue is the worst case we expect.
func (h *Handler) runEnrichAsync() {
defer h.enrichRunning.Store(false)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
summary, err := h.service.RunCrawlEnrichAll(ctx)
h.enrichMu.Lock()
h.enrichFinishedAt = time.Now().UTC()
if err != nil {
h.enrichError = err.Error()
slog.ErrorContext(ctx, "async crawl-enrich failed", "error", err)
} else {
sCopy := summary
h.enrichSummary = &sCopy
}
h.enrichMu.Unlock()
}
// CrawlEnrichStatus returns the state of the most recent async crawl-enrich.
// Shape mirrors CrawlStatus for UI reuse.
func (h *Handler) CrawlEnrichStatus(c *gin.Context) {
h.enrichMu.RLock()
defer h.enrichMu.RUnlock()
c.JSON(http.StatusOK, gin.H{
"data": gin.H{
"running": h.enrichRunning.Load(),
"started_at": h.enrichStartedAt,
"finished_at": h.enrichFinishedAt,
"summary": h.enrichSummary,
"error": h.enrichError,
},
})
}
func currentUserID(c *gin.Context) (uuid.UUID, bool) {
raw, exists := c.Get("user_id")
if !exists {

View File

@@ -55,6 +55,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
nil,
)
h := NewHandler(svc, 0) // rate limit disabled
@@ -89,7 +90,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
// TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler.
func TestCrawlStatusInitialState(t *testing.T) {
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil)
h := NewHandler(svc, 0)
w := httptest.NewRecorder()
@@ -139,7 +140,7 @@ func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) {
started: make(chan struct{}),
release: make(chan struct{}),
}
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil)
h := NewHandler(svc, 0) // rate limit disabled
// First request — returns 202 and spawns goroutine.
@@ -180,6 +181,7 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
nil,
)
// 1 per hour window.
h := NewHandler(svc, 1)

View File

@@ -6,6 +6,8 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"marktvogt.de/backend/internal/domain/discovery/enrich"
)
// Compile-time guard: mockRepo must fully satisfy Repository.
@@ -28,11 +30,12 @@ type mockRepo struct {
// Populated by newMockRepo(); nil when insertDiscFn is set externally.
inserted []DiscoveredMarket
// Enrichment hooks (optional). Tests set whichever they need; nil falls
// enrich.Enrichment hooks (optional). Tests set whichever they need; nil falls
// through to a no-op / empty response.
setEnrichmentFn func(id uuid.UUID, payload Enrichment, status string) error
getCacheFn func(key string) (Enrichment, bool, error)
setCacheFn func(key string, payload Enrichment, ttl time.Duration) error
setEnrichmentFn func(id uuid.UUID, payload enrich.Enrichment, status string) error
getCacheFn func(key string) (enrich.Enrichment, bool, error)
setCacheFn func(key string, payload enrich.Enrichment, ttl time.Duration) error
listPendingEnrichFn func(limit int) ([]DiscoveredMarket, error)
}
func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) {
@@ -95,24 +98,30 @@ func (m *mockRepo) Stats(ctx context.Context, forwardMonths, recentErrorsLimit i
func (m *mockRepo) UpdatePending(ctx context.Context, id uuid.UUID, f UpdatePendingFields, nn *string) error {
return nil
}
func (m *mockRepo) SetEnrichment(_ context.Context, id uuid.UUID, payload Enrichment, status string) error {
func (m *mockRepo) SetEnrichment(_ context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error {
if m.setEnrichmentFn != nil {
return m.setEnrichmentFn(id, payload, status)
}
return nil
}
func (m *mockRepo) GetEnrichmentCache(_ context.Context, key string) (Enrichment, bool, error) {
func (m *mockRepo) GetEnrichmentCache(_ context.Context, key string) (enrich.Enrichment, bool, error) {
if m.getCacheFn != nil {
return m.getCacheFn(key)
}
return Enrichment{}, false, nil
return enrich.Enrichment{}, false, nil
}
func (m *mockRepo) SetEnrichmentCache(_ context.Context, key string, payload Enrichment, ttl time.Duration) error {
func (m *mockRepo) SetEnrichmentCache(_ context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error {
if m.setCacheFn != nil {
return m.setCacheFn(key, payload, ttl)
}
return nil
}
func (m *mockRepo) ListPendingEnrichment(_ context.Context, limit int) ([]DiscoveredMarket, error) {
if m.listPendingEnrichFn != nil {
return m.listPendingEnrichFn(limit)
}
return nil, nil
}
// noopLinkVerifier passes every URL — used by tests to isolate from network.
type noopLinkVerifier struct{}

View File

@@ -5,6 +5,8 @@ import (
"time"
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/discovery/enrich"
)
// SourceContribution captures what one source reported about a market.
@@ -68,41 +70,17 @@ type DiscoveredMarket struct {
ReviewedBy *uuid.UUID `json:"reviewed_by"`
CreatedEditionID *uuid.UUID `json:"created_edition_id"`
// Enrichment payload + worker lifecycle. See Enrichment for the contract.
Enrichment Enrichment `json:"enrichment"`
EnrichmentStatus string `json:"enrichment_status"` // pending|done|failed|skipped
EnrichmentAttempts int `json:"enrichment_attempts"`
EnrichedAt *time.Time `json:"enriched_at"`
// Enrichment payload + worker lifecycle. See enrich.Enrichment for the contract.
Enrichment enrich.Enrichment `json:"enrichment"`
EnrichmentStatus string `json:"enrichment_status"` // pending|done|failed|skipped
EnrichmentAttempts int `json:"enrichment_attempts"`
EnrichedAt *time.Time `json:"enriched_at"`
}
// Enrichment is the JSONB payload for discovered_markets.enrichment.
// Every field is optional; `Sources` records which pass filled each field so
// the eval harness can separate crawl-enrich (free, deterministic) from
// llm-enrich (paid, synthesised) contributions.
//
// Field shape is intentionally flat — promotion to typed columns is easier
// from a flat blob than from nested structures.
type Enrichment struct {
PLZ string `json:"plz,omitempty"`
Venue string `json:"venue,omitempty"`
Organizer string `json:"organizer,omitempty"`
Lat *float64 `json:"lat,omitempty"`
Lng *float64 `json:"lng,omitempty"`
Category string `json:"category,omitempty"`
OpeningHours string `json:"opening_hours,omitempty"`
Description string `json:"description,omitempty"`
Sources EnrichmentSources `json:"sources,omitempty"`
EnrichedAt *time.Time `json:"enriched_at,omitempty"`
Model string `json:"model,omitempty"`
InputTokens int `json:"input_tokens,omitempty"`
OutputTokens int `json:"output_tokens,omitempty"`
}
// EnrichmentSources maps each enrichment field name to the pass that filled it.
// Values are EnrichmentProvenance constants.
type EnrichmentSources map[string]string
// EnrichmentStatus constants for discovered_markets.enrichment_status.
// The payload type + provenance constants live in the enrich package; this
// status string is a DB column value used by repository queries, kept here
// alongside the other DB-lifecycle constants.
const (
EnrichmentStatusPending = "pending"
EnrichmentStatusDone = "done"
@@ -110,14 +88,6 @@ const (
EnrichmentStatusSkipped = "skipped"
)
// EnrichmentProvenance values for EnrichmentSources entries. The crawl pass
// covers both offline consolidation of source_contributions and Nominatim
// geocoding — both deterministic, neither counts against the LLM budget.
const (
EnrichmentProvenanceCrawl = "crawl"
EnrichmentProvenanceLLM = "llm"
)
// RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year).
type RejectedDiscovery struct {
ID uuid.UUID `json:"id"`

View File

@@ -11,6 +11,8 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"marktvogt.de/backend/internal/domain/discovery/enrich"
)
type Repository interface {
@@ -31,10 +33,13 @@ type Repository interface {
// Enrichment persistence. SetEnrichment writes payload + status + bumps
// attempts + stamps enriched_at in one UPDATE so the worker stays idempotent.
SetEnrichment(ctx context.Context, id uuid.UUID, payload Enrichment, status string) error
SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error
// ListPendingEnrichment returns queue rows awaiting their first enrichment
// pass. Ordered by discovered_at ASC so older rows get processed first.
ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error)
// Cache operations keyed on sha256(name_normalized|stadt|year).
GetEnrichmentCache(ctx context.Context, key string) (Enrichment, bool, error)
SetEnrichmentCache(ctx context.Context, key string, payload Enrichment, ttl time.Duration) error
GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error)
SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error
}
// SeriesCandidate is a minimal projection used for name-normalization comparison in Go.
@@ -160,6 +165,37 @@ func (r *pgRepository) CountQueue(ctx context.Context, status string) (int, erro
return n, err
}
// ListPendingEnrichment selects queue rows still awaiting enrichment. The
// status='pending' filter excludes accepted/rejected rows — enrichment only
// makes sense for review candidates. Ordered by discovered_at ASC so an
// operator running the bulk action sees the oldest queue entries improve first.
func (r *pgRepository) ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error) {
rows, err := r.pool.Query(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''),
coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id,
sources, source_contributions,
enrichment, enrichment_status, enrichment_attempts, enriched_at
FROM discovered_markets
WHERE status = 'pending' AND enrichment_status = $1
ORDER BY discovered_at ASC
LIMIT $2`, EnrichmentStatusPending, limit)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]DiscoveredMarket, 0)
for rows.Next() {
d, err := scanDiscoveredMarket(rows)
if err != nil {
return nil, err
}
out = append(out, d)
}
return out, rows.Err()
}
func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) {
row := r.pool.QueryRow(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
@@ -315,7 +351,7 @@ LIMIT $1`, recentErrorsLimit)
// if the caller's read-modify-write is racy. enriched_at is only stamped for
// terminal states ('done' / 'failed'), not 'skipped', to distinguish "we ran
// and nothing changed" from "we ran and succeeded trivially".
func (r *pgRepository) SetEnrichment(ctx context.Context, id uuid.UUID, payload Enrichment, status string) error {
func (r *pgRepository) SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal enrichment: %w", err)
@@ -347,28 +383,28 @@ WHERE id = $1`, enrichedExpr), id, payloadJSON, status)
// expired, (_, false, nil) on a miss or expired entry. Expired entries are
// treated as misses so the caller refetches rather than relying on a
// background pruner.
func (r *pgRepository) GetEnrichmentCache(ctx context.Context, key string) (Enrichment, bool, error) {
func (r *pgRepository) GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error) {
var payloadJSON []byte
err := r.pool.QueryRow(ctx, `
SELECT payload FROM enrichment_cache
WHERE cache_key = $1
AND (expires_at IS NULL OR expires_at > now())`, key).Scan(&payloadJSON)
if errors.Is(err, pgx.ErrNoRows) {
return Enrichment{}, false, nil
return enrich.Enrichment{}, false, nil
}
if err != nil {
return Enrichment{}, false, fmt.Errorf("cache get: %w", err)
return enrich.Enrichment{}, false, fmt.Errorf("cache get: %w", err)
}
var e Enrichment
var e enrich.Enrichment
if err := json.Unmarshal(payloadJSON, &e); err != nil {
return Enrichment{}, false, fmt.Errorf("unmarshal cache payload: %w", err)
return enrich.Enrichment{}, false, fmt.Errorf("unmarshal cache payload: %w", err)
}
return e, true, nil
}
// SetEnrichmentCache writes or replaces a cache entry. ttl==0 means "no
// expiry" (expires_at stays NULL). Callers typically use 30 days.
func (r *pgRepository) SetEnrichmentCache(ctx context.Context, key string, payload Enrichment, ttl time.Duration) error {
func (r *pgRepository) SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error {
payloadJSON, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal cache payload: %w", err)

View File

@@ -28,5 +28,11 @@ func RegisterRoutes(
admin.POST("/crawl-manual", h.Crawl)
// Async crawl status polling.
admin.GET("/crawl-status", h.CrawlStatus)
// Manual bulk crawl-enrich — deterministic consolidation + Nominatim
// geocoding applied to every enrichment_status='pending' row. Async
// (202 + polling) because the Nominatim 1rps rate means a full queue
// can take minutes.
admin.POST("/enrichment/crawl-all", h.RunCrawlEnrichAll)
admin.GET("/enrichment/crawl-all-status", h.CrawlEnrichStatus)
}
}

View File

@@ -12,9 +12,21 @@ import (
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/discovery/enrich"
"marktvogt.de/backend/internal/domain/market"
)
// Geocoder is the narrow interface the enrichment flow depends on.
// *pkg/geocode.Geocoder satisfies it. Declared here instead of in the
// sub-package enrich/ to avoid an import cycle (enrich depends on discovery
// for the DiscoveredMarket / Enrichment types).
//
// Structurally identical to enrich.Geocoder, so a concrete *geocode.Geocoder
// satisfies both and can be passed across the package boundary.
type Geocoder interface {
Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error)
}
type marketCreator interface {
Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error)
CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error)
@@ -39,15 +51,21 @@ type Service struct {
crawler crawlerRunner
marketCreator marketCreator
linkChecker linkVerifier
// geocoder is optional — nil means crawl-enrich skips lat/lng. Wired from
// server/routes.go using the shared Nominatim client (1 rps limited).
geocoder Geocoder
}
// NewService constructs a Service wired for the crawler-driven Crawl path.
func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service {
// geocoder may be nil; in that case crawl-enrich runs consolidation only and
// skips the lat/lng step.
func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder) *Service {
return &Service{
repo: repo,
crawler: cr,
marketCreator: mc,
linkChecker: lc,
geocoder: gc,
}
}
@@ -446,6 +464,21 @@ func (s *Service) FindSimilarToQueueEntry(ctx context.Context, id uuid.UUID) ([]
return FindSimilar(target, candidates, 0.5), nil
}
// contributionsForEnrich maps the rich discovery-domain SourceContribution
// down to the narrow enrich.Contribution shape — keeps the enrich package
// free of a reverse import on discovery.
func contributionsForEnrich(cs []SourceContribution) []enrich.Contribution {
out := make([]enrich.Contribution, 0, len(cs))
for _, c := range cs {
out = append(out, enrich.Contribution{
PLZ: c.PLZ,
Venue: c.Venue,
Organizer: c.Organizer,
})
}
return out
}
// toContributions translates the crawler's per-source RawEvents into the
// discovery-domain SourceContribution shape for JSONB persistence.
func toContributions(raws []crawler.RawEvent) []SourceContribution {
@@ -485,3 +518,88 @@ func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UU
}
return nil
}
// CrawlEnrichSummary reports the outcome of one RunCrawlEnrichAll pass.
// Mirrors CrawlSummary's shape so the admin UI can reuse its render path.
type CrawlEnrichSummary struct {
StartedAt time.Time `json:"started_at"`
DurationMs int64 `json:"duration_ms"`
Total int `json:"total"` // rows loaded for enrichment
Succeeded int `json:"succeeded"` // enrichment_status -> done
Failed int `json:"failed"` // SetEnrichment returned error
// Bounded list of (id, error) pairs for operator debugging. Empty on
// happy paths; capped so a catastrophic run doesn't balloon the summary.
Errors []CrawlEnrichError `json:"errors"`
}
// CrawlEnrichError records a single row that failed to persist enrichment.
type CrawlEnrichError struct {
QueueID uuid.UUID `json:"queue_id"`
Error string `json:"error"`
}
// runCrawlEnrichAllBatchSize caps how many rows we pull per bulk pass. Bigger
// than the usual queue depth keeps us single-pass; the Nominatim 1rps limit
// is the real throughput constraint, not batch size.
const runCrawlEnrichAllBatchSize = 2000
// runCrawlEnrichErrorCap bounds the Errors slice so a total outage doesn't
// produce a summary the admin UI can't render. Operators can check logs for
// the full set; this is just an at-a-glance list.
const runCrawlEnrichErrorCap = 50
// RunCrawlEnrichAll applies crawl-enrich (source consolidation + Nominatim
// geocoding) to every queue row with enrichment_status='pending'. Persists
// the resulting payload with status='done' on success, 'failed' on error.
//
// Already-enriched rows are skipped by the repo query — this is idempotent
// only in the sense that re-running picks up any pending rows that appeared
// since the last run. Forcing a re-enrich of already-done rows is a future
// feature; for now operators must manually reset enrichment_status.
//
// Designed to be called from a goroutine with a detached context — the
// Nominatim rate limit means a 200-row queue takes 3+ minutes and must
// outlive the originating HTTP request.
func (s *Service) RunCrawlEnrichAll(ctx context.Context) (CrawlEnrichSummary, error) {
summary := CrawlEnrichSummary{StartedAt: time.Now().UTC()}
rows, err := s.repo.ListPendingEnrichment(ctx, runCrawlEnrichAllBatchSize)
if err != nil {
return summary, fmt.Errorf("list pending enrichment: %w", err)
}
summary.Total = len(rows)
for _, row := range rows {
if err := ctx.Err(); err != nil {
// Caller cancelled — stop cleanly. Summary reflects partial
// progress; the remaining rows stay in enrichment_status='pending'
// and will be picked up by the next run.
return summary, err
}
in := enrich.Input{
Stadt: row.Stadt,
Land: row.Land,
Contributions: contributionsForEnrich(row.SourceContributions),
}
payload := enrich.CrawlEnrich(ctx, in, s.geocoder)
if err := s.repo.SetEnrichment(ctx, row.ID, payload, EnrichmentStatusDone); err != nil {
slog.WarnContext(ctx, "set enrichment failed",
"queue_id", row.ID, "error", err)
summary.Failed++
if len(summary.Errors) < runCrawlEnrichErrorCap {
summary.Errors = append(summary.Errors, CrawlEnrichError{
QueueID: row.ID,
Error: err.Error(),
})
}
continue
}
summary.Succeeded++
}
summary.DurationMs = time.Since(summary.StartedAt).Milliseconds()
slog.InfoContext(ctx, "crawl-enrich-all completed",
"total", summary.Total,
"succeeded", summary.Succeeded,
"failed", summary.Failed,
"duration_ms", summary.DurationMs)
return summary, nil
}

View File

@@ -2,6 +2,7 @@ package discovery
import (
"context"
"errors"
"testing"
"time"
@@ -9,9 +10,22 @@ import (
"github.com/jackc/pgx/v5"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/discovery/enrich"
"marktvogt.de/backend/internal/domain/market"
)
// stubGeocoder is a test-local geocoder that returns canned lat/lng.
type stubGeocoder struct {
lat, lng *float64
err error
}
func (s stubGeocoder) Geocode(_ context.Context, _, _, _, _ string) (*float64, *float64, error) {
return s.lat, s.lng, s.err
}
func ptrFloat(v float64) *float64 { return &v }
// newMockRepo returns a mockRepo with default no-op implementations and an
// inserted field that captures every InsertDiscovered call.
func newMockRepo() *mockRepo {
@@ -131,7 +145,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, noopLinkVerifier{}, mc)
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil)
_, _, err := svc.Accept(context.Background(), qID, uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -153,7 +167,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, noopLinkVerifier{}, mc)
svc := NewService(m, nil, noopLinkVerifier{}, mc, nil)
_, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -184,7 +198,7 @@ func TestServiceCrawlHappyPath(t *testing.T) {
PerSourceMS: map[string]int64{"marktkalendarium": 1},
},
}
svc := NewService(repo, sc, lc, noopMarketCreator{})
svc := NewService(repo, sc, lc, noopMarketCreator{}, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -225,7 +239,7 @@ func TestServiceCrawlDedupQueue(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -256,7 +270,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil)
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
@@ -302,7 +316,7 @@ func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) {
},
},
}
svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil)
// Cancel the context BEFORE Crawl runs — simulates gateway timeout
// that fires while the handler is still mid-run.
@@ -327,7 +341,7 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) {
m := &mockRepo{
countQueueFn: func(_ context.Context, _ string) (int, error) { return 42, nil },
}
svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil)
rows, total, err := svc.ListPendingQueuePaged(context.Background(), 50, 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -359,7 +373,7 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil)
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
@@ -408,7 +422,7 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil)
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -424,3 +438,149 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
t.Errorf("Konfidenz = %q; want %q (2+ sources)", repo.inserted[0].Konfidenz, KonfidenzHoch)
}
}
// TestRunCrawlEnrichAll_HappyPath exercises the bulk path: three pending rows,
// each with differently-shaped contributions, get consolidated and geocoded
// into the SetEnrichment calls. Nothing queries the LLM — this is the
// crawl-only pass.
func TestRunCrawlEnrichAll_HappyPath(t *testing.T) {
id1, id2, id3 := uuid.New(), uuid.New(), uuid.New()
rows := []DiscoveredMarket{
{
ID: id1, Stadt: "Dresden", Land: "Deutschland",
SourceContributions: []SourceContribution{
{SourceName: "marktkalendarium", PLZ: "01067", Venue: "Stallhof"},
},
},
{
ID: id2, Stadt: "Leipzig", Land: "Deutschland",
SourceContributions: []SourceContribution{
{SourceName: "a", Organizer: "Verein e.V."},
{SourceName: "b", PLZ: "04109"},
},
},
{ID: id3, Stadt: "", Land: "Deutschland"}, // No stadt, no contribs — skips geocode.
}
var writes []struct {
id uuid.UUID
payload enrich.Enrichment
status string
}
repo := &mockRepo{
listPendingEnrichFn: func(limit int) ([]DiscoveredMarket, error) {
if limit <= 0 {
t.Errorf("limit must be positive, got %d", limit)
}
return rows, nil
},
setEnrichmentFn: func(id uuid.UUID, payload enrich.Enrichment, status string) error {
writes = append(writes, struct {
id uuid.UUID
payload enrich.Enrichment
status string
}{id, payload, status})
return nil
},
}
gc := stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if summary.Total != 3 {
t.Errorf("Total = %d; want 3", summary.Total)
}
if summary.Succeeded != 3 {
t.Errorf("Succeeded = %d; want 3", summary.Succeeded)
}
if summary.Failed != 0 {
t.Errorf("Failed = %d; want 0", summary.Failed)
}
if len(writes) != 3 {
t.Fatalf("expected 3 SetEnrichment calls, got %d", len(writes))
}
// Row 1: PLZ consolidated, geocode returned.
if writes[0].payload.PLZ != "01067" || writes[0].payload.Venue != "Stallhof" {
t.Errorf("row1 payload: %+v", writes[0].payload)
}
if writes[0].payload.Lat == nil || *writes[0].payload.Lat != 51.05 {
t.Errorf("row1 lat: %v", writes[0].payload.Lat)
}
if writes[0].payload.Sources["plz"] != enrich.ProvenanceCrawl {
t.Errorf("row1 plz provenance: %q", writes[0].payload.Sources["plz"])
}
// Row 2: first non-empty wins across contributions.
if writes[1].payload.PLZ != "04109" || writes[1].payload.Organizer != "Verein e.V." {
t.Errorf("row2 payload: %+v", writes[1].payload)
}
// Row 3: no stadt means no geocode attempt — payload has empty sources.
if writes[2].payload.Lat != nil {
t.Errorf("row3 should have no lat (no stadt), got %v", writes[2].payload.Lat)
}
// All writes mark status=done.
for i, w := range writes {
if w.status != EnrichmentStatusDone {
t.Errorf("write %d: status = %q, want %q", i, w.status, EnrichmentStatusDone)
}
}
}
// TestRunCrawlEnrichAll_SetEnrichmentFailure verifies a row with a persist
// error is counted in Failed but does not halt the pass.
func TestRunCrawlEnrichAll_SetEnrichmentFailure(t *testing.T) {
idOK, idBad := uuid.New(), uuid.New()
rows := []DiscoveredMarket{
{ID: idOK, Stadt: "Dresden", SourceContributions: []SourceContribution{{PLZ: "01067"}}},
{ID: idBad, Stadt: "Leipzig", SourceContributions: []SourceContribution{{PLZ: "04109"}}},
}
repo := &mockRepo{
listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) { return rows, nil },
setEnrichmentFn: func(id uuid.UUID, _ enrich.Enrichment, _ string) error {
if id == idBad {
return errors.New("db down")
}
return nil
},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if summary.Succeeded != 1 || summary.Failed != 1 {
t.Errorf("Succeeded=%d Failed=%d; want 1/1", summary.Succeeded, summary.Failed)
}
if len(summary.Errors) != 1 || summary.Errors[0].QueueID != idBad {
t.Errorf("expected single error pinned to idBad, got %+v", summary.Errors)
}
}
// TestRunCrawlEnrichAll_EmptyQueueNoOp: nothing pending, zero summary, no writes.
func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) {
var writes int
repo := &mockRepo{
listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) {
return []DiscoveredMarket{}, nil
},
setEnrichmentFn: func(uuid.UUID, enrich.Enrichment, string) error {
writes++
return nil
},
}
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil)
summary, err := svc.RunCrawlEnrichAll(context.Background())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if summary.Total != 0 || summary.Succeeded != 0 || summary.Failed != 0 {
t.Errorf("empty-queue summary should be all zeros, got %+v", summary)
}
if writes != 0 {
t.Errorf("SetEnrichment called %d times on empty queue", writes)
}
}

View File

@@ -74,7 +74,7 @@ func (s *Server) registerRoutes() {
// Discovery routes
discoveryRepo := discovery.NewRepository(s.db)
crawlerInstance := crawler.NewCrawler(s.cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs())
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc)
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder)
discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour)
requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token)
discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken)

View File

@@ -19,6 +19,22 @@ type SourceContribution = {
organizer?: string;
};
type Enrichment = {
plz?: string;
venue?: string;
organizer?: string;
lat?: number;
lng?: number;
category?: string;
opening_hours?: string;
description?: string;
sources?: Record<string, 'crawl' | 'llm'>;
enriched_at?: string;
model?: string;
input_tokens?: number;
output_tokens?: number;
};
type DiscoveredMarket = {
id: string;
markt_name: string;
@@ -36,6 +52,10 @@ type DiscoveredMarket = {
hinweis: string;
matched_series_id: string | null;
discovered_at: string;
enrichment: Enrichment;
enrichment_status: 'pending' | 'done' | 'failed' | 'skipped';
enrichment_attempts: number;
enriched_at: string | null;
};
type BucketError = {
@@ -161,6 +181,24 @@ export const actions: Actions = {
}
},
enrichCrawl: async ({ cookies, fetch }) => {
try {
await serverFetch<unknown>(`/admin/discovery/enrichment/crawl-all`, cookies, {
method: 'POST',
fetch
});
return { enrichStarted: true };
} catch (err) {
if (err instanceof ApiClientError && err.status === 429) {
return fail(429, {
enrichError: `Ein Crawl-Enrich-Lauf läuft bereits. (${err.message})`
});
}
const message = err instanceof Error ? err.message : 'Crawl-Enrich fehlgeschlagen.';
return fail(500, { enrichError: message });
}
},
update: async ({ request, cookies, fetch }) => {
const form = await request.formData();
const id = String(form.get('id') ?? '');

View File

@@ -30,10 +30,31 @@
error: string;
};
type EnrichSummary = {
started_at: string;
duration_ms: number;
total: number;
succeeded: number;
failed: number;
errors: Array<{ queue_id: string; error: string }> | null;
};
type EnrichStatus = {
running: boolean;
started_at: string;
finished_at: string;
summary: EnrichSummary | null;
error: string;
};
let crawling = $state(false);
let crawlStatus = $state<CrawlStatus | null>(null);
let pollInterval: ReturnType<typeof setInterval> | null = null;
let enriching = $state(false);
let enrichStatus = $state<EnrichStatus | null>(null);
let enrichPollInterval: ReturnType<typeof setInterval> | null = null;
// Coalesce nullable list fields (Go encodes nil slices as null).
const queue = $derived(data.queue ?? []);
const recentErrors = $derived(data.stats.recent_errors ?? []);
@@ -182,25 +203,69 @@
}, 3000);
}
// On mount: fetch current status so a page refresh picks up any in-progress
// or recently completed crawl.
async function fetchEnrichStatus(): Promise<EnrichStatus | null> {
try {
const res = await fetch('/admin/discovery/enrichment/crawl-all-status');
if (!res.ok) return null;
return (await res.json()) as EnrichStatus;
} catch {
return null;
}
}
function stopEnrichPolling() {
if (enrichPollInterval !== null) {
clearInterval(enrichPollInterval);
enrichPollInterval = null;
}
}
function startEnrichPolling() {
if (enrichPollInterval !== null) return; // already polling
enrichPollInterval = setInterval(async () => {
const status = await fetchEnrichStatus();
if (status === null) return;
enrichStatus = status;
if (!status.running) {
stopEnrichPolling();
enriching = false;
}
}, 3000);
}
// On mount: fetch both statuses so a page refresh picks up any in-progress
// or recently completed crawl / enrichment run.
onMount(async () => {
const status = await fetchCrawlStatus();
if (status === null) return;
crawlStatus = status;
if (status.running) {
crawling = true;
startPolling();
const [cs, es] = await Promise.all([fetchCrawlStatus(), fetchEnrichStatus()]);
if (cs !== null) {
crawlStatus = cs;
if (cs.running) {
crawling = true;
startPolling();
}
}
if (es !== null) {
enrichStatus = es;
if (es.running) {
enriching = true;
startEnrichPolling();
}
}
});
// When the crawl action returns crawlStarted, begin polling.
// When an action returns a *Started flag, begin the corresponding poller.
$effect(() => {
if (form?.crawlStarted) {
crawling = true;
startPolling();
}
});
$effect(() => {
if (form?.enrichStarted) {
enriching = true;
startEnrichPolling();
}
});
// Elapsed time display while running.
const elapsedLabel = $derived.by(() => {
@@ -215,6 +280,18 @@
const displayError = $derived(
crawlStatus && !crawlStatus.running && crawlStatus.error ? crawlStatus.error : null
);
// Enrichment-side display derived values — same shape as crawl for UI reuse.
const enrichElapsedLabel = $derived.by(() => {
if (!enrichStatus?.running || !enrichStatus.started_at) return '';
const ms = Date.now() - new Date(enrichStatus.started_at).getTime();
const s = Math.round(ms / 1000);
return s < 60 ? `${s}s` : `${Math.floor(s / 60)}m ${s % 60}s`;
});
const enrichDisplaySummary = $derived(enrichStatus?.summary ?? null);
const enrichDisplayError = $derived(
enrichStatus && !enrichStatus.running && enrichStatus.error ? enrichStatus.error : null
);
</script>
<svelte:head>
@@ -281,7 +358,7 @@
</details>
{/if}
<div class="mt-6 flex items-center gap-3">
<div class="mt-6 flex flex-wrap items-center gap-3">
<form
method="POST"
action="?/crawl"
@@ -304,6 +381,29 @@
{/if}
</button>
</form>
<form
method="POST"
action="?/enrichCrawl"
use:enhance={() => {
enriching = true;
return async ({ update }) => {
await update({ reset: false });
};
}}
>
<button
type="submit"
disabled={enriching}
class="rounded bg-teal-600 px-3 py-1.5 text-sm text-white hover:bg-teal-700 disabled:cursor-not-allowed disabled:opacity-60"
title="Konsolidiert Quellendaten + Geocoding für alle pending-Einträge. Kein LLM."
>
{#if enriching}
Enriching…{enrichElapsedLabel ? ` (${enrichElapsedLabel})` : ''}
{:else}
Run crawl-enrich
{/if}
</button>
</form>
</div>
{#if form?.crawlError}
@@ -399,6 +499,81 @@
</div>
{/if}
{#if form?.enrichError}
<div
class="mt-4 rounded border border-red-300 bg-red-50 px-3 py-2 text-sm text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
>
{form.enrichError}
</div>
{/if}
{#if enrichDisplayError}
<div
class="mt-4 rounded border border-red-300 bg-red-50 px-3 py-2 text-sm text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
>
Crawl-enrich fehlgeschlagen: {enrichDisplayError}
</div>
{/if}
{#if enrichDisplaySummary}
{@const es = enrichDisplaySummary}
<div
class="mt-4 rounded-lg border border-stone-200 bg-stone-50 p-4 text-sm dark:border-stone-700 dark:bg-stone-900"
>
<div class="flex items-baseline justify-between">
<h2 class="font-semibold">Crawl-Enrich-Ergebnis</h2>
<span class="font-mono text-xs text-stone-500"
>{(es.duration_ms / 1000).toFixed(1)} s · {es.started_at
.slice(0, 16)
.replace('T', ' ')}</span
>
</div>
<div class="mt-3 grid grid-cols-3 gap-2">
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
<div class="text-[10px] text-stone-500 dark:text-stone-400">Total</div>
<div class="font-mono text-sm font-medium">{es.total}</div>
</div>
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
<div class="text-[10px] text-stone-500 dark:text-stone-400">Succeeded</div>
<div class="font-mono text-sm font-medium text-emerald-700 dark:text-emerald-300">
{es.succeeded}
</div>
</div>
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
<div class="text-[10px] text-stone-500 dark:text-stone-400">Failed</div>
<div
class="font-mono text-sm font-medium {es.failed > 0
? 'text-red-700 dark:text-red-300'
: ''}"
>
{es.failed}
</div>
</div>
</div>
{#if es.errors && es.errors.length > 0}
<details class="mt-3 text-xs">
<summary class="cursor-pointer text-red-700 dark:text-red-300"
>{es.errors.length} Fehler beim Persistieren</summary
>
<ul class="mt-2 space-y-1 font-mono">
{#each es.errors as e}
<li class="break-all">
<span class="text-stone-500">{e.queue_id.slice(0, 8)}</span>
<span class="ml-2 text-red-700 dark:text-red-300">{e.error}</span>
</li>
{/each}
</ul>
</details>
{/if}
</div>
{/if}
<h2 class="mt-8 text-lg font-semibold">Queue</h2>
<p class="mt-1 text-sm text-stone-500">
{#if (data.total ?? 0) > 0}
@@ -660,6 +835,62 @@
</div>
<p class="mt-2 text-xs text-stone-600 dark:text-stone-300">{row.hinweis}</p>
{/if}
{#if row.enrichment_status && row.enrichment_status !== 'pending'}
<div
class="mt-4 text-xs font-medium tracking-wider text-stone-500 uppercase dark:text-stone-400"
>
Enrichment
<span
class="ml-1 inline-block rounded px-1.5 py-0.5 text-[10px] {row.enrichment_status ===
'done'
? 'bg-emerald-100 text-emerald-700 dark:bg-emerald-900/50 dark:text-emerald-300'
: row.enrichment_status === 'failed'
? 'bg-red-100 text-red-700 dark:bg-red-900/50 dark:text-red-300'
: 'bg-stone-100 text-stone-600 dark:bg-stone-800 dark:text-stone-300'}"
>
{row.enrichment_status}
</span>
</div>
<dl class="mt-2 grid grid-cols-[auto_1fr] gap-x-3 gap-y-1 text-xs">
{#if row.enrichment.plz}
<dt class="text-stone-500">PLZ</dt>
<dd>
{row.enrichment.plz}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.plz ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.venue}
<dt class="text-stone-500">Venue</dt>
<dd>
{row.enrichment.venue}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.venue ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.organizer}
<dt class="text-stone-500">Organizer</dt>
<dd>
{row.enrichment.organizer}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.organizer ?? '—'})</span
>
</dd>
{/if}
{#if row.enrichment.lat != null && row.enrichment.lng != null}
<dt class="text-stone-500">Lat/Lng</dt>
<dd class="font-mono">
{row.enrichment.lat.toFixed(4)}, {row.enrichment.lng.toFixed(4)}
<span class="ml-1 text-[10px] text-stone-400"
>({row.enrichment.sources?.lat ?? '—'})</span
>
</dd>
{/if}
</dl>
{/if}
</div>
<!-- Right: editable form -->