diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 51c22cc..da3eed3 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "strings" "time" "github.com/google/uuid" @@ -50,9 +51,13 @@ type TickSummary struct { DedupedRejected int `json:"deduped_rejected"` DedupedQueue int `json:"deduped_queue"` Errors int `json:"errors"` + RateLimited int `json:"rate_limited"` } // Tick picks N stale buckets and runs Pass 0 for each, writing net-new discoveries. +// On a rate-limit hit, aborts the remainder of the tick: subsequent buckets in the +// same batch would almost certainly hit the same limit, and we want to give Mistral's +// web_search budget time to refill before trying again. func (s *Service) Tick(ctx context.Context) (TickSummary, error) { if s.agent == nil || !s.agent.Enabled() { return TickSummary{}, errors.New("discovery agent not configured") @@ -64,26 +69,42 @@ func (s *Service) Tick(ctx context.Context) (TickSummary, error) { var summary TickSummary summary.BucketsProcessed = len(buckets) for _, b := range buckets { - s.processOneBucket(ctx, b, &summary) + if stop := s.processOneBucket(ctx, b, &summary); stop { + break + } } return summary, nil } -func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) { +// processOneBucket runs Pass 0 for a single bucket. Returns stop=true when the tick +// should abort early (currently only on persistent rate limits). +func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) (stop bool) { resp, err := s.agent.Discover(ctx, b) if err != nil { - // Retry once after 2s per spec §9. + wait := 2 * time.Second + if isRateLimit(err) { + wait = 10 * time.Second + } select { case <-ctx.Done(): - return - case <-time.After(2 * time.Second): + return false + case <-time.After(wait): } resp, err = s.agent.Discover(ctx, b) if err != nil { + if isRateLimit(err) { + // Leave last_queried_at unchanged so the bucket is re-picked on the + // next tick, and abort the rest of this tick — Mistral's web_search + // budget is shared across buckets, no point hammering it further. + slog.InfoContext(ctx, "pass0 rate-limited; deferring bucket + aborting tick", + "bucket_id", b.ID, "region", b.Region, "year_month", b.YearMonth) + summary.RateLimited++ + return true + } slog.WarnContext(ctx, "pass0 failed twice", "bucket_id", b.ID, "error", err) _ = s.repo.UpdateBucketQueried(ctx, b.ID, err.Error()) summary.Errors++ - return + return false } } sub := s.processBucketResponse(ctx, b, resp) @@ -94,6 +115,19 @@ func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickS if err := s.repo.UpdateBucketQueried(ctx, b.ID, ""); err != nil { slog.ErrorContext(ctx, "update bucket queried", "bucket_id", b.ID, "error", err) } + return false +} + +// isRateLimit detects Mistral 429 / web_search rate-limit errors. Matches the +// error string shape surfaced by the SDK: '... rate limit reached. (status 429)'. +func isRateLimit(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "rate limit") || + strings.Contains(msg, "status 429") || + strings.Contains(msg, "429") } func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary { diff --git a/web/src/routes/admin/discovery/+page.svelte b/web/src/routes/admin/discovery/+page.svelte index 4e69075..2eb9ace 100644 --- a/web/src/routes/admin/discovery/+page.svelte +++ b/web/src/routes/admin/discovery/+page.svelte @@ -6,6 +6,21 @@ const queue = $derived(data.queue ?? []); const recentErrors = $derived(data.stats.recent_errors ?? []); + // 'YYYY-MM-DDTHH:mm:ssZ' → 'DD.MM.YYYY' (German) for display. + function fmtDate(iso: string | null): string { + if (!iso) return ''; + const d = iso.slice(0, 10); // 'YYYY-MM-DD' + const [y, m, day] = d.split('-'); + return y && m && day ? `${day}.${m}.${y}` : d; + } + + function fmtDateRange(start: string | null, end: string | null): string { + const s = fmtDate(start); + const e = fmtDate(end); + if (s && e && s !== e) return `${s} – ${e}`; + return s || e || ''; + } + const lastTickLabel = $derived.by(() => { if (!data.stats.last_tick_at) return 'nie'; const ts = new Date(data.stats.last_tick_at).getTime(); @@ -100,79 +115,87 @@ Keine Einträge in der Warteschlange.

{:else} - - - - - - - - - - - - - - - - {#each queue as row (row.id)} - - - - - - - - - - +
+
LandRegionMarktStadtDatumWebsiteQuellenExtraktionAktion
{row.land}{row.bundesland}{row.markt_name}{row.stadt} - {#if row.start_datum} - {row.start_datum}{row.end_datum ? ` – ${row.end_datum}` : ''} - {:else} - - {/if} - - {#if row.website} - link - {:else} - - {/if} - {row.quellen?.length ?? 0} - - {row.extraktion || '—'} - - -
- - -
-
- - -
-
+ + + + + + + + + + - {/each} - -
RegionMarktStadtDatumWebsiteQuellenExtraktionAktion
+ + + {#each queue as row (row.id)} + + + {row.bundesland || row.land} + + {row.markt_name} + {row.stadt} + + {#if row.start_datum} + {fmtDateRange(row.start_datum, row.end_datum)} + {:else} + + {/if} + + + {#if row.website} + + link + + {:else} + + {/if} + + + {row.quellen?.length ?? 0} + + + + {row.extraktion || '—'} + + + +
+ + +
+
+ + +
+ + + {/each} + + + {/if}