fix(discovery): auto-trigger Pass B (LLM enrich) after post-crawl Pass A
Adds ListEnrichedNeedingLLM to the Repository interface and RunLLMEnrichBacklog to Service, then wires RunLLMEnrichBacklog into the post-crawl goroutine so LLM enrichment runs automatically after every crawl without manual triggers.
This commit is contained in:
@@ -46,6 +46,9 @@ type mockRepo struct {
|
||||
// Similarity AI cache hooks.
|
||||
getSimCacheFn func(pairKey string) (enrich.Verdict, bool, error)
|
||||
setSimCacheFn func(pairKey string, v enrich.Verdict, ttl time.Duration) error
|
||||
|
||||
// LLM backlog hook.
|
||||
listEnrichedNeedingLLMFn func(limit int) ([]uuid.UUID, error)
|
||||
}
|
||||
|
||||
// listQueueCall records arguments passed to mockRepo.ListQueue so tests can
|
||||
@@ -163,6 +166,12 @@ func (m *mockRepo) SetSimilarityCache(_ context.Context, pairKey string, v enric
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *mockRepo) ListEnrichedNeedingLLM(_ context.Context, limit int) ([]uuid.UUID, error) {
|
||||
if m.listEnrichedNeedingLLMFn != nil {
|
||||
return m.listEnrichedNeedingLLMFn(limit)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// noopLinkVerifier passes every URL — used by tests to isolate from network.
|
||||
type noopLinkVerifier struct{}
|
||||
|
||||
@@ -55,6 +55,10 @@ type Repository interface {
|
||||
// Similarity AI cache — keyed on enrich.SimilarityPairKey.
|
||||
GetSimilarityCache(ctx context.Context, pairKey string) (enrich.Verdict, bool, error)
|
||||
SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error
|
||||
|
||||
// ListEnrichedNeedingLLM returns up to max row IDs that have
|
||||
// enrichment_status='done' but no LLM-derived category yet.
|
||||
ListEnrichedNeedingLLM(ctx context.Context, limit int) ([]uuid.UUID, error)
|
||||
}
|
||||
|
||||
// SeriesCandidate is a minimal projection used for name-normalization comparison in Go.
|
||||
@@ -558,6 +562,34 @@ WHERE pair_key = $1
|
||||
return v, true, nil
|
||||
}
|
||||
|
||||
// ListEnrichedNeedingLLM returns up to max row IDs with enrichment_status='done'
|
||||
// but no LLM-derived category. Ordered newest-first so recently-crawled rows
|
||||
// get their category filled in first.
|
||||
func (r *pgRepository) ListEnrichedNeedingLLM(ctx context.Context, limit int) ([]uuid.UUID, error) {
|
||||
query := `
|
||||
SELECT id FROM discovered_markets
|
||||
WHERE enrichment_status = 'done'
|
||||
AND (enrichment->>'category' IS NULL OR enrichment->>'category' = '')
|
||||
AND array_length(quellen, 1) > 0
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $1
|
||||
`
|
||||
rows, err := r.pool.Query(ctx, query, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list enriched needing llm: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
var ids []uuid.UUID
|
||||
for rows.Next() {
|
||||
var id uuid.UUID
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
return nil, fmt.Errorf("scanning id: %w", err)
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
return ids, rows.Err()
|
||||
}
|
||||
|
||||
// SetSimilarityCache upserts a verdict. ttl=0 means "no expiry" (nullable).
|
||||
func (r *pgRepository) SetSimilarityCache(ctx context.Context, pairKey string, v enrich.Verdict, ttl time.Duration) error {
|
||||
var expiresAt *time.Time
|
||||
|
||||
@@ -315,6 +315,10 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
|
||||
if _, err := s.RunCrawlEnrichAll(enrichCtx); err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
||||
slog.WarnContext(context.Background(), "post-crawl enrichment failed", "error", err)
|
||||
}
|
||||
if err := s.RunLLMEnrichBacklog(enrichCtx, llmEnrichBacklogMax); err != nil &&
|
||||
!errors.Is(err, context.DeadlineExceeded) {
|
||||
slog.WarnContext(context.Background(), "post-crawl llm backlog failed", "error", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -774,6 +778,37 @@ func (s *Service) RunLLMEnrichOne(ctx context.Context, queueID uuid.UUID) (enric
|
||||
return merged, nil
|
||||
}
|
||||
|
||||
const llmEnrichBacklogMax = 200
|
||||
|
||||
// llmEnrichBacklogDelay is the inter-row sleep in RunLLMEnrichBacklog. Declared
|
||||
// as a var (not const) so tests can set it to 0 without a 500ms-per-row penalty.
|
||||
var llmEnrichBacklogDelay = 500 * time.Millisecond
|
||||
|
||||
// RunLLMEnrichBacklog runs LLM enrich (Pass B) on rows that Pass A enriched
|
||||
// but left without a category. Capped at max rows per call; uses a 500ms
|
||||
// inter-row delay to stay within Gemini free-tier throughput.
|
||||
func (s *Service) RunLLMEnrichBacklog(ctx context.Context, limit int) error {
|
||||
if s.llmEnricher == nil {
|
||||
return nil // not configured; not an error
|
||||
}
|
||||
ids, err := s.repo.ListEnrichedNeedingLLM(ctx, limit)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list backlog: %w", err)
|
||||
}
|
||||
for _, id := range ids {
|
||||
if _, err := s.RunLLMEnrichOne(ctx, id); err != nil {
|
||||
slog.WarnContext(ctx, "backlog llm enrich failed", "id", id, "error", err)
|
||||
// single-row failure must not abort the batch
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(llmEnrichBacklogDelay):
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ClassifySimilarPair runs the AI classifier on the two queue rows identified
|
||||
// by aID and bID, returning a verdict about whether they're the same market.
|
||||
// Cache-first: a content-keyed entry (enrich.SimilarityPairKey) shortcuts the
|
||||
|
||||
@@ -887,6 +887,39 @@ func TestRunLLMEnrichOne_SkipsCacheWhenYearZero(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunLLMEnrichBacklog_CallsLLMForEachPendingRow: ListEnrichedNeedingLLM returns
|
||||
// two IDs; RunLLMEnrichBacklog must call the LLM enricher once per row.
|
||||
func TestRunLLMEnrichBacklog_CallsLLMForEachPendingRow(t *testing.T) {
|
||||
id1, id2 := uuid.New(), uuid.New()
|
||||
start := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
|
||||
|
||||
rowsByID := map[uuid.UUID]DiscoveredMarket{
|
||||
id1: {ID: id1, MarktName: "Markt A", Stadt: "München", NameNormalized: "markt a", StartDatum: &start, Quellen: []string{"https://a.example"}},
|
||||
id2: {ID: id2, MarktName: "Markt B", Stadt: "Berlin", NameNormalized: "markt b", StartDatum: &start, Quellen: []string{"https://b.example"}},
|
||||
}
|
||||
repo := &mockRepo{
|
||||
listEnrichedNeedingLLMFn: func(_ int) ([]uuid.UUID, error) { //nolint:unparam
|
||||
return []uuid.UUID{id1, id2}, nil
|
||||
},
|
||||
getDiscoveredFn: func(_ context.Context, id uuid.UUID) (DiscoveredMarket, error) {
|
||||
return rowsByID[id], nil
|
||||
},
|
||||
}
|
||||
llm := &stubLLMEnricher{result: enrich.Enrichment{Category: catMittelaltermarkt}}
|
||||
svc := NewService(repo, nil, noopLinkVerifier{}, noopMarketCreator{}, nil, llm, nil)
|
||||
|
||||
// Zero the inter-row delay so this test completes in milliseconds.
|
||||
t.Cleanup(func() { llmEnrichBacklogDelay = 500 * time.Millisecond })
|
||||
llmEnrichBacklogDelay = 0
|
||||
|
||||
if err := svc.RunLLMEnrichBacklog(context.Background(), 10); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if llm.called != 2 {
|
||||
t.Errorf("expected 2 LLM calls, got %d", llm.called)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunCrawlEnrichAll_EmptyQueueNoOp: nothing pending, zero summary, no writes.
|
||||
func TestRunCrawlEnrichAll_EmptyQueueNoOp(t *testing.T) {
|
||||
var writes int
|
||||
|
||||
Reference in New Issue
Block a user