Merge branch 'feat/discovery-crawler-mr2' — Ship 1 MR 2 cutover

Deletes the Mistral Pass 0 code path from discovery, flips the k8s
CronJob to the crawler endpoint on a daily schedule, and adds a
Run crawl button to the admin UI that renders CrawlSummary.

Net change: ~-900 lines / +150 lines. Mistral remains wired for Pass 1
and Pass 2 research — only Pass 0 discovery is replaced by the deterministic
5-source Go crawler.
This commit is contained in:
2026-04-18 17:49:08 +02:00
17 changed files with 175 additions and 901 deletions

View File

@@ -1,233 +0,0 @@
// discovery-compare: Run the crawler and Mistral Pass 0 against a sample of
// (land, region, year-month) buckets; emit markdown diff to stdout or --out.
// Purpose: verify crawler coverage before MR 2 deletes the Mistral path.
//
// Requires the usual backend env vars. JWT_SECRET must be set (any value works
// for this tool since no HTTP server is started).
package main
import (
"context"
"flag"
"fmt"
"os"
"sort"
"strings"
"time"
"marktvogt.de/backend/internal/config"
"marktvogt.de/backend/internal/domain/discovery"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/pkg/ai"
)
type sampleBucket struct {
Land string
Region string
YearMonth string
}
func main() {
var (
bucketsFlag = flag.String("buckets", "", "comma-separated LAND:REGION:YYYY-MM list (e.g., Deutschland:Bayern:2026-04)")
outFlag = flag.String("out", "", "write markdown report to this file (default: stdout)")
)
flag.Parse()
if *bucketsFlag == "" {
fmt.Fprintln(os.Stderr, "--buckets required")
os.Exit(2)
}
buckets, err := parseBuckets(*bucketsFlag)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
}
if err := run(buckets, *outFlag); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func run(buckets []sampleBucket, outPath string) error {
cfg, err := config.Load()
if err != nil {
return fmt.Errorf("load config: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
// 1. Crawler run (dry-run — no DB inserts).
cr := crawler.NewCrawler(cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs())
res, err := cr.RunAll(ctx)
if err != nil {
return fmt.Errorf("crawler: %w", err)
}
crawlerEvents := make([]crawler.RawEvent, 0)
for _, evs := range res.PerSource {
crawlerEvents = append(crawlerEvents, evs...)
}
merged := crawler.Merge(crawlerEvents)
crawlerByBucket := groupCrawlerByBucket(merged, buckets)
// 2. Mistral Pass 0 run (one per sample bucket).
aiClient := ai.New(cfg.AI.APIKey, cfg.AI.AgentSimple, cfg.AI.ModelComplex, cfg.AI.RateLimitRPS)
agentClient := discovery.NewAgentClient(aiClient, cfg.AI.AgentDiscovery)
mistralByBucket := runMistralForBuckets(ctx, agentClient, buckets)
// 3. Emit report.
report := buildMarkdownReport(buckets, crawlerByBucket, mistralByBucket)
if outPath == "" {
fmt.Println(report)
return nil
}
if err := os.WriteFile(outPath, []byte(report), 0644); err != nil {
return fmt.Errorf("write out: %w", err)
}
return nil
}
func parseBuckets(s string) ([]sampleBucket, error) {
parts := strings.Split(s, ",")
out := make([]sampleBucket, 0, len(parts))
for _, item := range parts {
fields := strings.Split(strings.TrimSpace(item), ":")
if len(fields) != 3 {
return nil, fmt.Errorf("bucket %q: want LAND:REGION:YYYY-MM", item)
}
out = append(out, sampleBucket{Land: fields[0], Region: fields[1], YearMonth: fields[2]})
}
return out, nil
}
// groupCrawlerByBucket assigns merged crawler events to sample buckets.
//
// NOTE: this is an approximation for the diagnostic CLI only — not for
// production dedup. The Bundesland match uses `strings.Contains` so a merged
// event with Bundesland="Bayern" will join a bucket with Region="Bay" (or
// "ern"). Good enough to compare coverage between the crawler and Mistral
// Pass 0 at bucket granularity; not safe for business-logic routing.
func groupCrawlerByBucket(merged []crawler.MergedEvent, buckets []sampleBucket) map[string][]crawler.MergedEvent {
result := make(map[string][]crawler.MergedEvent)
for _, b := range buckets {
result[bucketKey(b)] = nil
}
for _, m := range merged {
if m.StartDate == nil {
continue
}
ym := m.StartDate.Format("2006-01")
for _, b := range buckets {
if b.YearMonth != ym {
continue
}
if m.Land != "" && m.Land != b.Land {
continue
}
if m.Bundesland != "" && !strings.Contains(m.Bundesland, b.Region) {
continue
}
key := bucketKey(b)
result[key] = append(result[key], m)
}
}
return result
}
func runMistralForBuckets(ctx context.Context, ac *discovery.AgentClient, buckets []sampleBucket) map[string][]discovery.Pass0Market {
out := make(map[string][]discovery.Pass0Market)
for _, b := range buckets {
bk := discovery.Bucket{Land: b.Land, Region: b.Region, YearMonth: b.YearMonth, Halbmonat: "H1"}
resp, err := ac.Discover(ctx, bk)
if err != nil {
fmt.Fprintf(os.Stderr, "mistral %s: %v\n", bucketKey(b), err)
continue
}
out[bucketKey(b)] = resp.Maerkte
}
return out
}
func bucketKey(b sampleBucket) string {
return b.Land + ":" + b.Region + ":" + b.YearMonth
}
func buildMarkdownReport(buckets []sampleBucket, c map[string][]crawler.MergedEvent, m map[string][]discovery.Pass0Market) string {
var sb strings.Builder
sb.WriteString("# Discovery coverage comparison\n\n")
sb.WriteString(fmt.Sprintf("Generated: %s\n\n", time.Now().Format(time.RFC3339)))
for _, b := range buckets {
key := bucketKey(b)
ce := c[key]
me := m[key]
sb.WriteString(fmt.Sprintf("## %s\n\n", key))
sb.WriteString(fmt.Sprintf("- crawler: %d events\n", len(ce)))
sb.WriteString(fmt.Sprintf("- mistral: %d events\n\n", len(me)))
crawlerNames := nameSet(ceNames(ce))
mistralNames := nameSet(meNames(me))
sb.WriteString("### Only in crawler\n")
for _, n := range diff(crawlerNames, mistralNames) {
sb.WriteString(fmt.Sprintf("- %s\n", n))
}
sb.WriteString("\n### Only in mistral\n")
for _, n := range diff(mistralNames, crawlerNames) {
sb.WriteString(fmt.Sprintf("- %s\n", n))
}
sb.WriteString("\n### In both\n")
for _, n := range intersect(crawlerNames, mistralNames) {
sb.WriteString(fmt.Sprintf("- %s\n", n))
}
sb.WriteString("\n")
}
return sb.String()
}
func ceNames(ce []crawler.MergedEvent) []string {
out := make([]string, len(ce))
for i, e := range ce {
out[i] = discovery.NormalizeName(e.Name)
}
return out
}
func meNames(me []discovery.Pass0Market) []string {
out := make([]string, len(me))
for i, e := range me {
out[i] = discovery.NormalizeName(e.MarktName)
}
return out
}
func nameSet(names []string) map[string]bool {
s := make(map[string]bool, len(names))
for _, n := range names {
s[n] = true
}
return s
}
func diff(a, b map[string]bool) []string {
out := make([]string, 0, len(a))
for n := range a {
if !b[n] {
out = append(out, n)
}
}
sort.Strings(out)
return out
}
func intersect(a, b map[string]bool) []string {
out := make([]string, 0)
for n := range a {
if b[n] {
out = append(out, n)
}
}
sort.Strings(out)
return out
}

View File

@@ -37,7 +37,7 @@ spec:
curl -fsS --retry 2 --retry-delay 10 \
-X POST \
-H "Authorization: Bearer $DISCOVERY_TOKEN" \
"http://{{ include "marktvogt-backend.fullname" . }}:{{ .Values.service.port }}/api/v1/admin/discovery/tick"
"http://{{ include "marktvogt-backend.fullname" . }}:{{ .Values.service.port }}/api/v1/admin/discovery/crawl"
env:
- name: DISCOVERY_TOKEN
valueFrom:

View File

@@ -102,7 +102,7 @@ ai:
# Discovery cron — token passed via CI secrets during deploy.
discovery:
enabled: true
schedule: "0 */4 * * *"
schedule: "0 4 * * *"
token: ""
batchSize: 4
forwardMonths: 12

View File

@@ -26,19 +26,16 @@ type Config struct {
}
type DiscoveryConfig struct {
Token string // bearer token for /tick endpoint
BatchSize int // buckets per tick (default 4)
ForwardMonths int // forward window in months (default 12)
Token string // bearer token for /crawl endpoint
CrawlerUserAgent string // user-agent for crawler HTTP requests
CrawlerManualRateLimitPerHour int // max manual crawl requests per hour (1-3600, default 1)
}
type AIConfig struct {
APIKey string
AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search)
AgentDiscovery string // Agent ID for discovery pipeline (Task 7)
ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest)
RateLimitRPS float64 // Max requests per second to Mistral (0 = disabled)
APIKey string
AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search)
ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest)
RateLimitRPS float64 // Max requests per second to Mistral (0 = disabled)
}
type AppConfig struct {
@@ -186,19 +183,9 @@ func Load() (*Config, error) {
return nil, fmt.Errorf("AI_RATE_LIMIT_RPS: %w", err)
}
batchSize, err := envInt("DISCOVERY_BATCH_SIZE", 4)
if err != nil {
return nil, fmt.Errorf("DISCOVERY_BATCH_SIZE: %w", err)
}
forwardMonths, err := envInt("DISCOVERY_FORWARD_MONTHS", 12)
if err != nil {
return nil, fmt.Errorf("DISCOVERY_FORWARD_MONTHS: %w", err)
}
discoveryToken := envStr("DISCOVERY_TOKEN", "")
if discoveryToken == "" {
slog.Warn("DISCOVERY_TOKEN is empty; /api/v1/admin/discovery/tick is disabled")
slog.Warn("DISCOVERY_TOKEN is empty; /api/v1/admin/discovery/crawl is disabled")
}
crawlerRateLimit, err := envInt("DISCOVERY_CRAWLER_MANUAL_RATE_LIMIT_PER_HOUR", 1)
@@ -285,16 +272,13 @@ func Load() (*Config, error) {
FrontendURL: envStr("FRONTEND_URL", "http://localhost:5173"),
},
AI: AIConfig{
APIKey: envStr("AI_API_KEY", ""),
AgentSimple: envStr("AI_AGENT_SIMPLE", ""),
AgentDiscovery: envStr("AI_AGENT_DISCOVERY", ""),
ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"),
RateLimitRPS: rpsAI,
APIKey: envStr("AI_API_KEY", ""),
AgentSimple: envStr("AI_AGENT_SIMPLE", ""),
ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"),
RateLimitRPS: rpsAI,
},
Discovery: DiscoveryConfig{
Token: discoveryToken,
BatchSize: batchSize,
ForwardMonths: forwardMonths,
CrawlerUserAgent: envStr("DISCOVERY_CRAWLER_USER_AGENT", "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0"),
CrawlerManualRateLimitPerHour: crawlerRateLimit,
},

View File

@@ -1,116 +0,0 @@
package discovery
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"marktvogt.de/backend/internal/pkg/ai"
)
// AgentClient wraps the Mistral Pass 0 agent for discovery.
type AgentClient struct {
ai *ai.Client
agentID string
}
func NewAgentClient(aiClient *ai.Client, agentID string) *AgentClient {
return &AgentClient{ai: aiClient, agentID: agentID}
}
func (c *AgentClient) Enabled() bool {
return c.ai != nil && c.agentID != ""
}
// Discover runs Pass 0 for the given bucket. The agent's full instructions
// are set in the Mistral console (see spec §6.2). We only inject the bucket
// parameters here.
func (c *AgentClient) Discover(ctx context.Context, b Bucket) (Pass0Response, error) {
if !c.Enabled() {
return Pass0Response{}, fmt.Errorf("discovery agent not configured")
}
prompt := fmt.Sprintf(
"bucket:\n land: %s\n region: %s\n jahr_monat: %s\n halbmonat: %s\nrecherche_datum: %s\n\nFinde alle Maerkte in diesem Bucket und antworte im vorgegebenen JSON-Schema.",
b.Land, b.Region, b.YearMonth, b.Halbmonat, time.Now().UTC().Format("2006-01-02"),
)
result, err := c.ai.Pass0(ctx, c.agentID, prompt)
if err != nil {
return Pass0Response{}, fmt.Errorf("mistral pass0: %w", err)
}
return parsePass0Response(result.Content)
}
func parsePass0Response(raw string) (Pass0Response, error) {
cleaned := extractJSON(raw)
cleaned = stripJSONComments(cleaned)
var out Pass0Response
if err := json.Unmarshal([]byte(cleaned), &out); err != nil {
return Pass0Response{}, fmt.Errorf("unmarshal pass0: %w (raw first 500: %q)", err, truncate(raw, 500))
}
return out, nil
}
// --- JSON helpers (independent copy; logic mirrors domain/market/research.go).
// Do not import from the market package — keeping packages decoupled.
func extractJSON(s string) string {
start := strings.IndexByte(s, '{')
if start < 0 {
return s
}
s = s[start:]
depth := 0
for i := 0; i < len(s); i++ {
switch s[i] {
case '{':
depth++
case '}':
depth--
if depth == 0 {
return s[:i+1]
}
}
}
return s
}
func stripJSONComments(s string) string {
var result []byte
inString := false
escaped := false
for i := 0; i < len(s); i++ {
c := s[i]
if escaped {
result = append(result, c)
escaped = false
continue
}
if c == '\\' && inString {
result = append(result, c)
escaped = true
continue
}
if c == '"' {
inString = !inString
result = append(result, c)
continue
}
if !inString && c == '/' && i+1 < len(s) && s[i+1] == '/' {
for i < len(s) && s[i] != '\n' {
i++
}
continue
}
result = append(result, c)
}
return string(result)
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n]
}

