feat(discovery): persist and display per-source contributions for merged queue rows

Migration 000018 adds sources text[] + source_contributions jsonb
columns to discovered_markets. Crawler's merger now preserves the raw
per-source RawEvents through Merge() so they can be stored alongside
the merged row. Admin UI gains two surfaces: (a) compact "merged from
source1 + source2" chip + amber Datumskonflikt badge when hinweis
flags it, (b) expandable Quellen-Vergleich panel showing a per-field
comparison table with diverging fields highlighted. Forensic visibility
into what each source said vs what the merger picked.
This commit is contained in:
2026-04-19 00:27:34 +02:00
parent f22a141615
commit cc6c4f2efb
11 changed files with 393 additions and 52 deletions

View File

@@ -172,6 +172,7 @@ func mergeGroup(raws []RawEvent) MergedEvent {
m.Quellen = sortedKeys(quellenSet)
m.Sources = sortedKeys(sourceSet)
m.Contributions = raws // raws is sorted by source rank (best first)
return m
}

View File

@@ -47,6 +47,11 @@ type MergedEvent struct {
Quellen []string
Hinweis string
Sources []string
// Contributions holds the raw per-source inputs that fed this merge.
// Preserved so downstream code can surface per-field conflicts in the
// admin UI without re-running merger logic.
Contributions []RawEvent
}
// SourceError records a per-source failure without stopping the whole crawl.

View File

