feat(discovery): implement Tick with dedup + queue write

This commit is contained in:
2026-04-18 07:34:14 +02:00
parent aa14724947
commit 4e7120e958
2 changed files with 229 additions and 0 deletions

View File

@@ -2,6 +2,10 @@ package discovery
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/google/uuid"
@@ -33,6 +37,153 @@ func (s *Service) PickBuckets(ctx context.Context) ([]Bucket, error) {
return s.repo.PickStaleBuckets(ctx, s.forwardMonths, s.batchSize)
}
// TickSummary reports what happened in one tick.
type TickSummary struct {
BucketsProcessed int `json:"buckets_processed"`
Discovered int `json:"markets_discovered"`
DedupedExisting int `json:"deduped_existing"`
DedupedRejected int `json:"deduped_rejected"`
DedupedQueue int `json:"deduped_queue"`
Errors int `json:"errors"`
}
// Tick picks N stale buckets and runs Pass 0 for each, writing net-new discoveries.
func (s *Service) Tick(ctx context.Context) (TickSummary, error) {
if s.agent == nil || !s.agent.Enabled() {
return TickSummary{}, errors.New("discovery agent not configured")
}
buckets, err := s.PickBuckets(ctx)
if err != nil {
return TickSummary{}, fmt.Errorf("pick buckets: %w", err)
}
var summary TickSummary
summary.BucketsProcessed = len(buckets)
for _, b := range buckets {
s.processOneBucket(ctx, b, &summary)
}
return summary, nil
}
func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) {
resp, err := s.agent.Discover(ctx, b)
if err != nil {
// Retry once after 2s per spec §9.
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Second):
}
resp, err = s.agent.Discover(ctx, b)
if err != nil {
slog.WarnContext(ctx, "pass0 failed twice", "bucket_id", b.ID, "error", err)
_ = s.repo.UpdateBucketQueried(ctx, b.ID, err.Error())
summary.Errors++
return
}
}
sub := s.processBucketResponse(ctx, b, resp)
summary.Discovered += sub.Discovered
summary.DedupedExisting += sub.DedupedExisting
summary.DedupedRejected += sub.DedupedRejected
summary.DedupedQueue += sub.DedupedQueue
if err := s.repo.UpdateBucketQueried(ctx, b.ID, ""); err != nil {
slog.ErrorContext(ctx, "update bucket queried", "bucket_id", b.ID, "error", err)
}
}
func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary {
var summary TickSummary
seen := make(map[string]bool) // in-request dedup
for _, m := range resp.Maerkte {
key := NormalizeName(m.MarktName) + "|" + NormalizeCity(m.Stadt) + "|" + m.StartDatum
if seen[key] {
continue
}
seen[key] = true
// Series candidates pre-filtered by city.
candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.Stadt))
if err != nil {
slog.WarnContext(ctx, "list series by city", "city", m.Stadt, "error", err)
continue
}
matchedSeriesID := findSeriesMatch(m.MarktName, candidates)
startDatum, endDatum := parseOptionalDate(m.StartDatum), parseOptionalDate(m.EndDatum)
year := 0
if startDatum != nil {
year = startDatum.Year()
}
// Dedup checks in order (spec §7.1.e).
if matchedSeriesID != nil && year > 0 {
exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year)
if err != nil {
slog.WarnContext(ctx, "edition exists check", "error", err)
continue
}
if exists {
summary.DedupedExisting++
continue
}
}
nameNorm := NormalizeName(m.MarktName)
if year > 0 {
rejected, err := s.repo.IsRejected(ctx, nameNorm, m.Stadt, year)
if err != nil {
slog.WarnContext(ctx, "is rejected check", "error", err)
continue
}
if rejected {
summary.DedupedRejected++
continue
}
}
pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.Stadt, startDatum)
if err != nil {
slog.WarnContext(ctx, "queue pending check", "error", err)
continue
}
if pending {
summary.DedupedQueue++
continue
}
dm := DiscoveredMarket{
BucketID: b.ID,
MarktName: m.MarktName,
Stadt: m.Stadt,
Bundesland: m.Bundesland,
Land: b.Land,
StartDatum: startDatum,
EndDatum: endDatum,
Website: m.Website,
Quellen: m.Quellen,
Extraktion: m.Extraktion,
Hinweis: m.Hinweis,
NameNormalized: nameNorm,
MatchedSeriesID: matchedSeriesID,
}
if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil {
slog.WarnContext(ctx, "insert discovered", "error", err)
continue
}
summary.Discovered++
}
return summary
}
func parseOptionalDate(s string) *time.Time {
if s == "" {
return nil
}
t, err := time.Parse("2006-01-02", s)
if err != nil {
return nil
}
return &t
}
// findSeriesMatch returns the ID of the first candidate whose normalized name matches
// incomingName after normalization. Candidates are expected to be pre-filtered by city.
func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID {

View File

@@ -3,6 +3,7 @@ package discovery
import (
"context"
"testing"
"time"
"github.com/google/uuid"
)
@@ -65,3 +66,80 @@ func TestPickBucketsPassesConfigToRepo(t *testing.T) {
t.Errorf("forwarded fm=%d lim=%d, want 12,4", gotFM, gotLim)
}
}
func TestProcessBucket_DedupsExisting(t *testing.T) {
bucket := Bucket{
ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09",
}
seriesID := uuid.New()
var inserted []DiscoveredMarket
m := &mockRepo{
listSeriesFn: func(_ context.Context, city string) ([]SeriesCandidate, error) {
return []SeriesCandidate{{ID: seriesID, Name: "Mittelaltermarkt Trostberg", City: city}}, nil
},
editionExistsFn: func(_ context.Context, _ uuid.UUID, year int) (bool, error) {
return year == 2026, nil // 2026 edition exists → dedup
},
isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil },
queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil },
insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) {
inserted = append(inserted, d)
return uuid.New(), nil
},
updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil },
}
svc := NewService(m, nil, nil, 4, 12)
resp := Pass0Response{
Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"},
Maerkte: []Pass0Market{
{MarktName: "Mittelaltermarkt Trostberg", Stadt: "Trostberg", StartDatum: "2026-09-12", EndDatum: "2026-09-14", Quellen: []string{"https://x"}},
},
}
summary := svc.processBucketResponse(context.Background(), bucket, resp)
if summary.Discovered != 0 {
t.Errorf("expected 0 discovered, got %d", summary.Discovered)
}
if summary.DedupedExisting != 1 {
t.Errorf("expected 1 deduped_existing, got %d", summary.DedupedExisting)
}
if len(inserted) != 0 {
t.Errorf("expected no inserts, got %d", len(inserted))
}
}
func TestProcessBucket_InsertsNetNew(t *testing.T) {
bucket := Bucket{
ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09",
}
var inserted []DiscoveredMarket
m := &mockRepo{
listSeriesFn: func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil },
editionExistsFn: func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil },
isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil },
queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil },
insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) {
inserted = append(inserted, d)
return uuid.New(), nil
},
updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil },
}
svc := NewService(m, nil, nil, 4, 12)
resp := Pass0Response{
Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"},
Maerkte: []Pass0Market{
{MarktName: "Neuer Markt", Stadt: "Passau", StartDatum: "2026-09-20", EndDatum: "2026-09-22", Quellen: []string{"https://x"}},
},
}
summary := svc.processBucketResponse(context.Background(), bucket, resp)
if summary.Discovered != 1 {
t.Errorf("expected 1 discovered, got %d", summary.Discovered)
}
if len(inserted) != 1 {
t.Errorf("expected 1 insert, got %d", len(inserted))
}
if inserted[0].NameNormalized != "neuer" {
t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized)
}
}