View File

@@ -1,74 +0,0 @@
package discovery
import "testing"
func TestParsePass0_Valid(t *testing.T) {
raw := `{
"bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"},
"recherche_datum": "2026-04-18",
"quellen_gesamt": ["https://a.example", "https://b.example"],
"maerkte": [
{
"markt_name": "Mittelaltermarkt Trostberg",
"stadt": "Trostberg",
"bundesland": "Bayern",
"start_datum": "2026-09-12",
"end_datum": "2026-09-14",
"website": "https://trostberg.example",
"quellen": ["https://a.example"],
"extraktion": "verbatim",
"hinweis": null
}
]
}`
got, err := parsePass0Response(raw)
if err != nil {
t.Fatalf("parse err: %v", err)
}
if got.Bucket.Region != "Bayern" {
t.Errorf("region = %q, want Bayern", got.Bucket.Region)
}
if len(got.Maerkte) != 1 || got.Maerkte[0].MarktName != "Mittelaltermarkt Trostberg" {
t.Errorf("unexpected markets: %+v", got.Maerkte)
}
}
func TestParsePass0_WithCommentsAndTrailingText(t *testing.T) {
raw := `Here is the JSON:
{
"bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"},
// a comment the agent added
"recherche_datum": "2026-04-18",
"quellen_gesamt": [],
"maerkte": []
}
end.`
got, err := parsePass0Response(raw)
if err != nil {
t.Fatalf("parse err: %v", err)
}
if got.Bucket.Region != "Bayern" || len(got.Maerkte) != 0 {
t.Errorf("unexpected: %+v", got)
}
}
func TestParsePass0_Malformed(t *testing.T) {
raw := `not JSON at all`
if _, err := parsePass0Response(raw); err == nil {
t.Error("expected error on non-JSON input")
}
}
func TestParsePass0_EmptyMaerkte(t *testing.T) {
raw := `{"bucket":{"land":"Deutschland","region":"Bayern","jahr_monat":"2026-09"},"recherche_datum":"","quellen_gesamt":[],"maerkte":[]}`
got, err := parsePass0Response(raw)
if err != nil {
t.Fatalf("parse err: %v", err)
}
if got.Maerkte == nil {
got.Maerkte = []Pass0Market{} // nil vs empty is fine
}
if len(got.Maerkte) != 0 {
t.Errorf("expected empty, got %+v", got.Maerkte)
}
}

