From f98ecf8790c5cfe514f95ed5a7cb17e6fc31691c Mon Sep 17 00:00:00 2001 From: vikingowl Date: Sat, 25 Apr 2026 17:53:56 +0200 Subject: [PATCH] fix(discovery): auto-trigger Pass B (LLM enrich) after post-crawl Pass A Adds ListEnrichedNeedingLLM to the Repository interface and RunLLMEnrichBacklog to Service, then wires RunLLMEnrichBacklog into the post-crawl goroutine so LLM enrichment runs automatically after every crawl without manual triggers. --- .../domain/discovery/mock_repo_test.go | 9 +++++ .../internal/domain/discovery/repository.go | 32 +++++++++++++++++ backend/internal/domain/discovery/service.go | 35 +++++++++++++++++++ .../internal/domain/discovery/service_test.go | 33 +++++++++++++++++ 4 files changed, 109 insertions(+) diff --git a/backend/internal/domain/discovery/mock_repo_test.go b/backend/internal/domain/discovery/mock_repo_test.go index 0ae191f..0b142f6 100644 --- a/backend/internal/domain/discovery/mock_repo_test.go +++ b/backend/internal/domain/discovery/mock_repo_test.go @@ -46,6 +46,9 @@ type mockRepo struct { // Similarity AI cache hooks. getSimCacheFn func(pairKey string) (enrich.Verdict, bool, error) setSimCacheFn func(pairKey string, v enrich.Verdict, ttl time.Duration) error + + // LLM backlog hook. + listEnrichedNeedingLLMFn func(limit int) ([]uuid.UUID, error) } // listQueueCall records arguments passed to mockRepo.ListQueue so tests can @@ -163,6 +166,12 @@ func (m *mockRepo) SetSimilarityCache(_ context.Context, pairKey string, v enric } return nil } +func (m *mockRepo) ListEnrichedNeedingLLM(_ context.Context, limit int) ([]uuid.UUID, error) { + if m.listEnrichedNeedingLLMFn != nil { + return m.listEnrichedNeedingLLMFn(limit) + } + return nil, nil +} // noopLinkVerifier passes every URL — used by tests to isolate from network. type noopLinkVerifier struct{} diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go index 028a05b..91d22b4 100644 --- a/backend/internal/domain/discovery/repository.go +++ b/backend/internal/domain/discovery/repository.go @@ -55,6 +55,10 @@ type Repository interface { // Similarity AI cache — keyed on enrich.SimilarityPairKey. GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error) SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error + + // ListEnrichedNeedingLLM returns up to max row IDs that have + // enrichment_status='done' but no LLM-derived category yet. + ListEnrichedNeedingLLM(ctx context.Context, limit int) ([]uuid.UUID, error) } // SeriesCandidate is a minimal projection used for name-normalization comparison in Go. @@ -558,6 +562,34 @@ WHERE pair_key = $1 return v, true, nil } +// ListEnrichedNeedingLLM returns up to max row IDs with enrichment_status='done' +// but no LLM-derived category. Ordered newest-first so recently-crawled rows +// get their category filled in first. +func (r *pgRepository) ListEnrichedNeedingLLM(ctx context.Context, limit int) ([]uuid.UUID, error) { + query := ` + SELECT id FROM discovered_markets + WHERE enrichment_status = 'done' + AND (enrichment->>'category' IS NULL OR enrichment->>'category' = '') + AND array_length(quellen, 1) > 0 + ORDER BY created_at DESC + LIMIT $1 + ` + rows, err := r.pool.Query(ctx, query, limit) + if err != nil { + return nil, fmt.Errorf("list enriched needing llm: %w", err) + } + defer rows.Close() + var ids []uuid.UUID + for rows.Next() { + var id uuid.UUID + if err := rows.Scan(&id); err != nil { + return nil, fmt.Errorf("scanning id: %w", err) + } + ids = append(ids, id) + } + return ids, rows.Err() +} + // SetSimilarityCache upserts a verdict. ttl=0 means "no expiry" (nullable). func (r *pgRepository) SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error { var expiresAt *time.Time diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index dcf29fe..14cd721 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -315,6 +315,10 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { if _, err := s.RunCrawlEnrichAll(enrichCtx); err != nil && !errors.Is(err, context.DeadlineExceeded) { slog.WarnContext(context.Background(), "post-crawl enrichment failed", "error", err) } + if err := s.RunLLMEnrichBacklog(enrichCtx, llmEnrichBacklogMax); err != nil && + !errors.Is(err, context.DeadlineExceeded) { + slog.WarnContext(context.Background(), "post-crawl llm backlog failed", "error", err) + } }() } @@ -774,6 +778,37 @@ func (s *Service) RunLLMEnrichOne(ctx context.Context, queueID uuid.UUID) (enric return merged, nil } +const llmEnrichBacklogMax = 200 + +// llmEnrichBacklogDelay is the inter-row sleep in RunLLMEnrichBacklog. Declared +// as a var (not const) so tests can set it to 0 without a 500ms-per-row penalty. +var llmEnrichBacklogDelay = 500 * time.Millisecond + +// RunLLMEnrichBacklog runs LLM enrich (Pass B) on rows that Pass A enriched +// but left without a category. Capped at max rows per call; uses a 500ms +// inter-row delay to stay within Gemini free-tier throughput. +func (s *Service) RunLLMEnrichBacklog(ctx context.Context, limit int) error { + if s.llmEnricher == nil { + return nil // not configured; not an error + } + ids, err := s.repo.ListEnrichedNeedingLLM(ctx, limit) + if err != nil { + return fmt.Errorf("list backlog: %w", err) + } + for _, id := range ids { + if _, err := s.RunLLMEnrichOne(ctx, id); err != nil { + slog.WarnContext(ctx, "backlog llm enrich failed", "id", id, "error", err) + // single-row failure must not abort the batch + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(llmEnrichBacklogDelay): + } + } + return nil +} + // ClassifySimilarPair runs the AI classifier on the two queue rows identified // by aID and bID, returning a verdict about whether they're the same market. // Cache-first: a content-keyed entry (enrich.SimilarityPairKey) shortcuts the diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index f23a125..b55544e 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -887,6 +887,39 @@ func TestRunLLMEnrichOne_SkipsCacheWhenYearZero(t *testing.T) { } } +// TestRunLLMEnrichBacklog_CallsLLMForEachPendingRow: ListEnrichedNeedingLLM returns +// two IDs; RunLLMEnrichBacklog must call the LLM enricher once per row. +func TestRunLLMEnrichBacklog_CallsLLMForEachPendingRow(t *testing.T) { + id1, id2 := uuid.New(), uuid.New() + start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + + rowsByID := map[uuid.UUID]DiscoveredMarket{ + id1: {ID: id1, MarktName: "Markt A", Stadt: "München", NameNormalized: "markt a", StartDatum: &start, Quellen: []string{"https://a.example"}}, + id2: {ID: id2, MarktName: "Markt B", Stadt: "Berlin", NameNormalized: "markt b", StartDatum: &start, Quellen: []string{"https://b.example"}}, + } + repo := &mockRepo{ + listEnrichedNeedingLLMFn: func(_ int) ([]uuid.UUID, error) { //nolint:unparam + return []uuid.UUID{id1, id2}, nil + }, + getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) { + return rowsByID[id], nil + }, + } + llm := &stubLLMEnricher{result: enrich.Enrichment{Category: catMittelaltermarkt}} + svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil) + + // Zero the inter-row delay so this test completes in milliseconds. + t.Cleanup(func() { llmEnrichBacklogDelay = 500 * time.Millisecond }) + llmEnrichBacklogDelay = 0 + + if err := svc.RunLLMEnrichBacklog(context.Background(), 10); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if llm.called != 2 { + t.Errorf("expected 2 LLM calls, got %d", llm.called) + } +} + // TestRunCrawlEnrichAll_EmptyQueueNoOp: nothing pending, zero summary, no writes. func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) { var writes int