diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 82f3b2e..92c78c0 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -2,6 +2,10 @@ package discovery import ( "context" + "errors" + "fmt" + "log/slog" + "time" "github.com/google/uuid" @@ -33,6 +37,153 @@ func (s *Service) PickBuckets(ctx context.Context) ([]Bucket, error) { return s.repo.PickStaleBuckets(ctx, s.forwardMonths, s.batchSize) } +// TickSummary reports what happened in one tick. +type TickSummary struct { + BucketsProcessed int `json:"buckets_processed"` + Discovered int `json:"markets_discovered"` + DedupedExisting int `json:"deduped_existing"` + DedupedRejected int `json:"deduped_rejected"` + DedupedQueue int `json:"deduped_queue"` + Errors int `json:"errors"` +} + +// Tick picks N stale buckets and runs Pass 0 for each, writing net-new discoveries. +func (s *Service) Tick(ctx context.Context) (TickSummary, error) { + if s.agent == nil || !s.agent.Enabled() { + return TickSummary{}, errors.New("discovery agent not configured") + } + buckets, err := s.PickBuckets(ctx) + if err != nil { + return TickSummary{}, fmt.Errorf("pick buckets: %w", err) + } + var summary TickSummary + summary.BucketsProcessed = len(buckets) + for _, b := range buckets { + s.processOneBucket(ctx, b, &summary) + } + return summary, nil +} + +func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) { + resp, err := s.agent.Discover(ctx, b) + if err != nil { + // Retry once after 2s per spec §9. + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Second): + } + resp, err = s.agent.Discover(ctx, b) + if err != nil { + slog.WarnContext(ctx, "pass0 failed twice", "bucket_id", b.ID, "error", err) + _ = s.repo.UpdateBucketQueried(ctx, b.ID, err.Error()) + summary.Errors++ + return + } + } + sub := s.processBucketResponse(ctx, b, resp) + summary.Discovered += sub.Discovered + summary.DedupedExisting += sub.DedupedExisting + summary.DedupedRejected += sub.DedupedRejected + summary.DedupedQueue += sub.DedupedQueue + if err := s.repo.UpdateBucketQueried(ctx, b.ID, ""); err != nil { + slog.ErrorContext(ctx, "update bucket queried", "bucket_id", b.ID, "error", err) + } +} + +func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary { + var summary TickSummary + seen := make(map[string]bool) // in-request dedup + for _, m := range resp.Maerkte { + key := NormalizeName(m.MarktName) + "|" + NormalizeCity(m.Stadt) + "|" + m.StartDatum + if seen[key] { + continue + } + seen[key] = true + + // Series candidates pre-filtered by city. + candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.Stadt)) + if err != nil { + slog.WarnContext(ctx, "list series by city", "city", m.Stadt, "error", err) + continue + } + matchedSeriesID := findSeriesMatch(m.MarktName, candidates) + + startDatum, endDatum := parseOptionalDate(m.StartDatum), parseOptionalDate(m.EndDatum) + year := 0 + if startDatum != nil { + year = startDatum.Year() + } + + // Dedup checks in order (spec §7.1.e). + if matchedSeriesID != nil && year > 0 { + exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year) + if err != nil { + slog.WarnContext(ctx, "edition exists check", "error", err) + continue + } + if exists { + summary.DedupedExisting++ + continue + } + } + nameNorm := NormalizeName(m.MarktName) + if year > 0 { + rejected, err := s.repo.IsRejected(ctx, nameNorm, m.Stadt, year) + if err != nil { + slog.WarnContext(ctx, "is rejected check", "error", err) + continue + } + if rejected { + summary.DedupedRejected++ + continue + } + } + pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.Stadt, startDatum) + if err != nil { + slog.WarnContext(ctx, "queue pending check", "error", err) + continue + } + if pending { + summary.DedupedQueue++ + continue + } + + dm := DiscoveredMarket{ + BucketID: b.ID, + MarktName: m.MarktName, + Stadt: m.Stadt, + Bundesland: m.Bundesland, + Land: b.Land, + StartDatum: startDatum, + EndDatum: endDatum, + Website: m.Website, + Quellen: m.Quellen, + Extraktion: m.Extraktion, + Hinweis: m.Hinweis, + NameNormalized: nameNorm, + MatchedSeriesID: matchedSeriesID, + } + if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil { + slog.WarnContext(ctx, "insert discovered", "error", err) + continue + } + summary.Discovered++ + } + return summary +} + +func parseOptionalDate(s string) *time.Time { + if s == "" { + return nil + } + t, err := time.Parse("2006-01-02", s) + if err != nil { + return nil + } + return &t +} + // findSeriesMatch returns the ID of the first candidate whose normalized name matches // incomingName after normalization. Candidates are expected to be pre-filtered by city. func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID { diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index 1127f0c..f2114c8 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -3,6 +3,7 @@ package discovery import ( "context" "testing" + "time" "github.com/google/uuid" ) @@ -65,3 +66,80 @@ func TestPickBucketsPassesConfigToRepo(t *testing.T) { t.Errorf("forwarded fm=%d lim=%d, want 12,4", gotFM, gotLim) } } + +func TestProcessBucket_DedupsExisting(t *testing.T) { + bucket := Bucket{ + ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09", + } + seriesID := uuid.New() + var inserted []DiscoveredMarket + m := &mockRepo{ + listSeriesFn: func(_ context.Context, city string) ([]SeriesCandidate, error) { + return []SeriesCandidate{{ID: seriesID, Name: "Mittelaltermarkt Trostberg", City: city}}, nil + }, + editionExistsFn: func(_ context.Context, _ uuid.UUID, year int) (bool, error) { + return year == 2026, nil // 2026 edition exists → dedup + }, + isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }, + queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }, + insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) { + inserted = append(inserted, d) + return uuid.New(), nil + }, + updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil }, + } + svc := NewService(m, nil, nil, 4, 12) + + resp := Pass0Response{ + Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"}, + Maerkte: []Pass0Market{ + {MarktName: "Mittelaltermarkt Trostberg", Stadt: "Trostberg", StartDatum: "2026-09-12", EndDatum: "2026-09-14", Quellen: []string{"https://x"}}, + }, + } + summary := svc.processBucketResponse(context.Background(), bucket, resp) + if summary.Discovered != 0 { + t.Errorf("expected 0 discovered, got %d", summary.Discovered) + } + if summary.DedupedExisting != 1 { + t.Errorf("expected 1 deduped_existing, got %d", summary.DedupedExisting) + } + if len(inserted) != 0 { + t.Errorf("expected no inserts, got %d", len(inserted)) + } +} + +func TestProcessBucket_InsertsNetNew(t *testing.T) { + bucket := Bucket{ + ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09", + } + var inserted []DiscoveredMarket + m := &mockRepo{ + listSeriesFn: func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil }, + editionExistsFn: func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil }, + isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }, + queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }, + insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) { + inserted = append(inserted, d) + return uuid.New(), nil + }, + updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil }, + } + svc := NewService(m, nil, nil, 4, 12) + + resp := Pass0Response{ + Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"}, + Maerkte: []Pass0Market{ + {MarktName: "Neuer Markt", Stadt: "Passau", StartDatum: "2026-09-20", EndDatum: "2026-09-22", Quellen: []string{"https://x"}}, + }, + } + summary := svc.processBucketResponse(context.Background(), bucket, resp) + if summary.Discovered != 1 { + t.Errorf("expected 1 discovered, got %d", summary.Discovered) + } + if len(inserted) != 1 { + t.Errorf("expected 1 insert, got %d", len(inserted)) + } + if inserted[0].NameNormalized != "neuer" { + t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized) + } +}