View File

@@ -43,17 +43,6 @@ func NewHandler(s *Service, manualRateLimitPerHour int) *Handler {
return &Handler{service: s, crawlRateLimit: rl}
}
func (h *Handler) Tick(c *gin.Context) {
summary, err := h.service.Tick(c.Request.Context())
if err != nil {
slog.ErrorContext(c.Request.Context(), "discovery tick failed", "error", err)
apiErr := apierror.Internal("tick failed")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
c.JSON(http.StatusOK, gin.H{"data": summary})
}
// Crawl runs the crawler once and returns CrawlSummary. A per-process mutex
// blocks concurrent runs. The manual-admin rate limit is bypassed when
// "crawl_bypass_rate_limit" is set in the gin context (set by the bearer-token

View File

@@ -33,7 +33,7 @@ func (b *blockingCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult
// gets HTTP 429 while the first is still running.
func TestCrawlHandlerMutexReentry(t *testing.T) {
bc := &blockingCrawlerRunner{started: make(chan struct{})}
svc := NewServiceWithCrawler(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
h := NewHandler(svc, 1)
// First request — runs in a goroutine so the handler blocks.
@@ -75,7 +75,7 @@ func TestCrawlHandlerMutexReentry(t *testing.T) {
// within the rate limit window returns 429 with Retry-After.
func TestCrawlHandlerRateLimit(t *testing.T) {
// Use an instant-returning crawler so the mutex is released quickly.
svc := NewServiceWithCrawler(
svc := NewService(
newMockRepo(),
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
@@ -120,7 +120,7 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
// TestCrawlHandlerRateLimitResets verifies that a manual request succeeds once
// the rate limit window has elapsed.
func TestCrawlHandlerRateLimitResets(t *testing.T) {
svc := NewServiceWithCrawler(
svc := NewService(
newMockRepo(),
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},

View File

@@ -12,8 +12,6 @@ import (
var _ Repository = (*mockRepo)(nil)
type mockRepo struct {
pickStaleFn func(ctx context.Context, forwardMonths, limit int) ([]Bucket, error)
updateBucketFn func(ctx context.Context, id uuid.UUID, errMsg string) error
listSeriesFn func(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error)
editionExistsFn func(ctx context.Context, seriesID uuid.UUID, year int) (bool, error)
insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
@@ -30,12 +28,6 @@ type mockRepo struct {
inserted []DiscoveredMarket
}
func (m *mockRepo) PickStaleBuckets(ctx context.Context, fm, lim int) ([]Bucket, error) {
return m.pickStaleFn(ctx, fm, lim)
}
func (m *mockRepo) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error {
return m.updateBucketFn(ctx, id, errMsg)
}
func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) {
return m.listSeriesFn(ctx, c)
}

View File

@@ -59,34 +59,6 @@ type RejectedDiscovery struct {
Reason string `json:"reason"`
}
// Pass0Response is the parsed shape of the Mistral Pass 0 agent reply.
type Pass0Response struct {
Bucket Pass0Bucket `json:"bucket"`
RechercheDatum string `json:"recherche_datum"`
QuellenGesamt []string `json:"quellen_gesamt"`
Maerkte []Pass0Market `json:"maerkte"`
}
type Pass0Bucket struct {
Land string `json:"land"`
Region string `json:"region"`
JahrMonat string `json:"jahr_monat"`
Halbmonat string `json:"halbmonat"`
}
type Pass0Market struct {
MarktName string `json:"markt_name"`
Stadt string `json:"stadt"`
Bundesland string `json:"bundesland"`
StartDatum string `json:"start_datum"` // 'YYYY-MM-DD' or ""
EndDatum string `json:"end_datum"`
Website string `json:"website"`
Quellen []string `json:"quellen"`
Konfidenz string `json:"konfidenz"` // 'hoch' | 'mittel' | 'niedrig'
AgentStatus string `json:"status"` // agent's status field; see DiscoveredMarket.AgentStatus for values
Hinweis string `json:"hinweis"`
}
// UpdatePendingFields is a partial update for a pending discovered_market row.
// Only non-nil fields are written. Name normalization is recomputed when
// MarktName or Stadt is set so dedup stays honest.

View File

@@ -12,8 +12,6 @@ import (
)
type Repository interface {
PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error)
UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error
ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error)
EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error)
InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
@@ -44,44 +42,6 @@ func NewRepository(pool *pgxpool.Pool) Repository {
return &pgRepository{pool: pool}
}
func (r *pgRepository) PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) {
q := `
SELECT id, land, region, year_month, halbmonat, last_queried_at, coalesce(last_error, ''), created_at
FROM discovery_buckets
WHERE year_month >= to_char(date_trunc('month', now()), 'YYYY-MM')
AND year_month <= to_char(date_trunc('month', now()) + ($1 * interval '1 month'), 'YYYY-MM')
AND (last_queried_at IS NULL OR last_queried_at < now() - interval '7 days')
ORDER BY last_queried_at NULLS FIRST, year_month, halbmonat
LIMIT $2`
rows, err := r.pool.Query(ctx, q, forwardMonths, limit)
if err != nil {
return nil, fmt.Errorf("pick buckets: %w", err)
}
defer rows.Close()
out := make([]Bucket, 0)
for rows.Next() {
var b Bucket
if err := rows.Scan(&b.ID, &b.Land, &b.Region, &b.YearMonth, &b.Halbmonat, &b.LastQueriedAt, &b.LastError, &b.CreatedAt); err != nil {
return nil, err
}
out = append(out, b)
}
return out, rows.Err()
}
func (r *pgRepository) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error {
var errValue any
if errMsg == "" {
errValue = nil
} else {
errValue = errMsg
}
_, err := r.pool.Exec(ctx,
`UPDATE discovery_buckets SET last_queried_at = now(), last_error = $2 WHERE id = $1`,
id, errValue)
return err
}
func (r *pgRepository) ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, name, city FROM market_series WHERE LOWER(city) = $1`,

View File

@@ -2,16 +2,13 @@ package discovery
import "github.com/gin-gonic/gin"
// RegisterRoutes mounts both the admin-session routes (queue mgmt) and the
// bearer-token route (tick). The two middlewares are passed in separately.
// RegisterRoutes mounts admin-session routes (queue mgmt) and the
// bearer-token crawl route.
func RegisterRoutes(
rg *gin.RouterGroup,
h *Handler,
requireAuth, requireAdmin, requireTickToken gin.HandlerFunc,
) {
// Machine-driven tick (bearer token).
rg.POST("/admin/discovery/tick", requireTickToken, h.Tick)
// Machine-driven crawl (bearer token; bypasses manual rate limit).
rg.POST("/admin/discovery/crawl", requireTickToken, func(c *gin.Context) {
c.Set("crawl_bypass_rate_limit", true)

View File

@@ -13,7 +13,6 @@ import (
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/market"
"marktvogt.de/backend/internal/pkg/ai"
)
type marketCreator interface {
@@ -34,39 +33,16 @@ type crawlerRunner interface {
RunAll(ctx context.Context) (crawler.CrawlResult, error)
}
// Service orchestrates bucket scheduling, agent invocation, and queue management.
// Service orchestrates crawler runs and queue management.
type Service struct {
repo Repository
agent *AgentClient
crawler crawlerRunner
marketCreator marketCreator
linkChecker linkVerifier
batchSize int
forwardMonths int
}
// NewService constructs a Service. batchSize and forwardMonths configure bucket picking.
func NewService(repo Repository, agent *AgentClient, mc marketCreator, batchSize, forwardMonths int) *Service {
return &Service{
repo: repo,
agent: agent,
marketCreator: mc,
linkChecker: NewLinkChecker(),
batchSize: batchSize,
forwardMonths: forwardMonths,
}
}
// SetCrawler attaches a crawler instance post-construction. For MR 1, this allows
// wiring both Tick (agent-driven) and Crawl (crawler-driven) paths on the same Service.
func (s *Service) SetCrawler(cr crawlerRunner) {
s.crawler = cr
}
// NewServiceWithCrawler constructs a Service wired for the crawler-driven
// Crawl method. The existing Pass 0 Tick path is not wired here (no agent).
// MR 2 will consolidate this with NewService once the Mistral path is removed.
func NewServiceWithCrawler(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service {
// NewService constructs a Service wired for the crawler-driven Crawl path.
func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service {
return &Service{
repo: repo,
crawler: cr,
@@ -75,199 +51,6 @@ func NewServiceWithCrawler(repo Repository, cr crawlerRunner, lc linkVerifier, m
}
}
// PickBuckets returns stale buckets eligible for the next discovery run.
func (s *Service) PickBuckets(ctx context.Context) ([]Bucket, error) {
return s.repo.PickStaleBuckets(ctx, s.forwardMonths, s.batchSize)
}
// TickSummary reports what happened in one tick.
type TickSummary struct {
BucketsProcessed int `json:"buckets_processed"`
Discovered int `json:"markets_discovered"`
DedupedExisting int `json:"deduped_existing"`
DedupedRejected int `json:"deduped_rejected"`
DedupedQueue int `json:"deduped_queue"`
Errors int `json:"errors"`
RateLimited int `json:"rate_limited"`
LinkCheckFailed int `json:"link_check_failed"`
ValidationFailed int `json:"validation_failed"`
}
// Tick picks N stale buckets and runs Pass 0 for each, writing net-new discoveries.
// On a rate-limit hit, aborts the remainder of the tick: subsequent buckets in the
// same batch would almost certainly hit the same limit, and we want to give Mistral's
// web_search budget time to refill before trying again.
func (s *Service) Tick(ctx context.Context) (TickSummary, error) {
if s.agent == nil || !s.agent.Enabled() {
return TickSummary{}, errors.New("discovery agent not configured")
}
buckets, err := s.PickBuckets(ctx)
if err != nil {
return TickSummary{}, fmt.Errorf("pick buckets: %w", err)
}
var summary TickSummary
summary.BucketsProcessed = len(buckets)
for _, b := range buckets {
if stop := s.processOneBucket(ctx, b, &summary); stop {
break
}
}
return summary, nil
}
// processOneBucket runs Pass 0 for a single bucket. Returns stop=true when the tick
// should abort early (currently only on persistent rate limits).
func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) (stop bool) {
resp, err := s.agent.Discover(ctx, b)
if err != nil {
wait := 2 * time.Second
if ai.IsRateLimit(err) {
wait = 10 * time.Second
}
select {
case <-ctx.Done():
return false
case <-time.After(wait):
}
resp, err = s.agent.Discover(ctx, b)
if err != nil {
if ai.IsRateLimit(err) {
// Leave last_queried_at unchanged so the bucket is re-picked on the
// next tick, and abort the rest of this tick — Mistral's web_search
// budget is shared across buckets, no point hammering it further.
slog.InfoContext(ctx, "pass0 rate-limited; deferring bucket + aborting tick",
"bucket_id", b.ID, "region", b.Region, "year_month", b.YearMonth)
summary.RateLimited++
return true
}
slog.WarnContext(ctx, "pass0 failed twice", "bucket_id", b.ID, "error", err)
_ = s.repo.UpdateBucketQueried(ctx, b.ID, err.Error())
summary.Errors++
return false
}
}
sub := s.processBucketResponse(ctx, b, resp)
summary.Discovered += sub.Discovered
summary.DedupedExisting += sub.DedupedExisting
summary.DedupedRejected += sub.DedupedRejected
summary.DedupedQueue += sub.DedupedQueue
if err := s.repo.UpdateBucketQueried(ctx, b.ID, ""); err != nil {
slog.ErrorContext(ctx, "update bucket queried", "bucket_id", b.ID, "error", err)
}
return false
}
func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary {
var summary TickSummary
seen := make(map[string]bool) // in-request dedup
for _, m := range resp.Maerkte {
key := NormalizeName(m.MarktName) + "|" + NormalizeCity(m.Stadt) + "|" + m.StartDatum
if seen[key] {
continue
}
seen[key] = true
// Series candidates pre-filtered by city.
candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.Stadt))
if err != nil {
slog.WarnContext(ctx, "list series by city", "city", m.Stadt, "error", err)
continue
}
matchedSeriesID := findSeriesMatch(m.MarktName, candidates)
startDatum, endDatum := parseOptionalDate(m.StartDatum), parseOptionalDate(m.EndDatum)
year := 0
if startDatum != nil {
year = startDatum.Year()
}
// Dedup checks in order (spec §7.1.e).
if matchedSeriesID != nil && year > 0 {
exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year)
if err != nil {
slog.WarnContext(ctx, "edition exists check", "error", err)
continue
}
if exists {
summary.DedupedExisting++
continue
}
}
nameNorm := NormalizeName(m.MarktName)
if year > 0 {
rejected, err := s.repo.IsRejected(ctx, nameNorm, m.Stadt, year)
if err != nil {
slog.WarnContext(ctx, "is rejected check", "error", err)
continue
}
if rejected {
summary.DedupedRejected++
continue
}
}
pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.Stadt, startDatum)
if err != nil {
slog.WarnContext(ctx, "queue pending check", "error", err)
continue
}
if pending {
summary.DedupedQueue++
continue
}
// Link verification — drop URLs the agent hallucinated or that are now
// 404/unreachable. If all quellen fail, skip the market entirely (we
// need at least one verifiable source). If the website fails, clear it
// but keep the market since quellen is the primary evidence.
verifiedQuellen := s.linkChecker.FilterURLs(ctx, m.Quellen)
if len(verifiedQuellen) == 0 {
slog.InfoContext(ctx, "link check dropped all quellen; skipping market",
"markt", m.MarktName, "stadt", m.Stadt)
summary.LinkCheckFailed++
continue
}
verifiedWebsite := m.Website
if verifiedWebsite != "" && !s.linkChecker.CheckURL(ctx, verifiedWebsite) {
verifiedWebsite = ""
}
dm := DiscoveredMarket{
BucketID: &b.ID,
MarktName: m.MarktName,
Stadt: m.Stadt,
Bundesland: m.Bundesland,
Land: b.Land,
StartDatum: startDatum,
EndDatum: endDatum,
Website: verifiedWebsite,
Quellen: verifiedQuellen,
Konfidenz: m.Konfidenz,
AgentStatus: m.AgentStatus,
Hinweis: m.Hinweis,
NameNormalized: nameNorm,
MatchedSeriesID: matchedSeriesID,
}
// Semantic validation — catches agent self-contradictions that the
// schema alone cannot. Errors drop the market; warnings would be
// appended to hinweis (none defined yet at Pass 0 scope).
issues := ValidateForInsert(dm, &b)
if HasErrors(issues) {
slog.InfoContext(ctx, "validation failed; skipping market",
"markt", m.MarktName, "stadt", m.Stadt, "issues", formatIssues(issues))
summary.ValidationFailed++
continue
}
if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil {
slog.WarnContext(ctx, "insert discovered", "error", err)
continue
}
summary.Discovered++
}
return summary
}
// CrawlSummary reports the outcome of one Service.Crawl run.
type CrawlSummary struct {
StartedAt time.Time `json:"started_at"`
@@ -474,17 +257,6 @@ func formatIssues(issues []Issue) string {
return strings.Join(parts, ",")
}
func parseOptionalDate(s string) *time.Time {
if s == "" {
return nil
}
t, err := time.Parse("2006-01-02", s)
if err != nil {
return nil
}
return &t
}
// Accept transitions a pending queue entry into a market edition.
// Returns (seriesID, editionID, error).
func (s *Service) Accept(ctx context.Context, queueID, reviewerID uuid.UUID) (uuid.UUID, uuid.UUID, error) {
@@ -621,7 +393,8 @@ func (s *Service) UpdatePending(ctx context.Context, id uuid.UUID, fields Update
// Stats returns a health snapshot for the admin dashboard.
// Shows up to 5 most-recent error buckets inline.
func (s *Service) Stats(ctx context.Context) (Stats, error) {
return s.repo.Stats(ctx, s.forwardMonths, 5)
const forwardMonths = 12
return s.repo.Stats(ctx, forwardMonths, 5)
}
// findSeriesMatch returns the ID of the first candidate whose normalized name matches

View File

@@ -24,8 +24,6 @@ func newMockRepo() *mockRepo {
m.editionExistsFn = func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil }
m.isRejectedFn = func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }
m.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }
m.updateBucketFn = func(_ context.Context, _ uuid.UUID, _ string) error { return nil }
m.pickStaleFn = func(_ context.Context, _, _ int) ([]Bucket, error) { return nil, nil }
return m
}
@@ -94,107 +92,6 @@ func TestFindSeriesMatch(t *testing.T) {
}
}
func TestPickBucketsPassesConfigToRepo(t *testing.T) {
var gotFM, gotLim int
m := &mockRepo{
pickStaleFn: func(_ context.Context, fm, lim int) ([]Bucket, error) {
gotFM, gotLim = fm, lim
return []Bucket{{ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09"}}, nil
},
}
svc := NewService(m, nil, nil, 4, 12)
svc.linkChecker = noopLinkVerifier{}
got, err := svc.PickBuckets(context.Background())
if err != nil {
t.Fatalf("err: %v", err)
}
if len(got) != 1 {
t.Errorf("got %d buckets, want 1", len(got))
}
if gotFM != 12 || gotLim != 4 {
t.Errorf("forwarded fm=%d lim=%d, want 12,4", gotFM, gotLim)
}
}
func TestProcessBucket_DedupsExisting(t *testing.T) {
bucket := Bucket{
ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09",
}
seriesID := uuid.New()
var inserted []DiscoveredMarket
m := &mockRepo{
listSeriesFn: func(_ context.Context, city string) ([]SeriesCandidate, error) {
return []SeriesCandidate{{ID: seriesID, Name: "Mittelaltermarkt Trostberg", City: city}}, nil
},
editionExistsFn: func(_ context.Context, _ uuid.UUID, year int) (bool, error) {
return year == 2026, nil // 2026 edition exists → dedup
},
isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil },
queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil },
insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) {
inserted = append(inserted, d)
return uuid.New(), nil
},
updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil },
}
svc := NewService(m, nil, nil, 4, 12)
svc.linkChecker = noopLinkVerifier{}
resp := Pass0Response{
Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"},
Maerkte: []Pass0Market{
{MarktName: "Mittelaltermarkt Trostberg", Stadt: "Trostberg", StartDatum: "2026-09-12", EndDatum: "2026-09-14", Quellen: []string{"https://x"}},
},
}
summary := svc.processBucketResponse(context.Background(), bucket, resp)
if summary.Discovered != 0 {
t.Errorf("expected 0 discovered, got %d", summary.Discovered)
}
if summary.DedupedExisting != 1 {
t.Errorf("expected 1 deduped_existing, got %d", summary.DedupedExisting)
}
if len(inserted) != 0 {
t.Errorf("expected no inserts, got %d", len(inserted))
}
}
func TestProcessBucket_InsertsNetNew(t *testing.T) {
bucket := Bucket{
ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09",
}
var inserted []DiscoveredMarket
m := &mockRepo{
listSeriesFn: func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil },
editionExistsFn: func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil },
isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil },
queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil },
insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) {
inserted = append(inserted, d)
return uuid.New(), nil
},
updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil },
}
svc := NewService(m, nil, nil, 4, 12)
svc.linkChecker = noopLinkVerifier{}
resp := Pass0Response{
Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"},
Maerkte: []Pass0Market{
{MarktName: "Neuer Markt", Stadt: "Passau", StartDatum: "2026-09-20", EndDatum: "2026-09-22", Quellen: []string{"https://x"}},
},
}
summary := svc.processBucketResponse(context.Background(), bucket, resp)
if summary.Discovered != 1 {
t.Errorf("expected 1 discovered, got %d", summary.Discovered)
}
if len(inserted) != 1 {
t.Errorf("expected 1 insert, got %d", len(inserted))
}
if inserted[0].NameNormalized != "neuer" {
t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized)
}
}
type stubCreator struct {
createCalls int
createEditionForSeriesCalls int
@@ -234,8 +131,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, mc, 4, 12)
svc.linkChecker = noopLinkVerifier{}
svc := NewService(m, nil, noopLinkVerifier{}, mc)
_, _, err := svc.Accept(context.Background(), qID, uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -257,8 +153,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) {
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, mc, 4, 12)
svc.linkChecker = noopLinkVerifier{}
svc := NewService(m, nil, noopLinkVerifier{}, mc)
_, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
@@ -289,7 +184,7 @@ func TestServiceCrawlHappyPath(t *testing.T) {
PerSourceMS: map[string]int64{"marktkalendarium": 1},
},
}
svc := NewServiceWithCrawler(repo, sc, lc, noopMarketCreator{})
svc := NewService(repo, sc, lc, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -333,7 +228,7 @@ func TestServiceCrawlLinkCheckFailed(t *testing.T) {
},
},
}
svc := NewServiceWithCrawler(repo, sc, alwaysFailLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, alwaysFailLinkVerifier{}, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -367,7 +262,7 @@ func TestServiceCrawlDedupQueue(t *testing.T) {
},
},
}
svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {
@@ -398,7 +293,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) {
},
},
}
svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
@@ -427,7 +322,7 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
},
},
}
svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {

View File

@@ -73,17 +73,8 @@ func (s *Server) registerRoutes() {
// Discovery routes
discoveryRepo := discovery.NewRepository(s.db)
discoveryAgent := discovery.NewAgentClient(aiClient, s.cfg.AI.AgentDiscovery)
discoveryService := discovery.NewService(
discoveryRepo,
discoveryAgent,
marketSvc,
s.cfg.Discovery.BatchSize,
s.cfg.Discovery.ForwardMonths,
)
// Wire the crawler for the Crawl path (MR 1 keeps both Tick and Crawl paths)
crawlerInstance := crawler.NewCrawler(s.cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs())
discoveryService.SetCrawler(crawlerInstance)
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc)
discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour)
requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token)
discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken)

View File

@@ -1,6 +1,23 @@
import type { PageServerLoad, Actions } from './$types.js';
import { redirect, fail } from '@sveltejs/kit';
import { serverFetch } from '$lib/api/client.server.js';
import { ApiClientError } from '$lib/api/client.js';
type CrawlSummary = {
started_at: string;
duration_ms: number;
per_source: Record<string, { events_fetched: number; elapsed_ms: number }>;
merged: number;
merged_across_sites: number;
discovered: number;
deduped_existing: number;
deduped_rejected: number;
deduped_queue: number;
link_check_failed: number;
validation_failed: number;
date_conflicts: number;
source_errors: Array<{ source: string; error: string }>;
};
type DiscoveredMarket = {
id: string;
@@ -94,6 +111,24 @@ export const actions: Actions = {
}
},
crawl: async ({ cookies, fetch }) => {
try {
const res = await serverFetch<CrawlSummary>(`/admin/discovery/crawl-manual`, cookies, {
method: 'POST',
fetch
});
return { crawlSummary: res.data };
} catch (err) {
if (err instanceof ApiClientError && err.status === 429) {
return fail(429, {
crawlError: `Rate limit erreicht. Bitte kurz warten und erneut versuchen. (${err.message})`
});
}
const message = err instanceof Error ? err.message : 'Crawl fehlgeschlagen.';
return fail(500, { crawlError: message });
}
},
update: async ({ request, cookies, fetch }) => {
const form = await request.formData();
const id = String(form.get('id') ?? '');

View File

@@ -1,6 +1,8 @@
<script lang="ts">
import { enhance } from '$app/forms';
let { data } = $props();
let { data, form } = $props();
let crawling = $state(false);
// Coalesce nullable list fields (Go encodes nil slices as null).
const queue = $derived(data.queue ?? []);
@@ -127,6 +129,113 @@
</details>
{/if}
<div class="mt-6 flex items-center gap-3">
<form
method="POST"
action="?/crawl"
use:enhance={() => {
crawling = true;
return async ({ update }) => {
crawling = false;
await update();
};
}}
>
<button
type="submit"
disabled={crawling}
class="rounded bg-indigo-600 px-3 py-1.5 text-sm text-white hover:bg-indigo-700 disabled:cursor-not-allowed disabled:opacity-60"
>
{crawling ? 'Crawling…' : 'Run crawl'}
</button>
</form>
</div>
{#if form?.crawlError}
<div
class="mt-4 rounded border border-red-300 bg-red-50 px-3 py-2 text-sm text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
>
{form.crawlError}
</div>
{/if}
{#if form?.crawlSummary}
{@const s = form.crawlSummary}
<div
class="mt-4 rounded-lg border border-stone-200 bg-stone-50 p-4 text-sm dark:border-stone-700 dark:bg-stone-900"
>
<div class="flex items-baseline justify-between">
<h2 class="font-semibold">Crawl-Ergebnis</h2>
<span class="font-mono text-xs text-stone-500"
>{(s.duration_ms / 1000).toFixed(1)} s · {s.started_at
.slice(0, 16)
.replace('T', ' ')}</span
>
</div>
<div class="mt-3 grid grid-cols-3 gap-2 sm:grid-cols-5">
{#each [['Entdeckt', s.discovered], ['Merged', s.merged], ['Merged cross-site', s.merged_across_sites], ['Dedup existing', s.deduped_existing], ['Dedup rejected', s.deduped_rejected], ['Dedup queue', s.deduped_queue], ['Link check fail', s.link_check_failed], ['Validation fail', s.validation_failed], ['Date conflicts', s.date_conflicts]] as [label, value]}
<div
class="rounded border border-stone-200 bg-white px-2 py-1.5 dark:border-stone-700 dark:bg-stone-800"
>
<div class="text-[10px] text-stone-500 dark:text-stone-400">{label}</div>
<div class="font-mono text-sm font-medium">{value}</div>
</div>
{/each}
</div>
{#if Object.keys(s.per_source).length > 0}
<div class="mt-4">
<div
class="text-xs font-medium tracking-wider text-stone-500 uppercase dark:text-stone-400"
>
Per source
</div>
<table class="mt-2 w-full text-left text-xs">
<thead>
<tr class="border-b border-stone-200 dark:border-stone-700">
<th class="pr-4 pb-1 font-medium text-stone-500">Source</th>
<th class="pr-4 pb-1 text-right font-medium text-stone-500">Events fetched</th>
<th class="pb-1 text-right font-medium text-stone-500">Elapsed (ms)</th>
</tr>
</thead>
<tbody>
{#each Object.entries(s.per_source) as [src, stat]}
<tr class="border-b border-stone-100 dark:border-stone-800">
<td class="py-1 pr-4 font-mono">{src}</td>
<td class="py-1 pr-4 text-right tabular-nums">{stat.events_fetched}</td>
<td class="py-1 text-right tabular-nums">{stat.elapsed_ms}</td>
</tr>
{/each}
</tbody>
</table>
</div>
{/if}
{#if s.source_errors && s.source_errors.length > 0}
<div class="mt-4">
<div
class="text-xs font-medium tracking-wider text-amber-700 uppercase dark:text-amber-400"
>
Source errors ({s.source_errors.length})
</div>
<ul class="mt-2 space-y-1">
{#each s.source_errors as e}
<li
class="rounded border border-amber-200 bg-amber-50 px-2 py-1 text-xs dark:border-amber-800 dark:bg-amber-950"
>
<span class="font-mono font-medium text-amber-800 dark:text-amber-300"
>{e.source}</span
>
<span class="ml-2 text-amber-700 dark:text-amber-400">{e.error}</span>
</li>
{/each}
</ul>
</div>
{/if}
</div>
{/if}
<h2 class="mt-8 text-lg font-semibold">Queue</h2>
<p class="mt-1 text-sm text-stone-500">
{queue.length} pending · showing from offset {data.offset}