From afe9d916d6aafd95f8cd36ba16352aaafd6caf2c Mon Sep 17 00:00:00 2001 From: vikingowl Date: Fri, 24 Apr 2026 10:29:58 +0200 Subject: [PATCH] feat(discovery): manual crawl-enrich-all button + payload display MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the originally-planned async-worker design with operator- triggered bulk runs (see memory/project_ship2_enrichment.md). Crawl- enrichment is cheap enough to always run against the whole list but runs only when the admin clicks — the flow stays predictable and the crawl itself stays fast. Endpoints - POST /admin/discovery/enrichment/crawl-all — 202 + goroutine, mirrors the crawl pattern. Per-process CAS gate prevents concurrent runs. - GET /admin/discovery/enrichment/crawl-all-status — polled shape identical to /crawl-status for UI reuse. Service RunCrawlEnrichAll iterates enrichment_status='pending' rows, builds an enrich.Input from each, runs CrawlEnrich (consolidation + Nominatim geocoding via the shared geocoder), and persists via SetEnrichment(status=done). Per-row errors count toward Failed and append to a bounded Errors slice; the pass never halts. Enrich package refactor - Enrichment, Sources, Provenance constants moved from discovery -> enrich (they are the enrich package's own types; discovery previously held them for historical reasons). - CrawlEnrich now takes a narrow enrich.Input / enrich.Contribution so the enrich package no longer imports the parent discovery package. This breaks the import cycle that appeared once discovery needed to call enrich (the MR 2 structure only worked because no caller went in that direction yet). - LLMEnricher takes an LLMRequest (primitives) instead of a DiscoveredMarket. NoopLLMEnricher updated; real Mistral impl lands in MR 3b. - CacheKey signature switched from (DiscoveredMarket) to primitive (nameNormalized, stadt, year). Service geocoder wiring: discovery.NewService gains a Geocoder param (routes.go passes the shared Nominatim client; the interface lives in discovery to avoid another circular edge with enrich). UI: "Run crawl-enrich" button next to "Run crawl"; identical poll + summary card pattern. Queue row expand shows enrichment status badge plus the PLZ/Venue/Organizer/Lat-Lng fields inline with per-field provenance tag. Tests: three new service tests (happy path, per-row SetEnrichment failure, empty-queue no-op). Existing enrich package tests updated for the primitive input signature. All 13 test NewService call-sites updated for the new geocoder param. --- .../internal/domain/discovery/enrich/cache.go | 25 +- .../internal/domain/discovery/enrich/crawl.go | 59 ++-- .../domain/discovery/enrich/crawl_test.go | 116 ++++---- .../domain/discovery/enrich/enrich.go | 63 ++++- .../internal/domain/discovery/enrich/llm.go | 33 ++- backend/internal/domain/discovery/handler.go | 78 ++++++ .../internal/domain/discovery/handler_test.go | 6 +- .../domain/discovery/mock_repo_test.go | 25 +- backend/internal/domain/discovery/model.go | 50 +--- .../internal/domain/discovery/repository.go | 56 +++- backend/internal/domain/discovery/routes.go | 6 + backend/internal/domain/discovery/service.go | 120 ++++++++- .../internal/domain/discovery/service_test.go | 178 ++++++++++++- backend/internal/server/routes.go | 2 +- .../routes/admin/discovery/+page.server.ts | 38 +++ web/src/routes/admin/discovery/+page.svelte | 251 +++++++++++++++++- 16 files changed, 897 insertions(+), 209 deletions(-) diff --git a/backend/internal/domain/discovery/enrich/cache.go b/backend/internal/domain/discovery/enrich/cache.go index 1587bf0..3e812b2 100644 --- a/backend/internal/domain/discovery/enrich/cache.go +++ b/backend/internal/domain/discovery/enrich/cache.go @@ -5,26 +5,19 @@ import ( "encoding/hex" "fmt" "time" - - "marktvogt.de/backend/internal/domain/discovery" ) -// CacheKey derives a stable cache key for a DiscoveredMarket. Two rows with -// the same (normalized name, city, year) share a cache entry — the LLM -// answer doesn't change when the crawler rediscovers the same market. +// CacheKey derives a stable cache key from the (normalized name, city, year) +// tuple. Two rows with the same tuple share a cache entry — the LLM answer +// doesn't change when the crawler rediscovers the same market. // -// Year comes from StartDatum. A row without a start date uses year=0; those -// entries share a bucket, which is slightly lossy but acceptable — a -// date-less market with an LLM-derived description is rare and regenerating -// on demand is cheap relative to other failure modes. -func CacheKey(m discovery.DiscoveredMarket) string { - year := 0 - if m.StartDatum != nil { - year = m.StartDatum.Year() - } - // name_normalized is already lowercased + stripped elsewhere; stadt is +// A row without a known start date should pass year=0; those entries share a +// bucket, which is slightly lossy but acceptable — date-less markets with +// LLM-derived descriptions are rare and regenerating on demand is cheap. +func CacheKey(nameNormalized, stadt string, year int) string { + // nameNormalized is already lowercased + stripped by the caller; stadt is // lowercased here to avoid cache misses from casing drift. - raw := fmt.Sprintf("%s|%s|%d", m.NameNormalized, lowerASCII(m.Stadt), year) + raw := fmt.Sprintf("%s|%s|%d", nameNormalized, lowerASCII(stadt), year) sum := sha256.Sum256([]byte(raw)) return hex.EncodeToString(sum[:]) } diff --git a/backend/internal/domain/discovery/enrich/crawl.go b/backend/internal/domain/discovery/enrich/crawl.go index 6f36ff4..a87fbac 100644 --- a/backend/internal/domain/discovery/enrich/crawl.go +++ b/backend/internal/domain/discovery/enrich/crawl.go @@ -3,8 +3,6 @@ package enrich import ( "context" "time" - - "marktvogt.de/backend/internal/domain/discovery" ) // Geocoder is the narrow interface crawl-enrich depends on for lat/lng. @@ -13,49 +11,66 @@ type Geocoder interface { Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error) } +// Contribution is the subset of one source's data crawl-enrich reads. Callers +// adapt their domain-specific contribution type to this before invoking +// CrawlEnrich — keeps the enrich package free of a reverse import. +type Contribution struct { + PLZ string + Venue string + Organizer string +} + +// Input is the read-only context CrawlEnrich needs: the market's city and +// country (for geocoding) plus the per-source contributions already gathered +// by the crawler. All other domain-row fields are irrelevant to the crawl pass. +type Input struct { + Stadt string + Land string + Contributions []Contribution +} + // CrawlEnrich fills as many fields as possible from data already in hand — -// the DiscoveredMarket's SourceContributions and (optionally) a geocoder. -// Returns a fully-populated Sources map so downstream code can tell which -// pass contributed each field. +// the caller's Contribution list and (optionally) a geocoder. Returns a +// fully-populated Sources map so downstream code can tell which pass +// contributed each field. // // No LLM, no AI, no paid APIs. Nominatim counts as crawl-enrich because the // call is deterministic and already rate-limited by pkg/geocode. // // geocoder may be nil; lat/lng simply stay empty in that case. -func CrawlEnrich(ctx context.Context, m discovery.DiscoveredMarket, geocoder Geocoder) discovery.Enrichment { - out := discovery.Enrichment{Sources: discovery.EnrichmentSources{}} +func CrawlEnrich(ctx context.Context, in Input, geocoder Geocoder) Enrichment { + out := Enrichment{Sources: Sources{}} // Consolidate text fields from contributions — first non-empty wins. - // Order is deterministic because the crawler's Merge step already sorts - // contributions by source name. - for _, c := range m.SourceContributions { + // Order is deterministic because the caller's list is expected to be + // pre-sorted (crawler Merge step orders by source name). + for _, c := range in.Contributions { if out.PLZ == "" && c.PLZ != "" { out.PLZ = c.PLZ - out.Sources["plz"] = discovery.EnrichmentProvenanceCrawl + out.Sources["plz"] = ProvenanceCrawl } if out.Venue == "" && c.Venue != "" { out.Venue = c.Venue - out.Sources["venue"] = discovery.EnrichmentProvenanceCrawl + out.Sources["venue"] = ProvenanceCrawl } if out.Organizer == "" && c.Organizer != "" { out.Organizer = c.Organizer - out.Sources["organizer"] = discovery.EnrichmentProvenanceCrawl + out.Sources["organizer"] = ProvenanceCrawl } } // Geocode from (city, plz) — only if we have enough input to be useful. - // The plz we just consolidated is preferred; fall back to the market's - // top-level stadt when plz is missing. - if geocoder != nil && m.Stadt != "" { - lat, lng, err := geocoder.Geocode(ctx, "", m.Stadt, out.PLZ, landToISO(m.Land)) + // The plz we just consolidated is preferred. + if geocoder != nil && in.Stadt != "" { + lat, lng, err := geocoder.Geocode(ctx, "", in.Stadt, out.PLZ, landToISO(in.Land)) // Geocoder failures are non-fatal — lat/lng stay empty, other crawl- // enriched fields still persist. An LLM pass later cannot fill // lat/lng either, so failures just mean that row gets no coordinates. if err == nil && lat != nil && lng != nil { out.Lat = lat out.Lng = lng - out.Sources["lat"] = discovery.EnrichmentProvenanceCrawl - out.Sources["lng"] = discovery.EnrichmentProvenanceCrawl + out.Sources["lat"] = ProvenanceCrawl + out.Sources["lng"] = ProvenanceCrawl } } @@ -66,9 +81,9 @@ func CrawlEnrich(ctx context.Context, m discovery.DiscoveredMarket, geocoder Geo return out } -// landToISO maps the discovery Land value to an ISO-2 country code for -// Nominatim's countrycodes parameter. Returns empty string for unknowns so -// the geocoder runs without a country filter. +// landToISO maps the Land value to an ISO-2 country code for Nominatim's +// countrycodes parameter. Returns empty string for unknowns so the geocoder +// runs without a country filter. func landToISO(land string) string { switch land { case "Deutschland": diff --git a/backend/internal/domain/discovery/enrich/crawl_test.go b/backend/internal/domain/discovery/enrich/crawl_test.go index 33e235a..77b6ea9 100644 --- a/backend/internal/domain/discovery/enrich/crawl_test.go +++ b/backend/internal/domain/discovery/enrich/crawl_test.go @@ -4,9 +4,6 @@ import ( "context" "errors" "testing" - "time" - - "marktvogt.de/backend/internal/domain/discovery" ) // stubGeocoder returns canned lat/lng. nil lat/lng simulate "not found". @@ -28,8 +25,7 @@ func ptrFloat(v float64) *float64 { return &v } const stallhof = "Stallhof" func TestCrawlEnrich_EmptyInputs(t *testing.T) { - m := discovery.DiscoveredMarket{} - got := CrawlEnrich(context.Background(), m, nil) + got := CrawlEnrich(context.Background(), Input{}, nil) if got.PLZ != "" || got.Venue != "" || got.Organizer != "" { t.Errorf("expected all fields empty, got %+v", got) } @@ -42,18 +38,18 @@ func TestCrawlEnrich_EmptyInputs(t *testing.T) { } func TestCrawlEnrich_SingleSource(t *testing.T) { - m := discovery.DiscoveredMarket{ + in := Input{ Stadt: "Dresden", - SourceContributions: []discovery.SourceContribution{ - {SourceName: "marktkalendarium", PLZ: "01067", Venue: stallhof, Organizer: "Tourismusverband"}, + Contributions: []Contribution{ + {PLZ: "01067", Venue: stallhof, Organizer: "Tourismusverband"}, }, } - got := CrawlEnrich(context.Background(), m, nil) + got := CrawlEnrich(context.Background(), in, nil) if got.PLZ != "01067" || got.Venue != stallhof || got.Organizer != "Tourismusverband" { t.Errorf("unexpected payload: %+v", got) } - if got.Sources["plz"] != discovery.EnrichmentProvenanceCrawl { - t.Errorf("plz provenance: got %q, want %q", got.Sources["plz"], discovery.EnrichmentProvenanceCrawl) + if got.Sources["plz"] != ProvenanceCrawl { + t.Errorf("plz provenance: got %q, want %q", got.Sources["plz"], ProvenanceCrawl) } if got.EnrichedAt == nil { t.Error("expected EnrichedAt to be stamped when any field filled") @@ -63,14 +59,14 @@ func TestCrawlEnrich_SingleSource(t *testing.T) { func TestCrawlEnrich_MultiSourceFirstWins(t *testing.T) { // Two sources disagree on PLZ: first non-empty wins. Order is the // crawler-produced contribution order (alphabetical by source name). - m := discovery.DiscoveredMarket{ + in := Input{ Stadt: "Dresden", - SourceContributions: []discovery.SourceContribution{ - {SourceName: "marktkalendarium", PLZ: "01067"}, - {SourceName: "mittelaltermarkt_online", PLZ: "01099"}, + Contributions: []Contribution{ + {PLZ: "01067"}, + {PLZ: "01099"}, }, } - got := CrawlEnrich(context.Background(), m, nil) + got := CrawlEnrich(context.Background(), in, nil) if got.PLZ != "01067" { t.Errorf("expected first source's PLZ, got %q", got.PLZ) } @@ -79,14 +75,14 @@ func TestCrawlEnrich_MultiSourceFirstWins(t *testing.T) { func TestCrawlEnrich_MultiSourceFillsGaps(t *testing.T) { // Source A contributes PLZ only; source B contributes venue only; both // end up in the payload. - m := discovery.DiscoveredMarket{ + in := Input{ Stadt: "Dresden", - SourceContributions: []discovery.SourceContribution{ - {SourceName: "marktkalendarium", PLZ: "01067"}, - {SourceName: "mittelaltermarkt_online", Venue: stallhof}, + Contributions: []Contribution{ + {PLZ: "01067"}, + {Venue: stallhof}, }, } - got := CrawlEnrich(context.Background(), m, nil) + got := CrawlEnrich(context.Background(), in, nil) if got.PLZ != "01067" { t.Errorf("PLZ from A missing: %q", got.PLZ) } @@ -96,15 +92,13 @@ func TestCrawlEnrich_MultiSourceFillsGaps(t *testing.T) { } func TestCrawlEnrich_GeocoderCalled(t *testing.T) { - m := discovery.DiscoveredMarket{ - Stadt: "Dresden", - Land: "Deutschland", - SourceContributions: []discovery.SourceContribution{ - {SourceName: "a", PLZ: "01067"}, - }, + in := Input{ + Stadt: "Dresden", + Land: "Deutschland", + Contributions: []Contribution{{PLZ: "01067"}}, } sg := &stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)} - got := CrawlEnrich(context.Background(), m, sg) + got := CrawlEnrich(context.Background(), in, sg) if got.Lat == nil || *got.Lat != 51.05 { t.Errorf("Lat = %v; want 51.05", got.Lat) @@ -112,8 +106,8 @@ func TestCrawlEnrich_GeocoderCalled(t *testing.T) { if got.Lng == nil || *got.Lng != 13.74 { t.Errorf("Lng = %v; want 13.74", got.Lng) } - if got.Sources["lat"] != discovery.EnrichmentProvenanceCrawl { - t.Errorf("lat provenance: got %q, want %q", got.Sources["lat"], discovery.EnrichmentProvenanceCrawl) + if got.Sources["lat"] != ProvenanceCrawl { + t.Errorf("lat provenance: got %q, want %q", got.Sources["lat"], ProvenanceCrawl) } // Land → ISO-2 mapping sanity. if sg.lastCountry != "de" { @@ -125,14 +119,14 @@ func TestCrawlEnrich_GeocoderCalled(t *testing.T) { } func TestCrawlEnrich_GeocoderFailureNonFatal(t *testing.T) { - m := discovery.DiscoveredMarket{ + in := Input{ Stadt: "Dresden", - SourceContributions: []discovery.SourceContribution{ - {SourceName: "a", PLZ: "01067", Venue: stallhof}, + Contributions: []Contribution{ + {PLZ: "01067", Venue: stallhof}, }, } sg := &stubGeocoder{err: errors.New("nominatim down")} - got := CrawlEnrich(context.Background(), m, sg) + got := CrawlEnrich(context.Background(), in, sg) // Non-geocode fields must still have landed. if got.PLZ != "01067" || got.Venue != stallhof { @@ -147,13 +141,9 @@ func TestCrawlEnrich_GeocoderFailureNonFatal(t *testing.T) { } func TestCrawlEnrich_GeocoderNotCalledWithoutStadt(t *testing.T) { - m := discovery.DiscoveredMarket{ - SourceContributions: []discovery.SourceContribution{ - {SourceName: "a", PLZ: "01067"}, - }, - } + in := Input{Contributions: []Contribution{{PLZ: "01067"}}} sg := &stubGeocoder{lat: ptrFloat(1), lng: ptrFloat(1)} - got := CrawlEnrich(context.Background(), m, sg) + got := CrawlEnrich(context.Background(), in, sg) if sg.lastCity != "" { t.Error("geocoder called despite empty Stadt") } @@ -164,12 +154,12 @@ func TestCrawlEnrich_GeocoderNotCalledWithoutStadt(t *testing.T) { func TestMerge_BaseWinsOverOverlay(t *testing.T) { // Invariant: a field set by crawl (base) is never overwritten by llm (overlay). - base := discovery.Enrichment{ + base := Enrichment{ PLZ: "01067", Venue: stallhof, - Sources: discovery.EnrichmentSources{"plz": "crawl", "venue": "crawl"}, + Sources: Sources{"plz": ProvenanceCrawl, "venue": ProvenanceCrawl}, } - overlay := discovery.Enrichment{ + overlay := Enrichment{ PLZ: "WRONG", Venue: "WRONG", Category: "mittelaltermarkt", @@ -177,11 +167,11 @@ func TestMerge_BaseWinsOverOverlay(t *testing.T) { InputTokens: 100, OutputTokens: 50, Model: "mistral-large-latest", - Sources: discovery.EnrichmentSources{ - "plz": "llm", - "venue": "llm", - "category": "llm", - "description": "llm", + Sources: Sources{ + "plz": ProvenanceLLM, + "venue": ProvenanceLLM, + "category": ProvenanceLLM, + "description": ProvenanceLLM, }, } got := Merge(base, overlay) @@ -198,10 +188,10 @@ func TestMerge_BaseWinsOverOverlay(t *testing.T) { if got.Description != "Ein großer Markt." { t.Errorf("Description should come from overlay: got %q", got.Description) } - if got.Sources["plz"] != "crawl" { + if got.Sources["plz"] != ProvenanceCrawl { t.Errorf("plz provenance overwritten: got %q, want crawl", got.Sources["plz"]) } - if got.Sources["category"] != "llm" { + if got.Sources["category"] != ProvenanceLLM { t.Errorf("category provenance lost: got %q, want llm", got.Sources["category"]) } if got.InputTokens != 100 || got.OutputTokens != 50 { @@ -210,11 +200,11 @@ func TestMerge_BaseWinsOverOverlay(t *testing.T) { } func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) { - base := discovery.Enrichment{Sources: discovery.EnrichmentSources{}} - overlay := discovery.Enrichment{ + base := Enrichment{Sources: Sources{}} + overlay := Enrichment{ Lat: ptrFloat(51.05), Lng: ptrFloat(13.74), - Sources: discovery.EnrichmentSources{"lat": "crawl", "lng": "crawl"}, + Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl}, } got := Merge(base, overlay) if got.Lat == nil || *got.Lat != 51.05 { @@ -222,10 +212,10 @@ func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) { } // Now base already has Lat; overlay must not overwrite. - baseWithLat := discovery.Enrichment{ + baseWithLat := Enrichment{ Lat: ptrFloat(50.00), Lng: ptrFloat(10.00), - Sources: discovery.EnrichmentSources{"lat": "crawl", "lng": "crawl"}, + Sources: Sources{"lat": ProvenanceCrawl, "lng": ProvenanceCrawl}, } got = Merge(baseWithLat, overlay) if *got.Lat != 50.00 { @@ -235,21 +225,19 @@ func TestMerge_CoordsOnlyFromOverlayIfBaseNil(t *testing.T) { // Sanity: CacheKey must be stable across re-runs and differ across inputs. func TestCacheKey_StableAndDistinct(t *testing.T) { - start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) - a := discovery.DiscoveredMarket{NameNormalized: "ritterfest dresden", Stadt: "Dresden", StartDatum: &start} - b := discovery.DiscoveredMarket{NameNormalized: "ritterfest dresden", Stadt: "Dresden", StartDatum: &start} - c := discovery.DiscoveredMarket{NameNormalized: "ritterfest leipzig", Stadt: "Leipzig", StartDatum: &start} + aKey := CacheKey("ritterfest dresden", "Dresden", 2026) + bKey := CacheKey("ritterfest dresden", "Dresden", 2026) + cKey := CacheKey("ritterfest leipzig", "Leipzig", 2026) - if CacheKey(a) != CacheKey(b) { + if aKey != bKey { t.Error("identical inputs must produce identical keys") } - if CacheKey(a) == CacheKey(c) { + if aKey == cKey { t.Error("different cities must produce different keys") } // Stadt casing drift doesn't change the key. - d := a - d.Stadt = "DRESDEN" - if CacheKey(a) != CacheKey(d) { + dKey := CacheKey("ritterfest dresden", "DRESDEN", 2026) + if aKey != dKey { t.Error("stadt casing drift should not change key") } } diff --git a/backend/internal/domain/discovery/enrich/enrich.go b/backend/internal/domain/discovery/enrich/enrich.go index a461f4f..f77d6f9 100644 --- a/backend/internal/domain/discovery/enrich/enrich.go +++ b/backend/internal/domain/discovery/enrich/enrich.go @@ -1,21 +1,58 @@ // Package enrich derives additional market metadata in two passes. // // Pass A (crawl-enrich) is deterministic and free. It consolidates fields -// already present in the DiscoveredMarket's SourceContributions (PLZ, venue, -// organizer, bundesland) and can geocode city+PLZ via Nominatim. First -// non-empty value wins per field; provenance is recorded as "crawl". +// from per-source contributions (PLZ, venue, organizer) and geocodes city +// + PLZ via Nominatim. First non-empty value wins per field; provenance is +// recorded as "crawl". // -// Pass B (llm-enrich, MR 3) runs only for fields Pass A could not fill and -// does cost money. Provenance is recorded as "llm". +// Pass B (llm-enrich) runs only for fields Pass A could not fill and does +// cost money. Provenance is recorded as "llm". // -// The Merge helper in this file combines partial payloads while preserving -// crawl-over-llm preference: a crawl-written field is never overwritten by -// an llm pass. Eval harness (MR 5) uses the per-field provenance to measure -// crawl-enrich and llm-enrich quality separately. +// Merge combines partial payloads preserving crawl-over-llm preference: a +// crawl-written field is never overwritten by an llm pass. The eval harness +// uses per-field provenance to measure the two passes separately. +// +// The package is intentionally standalone — it does NOT import the parent +// discovery package. Callers adapt domain types to the narrow Contribution +// input before invoking CrawlEnrich, which keeps the dependency edge one-way +// (discovery imports enrich; enrich imports no domain code). package enrich -import ( - "marktvogt.de/backend/internal/domain/discovery" +import "time" + +// Enrichment is the JSONB payload persisted on discovered_markets.enrichment. +// Every field is optional; Sources records which pass filled each field so +// the eval harness can separate crawl-enrich (free, deterministic) from +// llm-enrich (paid, synthesised) contributions. +// +// Field shape is intentionally flat — promotion to typed columns is easier +// from a flat blob than from nested structures. +type Enrichment struct { + PLZ string `json:"plz,omitempty"` + Venue string `json:"venue,omitempty"` + Organizer string `json:"organizer,omitempty"` + Lat *float64 `json:"lat,omitempty"` + Lng *float64 `json:"lng,omitempty"` + Category string `json:"category,omitempty"` + OpeningHours string `json:"opening_hours,omitempty"` + Description string `json:"description,omitempty"` + Sources Sources `json:"sources,omitempty"` + EnrichedAt *time.Time `json:"enriched_at,omitempty"` + Model string `json:"model,omitempty"` + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` +} + +// Sources maps each enrichment field name to the pass that filled it. +// Values are Provenance constants. +type Sources map[string]string + +// Provenance constants for Sources entries. Crawl covers both offline +// consolidation of per-source contributions and Nominatim geocoding — both +// deterministic, neither counts against the LLM budget. +const ( + ProvenanceCrawl = "crawl" + ProvenanceLLM = "llm" ) // Merge combines two Enrichment payloads. The base is taken as authoritative; @@ -26,7 +63,7 @@ import ( // llm-enrich. This enforces the "prefer crawl over llm" invariant: the LLM // pass cannot clobber a field the crawler already filled in, even if the // LLM is confident. -func Merge(base, overlay discovery.Enrichment) discovery.Enrichment { +func Merge(base, overlay Enrichment) Enrichment { out := base if out.PLZ == "" { @@ -56,7 +93,7 @@ func Merge(base, overlay discovery.Enrichment) discovery.Enrichment { // Merge provenance maps with base-wins semantics. if out.Sources == nil && len(overlay.Sources) > 0 { - out.Sources = discovery.EnrichmentSources{} + out.Sources = Sources{} } for k, v := range overlay.Sources { if _, exists := out.Sources[k]; !exists { diff --git a/backend/internal/domain/discovery/enrich/llm.go b/backend/internal/domain/discovery/enrich/llm.go index 8f7981c..ea5f290 100644 --- a/backend/internal/domain/discovery/enrich/llm.go +++ b/backend/internal/domain/discovery/enrich/llm.go @@ -1,28 +1,35 @@ package enrich -import ( - "context" +import "context" - "marktvogt.de/backend/internal/domain/discovery" -) +// LLMRequest is the narrow context the LLM enricher needs: identifying +// fields, the quellen URLs for web-search grounding, and the crawl-enrich +// payload already computed so the LLM can be instructed not to overwrite. +type LLMRequest struct { + MarktName string + Stadt string + Land string + Bundesland string + Quellen []string + // Partial is the output of crawl-enrich for this row. The LLM + // implementation MUST NOT overwrite any field set here — Merge enforces + // the invariant regardless, but the prompt should respect it to save + // tokens. + Partial Enrichment +} // LLMEnricher fills enrichment fields that crawl-enrich could not reach. -// Implementations should only write fields missing from `partial` — never -// overwrite a field the crawler already filled. Merge() enforces this too, -// but respecting it in the implementation keeps token usage down. -// // Returned payload must populate Sources with "llm" for every field it // writes; Model and token counts are required so the eval harness can // attribute cost. type LLMEnricher interface { - EnrichMissing(ctx context.Context, partial discovery.Enrichment, row discovery.DiscoveredMarket) (discovery.Enrichment, error) + EnrichMissing(ctx context.Context, req LLMRequest) (Enrichment, error) } // NoopLLMEnricher returns the partial unchanged. Used by tests and as a -// default when AI integration is disabled. Actual Mistral-backed -// implementation lands in MR 3. +// default when AI integration is disabled. type NoopLLMEnricher struct{} -func (NoopLLMEnricher) EnrichMissing(_ context.Context, partial discovery.Enrichment, _ discovery.DiscoveredMarket) (discovery.Enrichment, error) { - return partial, nil +func (NoopLLMEnricher) EnrichMissing(_ context.Context, req LLMRequest) (Enrichment, error) { + return req.Partial, nil } diff --git a/backend/internal/domain/discovery/handler.go b/backend/internal/domain/discovery/handler.go index 0f7141a..d498149 100644 --- a/backend/internal/domain/discovery/handler.go +++ b/backend/internal/domain/discovery/handler.go @@ -32,6 +32,18 @@ type Handler struct { lastFinishedAt time.Time lastSummary *CrawlSummary lastError string + + // Async crawl-enrich-all state (manual bulk operator action). + // Reuses the CAS-gate + mutex pattern of the crawl flow above so the + // admin UI can poll a dedicated status endpoint without colliding with a + // running crawl. + enrichRunning atomic.Bool + enrichMu sync.RWMutex + // Guarded by enrichMu: + enrichStartedAt time.Time + enrichFinishedAt time.Time + enrichSummary *CrawlEnrichSummary + enrichError string } // NewHandler constructs a Handler. manualRateLimitPerHour controls how @@ -285,6 +297,72 @@ func (h *Handler) Similar(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"data": matches}) } +// RunCrawlEnrichAll kicks off the manual bulk crawl-enrich pass in the +// background and returns 202 immediately. Exactly one run may be in progress +// at a time (CAS gate). Completion is observable via +// GET /admin/discovery/enrichment/crawl-all-status. +func (h *Handler) RunCrawlEnrichAll(c *gin.Context) { + if !h.enrichRunning.CompareAndSwap(false, true) { + apiErr := apierror.TooManyRequests("A crawl-enrich run is already in progress") + c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) + return + } + + h.enrichMu.Lock() + h.enrichStartedAt = time.Now().UTC() + h.enrichFinishedAt = time.Time{} + h.enrichSummary = nil + h.enrichError = "" + h.enrichMu.Unlock() + + go h.runEnrichAsync() + + c.JSON(http.StatusAccepted, gin.H{ + "data": gin.H{ + "status": "started", + "message": "Crawl-enrich started in background. Poll /api/v1/admin/discovery/enrichment/crawl-all-status for completion.", + }, + }) +} + +// runEnrichAsync runs RunCrawlEnrichAll with a detached context. 10m cap is +// generous for Nominatim's 1rps: a 600-row queue is the worst case we expect. +func (h *Handler) runEnrichAsync() { + defer h.enrichRunning.Store(false) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + summary, err := h.service.RunCrawlEnrichAll(ctx) + + h.enrichMu.Lock() + h.enrichFinishedAt = time.Now().UTC() + if err != nil { + h.enrichError = err.Error() + slog.ErrorContext(ctx, "async crawl-enrich failed", "error", err) + } else { + sCopy := summary + h.enrichSummary = &sCopy + } + h.enrichMu.Unlock() +} + +// CrawlEnrichStatus returns the state of the most recent async crawl-enrich. +// Shape mirrors CrawlStatus for UI reuse. +func (h *Handler) CrawlEnrichStatus(c *gin.Context) { + h.enrichMu.RLock() + defer h.enrichMu.RUnlock() + c.JSON(http.StatusOK, gin.H{ + "data": gin.H{ + "running": h.enrichRunning.Load(), + "started_at": h.enrichStartedAt, + "finished_at": h.enrichFinishedAt, + "summary": h.enrichSummary, + "error": h.enrichError, + }, + }) +} + func currentUserID(c *gin.Context) (uuid.UUID, bool) { raw, exists := c.Get("user_id") if !exists { diff --git a/backend/internal/domain/discovery/handler_test.go b/backend/internal/domain/discovery/handler_test.go index 721acc0..5952b1a 100644 --- a/backend/internal/domain/discovery/handler_test.go +++ b/backend/internal/domain/discovery/handler_test.go @@ -55,6 +55,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) { &stubCrawlerRunner{result: crawler.CrawlResult{}}, noopLinkVerifier{}, noopMarketCreator{}, + nil, ) h := NewHandler(svc, 0) // rate limit disabled @@ -89,7 +90,7 @@ func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) { // TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler. func TestCrawlStatusInitialState(t *testing.T) { - svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}, nil) h := NewHandler(svc, 0) w := httptest.NewRecorder() @@ -139,7 +140,7 @@ func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) { started: make(chan struct{}), release: make(chan struct{}), } - svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}, nil) h := NewHandler(svc, 0) // rate limit disabled // First request — returns 202 and spawns goroutine. @@ -180,6 +181,7 @@ func TestCrawlHandlerRateLimit(t *testing.T) { &stubCrawlerRunner{result: crawler.CrawlResult{}}, noopLinkVerifier{}, noopMarketCreator{}, + nil, ) // 1 per hour window. h := NewHandler(svc, 1) diff --git a/backend/internal/domain/discovery/mock_repo_test.go b/backend/internal/domain/discovery/mock_repo_test.go index dcfe043..f943804 100644 --- a/backend/internal/domain/discovery/mock_repo_test.go +++ b/backend/internal/domain/discovery/mock_repo_test.go @@ -6,6 +6,8 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" + + "marktvogt.de/backend/internal/domain/discovery/enrich" ) // Compile-time guard: mockRepo must fully satisfy Repository. @@ -28,11 +30,12 @@ type mockRepo struct { // Populated by newMockRepo(); nil when insertDiscFn is set externally. inserted []DiscoveredMarket - // Enrichment hooks (optional). Tests set whichever they need; nil falls + // enrich.Enrichment hooks (optional). Tests set whichever they need; nil falls // through to a no-op / empty response. - setEnrichmentFn func(id uuid.UUID, payload Enrichment, status string) error - getCacheFn func(key string) (Enrichment, bool, error) - setCacheFn func(key string, payload Enrichment, ttl time.Duration) error + setEnrichmentFn func(id uuid.UUID, payload enrich.Enrichment, status string) error + getCacheFn func(key string) (enrich.Enrichment, bool, error) + setCacheFn func(key string, payload enrich.Enrichment, ttl time.Duration) error + listPendingEnrichFn func(limit int) ([]DiscoveredMarket, error) } func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) { @@ -95,24 +98,30 @@ func (m *mockRepo) Stats(ctx context.Context, forwardMonths, recentErrorsLimit i func (m *mockRepo) UpdatePending(ctx context.Context, id uuid.UUID, f UpdatePendingFields, nn *string) error { return nil } -func (m *mockRepo) SetEnrichment(_ context.Context, id uuid.UUID, payload Enrichment, status string) error { +func (m *mockRepo) SetEnrichment(_ context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error { if m.setEnrichmentFn != nil { return m.setEnrichmentFn(id, payload, status) } return nil } -func (m *mockRepo) GetEnrichmentCache(_ context.Context, key string) (Enrichment, bool, error) { +func (m *mockRepo) GetEnrichmentCache(_ context.Context, key string) (enrich.Enrichment, bool, error) { if m.getCacheFn != nil { return m.getCacheFn(key) } - return Enrichment{}, false, nil + return enrich.Enrichment{}, false, nil } -func (m *mockRepo) SetEnrichmentCache(_ context.Context, key string, payload Enrichment, ttl time.Duration) error { +func (m *mockRepo) SetEnrichmentCache(_ context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error { if m.setCacheFn != nil { return m.setCacheFn(key, payload, ttl) } return nil } +func (m *mockRepo) ListPendingEnrichment(_ context.Context, limit int) ([]DiscoveredMarket, error) { + if m.listPendingEnrichFn != nil { + return m.listPendingEnrichFn(limit) + } + return nil, nil +} // noopLinkVerifier passes every URL — used by tests to isolate from network. type noopLinkVerifier struct{} diff --git a/backend/internal/domain/discovery/model.go b/backend/internal/domain/discovery/model.go index e22f1cb..befdcb1 100644 --- a/backend/internal/domain/discovery/model.go +++ b/backend/internal/domain/discovery/model.go @@ -5,6 +5,8 @@ import ( "time" "github.com/google/uuid" + + "marktvogt.de/backend/internal/domain/discovery/enrich" ) // SourceContribution captures what one source reported about a market. @@ -68,41 +70,17 @@ type DiscoveredMarket struct { ReviewedBy *uuid.UUID `json:"reviewed_by"` CreatedEditionID *uuid.UUID `json:"created_edition_id"` - // Enrichment payload + worker lifecycle. See Enrichment for the contract. - Enrichment Enrichment `json:"enrichment"` - EnrichmentStatus string `json:"enrichment_status"` // pending|done|failed|skipped - EnrichmentAttempts int `json:"enrichment_attempts"` - EnrichedAt *time.Time `json:"enriched_at"` + // Enrichment payload + worker lifecycle. See enrich.Enrichment for the contract. + Enrichment enrich.Enrichment `json:"enrichment"` + EnrichmentStatus string `json:"enrichment_status"` // pending|done|failed|skipped + EnrichmentAttempts int `json:"enrichment_attempts"` + EnrichedAt *time.Time `json:"enriched_at"` } -// Enrichment is the JSONB payload for discovered_markets.enrichment. -// Every field is optional; `Sources` records which pass filled each field so -// the eval harness can separate crawl-enrich (free, deterministic) from -// llm-enrich (paid, synthesised) contributions. -// -// Field shape is intentionally flat — promotion to typed columns is easier -// from a flat blob than from nested structures. -type Enrichment struct { - PLZ string `json:"plz,omitempty"` - Venue string `json:"venue,omitempty"` - Organizer string `json:"organizer,omitempty"` - Lat *float64 `json:"lat,omitempty"` - Lng *float64 `json:"lng,omitempty"` - Category string `json:"category,omitempty"` - OpeningHours string `json:"opening_hours,omitempty"` - Description string `json:"description,omitempty"` - Sources EnrichmentSources `json:"sources,omitempty"` - EnrichedAt *time.Time `json:"enriched_at,omitempty"` - Model string `json:"model,omitempty"` - InputTokens int `json:"input_tokens,omitempty"` - OutputTokens int `json:"output_tokens,omitempty"` -} - -// EnrichmentSources maps each enrichment field name to the pass that filled it. -// Values are EnrichmentProvenance constants. -type EnrichmentSources map[string]string - // EnrichmentStatus constants for discovered_markets.enrichment_status. +// The payload type + provenance constants live in the enrich package; this +// status string is a DB column value used by repository queries, kept here +// alongside the other DB-lifecycle constants. const ( EnrichmentStatusPending = "pending" EnrichmentStatusDone = "done" @@ -110,14 +88,6 @@ const ( EnrichmentStatusSkipped = "skipped" ) -// EnrichmentProvenance values for EnrichmentSources entries. The crawl pass -// covers both offline consolidation of source_contributions and Nominatim -// geocoding — both deterministic, neither counts against the LLM budget. -const ( - EnrichmentProvenanceCrawl = "crawl" - EnrichmentProvenanceLLM = "llm" -) - // RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year). type RejectedDiscovery struct { ID uuid.UUID `json:"id"` diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go index 371572f..f332c23 100644 --- a/backend/internal/domain/discovery/repository.go +++ b/backend/internal/domain/discovery/repository.go @@ -11,6 +11,8 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" + + "marktvogt.de/backend/internal/domain/discovery/enrich" ) type Repository interface { @@ -31,10 +33,13 @@ type Repository interface { // Enrichment persistence. SetEnrichment writes payload + status + bumps // attempts + stamps enriched_at in one UPDATE so the worker stays idempotent. - SetEnrichment(ctx context.Context, id uuid.UUID, payload Enrichment, status string) error + SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error + // ListPendingEnrichment returns queue rows awaiting their first enrichment + // pass. Ordered by discovered_at ASC so older rows get processed first. + ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error) // Cache operations keyed on sha256(name_normalized|stadt|year). - GetEnrichmentCache(ctx context.Context, key string) (Enrichment, bool, error) - SetEnrichmentCache(ctx context.Context, key string, payload Enrichment, ttl time.Duration) error + GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error) + SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error } // SeriesCandidate is a minimal projection used for name-normalization comparison in Go. @@ -160,6 +165,37 @@ func (r *pgRepository) CountQueue(ctx context.Context, status string) (int, erro return n, err } +// ListPendingEnrichment selects queue rows still awaiting enrichment. The +// status='pending' filter excludes accepted/rejected rows — enrichment only +// makes sense for review candidates. Ordered by discovered_at ASC so an +// operator running the bulk action sees the oldest queue entries improve first. +func (r *pgRepository) ListPendingEnrichment(ctx context.Context, limit int) ([]DiscoveredMarket, error) { + rows, err := r.pool.Query(ctx, ` +SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, + start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''), + coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status, + discovered_at, reviewed_at, reviewed_by, created_edition_id, + sources, source_contributions, + enrichment, enrichment_status, enrichment_attempts, enriched_at +FROM discovered_markets +WHERE status = 'pending' AND enrichment_status = $1 +ORDER BY discovered_at ASC +LIMIT $2`, EnrichmentStatusPending, limit) + if err != nil { + return nil, err + } + defer rows.Close() + out := make([]DiscoveredMarket, 0) + for rows.Next() { + d, err := scanDiscoveredMarket(rows) + if err != nil { + return nil, err + } + out = append(out, d) + } + return out, rows.Err() +} + func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) { row := r.pool.QueryRow(ctx, ` SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, @@ -315,7 +351,7 @@ LIMIT $1`, recentErrorsLimit) // if the caller's read-modify-write is racy. enriched_at is only stamped for // terminal states ('done' / 'failed'), not 'skipped', to distinguish "we ran // and nothing changed" from "we ran and succeeded trivially". -func (r *pgRepository) SetEnrichment(ctx context.Context, id uuid.UUID, payload Enrichment, status string) error { +func (r *pgRepository) SetEnrichment(ctx context.Context, id uuid.UUID, payload enrich.Enrichment, status string) error { payloadJSON, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal enrichment: %w", err) @@ -347,28 +383,28 @@ WHERE id = $1`, enrichedExpr), id, payloadJSON, status) // expired, (_, false, nil) on a miss or expired entry. Expired entries are // treated as misses so the caller refetches rather than relying on a // background pruner. -func (r *pgRepository) GetEnrichmentCache(ctx context.Context, key string) (Enrichment, bool, error) { +func (r *pgRepository) GetEnrichmentCache(ctx context.Context, key string) (enrich.Enrichment, bool, error) { var payloadJSON []byte err := r.pool.QueryRow(ctx, ` SELECT payload FROM enrichment_cache WHERE cache_key = $1 AND (expires_at IS NULL OR expires_at > now())`, key).Scan(&payloadJSON) if errors.Is(err, pgx.ErrNoRows) { - return Enrichment{}, false, nil + return enrich.Enrichment{}, false, nil } if err != nil { - return Enrichment{}, false, fmt.Errorf("cache get: %w", err) + return enrich.Enrichment{}, false, fmt.Errorf("cache get: %w", err) } - var e Enrichment + var e enrich.Enrichment if err := json.Unmarshal(payloadJSON, &e); err != nil { - return Enrichment{}, false, fmt.Errorf("unmarshal cache payload: %w", err) + return enrich.Enrichment{}, false, fmt.Errorf("unmarshal cache payload: %w", err) } return e, true, nil } // SetEnrichmentCache writes or replaces a cache entry. ttl==0 means "no // expiry" (expires_at stays NULL). Callers typically use 30 days. -func (r *pgRepository) SetEnrichmentCache(ctx context.Context, key string, payload Enrichment, ttl time.Duration) error { +func (r *pgRepository) SetEnrichmentCache(ctx context.Context, key string, payload enrich.Enrichment, ttl time.Duration) error { payloadJSON, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal cache payload: %w", err) diff --git a/backend/internal/domain/discovery/routes.go b/backend/internal/domain/discovery/routes.go index 5460382..c4496f8 100644 --- a/backend/internal/domain/discovery/routes.go +++ b/backend/internal/domain/discovery/routes.go @@ -28,5 +28,11 @@ func RegisterRoutes( admin.POST("/crawl-manual", h.Crawl) // Async crawl status polling. admin.GET("/crawl-status", h.CrawlStatus) + // Manual bulk crawl-enrich — deterministic consolidation + Nominatim + // geocoding applied to every enrichment_status='pending' row. Async + // (202 + polling) because the Nominatim 1rps rate means a full queue + // can take minutes. + admin.POST("/enrichment/crawl-all", h.RunCrawlEnrichAll) + admin.GET("/enrichment/crawl-all-status", h.CrawlEnrichStatus) } } diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 9000eb0..ab927e6 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -12,9 +12,21 @@ import ( "github.com/google/uuid" "marktvogt.de/backend/internal/domain/discovery/crawler" + "marktvogt.de/backend/internal/domain/discovery/enrich" "marktvogt.de/backend/internal/domain/market" ) +// Geocoder is the narrow interface the enrichment flow depends on. +// *pkg/geocode.Geocoder satisfies it. Declared here instead of in the +// sub-package enrich/ to avoid an import cycle (enrich depends on discovery +// for the DiscoveredMarket / Enrichment types). +// +// Structurally identical to enrich.Geocoder, so a concrete *geocode.Geocoder +// satisfies both and can be passed across the package boundary. +type Geocoder interface { + Geocode(ctx context.Context, street, city, zip, country string) (*float64, *float64, error) +} + type marketCreator interface { Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error) CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error) @@ -39,15 +51,21 @@ type Service struct { crawler crawlerRunner marketCreator marketCreator linkChecker linkVerifier + // geocoder is optional — nil means crawl-enrich skips lat/lng. Wired from + // server/routes.go using the shared Nominatim client (1 rps limited). + geocoder Geocoder } // NewService constructs a Service wired for the crawler-driven Crawl path. -func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service { +// geocoder may be nil; in that case crawl-enrich runs consolidation only and +// skips the lat/lng step. +func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator, gc Geocoder) *Service { return &Service{ repo: repo, crawler: cr, marketCreator: mc, linkChecker: lc, + geocoder: gc, } } @@ -446,6 +464,21 @@ func (s *Service) FindSimilarToQueueEntry(ctx context.Context, id uuid.UUID) ([] return FindSimilar(target, candidates, 0.5), nil } +// contributionsForEnrich maps the rich discovery-domain SourceContribution +// down to the narrow enrich.Contribution shape — keeps the enrich package +// free of a reverse import on discovery. +func contributionsForEnrich(cs []SourceContribution) []enrich.Contribution { + out := make([]enrich.Contribution, 0, len(cs)) + for _, c := range cs { + out = append(out, enrich.Contribution{ + PLZ: c.PLZ, + Venue: c.Venue, + Organizer: c.Organizer, + }) + } + return out +} + // toContributions translates the crawler's per-source RawEvents into the // discovery-domain SourceContribution shape for JSONB persistence. func toContributions(raws []crawler.RawEvent) []SourceContribution { @@ -485,3 +518,88 @@ func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UU } return nil } + +// CrawlEnrichSummary reports the outcome of one RunCrawlEnrichAll pass. +// Mirrors CrawlSummary's shape so the admin UI can reuse its render path. +type CrawlEnrichSummary struct { + StartedAt time.Time `json:"started_at"` + DurationMs int64 `json:"duration_ms"` + Total int `json:"total"` // rows loaded for enrichment + Succeeded int `json:"succeeded"` // enrichment_status -> done + Failed int `json:"failed"` // SetEnrichment returned error + // Bounded list of (id, error) pairs for operator debugging. Empty on + // happy paths; capped so a catastrophic run doesn't balloon the summary. + Errors []CrawlEnrichError `json:"errors"` +} + +// CrawlEnrichError records a single row that failed to persist enrichment. +type CrawlEnrichError struct { + QueueID uuid.UUID `json:"queue_id"` + Error string `json:"error"` +} + +// runCrawlEnrichAllBatchSize caps how many rows we pull per bulk pass. Bigger +// than the usual queue depth keeps us single-pass; the Nominatim 1rps limit +// is the real throughput constraint, not batch size. +const runCrawlEnrichAllBatchSize = 2000 + +// runCrawlEnrichErrorCap bounds the Errors slice so a total outage doesn't +// produce a summary the admin UI can't render. Operators can check logs for +// the full set; this is just an at-a-glance list. +const runCrawlEnrichErrorCap = 50 + +// RunCrawlEnrichAll applies crawl-enrich (source consolidation + Nominatim +// geocoding) to every queue row with enrichment_status='pending'. Persists +// the resulting payload with status='done' on success, 'failed' on error. +// +// Already-enriched rows are skipped by the repo query — this is idempotent +// only in the sense that re-running picks up any pending rows that appeared +// since the last run. Forcing a re-enrich of already-done rows is a future +// feature; for now operators must manually reset enrichment_status. +// +// Designed to be called from a goroutine with a detached context — the +// Nominatim rate limit means a 200-row queue takes 3+ minutes and must +// outlive the originating HTTP request. +func (s *Service) RunCrawlEnrichAll(ctx context.Context) (CrawlEnrichSummary, error) { + summary := CrawlEnrichSummary{StartedAt: time.Now().UTC()} + rows, err := s.repo.ListPendingEnrichment(ctx, runCrawlEnrichAllBatchSize) + if err != nil { + return summary, fmt.Errorf("list pending enrichment: %w", err) + } + summary.Total = len(rows) + + for _, row := range rows { + if err := ctx.Err(); err != nil { + // Caller cancelled — stop cleanly. Summary reflects partial + // progress; the remaining rows stay in enrichment_status='pending' + // and will be picked up by the next run. + return summary, err + } + in := enrich.Input{ + Stadt: row.Stadt, + Land: row.Land, + Contributions: contributionsForEnrich(row.SourceContributions), + } + payload := enrich.CrawlEnrich(ctx, in, s.geocoder) + if err := s.repo.SetEnrichment(ctx, row.ID, payload, EnrichmentStatusDone); err != nil { + slog.WarnContext(ctx, "set enrichment failed", + "queue_id", row.ID, "error", err) + summary.Failed++ + if len(summary.Errors) < runCrawlEnrichErrorCap { + summary.Errors = append(summary.Errors, CrawlEnrichError{ + QueueID: row.ID, + Error: err.Error(), + }) + } + continue + } + summary.Succeeded++ + } + summary.DurationMs = time.Since(summary.StartedAt).Milliseconds() + slog.InfoContext(ctx, "crawl-enrich-all completed", + "total", summary.Total, + "succeeded", summary.Succeeded, + "failed", summary.Failed, + "duration_ms", summary.DurationMs) + return summary, nil +} diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index d54ae44..8356317 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "errors" "testing" "time" @@ -9,9 +10,22 @@ import ( "github.com/jackc/pgx/v5" "marktvogt.de/backend/internal/domain/discovery/crawler" + "marktvogt.de/backend/internal/domain/discovery/enrich" "marktvogt.de/backend/internal/domain/market" ) +// stubGeocoder is a test-local geocoder that returns canned lat/lng. +type stubGeocoder struct { + lat, lng *float64 + err error +} + +func (s stubGeocoder) Geocode(_ context.Context, _, _, _, _ string) (*float64, *float64, error) { + return s.lat, s.lng, s.err +} + +func ptrFloat(v float64) *float64 { return &v } + // newMockRepo returns a mockRepo with default no-op implementations and an // inserted field that captures every InsertDiscovered call. func newMockRepo() *mockRepo { @@ -131,7 +145,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, noopLinkVerifier{}, mc) + svc := NewService(m, nil, noopLinkVerifier{}, mc, nil) _, _, err := svc.Accept(context.Background(), qID, uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -153,7 +167,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, noopLinkVerifier{}, mc) + svc := NewService(m, nil, noopLinkVerifier{}, mc, nil) _, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -184,7 +198,7 @@ func TestServiceCrawlHappyPath(t *testing.T) { PerSourceMS: map[string]int64{"marktkalendarium": 1}, }, } - svc := NewService(repo, sc, lc, noopMarketCreator{}) + svc := NewService(repo, sc, lc, noopMarketCreator{}, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -225,7 +239,7 @@ func TestServiceCrawlDedupQueue(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -256,7 +270,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil) if _, err := svc.Crawl(context.Background()); err != nil { t.Fatal(err) @@ -302,7 +316,7 @@ func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) { }, }, } - svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}, nil) // Cancel the context BEFORE Crawl runs — simulates gateway timeout // that fires while the handler is still mid-run. @@ -327,7 +341,7 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) { m := &mockRepo{ countQueueFn: func(_ context.Context, _ string) (int, error) { return 42, nil }, } - svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(m, nil, noopLinkVerifier{}, noopMarketCreator{}, nil) rows, total, err := svc.ListPendingQueuePaged(context.Background(), 50, 0) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -359,7 +373,7 @@ func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil) if _, err := svc.Crawl(context.Background()); err != nil { t.Fatal(err) @@ -408,7 +422,7 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { }, }, } - svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}, nil) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -424,3 +438,149 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { t.Errorf("Konfidenz = %q; want %q (2+ sources)", repo.inserted[0].Konfidenz, KonfidenzHoch) } } + +// TestRunCrawlEnrichAll_HappyPath exercises the bulk path: three pending rows, +// each with differently-shaped contributions, get consolidated and geocoded +// into the SetEnrichment calls. Nothing queries the LLM — this is the +// crawl-only pass. +func TestRunCrawlEnrichAll_HappyPath(t *testing.T) { + id1, id2, id3 := uuid.New(), uuid.New(), uuid.New() + rows := []DiscoveredMarket{ + { + ID: id1, Stadt: "Dresden", Land: "Deutschland", + SourceContributions: []SourceContribution{ + {SourceName: "marktkalendarium", PLZ: "01067", Venue: "Stallhof"}, + }, + }, + { + ID: id2, Stadt: "Leipzig", Land: "Deutschland", + SourceContributions: []SourceContribution{ + {SourceName: "a", Organizer: "Verein e.V."}, + {SourceName: "b", PLZ: "04109"}, + }, + }, + {ID: id3, Stadt: "", Land: "Deutschland"}, // No stadt, no contribs — skips geocode. + } + + var writes []struct { + id uuid.UUID + payload enrich.Enrichment + status string + } + repo := &mockRepo{ + listPendingEnrichFn: func(limit int) ([]DiscoveredMarket, error) { + if limit <= 0 { + t.Errorf("limit must be positive, got %d", limit) + } + return rows, nil + }, + setEnrichmentFn: func(id uuid.UUID, payload enrich.Enrichment, status string) error { + writes = append(writes, struct { + id uuid.UUID + payload enrich.Enrichment + status string + }{id, payload, status}) + return nil + }, + } + gc := stubGeocoder{lat: ptrFloat(51.05), lng: ptrFloat(13.74)} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, gc) + + summary, err := svc.RunCrawlEnrichAll(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if summary.Total != 3 { + t.Errorf("Total = %d; want 3", summary.Total) + } + if summary.Succeeded != 3 { + t.Errorf("Succeeded = %d; want 3", summary.Succeeded) + } + if summary.Failed != 0 { + t.Errorf("Failed = %d; want 0", summary.Failed) + } + if len(writes) != 3 { + t.Fatalf("expected 3 SetEnrichment calls, got %d", len(writes)) + } + // Row 1: PLZ consolidated, geocode returned. + if writes[0].payload.PLZ != "01067" || writes[0].payload.Venue != "Stallhof" { + t.Errorf("row1 payload: %+v", writes[0].payload) + } + if writes[0].payload.Lat == nil || *writes[0].payload.Lat != 51.05 { + t.Errorf("row1 lat: %v", writes[0].payload.Lat) + } + if writes[0].payload.Sources["plz"] != enrich.ProvenanceCrawl { + t.Errorf("row1 plz provenance: %q", writes[0].payload.Sources["plz"]) + } + // Row 2: first non-empty wins across contributions. + if writes[1].payload.PLZ != "04109" || writes[1].payload.Organizer != "Verein e.V." { + t.Errorf("row2 payload: %+v", writes[1].payload) + } + // Row 3: no stadt means no geocode attempt — payload has empty sources. + if writes[2].payload.Lat != nil { + t.Errorf("row3 should have no lat (no stadt), got %v", writes[2].payload.Lat) + } + // All writes mark status=done. + for i, w := range writes { + if w.status != EnrichmentStatusDone { + t.Errorf("write %d: status = %q, want %q", i, w.status, EnrichmentStatusDone) + } + } +} + +// TestRunCrawlEnrichAll_SetEnrichmentFailure verifies a row with a persist +// error is counted in Failed but does not halt the pass. +func TestRunCrawlEnrichAll_SetEnrichmentFailure(t *testing.T) { + idOK, idBad := uuid.New(), uuid.New() + rows := []DiscoveredMarket{ + {ID: idOK, Stadt: "Dresden", SourceContributions: []SourceContribution{{PLZ: "01067"}}}, + {ID: idBad, Stadt: "Leipzig", SourceContributions: []SourceContribution{{PLZ: "04109"}}}, + } + repo := &mockRepo{ + listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) { return rows, nil }, + setEnrichmentFn: func(id uuid.UUID, _ enrich.Enrichment, _ string) error { + if id == idBad { + return errors.New("db down") + } + return nil + }, + } + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil) + + summary, err := svc.RunCrawlEnrichAll(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if summary.Succeeded != 1 || summary.Failed != 1 { + t.Errorf("Succeeded=%d Failed=%d; want 1/1", summary.Succeeded, summary.Failed) + } + if len(summary.Errors) != 1 || summary.Errors[0].QueueID != idBad { + t.Errorf("expected single error pinned to idBad, got %+v", summary.Errors) + } +} + +// TestRunCrawlEnrichAll_EmptyQueueNoOp: nothing pending, zero summary, no writes. +func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) { + var writes int + repo := &mockRepo{ + listPendingEnrichFn: func(int) ([]DiscoveredMarket, error) { + return []DiscoveredMarket{}, nil + }, + setEnrichmentFn: func(uuid.UUID, enrich.Enrichment, string) error { + writes++ + return nil + }, + } + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil) + + summary, err := svc.RunCrawlEnrichAll(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if summary.Total != 0 || summary.Succeeded != 0 || summary.Failed != 0 { + t.Errorf("empty-queue summary should be all zeros, got %+v", summary) + } + if writes != 0 { + t.Errorf("SetEnrichment called %d times on empty queue", writes) + } +} diff --git a/backend/internal/server/routes.go b/backend/internal/server/routes.go index a8b5279..f7ea4be 100644 --- a/backend/internal/server/routes.go +++ b/backend/internal/server/routes.go @@ -74,7 +74,7 @@ func (s *Server) registerRoutes() { // Discovery routes discoveryRepo := discovery.NewRepository(s.db) crawlerInstance := crawler.NewCrawler(s.cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs()) - discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc) + discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder) discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour) requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token) discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken) diff --git a/web/src/routes/admin/discovery/+page.server.ts b/web/src/routes/admin/discovery/+page.server.ts index aa2c831..9f675f8 100644 --- a/web/src/routes/admin/discovery/+page.server.ts +++ b/web/src/routes/admin/discovery/+page.server.ts @@ -19,6 +19,22 @@ type SourceContribution = { organizer?: string; }; +type Enrichment = { + plz?: string; + venue?: string; + organizer?: string; + lat?: number; + lng?: number; + category?: string; + opening_hours?: string; + description?: string; + sources?: Record; + enriched_at?: string; + model?: string; + input_tokens?: number; + output_tokens?: number; +}; + type DiscoveredMarket = { id: string; markt_name: string; @@ -36,6 +52,10 @@ type DiscoveredMarket = { hinweis: string; matched_series_id: string | null; discovered_at: string; + enrichment: Enrichment; + enrichment_status: 'pending' | 'done' | 'failed' | 'skipped'; + enrichment_attempts: number; + enriched_at: string | null; }; type BucketError = { @@ -161,6 +181,24 @@ export const actions: Actions = { } }, + enrichCrawl: async ({ cookies, fetch }) => { + try { + await serverFetch(`/admin/discovery/enrichment/crawl-all`, cookies, { + method: 'POST', + fetch + }); + return { enrichStarted: true }; + } catch (err) { + if (err instanceof ApiClientError && err.status === 429) { + return fail(429, { + enrichError: `Ein Crawl-Enrich-Lauf läuft bereits. (${err.message})` + }); + } + const message = err instanceof Error ? err.message : 'Crawl-Enrich fehlgeschlagen.'; + return fail(500, { enrichError: message }); + } + }, + update: async ({ request, cookies, fetch }) => { const form = await request.formData(); const id = String(form.get('id') ?? ''); diff --git a/web/src/routes/admin/discovery/+page.svelte b/web/src/routes/admin/discovery/+page.svelte index 3467100..bd1a446 100644 --- a/web/src/routes/admin/discovery/+page.svelte +++ b/web/src/routes/admin/discovery/+page.svelte @@ -30,10 +30,31 @@ error: string; }; + type EnrichSummary = { + started_at: string; + duration_ms: number; + total: number; + succeeded: number; + failed: number; + errors: Array<{ queue_id: string; error: string }> | null; + }; + + type EnrichStatus = { + running: boolean; + started_at: string; + finished_at: string; + summary: EnrichSummary | null; + error: string; + }; + let crawling = $state(false); let crawlStatus = $state(null); let pollInterval: ReturnType | null = null; + let enriching = $state(false); + let enrichStatus = $state(null); + let enrichPollInterval: ReturnType | null = null; + // Coalesce nullable list fields (Go encodes nil slices as null). const queue = $derived(data.queue ?? []); const recentErrors = $derived(data.stats.recent_errors ?? []); @@ -182,25 +203,69 @@ }, 3000); } - // On mount: fetch current status so a page refresh picks up any in-progress - // or recently completed crawl. + async function fetchEnrichStatus(): Promise { + try { + const res = await fetch('/admin/discovery/enrichment/crawl-all-status'); + if (!res.ok) return null; + return (await res.json()) as EnrichStatus; + } catch { + return null; + } + } + + function stopEnrichPolling() { + if (enrichPollInterval !== null) { + clearInterval(enrichPollInterval); + enrichPollInterval = null; + } + } + + function startEnrichPolling() { + if (enrichPollInterval !== null) return; // already polling + enrichPollInterval = setInterval(async () => { + const status = await fetchEnrichStatus(); + if (status === null) return; + enrichStatus = status; + if (!status.running) { + stopEnrichPolling(); + enriching = false; + } + }, 3000); + } + + // On mount: fetch both statuses so a page refresh picks up any in-progress + // or recently completed crawl / enrichment run. onMount(async () => { - const status = await fetchCrawlStatus(); - if (status === null) return; - crawlStatus = status; - if (status.running) { - crawling = true; - startPolling(); + const [cs, es] = await Promise.all([fetchCrawlStatus(), fetchEnrichStatus()]); + if (cs !== null) { + crawlStatus = cs; + if (cs.running) { + crawling = true; + startPolling(); + } + } + if (es !== null) { + enrichStatus = es; + if (es.running) { + enriching = true; + startEnrichPolling(); + } } }); - // When the crawl action returns crawlStarted, begin polling. + // When an action returns a *Started flag, begin the corresponding poller. $effect(() => { if (form?.crawlStarted) { crawling = true; startPolling(); } }); + $effect(() => { + if (form?.enrichStarted) { + enriching = true; + startEnrichPolling(); + } + }); // Elapsed time display while running. const elapsedLabel = $derived.by(() => { @@ -215,6 +280,18 @@ const displayError = $derived( crawlStatus && !crawlStatus.running && crawlStatus.error ? crawlStatus.error : null ); + + // Enrichment-side display derived values — same shape as crawl for UI reuse. + const enrichElapsedLabel = $derived.by(() => { + if (!enrichStatus?.running || !enrichStatus.started_at) return ''; + const ms = Date.now() - new Date(enrichStatus.started_at).getTime(); + const s = Math.round(ms / 1000); + return s < 60 ? `${s}s` : `${Math.floor(s / 60)}m ${s % 60}s`; + }); + const enrichDisplaySummary = $derived(enrichStatus?.summary ?? null); + const enrichDisplayError = $derived( + enrichStatus && !enrichStatus.running && enrichStatus.error ? enrichStatus.error : null + ); @@ -281,7 +358,7 @@ {/if} -
+
+
{ + enriching = true; + return async ({ update }) => { + await update({ reset: false }); + }; + }} + > + +
{#if form?.crawlError} @@ -399,6 +499,81 @@
{/if} + {#if form?.enrichError} +
+ {form.enrichError} +
+ {/if} + + {#if enrichDisplayError} +
+ Crawl-enrich fehlgeschlagen: {enrichDisplayError} +
+ {/if} + + {#if enrichDisplaySummary} + {@const es = enrichDisplaySummary} +
+
+

Crawl-Enrich-Ergebnis

+ {(es.duration_ms / 1000).toFixed(1)} s · {es.started_at + .slice(0, 16) + .replace('T', ' ')} +
+
+
+
Total
+
{es.total}
+
+
+
Succeeded
+
+ {es.succeeded} +
+
+
+
Failed
+
+ {es.failed} +
+
+
+ {#if es.errors && es.errors.length > 0} +
+ {es.errors.length} Fehler beim Persistieren +
    + {#each es.errors as e} +
  • + {e.queue_id.slice(0, 8)} + {e.error} +
  • + {/each} +
+
+ {/if} +
+ {/if} +

Queue

{#if (data.total ?? 0) > 0} @@ -660,6 +835,62 @@

{row.hinweis}

{/if} + + {#if row.enrichment_status && row.enrichment_status !== 'pending'} +
+ Enrichment + + {row.enrichment_status} + +
+
+ {#if row.enrichment.plz} +
PLZ
+
+ {row.enrichment.plz} + ({row.enrichment.sources?.plz ?? '—'}) +
+ {/if} + {#if row.enrichment.venue} +
Venue
+
+ {row.enrichment.venue} + ({row.enrichment.sources?.venue ?? '—'}) +
+ {/if} + {#if row.enrichment.organizer} +
Organizer
+
+ {row.enrichment.organizer} + ({row.enrichment.sources?.organizer ?? '—'}) +
+ {/if} + {#if row.enrichment.lat != null && row.enrichment.lng != null} +
Lat/Lng
+
+ {row.enrichment.lat.toFixed(4)}, {row.enrichment.lng.toFixed(4)} + ({row.enrichment.sources?.lat ?? '—'}) +
+ {/if} +
+ {/if}