From cc6c4f2efba197537eee0c23a5582ba0e64462b7 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Sun, 19 Apr 2026 00:27:34 +0200 Subject: [PATCH] feat(discovery): persist and display per-source contributions for merged queue rows Migration 000018 adds sources text[] + source_contributions jsonb columns to discovered_markets. Crawler's merger now preserves the raw per-source RawEvents through Merge() so they can be stored alongside the merged row. Admin UI gains two surfaces: (a) compact "merged from source1 + source2" chip + amber Datumskonflikt badge when hinweis flags it, (b) expandable Quellen-Vergleich panel showing a per-field comparison table with diverging fields highlighted. Forensic visibility into what each source said vs what the merger picked. --- .../domain/discovery/crawler/merger.go | 1 + .../domain/discovery/crawler/types.go | 5 + backend/internal/domain/discovery/model.go | 61 ++++++--- .../internal/domain/discovery/repository.go | 68 +++++++--- backend/internal/domain/discovery/service.go | 54 ++++++-- .../internal/domain/discovery/service_test.go | 56 ++++++++ ...red_markets_sources_contributions.down.sql | 4 + ...vered_markets_sources_contributions.up.sql | 7 + .../routes/admin/discovery/+page.server.ts | 18 +++ web/src/routes/admin/discovery/+page.svelte | 46 +++++++ .../admin/discovery/ContributionsPanel.svelte | 125 ++++++++++++++++++ 11 files changed, 393 insertions(+), 52 deletions(-) create mode 100644 backend/migrations/000018_discovered_markets_sources_contributions.down.sql create mode 100644 backend/migrations/000018_discovered_markets_sources_contributions.up.sql create mode 100644 web/src/routes/admin/discovery/ContributionsPanel.svelte diff --git a/backend/internal/domain/discovery/crawler/merger.go b/backend/internal/domain/discovery/crawler/merger.go index 8f9422c..db62b4c 100644 --- a/backend/internal/domain/discovery/crawler/merger.go +++ b/backend/internal/domain/discovery/crawler/merger.go @@ -172,6 +172,7 @@ func mergeGroup(raws []RawEvent) MergedEvent { m.Quellen = sortedKeys(quellenSet) m.Sources = sortedKeys(sourceSet) + m.Contributions = raws // raws is sorted by source rank (best first) return m } diff --git a/backend/internal/domain/discovery/crawler/types.go b/backend/internal/domain/discovery/crawler/types.go index 4eef9f6..ff244bb 100644 --- a/backend/internal/domain/discovery/crawler/types.go +++ b/backend/internal/domain/discovery/crawler/types.go @@ -47,6 +47,11 @@ type MergedEvent struct { Quellen []string Hinweis string Sources []string + + // Contributions holds the raw per-source inputs that fed this merge. + // Preserved so downstream code can surface per-field conflicts in the + // admin UI without re-running merger logic. + Contributions []RawEvent } // SourceError records a per-source failure without stopping the whole crawl. diff --git a/backend/internal/domain/discovery/model.go b/backend/internal/domain/discovery/model.go index 31b9826..4398d60 100644 --- a/backend/internal/domain/discovery/model.go +++ b/backend/internal/domain/discovery/model.go @@ -7,6 +7,25 @@ import ( "github.com/google/uuid" ) +// SourceContribution captures what one source reported about a market. +// Persisted as a JSONB array in discovered_markets.source_contributions +// so the admin UI can render per-field conflicts between sources. +type SourceContribution struct { + SourceName string `json:"source_name"` + SourceURL string `json:"source_url,omitempty"` + DetailURL string `json:"detail_url,omitempty"` + Name string `json:"name,omitempty"` + City string `json:"city,omitempty"` + PLZ string `json:"plz,omitempty"` + Land string `json:"land,omitempty"` + Bundesland string `json:"bundesland,omitempty"` + StartDate *time.Time `json:"start_date,omitempty"` + EndDate *time.Time `json:"end_date,omitempty"` + Website string `json:"website,omitempty"` + Venue string `json:"venue,omitempty"` + Organizer string `json:"organizer,omitempty"` +} + // Bucket is a scheduler row: one (land, region, year_month, halbmonat) tuple. // Halbmonat splits the month into H1 (days 1-15) and H2 (days 16-EOM) so each // Pass 0 call covers a smaller window and fits within the 4096-token response. @@ -26,26 +45,28 @@ type Bucket struct { // the separate confirmation signal from Pass 0 (bestaetigt|unklar| // vorjahr_unbestaetigt|abgesagt). type DiscoveredMarket struct { - ID uuid.UUID `json:"id"` - BucketID *uuid.UUID `json:"bucket_id"` - MarktName string `json:"markt_name"` - Stadt string `json:"stadt"` - Bundesland string `json:"bundesland"` - Land string `json:"land"` - StartDatum *time.Time `json:"start_datum"` - EndDatum *time.Time `json:"end_datum"` - Website string `json:"website"` - Quellen []string `json:"quellen"` - Konfidenz string `json:"konfidenz"` // 'hoch' | 'mittel' | 'niedrig' - AgentStatus string `json:"agent_status"` // 'bestaetigt' | 'unklar' | 'vorjahr_unbestaetigt' | 'abgesagt' - Hinweis string `json:"hinweis"` - NameNormalized string `json:"name_normalized"` - MatchedSeriesID *uuid.UUID `json:"matched_series_id"` - Status string `json:"status"` // queue lifecycle - DiscoveredAt time.Time `json:"discovered_at"` - ReviewedAt *time.Time `json:"reviewed_at"` - ReviewedBy *uuid.UUID `json:"reviewed_by"` - CreatedEditionID *uuid.UUID `json:"created_edition_id"` + ID uuid.UUID `json:"id"` + BucketID *uuid.UUID `json:"bucket_id"` + MarktName string `json:"markt_name"` + Stadt string `json:"stadt"` + Bundesland string `json:"bundesland"` + Land string `json:"land"` + StartDatum *time.Time `json:"start_datum"` + EndDatum *time.Time `json:"end_datum"` + Website string `json:"website"` + Quellen []string `json:"quellen"` + Sources []string `json:"sources"` + SourceContributions []SourceContribution `json:"source_contributions"` + Konfidenz string `json:"konfidenz"` // 'hoch' | 'mittel' | 'niedrig' + AgentStatus string `json:"agent_status"` // 'bestaetigt' | 'unklar' | 'vorjahr_unbestaetigt' | 'abgesagt' + Hinweis string `json:"hinweis"` + NameNormalized string `json:"name_normalized"` + MatchedSeriesID *uuid.UUID `json:"matched_series_id"` + Status string `json:"status"` // queue lifecycle + DiscoveredAt time.Time `json:"discovered_at"` + ReviewedAt *time.Time `json:"reviewed_at"` + ReviewedBy *uuid.UUID `json:"reviewed_by"` + CreatedEditionID *uuid.UUID `json:"created_edition_id"` } // RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year). diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go index 30f5d4a..752dc79 100644 --- a/backend/internal/domain/discovery/repository.go +++ b/backend/internal/domain/discovery/repository.go @@ -3,6 +3,7 @@ package discovery import ( "context" + "encoding/json" "fmt" "time" @@ -71,15 +72,21 @@ func (r *pgRepository) EditionExists(ctx context.Context, seriesID uuid.UUID, ye } func (r *pgRepository) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) { + contribJSON, err := json.Marshal(d.SourceContributions) + if err != nil { + return uuid.Nil, fmt.Errorf("marshal source_contributions: %w", err) + } var id uuid.UUID - err := r.pool.QueryRow(ctx, ` + err = r.pool.QueryRow(ctx, ` INSERT INTO discovered_markets (bucket_id, markt_name, stadt, bundesland, land, start_datum, end_datum, website, - quellen, konfidenz, agent_status, hinweis, name_normalized, matched_series_id) -VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14) + quellen, konfidenz, agent_status, hinweis, name_normalized, matched_series_id, + sources, source_contributions) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16) RETURNING id`, d.BucketID, d.MarktName, d.Stadt, d.Bundesland, d.Land, d.StartDatum, d.EndDatum, d.Website, - d.Quellen, nilIfEmpty(d.Konfidenz), nilIfEmpty(d.AgentStatus), d.Hinweis, d.NameNormalized, d.MatchedSeriesID).Scan(&id) + d.Quellen, nilIfEmpty(d.Konfidenz), nilIfEmpty(d.AgentStatus), d.Hinweis, d.NameNormalized, d.MatchedSeriesID, + d.Sources, contribJSON).Scan(&id) return id, err } @@ -116,7 +123,8 @@ func (r *pgRepository) ListQueue(ctx context.Context, status string, limit, offs SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''), coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status, - discovered_at, reviewed_at, reviewed_by, created_edition_id + discovered_at, reviewed_at, reviewed_by, created_edition_id, + sources, source_contributions FROM discovered_markets WHERE status = $1 ORDER BY discovered_at DESC @@ -127,13 +135,8 @@ LIMIT $2 OFFSET $3`, status, limit, offset) defer rows.Close() out := make([]DiscoveredMarket, 0) for rows.Next() { - var d DiscoveredMarket - if err := rows.Scan( - &d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land, - &d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Konfidenz, - &d.AgentStatus, &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status, - &d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID, - ); err != nil { + d, err := scanDiscoveredMarket(rows) + if err != nil { return nil, err } out = append(out, d) @@ -149,19 +152,48 @@ func (r *pgRepository) CountQueue(ctx context.Context, status string) (int, erro } func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) { - var d DiscoveredMarket - err := r.pool.QueryRow(ctx, ` + row := r.pool.QueryRow(ctx, ` SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''), coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status, - discovered_at, reviewed_at, reviewed_by, created_edition_id -FROM discovered_markets WHERE id = $1`, id).Scan( + discovered_at, reviewed_at, reviewed_by, created_edition_id, + sources, source_contributions +FROM discovered_markets WHERE id = $1`, id) + return scanDiscoveredMarket(row) +} + +// scanner is the common interface between pgx.Row and pgx.Rows, both of which +// expose a Scan method with identical signature. +type scanner interface { + Scan(dest ...any) error +} + +// scanDiscoveredMarket reads one DiscoveredMarket from a pgx row/rows scanner. +// Used by both ListQueue and GetDiscovered to stay in sync with the column list. +func scanDiscoveredMarket(s scanner) (DiscoveredMarket, error) { + var d DiscoveredMarket + var contribJSON []byte + if err := s.Scan( &d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land, &d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Konfidenz, &d.AgentStatus, &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status, &d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID, - ) - return d, err + &d.Sources, &contribJSON, + ); err != nil { + return DiscoveredMarket{}, err + } + if len(contribJSON) > 0 { + if err := json.Unmarshal(contribJSON, &d.SourceContributions); err != nil { + return DiscoveredMarket{}, fmt.Errorf("unmarshal source_contributions: %w", err) + } + } + if d.Sources == nil { + d.Sources = []string{} + } + if d.SourceContributions == nil { + d.SourceContributions = []SourceContribution{} + } + return d, nil } func (r *pgRepository) MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error { diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index a543ebc..9000eb0 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -200,20 +200,22 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { } dm := DiscoveredMarket{ - BucketID: nil, - MarktName: m.Name, - Stadt: m.City, - Bundesland: m.Bundesland, - Land: m.Land, - StartDatum: m.StartDate, - EndDatum: endDatum, - Website: website, - Quellen: quellen, - Konfidenz: crawlerKonfidenz(m), - AgentStatus: AgentStatusCrawler, - Hinweis: m.Hinweis, - NameNormalized: nameNorm, - MatchedSeriesID: matchedSeriesID, + BucketID: nil, + MarktName: m.Name, + Stadt: m.City, + Bundesland: m.Bundesland, + Land: m.Land, + StartDatum: m.StartDate, + EndDatum: endDatum, + Website: website, + Quellen: quellen, + Sources: m.Sources, + SourceContributions: toContributions(m.Contributions), + Konfidenz: crawlerKonfidenz(m), + AgentStatus: AgentStatusCrawler, + Hinweis: m.Hinweis, + NameNormalized: nameNorm, + MatchedSeriesID: matchedSeriesID, } issues := ValidateForInsert(dm, nil) @@ -444,6 +446,30 @@ func (s *Service) FindSimilarToQueueEntry(ctx context.Context, id uuid.UUID) ([] return FindSimilar(target, candidates, 0.5), nil } +// toContributions translates the crawler's per-source RawEvents into the +// discovery-domain SourceContribution shape for JSONB persistence. +func toContributions(raws []crawler.RawEvent) []SourceContribution { + out := make([]SourceContribution, 0, len(raws)) + for _, r := range raws { + out = append(out, SourceContribution{ + SourceName: r.SourceName, + SourceURL: r.SourceURL, + DetailURL: r.DetailURL, + Name: r.Name, + City: r.City, + PLZ: r.PLZ, + Land: r.Land, + Bundesland: r.Bundesland, + StartDate: r.StartDate, + EndDate: r.EndDate, + Website: r.Website, + Venue: r.Venue, + Organizer: r.Organizer, + }) + } + return out +} + // findSeriesMatch returns the ID of the first candidate whose normalized name matches // incomingName after normalization. Candidates are expected to be pre-filtered by city. func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID { diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index 511e4fb..d54ae44 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -340,6 +340,62 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) { } } +func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) { + repo := newMockRepo() + start := mustParseDate(t, "2026-05-01") + + sc := &stubCrawlerRunner{ + result: crawler.CrawlResult{ + PerSource: map[string][]crawler.RawEvent{ + "marktkalendarium": { + {SourceName: "marktkalendarium", SourceURL: "https://mk/", Name: "Markt X", + City: "Dresden", PLZ: "01067", StartDate: start, Website: "https://organizer.de"}, + }, + "mittelaltermarkt_online": { + {SourceName: "mittelaltermarkt_online", SourceURL: "https://mo/", + DetailURL: "https://mo/e/1", Name: "Markt X", City: "Dresden", + PLZ: "01067", StartDate: start, Venue: "Stallhof"}, + }, + }, + }, + } + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + + if _, err := svc.Crawl(context.Background()); err != nil { + t.Fatal(err) + } + if len(repo.inserted) != 1 { + t.Fatalf("inserted = %d; want 1 (merged from two sources)", len(repo.inserted)) + } + got := repo.inserted[0] + + // Sources list populated. + if len(got.Sources) != 2 { + t.Errorf("Sources = %v; want 2 entries", got.Sources) + } + + // Contributions populated with both raws. + if len(got.SourceContributions) != 2 { + t.Fatalf("SourceContributions = %d; want 2", len(got.SourceContributions)) + } + // First contribution should be the highest-rank source + // (mittelaltermarkt_online = rank 1, marktkalendarium = rank 2). + if got.SourceContributions[0].SourceName != "mittelaltermarkt_online" { + t.Errorf("first contribution source = %q; want mittelaltermarkt_online (rank 1)", + got.SourceContributions[0].SourceName) + } + // Each contribution preserves its source's original field values. + var seenVenue bool + for _, c := range got.SourceContributions { + if c.Venue == "Stallhof" { + seenVenue = true + } + } + if !seenVenue { + t.Error("expected Venue=Stallhof preserved in the mittelaltermarkt_online contribution") + } +} + func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { repo := newMockRepo() start := mustParseDate(t, "2026-05-01") diff --git a/backend/migrations/000018_discovered_markets_sources_contributions.down.sql b/backend/migrations/000018_discovered_markets_sources_contributions.down.sql new file mode 100644 index 0000000..1e24b53 --- /dev/null +++ b/backend/migrations/000018_discovered_markets_sources_contributions.down.sql @@ -0,0 +1,4 @@ +DROP INDEX IF EXISTS idx_discovered_markets_sources_gin; +ALTER TABLE discovered_markets + DROP COLUMN IF EXISTS source_contributions, + DROP COLUMN IF EXISTS sources; diff --git a/backend/migrations/000018_discovered_markets_sources_contributions.up.sql b/backend/migrations/000018_discovered_markets_sources_contributions.up.sql new file mode 100644 index 0000000..1952c4b --- /dev/null +++ b/backend/migrations/000018_discovered_markets_sources_contributions.up.sql @@ -0,0 +1,7 @@ +ALTER TABLE discovered_markets + ADD COLUMN sources text[] NOT NULL DEFAULT '{}', + ADD COLUMN source_contributions jsonb NOT NULL DEFAULT '[]'::jsonb; + +-- Helpful indexes for potential future filtering. +CREATE INDEX IF NOT EXISTS idx_discovered_markets_sources_gin + ON discovered_markets USING GIN (sources); diff --git a/web/src/routes/admin/discovery/+page.server.ts b/web/src/routes/admin/discovery/+page.server.ts index f0a5e09..063facb 100644 --- a/web/src/routes/admin/discovery/+page.server.ts +++ b/web/src/routes/admin/discovery/+page.server.ts @@ -3,6 +3,22 @@ import { redirect, fail } from '@sveltejs/kit'; import { serverFetch } from '$lib/api/client.server.js'; import { ApiClientError } from '$lib/api/client.js'; +type SourceContribution = { + source_name: string; + source_url?: string; + detail_url?: string; + name?: string; + city?: string; + plz?: string; + land?: string; + bundesland?: string; + start_date?: string | null; + end_date?: string | null; + website?: string; + venue?: string; + organizer?: string; +}; + type DiscoveredMarket = { id: string; markt_name: string; @@ -13,6 +29,8 @@ type DiscoveredMarket = { end_datum: string | null; website: string; quellen: string[]; + sources: string[]; + source_contributions: SourceContribution[]; konfidenz: string; // 'hoch' | 'mittel' | 'niedrig' agent_status: string; // 'bestaetigt' | 'unklar' | 'vorjahr_unbestaetigt' | 'abgesagt' hinweis: string; diff --git a/web/src/routes/admin/discovery/+page.svelte b/web/src/routes/admin/discovery/+page.svelte index e773867..3467100 100644 --- a/web/src/routes/admin/discovery/+page.svelte +++ b/web/src/routes/admin/discovery/+page.svelte @@ -2,6 +2,7 @@ import { onMount } from 'svelte'; import { enhance } from '$app/forms'; import { goto } from '$app/navigation'; + import ContributionsPanel from './ContributionsPanel.svelte'; let { data, form } = $props(); @@ -58,6 +59,12 @@ let similarLoading = $state(false); let similarEntries = $state([]); + // Quellen-Vergleich panel — one open at a time. + let quellenVergleichOpenId: string | null = $state(null); + function toggleQuellenVergleich(id: string) { + quellenVergleichOpenId = quellenVergleichOpenId === id ? null : id; + } + async function toggleSimilar(id: string) { if (similarOpenId === id) { similarOpenId = null; @@ -454,6 +461,19 @@ {row.agent_status} {/if} + {#if row.hinweis?.includes('date_conflict')} + + ⚠ Datumskonflikt + + {/if} + {#if (row.sources?.length ?? 0) >= 2} +
+ merged: {row.sources.join(' + ')} +
+ {/if} {row.stadt} @@ -515,6 +535,17 @@ > Similar + {#if (row.sources?.length ?? 0) >= 2} + + {/if} {#if similarOpenId === row.id} @@ -573,6 +604,21 @@ {/if} + {#if quellenVergleichOpenId === row.id} + + + +
+ Quellen-Vergleich +
+ + + + {/if} {#if expandedId === row.id} + type SourceContribution = { + source_name: string; + source_url?: string; + detail_url?: string; + name?: string; + city?: string; + plz?: string; + land?: string; + bundesland?: string; + start_date?: string | null; + end_date?: string | null; + website?: string; + venue?: string; + organizer?: string; + }; + + let { contributions }: { contributions: SourceContribution[] } = $props(); + + type FieldDef = { + label: string; + key: keyof SourceContribution; + format?: (v: string | null | undefined) => string; + }; + + const fields: FieldDef[] = [ + { label: 'Name', key: 'name' }, + { label: 'Stadt', key: 'city' }, + { label: 'PLZ', key: 'plz' }, + { label: 'Land', key: 'land' }, + { label: 'Bundesland', key: 'bundesland' }, + { + label: 'Start-Datum', + key: 'start_date', + format: (v) => (v ? v.slice(0, 10) : '') + }, + { + label: 'End-Datum', + key: 'end_date', + format: (v) => (v ? v.slice(0, 10) : '') + }, + { label: 'Website', key: 'website' }, + { label: 'Venue', key: 'venue' }, + { label: 'Veranstalter', key: 'organizer' } + ]; + + // Returns the display value for a contribution field. + function cellValue(c: SourceContribution, field: FieldDef): string { + const raw = c[field.key] as string | null | undefined; + if (field.format) return field.format(raw); + return raw ?? ''; + } + + // A field has a conflict if at least two sources have non-empty, different values. + function hasConflict(field: FieldDef): boolean { + const values = contributions.map((c) => cellValue(c, field)).filter((v) => v !== ''); + if (values.length < 2) return false; + return values.some((v) => v !== values[0]); + } + + +{#if contributions.length === 0} +

Keine Quelldaten vorhanden.

+{:else} +
+

+ Rang 1 = höchste Priorität · Gewinnende Quelle: + {contributions[0].source_name} + · Amber = abweichende Werte zwischen Quellen +

+ + + + + {#each contributions as c, i} + + {/each} + + + + {#each fields as field} + {@const conflict = hasConflict(field)} + + + {#each contributions as c} + {@const val = cellValue(c, field)} + + {/each} + + {/each} + +
Feld + {c.source_name} + {#if i === 0} + Rang 1 + {/if} +
+ {field.label} + {#if conflict} + + {/if} + + {#if val} + {val} + {:else} + + {/if} +
+
+{/if}