@@ -7,6 +7,25 @@ import (
"github.com/google/uuid"
)
// SourceContribution captures what one source reported about a market.
// Persisted as a JSONB array in discovered_markets.source_contributions
// so the admin UI can render per-field conflicts between sources.
type SourceContribution struct {
SourceName string `json:"source_name"`
SourceURL string `json:"source_url,omitempty"`
DetailURL string `json:"detail_url,omitempty"`
Name string `json:"name,omitempty"`
City string `json:"city,omitempty"`
PLZ string `json:"plz,omitempty"`
Land string `json:"land,omitempty"`
Bundesland string `json:"bundesland,omitempty"`
StartDate *time.Time `json:"start_date,omitempty"`
EndDate *time.Time `json:"end_date,omitempty"`
Website string `json:"website,omitempty"`
Venue string `json:"venue,omitempty"`
Organizer string `json:"organizer,omitempty"`
}
// Bucket is a scheduler row: one (land, region, year_month, halbmonat) tuple.
// Halbmonat splits the month into H1 (days 1-15) and H2 (days 16-EOM) so each
// Pass 0 call covers a smaller window and fits within the 4096-token response.
@@ -26,26 +45,28 @@ type Bucket struct {
// the separate confirmation signal from Pass 0 (bestaetigt|unklar|
// vorjahr_unbestaetigt|abgesagt).
type DiscoveredMarket struct {
ID uuid.UUID `json:"id"`
BucketID *uuid.UUID `json:"bucket_id"`
MarktName string `json:"markt_name"`
Stadt string `json:"stadt"`
Bundesland string `json:"bundesland"`
Land string `json:"land"`
StartDatum *time.Time `json:"start_datum"`
EndDatum *time.Time `json:"end_datum"`
Website string `json:"website"`
Quellen []string `json:"quellen"`
Konfidenz string `json:"konfidenz"` // 'hoch' | 'mittel' | 'niedrig'
AgentStatus string `json:"agent_status"` // 'bestaetigt' | 'unklar' | 'vorjahr_unbestaetigt' | 'abgesagt'
Hinweis string `json:"hinweis"`
NameNormalized string `json:"name_normalized"`
MatchedSeriesID *uuid.UUID `json:"matched_series_id"`
Status string `json:"status"` // queue lifecycle
DiscoveredAt time.Time `json:"discovered_at"`
ReviewedAt *time.Time `json:"reviewed_at"`
ReviewedBy *uuid.UUID `json:"reviewed_by"`
CreatedEditionID *uuid.UUID `json:"created_edition_id"`
ID uuid.UUID `json:"id"`
BucketID *uuid.UUID `json:"bucket_id"`
MarktName string `json:"markt_name"`
Stadt string `json:"stadt"`
Bundesland string `json:"bundesland"`
Land string `json:"land"`
StartDatum *time.Time `json:"start_datum"`
EndDatum *time.Time `json:"end_datum"`
Website string `json:"website"`
Quellen []string `json:"quellen"`
Sources []string `json:"sources"`
SourceContributions []SourceContribution `json:"source_contributions"`
Konfidenz string `json:"konfidenz"` // 'hoch' | 'mittel' | 'niedrig'
AgentStatus string `json:"agent_status"` // 'bestaetigt' | 'unklar' | 'vorjahr_unbestaetigt' | 'abgesagt'
Hinweis string `json:"hinweis"`
NameNormalized string `json:"name_normalized"`
MatchedSeriesID *uuid.UUID `json:"matched_series_id"`
Status string `json:"status"` // queue lifecycle
DiscoveredAt time.Time `json:"discovered_at"`
ReviewedAt *time.Time `json:"reviewed_at"`
ReviewedBy *uuid.UUID `json:"reviewed_by"`
CreatedEditionID *uuid.UUID `json:"created_edition_id"`
}
// RejectedDiscovery stores a sticky rejection scoped to (normalized_name, city, year).

View File

@@ -3,6 +3,7 @@ package discovery
import (
"context"
"encoding/json"
"fmt"
"time"
@@ -71,15 +72,21 @@ func (r *pgRepository) EditionExists(ctx context.Context, seriesID uuid.UUID, ye
}
func (r *pgRepository) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) {
contribJSON, err := json.Marshal(d.SourceContributions)
if err != nil {
return uuid.Nil, fmt.Errorf("marshal source_contributions: %w", err)
}
var id uuid.UUID
err := r.pool.QueryRow(ctx, `
err = r.pool.QueryRow(ctx, `
INSERT INTO discovered_markets
(bucket_id, markt_name, stadt, bundesland, land, start_datum, end_datum, website,
quellen, konfidenz, agent_status, hinweis, name_normalized, matched_series_id)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14)
quellen, konfidenz, agent_status, hinweis, name_normalized, matched_series_id,
sources, source_contributions)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)
RETURNING id`,
d.BucketID, d.MarktName, d.Stadt, d.Bundesland, d.Land, d.StartDatum, d.EndDatum, d.Website,
d.Quellen, nilIfEmpty(d.Konfidenz), nilIfEmpty(d.AgentStatus), d.Hinweis, d.NameNormalized, d.MatchedSeriesID).Scan(&id)
d.Quellen, nilIfEmpty(d.Konfidenz), nilIfEmpty(d.AgentStatus), d.Hinweis, d.NameNormalized, d.MatchedSeriesID,
d.Sources, contribJSON).Scan(&id)
return id, err
}
@@ -116,7 +123,8 @@ func (r *pgRepository) ListQueue(ctx context.Context, status string, limit, offs
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''),
coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id
discovered_at, reviewed_at, reviewed_by, created_edition_id,
sources, source_contributions
FROM discovered_markets
WHERE status = $1
ORDER BY discovered_at DESC
@@ -127,13 +135,8 @@ LIMIT $2 OFFSET $3`, status, limit, offset)
defer rows.Close()
out := make([]DiscoveredMarket, 0)
for rows.Next() {
var d DiscoveredMarket
if err := rows.Scan(
&d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land,
&d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Konfidenz,
&d.AgentStatus, &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status,
&d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID,
); err != nil {
d, err := scanDiscoveredMarket(rows)
if err != nil {
return nil, err
}
out = append(out, d)
@@ -149,19 +152,48 @@ func (r *pgRepository) CountQueue(ctx context.Context, status string) (int, erro
}
func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) {
var d DiscoveredMarket
err := r.pool.QueryRow(ctx, `
row := r.pool.QueryRow(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(konfidenz,''),
coalesce(agent_status,''), coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id
FROM discovered_markets WHERE id = $1`, id).Scan(
discovered_at, reviewed_at, reviewed_by, created_edition_id,
sources, source_contributions
FROM discovered_markets WHERE id = $1`, id)
return scanDiscoveredMarket(row)
}
// scanner is the common interface between pgx.Row and pgx.Rows, both of which
// expose a Scan method with identical signature.
type scanner interface {
Scan(dest ...any) error
}
// scanDiscoveredMarket reads one DiscoveredMarket from a pgx row/rows scanner.
// Used by both ListQueue and GetDiscovered to stay in sync with the column list.
func scanDiscoveredMarket(s scanner) (DiscoveredMarket, error) {
var d DiscoveredMarket
var contribJSON []byte
if err := s.Scan(
&d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land,
&d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Konfidenz,
&d.AgentStatus, &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status,
&d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID,
)
return d, err
&d.Sources, &contribJSON,
); err != nil {
return DiscoveredMarket{}, err
}
if len(contribJSON) > 0 {
if err := json.Unmarshal(contribJSON, &d.SourceContributions); err != nil {
return DiscoveredMarket{}, fmt.Errorf("unmarshal source_contributions: %w", err)
}
}
if d.Sources == nil {
d.Sources = []string{}
}
if d.SourceContributions == nil {
d.SourceContributions = []SourceContribution{}
}
return d, nil
}
func (r *pgRepository) MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error {

View File

@@ -200,20 +200,22 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
}
dm := DiscoveredMarket{
BucketID: nil,
MarktName: m.Name,
Stadt: m.City,
Bundesland: m.Bundesland,
Land: m.Land,
StartDatum: m.StartDate,
EndDatum: endDatum,
Website: website,
Quellen: quellen,
Konfidenz: crawlerKonfidenz(m),
AgentStatus: AgentStatusCrawler,
Hinweis: m.Hinweis,
NameNormalized: nameNorm,
MatchedSeriesID: matchedSeriesID,
BucketID: nil,
MarktName: m.Name,
Stadt: m.City,
Bundesland: m.Bundesland,
Land: m.Land,
StartDatum: m.StartDate,
EndDatum: endDatum,
Website: website,
Quellen: quellen,
Sources: m.Sources,
SourceContributions: toContributions(m.Contributions),
Konfidenz: crawlerKonfidenz(m),
AgentStatus: AgentStatusCrawler,
Hinweis: m.Hinweis,
NameNormalized: nameNorm,
MatchedSeriesID: matchedSeriesID,
}
issues := ValidateForInsert(dm, nil)
@@ -444,6 +446,30 @@ func (s *Service) FindSimilarToQueueEntry(ctx context.Context, id uuid.UUID) ([]
return FindSimilar(target, candidates, 0.5), nil
}
// toContributions translates the crawler's per-source RawEvents into the
// discovery-domain SourceContribution shape for JSONB persistence.
func toContributions(raws []crawler.RawEvent) []SourceContribution {
out := make([]SourceContribution, 0, len(raws))
for _, r := range raws {
out = append(out, SourceContribution{
SourceName: r.SourceName,
SourceURL: r.SourceURL,
DetailURL: r.DetailURL,
Name: r.Name,
City: r.City,
PLZ: r.PLZ,
Land: r.Land,
Bundesland: r.Bundesland,
StartDate: r.StartDate,
EndDate: r.EndDate,
Website: r.Website,
Venue: r.Venue,
Organizer: r.Organizer,
})
}
return out
}
// findSeriesMatch returns the ID of the first candidate whose normalized name matches
// incomingName after normalization. Candidates are expected to be pre-filtered by city.
func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID {

View File

@@ -340,6 +340,62 @@ func TestListPendingQueuePaged_ReturnsBothRowsAndTotal(t *testing.T) {
}
}
func TestServiceCrawlPersistsSourcesAndContributions(t *testing.T) {
repo := newMockRepo()
start := mustParseDate(t, "2026-05-01")
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
"marktkalendarium": {
{SourceName: "marktkalendarium", SourceURL: "https://mk/", Name: "Markt X",
City: "Dresden", PLZ: "01067", StartDate: start, Website: "https://organizer.de"},
},
"mittelaltermarkt_online": {
{SourceName: "mittelaltermarkt_online", SourceURL: "https://mo/",
DetailURL: "https://mo/e/1", Name: "Markt X", City: "Dresden",
PLZ: "01067", StartDate: start, Venue: "Stallhof"},
},
},
},
}
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
}
if len(repo.inserted) != 1 {
t.Fatalf("inserted = %d; want 1 (merged from two sources)", len(repo.inserted))
}
got := repo.inserted[0]
// Sources list populated.
if len(got.Sources) != 2 {
t.Errorf("Sources = %v; want 2 entries", got.Sources)
}
// Contributions populated with both raws.
if len(got.SourceContributions) != 2 {
t.Fatalf("SourceContributions = %d; want 2", len(got.SourceContributions))
}
// First contribution should be the highest-rank source
// (mittelaltermarkt_online = rank 1, marktkalendarium = rank 2).
if got.SourceContributions[0].SourceName != "mittelaltermarkt_online" {
t.Errorf("first contribution source = %q; want mittelaltermarkt_online (rank 1)",
got.SourceContributions[0].SourceName)
}
// Each contribution preserves its source's original field values.
var seenVenue bool
for _, c := range got.SourceContributions {
if c.Venue == "Stallhof" {
seenVenue = true
}
}
if !seenVenue {
t.Error("expected Venue=Stallhof preserved in the mittelaltermarkt_online contribution")
}
}
func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
repo := newMockRepo()
start := mustParseDate(t, "2026-05-01")

View File

@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS idx_discovered_markets_sources_gin;
ALTER TABLE discovered_markets
DROP COLUMN IF EXISTS source_contributions,
DROP COLUMN IF EXISTS sources;

View File

@@ -0,0 +1,7 @@
ALTER TABLE discovered_markets
ADD COLUMN sources text[] NOT NULL DEFAULT '{}',
ADD COLUMN source_contributions jsonb NOT NULL DEFAULT '[]'::jsonb;
-- Helpful indexes for potential future filtering.
CREATE INDEX IF NOT EXISTS idx_discovered_markets_sources_gin
ON discovered_markets USING GIN (sources);

View File

@@ -3,6 +3,22 @@ import { redirect, fail } from '@sveltejs/kit';
import { serverFetch } from '$lib/api/client.server.js';
import { ApiClientError } from '$lib/api/client.js';
type SourceContribution = {
source_name: string;
source_url?: string;
detail_url?: string;
name?: string;
city?: string;
plz?: string;
land?: string;
bundesland?: string;
start_date?: string | null;
end_date?: string | null;
website?: string;
venue?: string;
organizer?: string;
};
type DiscoveredMarket = {
id: string;
markt_name: string;
@@ -13,6 +29,8 @@ type DiscoveredMarket = {
end_datum: string | null;
website: string;
quellen: string[];
sources: string[];
source_contributions: SourceContribution[];
konfidenz: string; // 'hoch' | 'mittel' | 'niedrig'
agent_status: string; // 'bestaetigt' | 'unklar' | 'vorjahr_unbestaetigt' | 'abgesagt'
hinweis: string;

View File

@@ -2,6 +2,7 @@
import { onMount } from 'svelte';
import { enhance } from '$app/forms';
import { goto } from '$app/navigation';
import ContributionsPanel from './ContributionsPanel.svelte';
let { data, form } = $props();
@@ -58,6 +59,12 @@
let similarLoading = $state(false);
let similarEntries = $state<SimilarEntry[]>([]);
// Quellen-Vergleich panel — one open at a time.
let quellenVergleichOpenId: string | null = $state(null);
function toggleQuellenVergleich(id: string) {
quellenVergleichOpenId = quellenVergleichOpenId === id ? null : id;
}
async function toggleSimilar(id: string) {
if (similarOpenId === id) {
similarOpenId = null;
@@ -454,6 +461,19 @@
{row.agent_status}
</span>
{/if}
{#if row.hinweis?.includes('date_conflict')}
<span
class="ml-1 inline-block rounded bg-amber-100 px-1.5 py-0.5 align-middle text-[10px] text-amber-700 dark:bg-amber-900/50 dark:text-amber-300"
title="Datumskonflikt zwischen Quellen"
>
⚠ Datumskonflikt
</span>
{/if}
{#if (row.sources?.length ?? 0) >= 2}
<div class="mt-0.5 text-[10px] font-normal text-stone-400 dark:text-stone-500">
merged: {row.sources.join(' + ')}
</div>
{/if}
</td>
<td class="py-3 pr-4">{row.stadt}</td>
<td class="py-3 pr-4 whitespace-nowrap">
@@ -515,6 +535,17 @@
>
Similar
</button>
{#if (row.sources?.length ?? 0) >= 2}
<button
type="button"
onclick={() => toggleQuellenVergleich(row.id)}
class="ml-1 rounded px-2 py-1 text-xs {quellenVergleichOpenId === row.id
? 'bg-amber-100 text-amber-700 dark:bg-amber-900/50 dark:text-amber-300'
: 'bg-stone-100 text-stone-600 hover:bg-stone-200 dark:bg-stone-800 dark:text-stone-300 dark:hover:bg-stone-700'}"
>
Quellen-Vergleich
</button>
{/if}
</td>
</tr>
{#if similarOpenId === row.id}
@@ -573,6 +604,21 @@
</td>
</tr>
{/if}
{#if quellenVergleichOpenId === row.id}
<tr
class="border-b border-stone-100 bg-amber-50/50 dark:border-stone-800 dark:bg-amber-950/20"
>
<td></td>
<td colspan="8" class="py-3 pr-4">
<div
class="text-xs font-medium tracking-wider text-amber-700 uppercase dark:text-amber-400"
>
Quellen-Vergleich
</div>
<ContributionsPanel contributions={row.source_contributions ?? []} />
</td>
</tr>
{/if}
{#if expandedId === row.id}
<tr
class="border-b border-stone-100 bg-stone-50 dark:border-stone-800 dark:bg-stone-900/50"

View File

@@ -0,0 +1,125 @@
<script lang="ts">
type SourceContribution = {
source_name: string;
source_url?: string;
detail_url?: string;
name?: string;
city?: string;
plz?: string;
land?: string;
bundesland?: string;
start_date?: string | null;
end_date?: string | null;
website?: string;
venue?: string;
organizer?: string;
};
let { contributions }: { contributions: SourceContribution[] } = $props();
type FieldDef = {
label: string;
key: keyof SourceContribution;
format?: (v: string | null | undefined) => string;
};
const fields: FieldDef[] = [
{ label: 'Name', key: 'name' },
{ label: 'Stadt', key: 'city' },
{ label: 'PLZ', key: 'plz' },
{ label: 'Land', key: 'land' },
{ label: 'Bundesland', key: 'bundesland' },
{
label: 'Start-Datum',
key: 'start_date',
format: (v) => (v ? v.slice(0, 10) : '')
},
{
label: 'End-Datum',
key: 'end_date',
format: (v) => (v ? v.slice(0, 10) : '')
},
{ label: 'Website', key: 'website' },
{ label: 'Venue', key: 'venue' },
{ label: 'Veranstalter', key: 'organizer' }
];
// Returns the display value for a contribution field.
function cellValue(c: SourceContribution, field: FieldDef): string {
const raw = c[field.key] as string | null | undefined;
if (field.format) return field.format(raw);
return raw ?? '';
}
// A field has a conflict if at least two sources have non-empty, different values.
function hasConflict(field: FieldDef): boolean {
const values = contributions.map((c) => cellValue(c, field)).filter((v) => v !== '');
if (values.length < 2) return false;
return values.some((v) => v !== values[0]);
}
</script>
{#if contributions.length === 0}
<p class="mt-2 text-xs text-stone-400">Keine Quelldaten vorhanden.</p>
{:else}
<div class="mt-2 overflow-x-auto">
<p class="mb-2 text-xs text-stone-500">
Rang 1 = höchste Priorität · Gewinnende Quelle:
<span class="font-mono font-medium text-stone-700 dark:text-stone-300"
>{contributions[0].source_name}</span
>
· Amber = abweichende Werte zwischen Quellen
</p>
<table class="w-full text-left text-xs">
<thead>
<tr
class="border-b border-amber-200 text-amber-700 dark:border-amber-800 dark:text-amber-400"
>
<th class="pr-4 pb-1 font-medium">Feld</th>
{#each contributions as c, i}
<th class="pr-4 pb-1 font-medium">
<span class="font-mono">{c.source_name}</span>
{#if i === 0}
<span
class="ml-1 rounded bg-amber-100 px-1 text-[10px] text-amber-700 dark:bg-amber-900/50 dark:text-amber-300"
>Rang 1</span
>
{/if}
</th>
{/each}
</tr>
</thead>
<tbody>
{#each fields as field}
{@const conflict = hasConflict(field)}
<tr
class="border-b border-amber-100 dark:border-amber-900 {conflict
? 'bg-amber-50 dark:bg-amber-950/30'
: ''}"
>
<td
class="py-1 pr-4 font-medium text-stone-500 dark:text-stone-400 {conflict
? 'text-amber-700 dark:text-amber-400'
: ''}"
>
{field.label}
{#if conflict}
<span title="Konflikt zwischen Quellen"></span>
{/if}
</td>
{#each contributions as c}
{@const val = cellValue(c, field)}
<td class="py-1 pr-4 {conflict && val ? 'font-medium' : ''}">
{#if val}
{val}
{:else}
<span class="text-stone-300 dark:text-stone-600"></span>
{/if}
</td>
{/each}
</tr>
{/each}
</tbody>
</table>
</div>
{/if}