diff --git a/backend/cmd/discovery-compare/main.go b/backend/cmd/discovery-compare/main.go deleted file mode 100644 index 1b32fdd..0000000 --- a/backend/cmd/discovery-compare/main.go +++ /dev/null @@ -1,233 +0,0 @@ -// discovery-compare: Run the crawler and Mistral Pass 0 against a sample of -// (land, region, year-month) buckets; emit markdown diff to stdout or --out. -// Purpose: verify crawler coverage before MR 2 deletes the Mistral path. -// -// Requires the usual backend env vars. JWT_SECRET must be set (any value works -// for this tool since no HTTP server is started). -package main - -import ( - "context" - "flag" - "fmt" - "os" - "sort" - "strings" - "time" - - "marktvogt.de/backend/internal/config" - "marktvogt.de/backend/internal/domain/discovery" - "marktvogt.de/backend/internal/domain/discovery/crawler" - "marktvogt.de/backend/internal/pkg/ai" -) - -type sampleBucket struct { - Land string - Region string - YearMonth string -} - -func main() { - var ( - bucketsFlag = flag.String("buckets", "", "comma-separated LAND:REGION:YYYY-MM list (e.g., Deutschland:Bayern:2026-04)") - outFlag = flag.String("out", "", "write markdown report to this file (default: stdout)") - ) - flag.Parse() - - if *bucketsFlag == "" { - fmt.Fprintln(os.Stderr, "--buckets required") - os.Exit(2) - } - buckets, err := parseBuckets(*bucketsFlag) - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(2) - } - - if err := run(buckets, *outFlag); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } -} - -func run(buckets []sampleBucket, outPath string) error { - cfg, err := config.Load() - if err != nil { - return fmt.Errorf("load config: %w", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) - defer cancel() - - // 1. Crawler run (dry-run — no DB inserts). - cr := crawler.NewCrawler(cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs()) - res, err := cr.RunAll(ctx) - if err != nil { - return fmt.Errorf("crawler: %w", err) - } - crawlerEvents := make([]crawler.RawEvent, 0) - for _, evs := range res.PerSource { - crawlerEvents = append(crawlerEvents, evs...) - } - merged := crawler.Merge(crawlerEvents) - crawlerByBucket := groupCrawlerByBucket(merged, buckets) - - // 2. Mistral Pass 0 run (one per sample bucket). - aiClient := ai.New(cfg.AI.APIKey, cfg.AI.AgentSimple, cfg.AI.ModelComplex, cfg.AI.RateLimitRPS) - agentClient := discovery.NewAgentClient(aiClient, cfg.AI.AgentDiscovery) - mistralByBucket := runMistralForBuckets(ctx, agentClient, buckets) - - // 3. Emit report. - report := buildMarkdownReport(buckets, crawlerByBucket, mistralByBucket) - if outPath == "" { - fmt.Println(report) - return nil - } - if err := os.WriteFile(outPath, []byte(report), 0644); err != nil { - return fmt.Errorf("write out: %w", err) - } - return nil -} - -func parseBuckets(s string) ([]sampleBucket, error) { - parts := strings.Split(s, ",") - out := make([]sampleBucket, 0, len(parts)) - for _, item := range parts { - fields := strings.Split(strings.TrimSpace(item), ":") - if len(fields) != 3 { - return nil, fmt.Errorf("bucket %q: want LAND:REGION:YYYY-MM", item) - } - out = append(out, sampleBucket{Land: fields[0], Region: fields[1], YearMonth: fields[2]}) - } - return out, nil -} - -// groupCrawlerByBucket assigns merged crawler events to sample buckets. -// -// NOTE: this is an approximation for the diagnostic CLI only — not for -// production dedup. The Bundesland match uses `strings.Contains` so a merged -// event with Bundesland="Bayern" will join a bucket with Region="Bay" (or -// "ern"). Good enough to compare coverage between the crawler and Mistral -// Pass 0 at bucket granularity; not safe for business-logic routing. -func groupCrawlerByBucket(merged []crawler.MergedEvent, buckets []sampleBucket) map[string][]crawler.MergedEvent { - result := make(map[string][]crawler.MergedEvent) - for _, b := range buckets { - result[bucketKey(b)] = nil - } - for _, m := range merged { - if m.StartDate == nil { - continue - } - ym := m.StartDate.Format("2006-01") - for _, b := range buckets { - if b.YearMonth != ym { - continue - } - if m.Land != "" && m.Land != b.Land { - continue - } - if m.Bundesland != "" && !strings.Contains(m.Bundesland, b.Region) { - continue - } - key := bucketKey(b) - result[key] = append(result[key], m) - } - } - return result -} - -func runMistralForBuckets(ctx context.Context, ac *discovery.AgentClient, buckets []sampleBucket) map[string][]discovery.Pass0Market { - out := make(map[string][]discovery.Pass0Market) - for _, b := range buckets { - bk := discovery.Bucket{Land: b.Land, Region: b.Region, YearMonth: b.YearMonth, Halbmonat: "H1"} - resp, err := ac.Discover(ctx, bk) - if err != nil { - fmt.Fprintf(os.Stderr, "mistral %s: %v\n", bucketKey(b), err) - continue - } - out[bucketKey(b)] = resp.Maerkte - } - return out -} - -func bucketKey(b sampleBucket) string { - return b.Land + ":" + b.Region + ":" + b.YearMonth -} - -func buildMarkdownReport(buckets []sampleBucket, c map[string][]crawler.MergedEvent, m map[string][]discovery.Pass0Market) string { - var sb strings.Builder - sb.WriteString("# Discovery coverage comparison\n\n") - sb.WriteString(fmt.Sprintf("Generated: %s\n\n", time.Now().Format(time.RFC3339))) - for _, b := range buckets { - key := bucketKey(b) - ce := c[key] - me := m[key] - sb.WriteString(fmt.Sprintf("## %s\n\n", key)) - sb.WriteString(fmt.Sprintf("- crawler: %d events\n", len(ce))) - sb.WriteString(fmt.Sprintf("- mistral: %d events\n\n", len(me))) - - crawlerNames := nameSet(ceNames(ce)) - mistralNames := nameSet(meNames(me)) - - sb.WriteString("### Only in crawler\n") - for _, n := range diff(crawlerNames, mistralNames) { - sb.WriteString(fmt.Sprintf("- %s\n", n)) - } - sb.WriteString("\n### Only in mistral\n") - for _, n := range diff(mistralNames, crawlerNames) { - sb.WriteString(fmt.Sprintf("- %s\n", n)) - } - sb.WriteString("\n### In both\n") - for _, n := range intersect(crawlerNames, mistralNames) { - sb.WriteString(fmt.Sprintf("- %s\n", n)) - } - sb.WriteString("\n") - } - return sb.String() -} - -func ceNames(ce []crawler.MergedEvent) []string { - out := make([]string, len(ce)) - for i, e := range ce { - out[i] = discovery.NormalizeName(e.Name) - } - return out -} - -func meNames(me []discovery.Pass0Market) []string { - out := make([]string, len(me)) - for i, e := range me { - out[i] = discovery.NormalizeName(e.MarktName) - } - return out -} - -func nameSet(names []string) map[string]bool { - s := make(map[string]bool, len(names)) - for _, n := range names { - s[n] = true - } - return s -} - -func diff(a, b map[string]bool) []string { - out := make([]string, 0, len(a)) - for n := range a { - if !b[n] { - out = append(out, n) - } - } - sort.Strings(out) - return out -} - -func intersect(a, b map[string]bool) []string { - out := make([]string, 0) - for n := range a { - if b[n] { - out = append(out, n) - } - } - sort.Strings(out) - return out -} diff --git a/backend/deploy/helm/templates/discovery-cron.yaml b/backend/deploy/helm/templates/discovery-cron.yaml index 27f00d0..08a4747 100644 --- a/backend/deploy/helm/templates/discovery-cron.yaml +++ b/backend/deploy/helm/templates/discovery-cron.yaml @@ -37,7 +37,7 @@ spec: curl -fsS --retry 2 --retry-delay 10 \ -X POST \ -H "Authorization: Bearer $DISCOVERY_TOKEN" \ - "http://{{ include "marktvogt-backend.fullname" . }}:{{ .Values.service.port }}/api/v1/admin/discovery/tick" + "http://{{ include "marktvogt-backend.fullname" . }}:{{ .Values.service.port }}/api/v1/admin/discovery/crawl" env: - name: DISCOVERY_TOKEN valueFrom: diff --git a/backend/deploy/helm/values.yaml b/backend/deploy/helm/values.yaml index 09fed88..550432e 100644 --- a/backend/deploy/helm/values.yaml +++ b/backend/deploy/helm/values.yaml @@ -102,7 +102,7 @@ ai: # Discovery cron — token passed via CI secrets during deploy. discovery: enabled: true - schedule: "0 */4 * * *" + schedule: "0 4 * * *" token: "" batchSize: 4 forwardMonths: 12 diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index ed97540..3f46661 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -26,19 +26,16 @@ type Config struct { } type DiscoveryConfig struct { - Token string // bearer token for /tick endpoint - BatchSize int // buckets per tick (default 4) - ForwardMonths int // forward window in months (default 12) + Token string // bearer token for /crawl endpoint CrawlerUserAgent string // user-agent for crawler HTTP requests CrawlerManualRateLimitPerHour int // max manual crawl requests per hour (1-3600, default 1) } type AIConfig struct { - APIKey string - AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search) - AgentDiscovery string // Agent ID for discovery pipeline (Task 7) - ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest) - RateLimitRPS float64 // Max requests per second to Mistral (0 = disabled) + APIKey string + AgentSimple string // Pre-created Mistral agent ID for Pass 1 (extraction + web search) + ModelComplex string // Model for Pass 2 (description + retry, e.g. mistral-large-latest) + RateLimitRPS float64 // Max requests per second to Mistral (0 = disabled) } type AppConfig struct { @@ -186,19 +183,9 @@ func Load() (*Config, error) { return nil, fmt.Errorf("AI_RATE_LIMIT_RPS: %w", err) } - batchSize, err := envInt("DISCOVERY_BATCH_SIZE", 4) - if err != nil { - return nil, fmt.Errorf("DISCOVERY_BATCH_SIZE: %w", err) - } - - forwardMonths, err := envInt("DISCOVERY_FORWARD_MONTHS", 12) - if err != nil { - return nil, fmt.Errorf("DISCOVERY_FORWARD_MONTHS: %w", err) - } - discoveryToken := envStr("DISCOVERY_TOKEN", "") if discoveryToken == "" { - slog.Warn("DISCOVERY_TOKEN is empty; /api/v1/admin/discovery/tick is disabled") + slog.Warn("DISCOVERY_TOKEN is empty; /api/v1/admin/discovery/crawl is disabled") } crawlerRateLimit, err := envInt("DISCOVERY_CRAWLER_MANUAL_RATE_LIMIT_PER_HOUR", 1) @@ -285,16 +272,13 @@ func Load() (*Config, error) { FrontendURL: envStr("FRONTEND_URL", "http://localhost:5173"), }, AI: AIConfig{ - APIKey: envStr("AI_API_KEY", ""), - AgentSimple: envStr("AI_AGENT_SIMPLE", ""), - AgentDiscovery: envStr("AI_AGENT_DISCOVERY", ""), - ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"), - RateLimitRPS: rpsAI, + APIKey: envStr("AI_API_KEY", ""), + AgentSimple: envStr("AI_AGENT_SIMPLE", ""), + ModelComplex: envStr("AI_MODEL_COMPLEX", "mistral-large-latest"), + RateLimitRPS: rpsAI, }, Discovery: DiscoveryConfig{ Token: discoveryToken, - BatchSize: batchSize, - ForwardMonths: forwardMonths, CrawlerUserAgent: envStr("DISCOVERY_CRAWLER_USER_AGENT", "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0"), CrawlerManualRateLimitPerHour: crawlerRateLimit, }, diff --git a/backend/internal/domain/discovery/agent_client.go b/backend/internal/domain/discovery/agent_client.go deleted file mode 100644 index 3bb109c..0000000 --- a/backend/internal/domain/discovery/agent_client.go +++ /dev/null @@ -1,116 +0,0 @@ -package discovery - -import ( - "context" - "encoding/json" - "fmt" - "strings" - "time" - - "marktvogt.de/backend/internal/pkg/ai" -) - -// AgentClient wraps the Mistral Pass 0 agent for discovery. -type AgentClient struct { - ai *ai.Client - agentID string -} - -func NewAgentClient(aiClient *ai.Client, agentID string) *AgentClient { - return &AgentClient{ai: aiClient, agentID: agentID} -} - -func (c *AgentClient) Enabled() bool { - return c.ai != nil && c.agentID != "" -} - -// Discover runs Pass 0 for the given bucket. The agent's full instructions -// are set in the Mistral console (see spec §6.2). We only inject the bucket -// parameters here. -func (c *AgentClient) Discover(ctx context.Context, b Bucket) (Pass0Response, error) { - if !c.Enabled() { - return Pass0Response{}, fmt.Errorf("discovery agent not configured") - } - prompt := fmt.Sprintf( - "bucket:\n land: %s\n region: %s\n jahr_monat: %s\n halbmonat: %s\nrecherche_datum: %s\n\nFinde alle Maerkte in diesem Bucket und antworte im vorgegebenen JSON-Schema.", - b.Land, b.Region, b.YearMonth, b.Halbmonat, time.Now().UTC().Format("2006-01-02"), - ) - result, err := c.ai.Pass0(ctx, c.agentID, prompt) - if err != nil { - return Pass0Response{}, fmt.Errorf("mistral pass0: %w", err) - } - return parsePass0Response(result.Content) -} - -func parsePass0Response(raw string) (Pass0Response, error) { - cleaned := extractJSON(raw) - cleaned = stripJSONComments(cleaned) - var out Pass0Response - if err := json.Unmarshal([]byte(cleaned), &out); err != nil { - return Pass0Response{}, fmt.Errorf("unmarshal pass0: %w (raw first 500: %q)", err, truncate(raw, 500)) - } - return out, nil -} - -// --- JSON helpers (independent copy; logic mirrors domain/market/research.go). -// Do not import from the market package — keeping packages decoupled. - -func extractJSON(s string) string { - start := strings.IndexByte(s, '{') - if start < 0 { - return s - } - s = s[start:] - depth := 0 - for i := 0; i < len(s); i++ { - switch s[i] { - case '{': - depth++ - case '}': - depth-- - if depth == 0 { - return s[:i+1] - } - } - } - return s -} - -func stripJSONComments(s string) string { - var result []byte - inString := false - escaped := false - for i := 0; i < len(s); i++ { - c := s[i] - if escaped { - result = append(result, c) - escaped = false - continue - } - if c == '\\' && inString { - result = append(result, c) - escaped = true - continue - } - if c == '"' { - inString = !inString - result = append(result, c) - continue - } - if !inString && c == '/' && i+1 < len(s) && s[i+1] == '/' { - for i < len(s) && s[i] != '\n' { - i++ - } - continue - } - result = append(result, c) - } - return string(result) -} - -func truncate(s string, n int) string { - if len(s) <= n { - return s - } - return s[:n] -} diff --git a/backend/internal/domain/discovery/agent_client_test.go b/backend/internal/domain/discovery/agent_client_test.go deleted file mode 100644 index 7932881..0000000 --- a/backend/internal/domain/discovery/agent_client_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package discovery - -import "testing" - -func TestParsePass0_Valid(t *testing.T) { - raw := `{ - "bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"}, - "recherche_datum": "2026-04-18", - "quellen_gesamt": ["https://a.example", "https://b.example"], - "maerkte": [ - { - "markt_name": "Mittelaltermarkt Trostberg", - "stadt": "Trostberg", - "bundesland": "Bayern", - "start_datum": "2026-09-12", - "end_datum": "2026-09-14", - "website": "https://trostberg.example", - "quellen": ["https://a.example"], - "extraktion": "verbatim", - "hinweis": null - } - ] -}` - got, err := parsePass0Response(raw) - if err != nil { - t.Fatalf("parse err: %v", err) - } - if got.Bucket.Region != "Bayern" { - t.Errorf("region = %q, want Bayern", got.Bucket.Region) - } - if len(got.Maerkte) != 1 || got.Maerkte[0].MarktName != "Mittelaltermarkt Trostberg" { - t.Errorf("unexpected markets: %+v", got.Maerkte) - } -} - -func TestParsePass0_WithCommentsAndTrailingText(t *testing.T) { - raw := `Here is the JSON: -{ - "bucket": {"land": "Deutschland", "region": "Bayern", "jahr_monat": "2026-09"}, - // a comment the agent added - "recherche_datum": "2026-04-18", - "quellen_gesamt": [], - "maerkte": [] -} -end.` - got, err := parsePass0Response(raw) - if err != nil { - t.Fatalf("parse err: %v", err) - } - if got.Bucket.Region != "Bayern" || len(got.Maerkte) != 0 { - t.Errorf("unexpected: %+v", got) - } -} - -func TestParsePass0_Malformed(t *testing.T) { - raw := `not JSON at all` - if _, err := parsePass0Response(raw); err == nil { - t.Error("expected error on non-JSON input") - } -} - -func TestParsePass0_EmptyMaerkte(t *testing.T) { - raw := `{"bucket":{"land":"Deutschland","region":"Bayern","jahr_monat":"2026-09"},"recherche_datum":"","quellen_gesamt":[],"maerkte":[]}` - got, err := parsePass0Response(raw) - if err != nil { - t.Fatalf("parse err: %v", err) - } - if got.Maerkte == nil { - got.Maerkte = []Pass0Market{} // nil vs empty is fine - } - if len(got.Maerkte) != 0 { - t.Errorf("expected empty, got %+v", got.Maerkte) - } -} diff --git a/backend/internal/domain/discovery/handler.go b/backend/internal/domain/discovery/handler.go index 60c37a3..993c2e3 100644 --- a/backend/internal/domain/discovery/handler.go +++ b/backend/internal/domain/discovery/handler.go @@ -43,17 +43,6 @@ func NewHandler(s *Service, manualRateLimitPerHour int) *Handler { return &Handler{service: s, crawlRateLimit: rl} } -func (h *Handler) Tick(c *gin.Context) { - summary, err := h.service.Tick(c.Request.Context()) - if err != nil { - slog.ErrorContext(c.Request.Context(), "discovery tick failed", "error", err) - apiErr := apierror.Internal("tick failed") - c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) - return - } - c.JSON(http.StatusOK, gin.H{"data": summary}) -} - // Crawl runs the crawler once and returns CrawlSummary. A per-process mutex // blocks concurrent runs. The manual-admin rate limit is bypassed when // "crawl_bypass_rate_limit" is set in the gin context (set by the bearer-token diff --git a/backend/internal/domain/discovery/handler_test.go b/backend/internal/domain/discovery/handler_test.go index d9c19ea..ce135c7 100644 --- a/backend/internal/domain/discovery/handler_test.go +++ b/backend/internal/domain/discovery/handler_test.go @@ -33,7 +33,7 @@ func (b *blockingCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult // gets HTTP 429 while the first is still running. func TestCrawlHandlerMutexReentry(t *testing.T) { bc := &blockingCrawlerRunner{started: make(chan struct{})} - svc := NewServiceWithCrawler(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}) h := NewHandler(svc, 1) // First request — runs in a goroutine so the handler blocks. @@ -75,7 +75,7 @@ func TestCrawlHandlerMutexReentry(t *testing.T) { // within the rate limit window returns 429 with Retry-After. func TestCrawlHandlerRateLimit(t *testing.T) { // Use an instant-returning crawler so the mutex is released quickly. - svc := NewServiceWithCrawler( + svc := NewService( newMockRepo(), &stubCrawlerRunner{result: crawler.CrawlResult{}}, noopLinkVerifier{}, @@ -120,7 +120,7 @@ func TestCrawlHandlerRateLimit(t *testing.T) { // TestCrawlHandlerRateLimitResets verifies that a manual request succeeds once // the rate limit window has elapsed. func TestCrawlHandlerRateLimitResets(t *testing.T) { - svc := NewServiceWithCrawler( + svc := NewService( newMockRepo(), &stubCrawlerRunner{result: crawler.CrawlResult{}}, noopLinkVerifier{}, diff --git a/backend/internal/domain/discovery/mock_repo_test.go b/backend/internal/domain/discovery/mock_repo_test.go index e83eff4..f1c4e4e 100644 --- a/backend/internal/domain/discovery/mock_repo_test.go +++ b/backend/internal/domain/discovery/mock_repo_test.go @@ -12,8 +12,6 @@ import ( var _ Repository = (*mockRepo)(nil) type mockRepo struct { - pickStaleFn func(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) - updateBucketFn func(ctx context.Context, id uuid.UUID, errMsg string) error listSeriesFn func(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) editionExistsFn func(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) @@ -30,12 +28,6 @@ type mockRepo struct { inserted []DiscoveredMarket } -func (m *mockRepo) PickStaleBuckets(ctx context.Context, fm, lim int) ([]Bucket, error) { - return m.pickStaleFn(ctx, fm, lim) -} -func (m *mockRepo) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error { - return m.updateBucketFn(ctx, id, errMsg) -} func (m *mockRepo) ListSeriesByCity(ctx context.Context, c string) ([]SeriesCandidate, error) { return m.listSeriesFn(ctx, c) } diff --git a/backend/internal/domain/discovery/model.go b/backend/internal/domain/discovery/model.go index 3d2a114..31b9826 100644 --- a/backend/internal/domain/discovery/model.go +++ b/backend/internal/domain/discovery/model.go @@ -59,34 +59,6 @@ type RejectedDiscovery struct { Reason string `json:"reason"` } -// Pass0Response is the parsed shape of the Mistral Pass 0 agent reply. -type Pass0Response struct { - Bucket Pass0Bucket `json:"bucket"` - RechercheDatum string `json:"recherche_datum"` - QuellenGesamt []string `json:"quellen_gesamt"` - Maerkte []Pass0Market `json:"maerkte"` -} - -type Pass0Bucket struct { - Land string `json:"land"` - Region string `json:"region"` - JahrMonat string `json:"jahr_monat"` - Halbmonat string `json:"halbmonat"` -} - -type Pass0Market struct { - MarktName string `json:"markt_name"` - Stadt string `json:"stadt"` - Bundesland string `json:"bundesland"` - StartDatum string `json:"start_datum"` // 'YYYY-MM-DD' or "" - EndDatum string `json:"end_datum"` - Website string `json:"website"` - Quellen []string `json:"quellen"` - Konfidenz string `json:"konfidenz"` // 'hoch' | 'mittel' | 'niedrig' - AgentStatus string `json:"status"` // agent's status field; see DiscoveredMarket.AgentStatus for values - Hinweis string `json:"hinweis"` -} - // UpdatePendingFields is a partial update for a pending discovered_market row. // Only non-nil fields are written. Name normalization is recomputed when // MarktName or Stadt is set so dedup stays honest. diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go index 0dac8c9..3b05ffb 100644 --- a/backend/internal/domain/discovery/repository.go +++ b/backend/internal/domain/discovery/repository.go @@ -12,8 +12,6 @@ import ( ) type Repository interface { - PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) - UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) @@ -44,44 +42,6 @@ func NewRepository(pool *pgxpool.Pool) Repository { return &pgRepository{pool: pool} } -func (r *pgRepository) PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) { - q := ` -SELECT id, land, region, year_month, halbmonat, last_queried_at, coalesce(last_error, ''), created_at -FROM discovery_buckets -WHERE year_month >= to_char(date_trunc('month', now()), 'YYYY-MM') - AND year_month <= to_char(date_trunc('month', now()) + ($1 * interval '1 month'), 'YYYY-MM') - AND (last_queried_at IS NULL OR last_queried_at < now() - interval '7 days') -ORDER BY last_queried_at NULLS FIRST, year_month, halbmonat -LIMIT $2` - rows, err := r.pool.Query(ctx, q, forwardMonths, limit) - if err != nil { - return nil, fmt.Errorf("pick buckets: %w", err) - } - defer rows.Close() - out := make([]Bucket, 0) - for rows.Next() { - var b Bucket - if err := rows.Scan(&b.ID, &b.Land, &b.Region, &b.YearMonth, &b.Halbmonat, &b.LastQueriedAt, &b.LastError, &b.CreatedAt); err != nil { - return nil, err - } - out = append(out, b) - } - return out, rows.Err() -} - -func (r *pgRepository) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error { - var errValue any - if errMsg == "" { - errValue = nil - } else { - errValue = errMsg - } - _, err := r.pool.Exec(ctx, - `UPDATE discovery_buckets SET last_queried_at = now(), last_error = $2 WHERE id = $1`, - id, errValue) - return err -} - func (r *pgRepository) ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) { rows, err := r.pool.Query(ctx, `SELECT id, name, city FROM market_series WHERE LOWER(city) = $1`, diff --git a/backend/internal/domain/discovery/routes.go b/backend/internal/domain/discovery/routes.go index d2db32d..8a6b310 100644 --- a/backend/internal/domain/discovery/routes.go +++ b/backend/internal/domain/discovery/routes.go @@ -2,16 +2,13 @@ package discovery import "github.com/gin-gonic/gin" -// RegisterRoutes mounts both the admin-session routes (queue mgmt) and the -// bearer-token route (tick). The two middlewares are passed in separately. +// RegisterRoutes mounts admin-session routes (queue mgmt) and the +// bearer-token crawl route. func RegisterRoutes( rg *gin.RouterGroup, h *Handler, requireAuth, requireAdmin, requireTickToken gin.HandlerFunc, ) { - // Machine-driven tick (bearer token). - rg.POST("/admin/discovery/tick", requireTickToken, h.Tick) - // Machine-driven crawl (bearer token; bypasses manual rate limit). rg.POST("/admin/discovery/crawl", requireTickToken, func(c *gin.Context) { c.Set("crawl_bypass_rate_limit", true) diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 15107c0..915068b 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -13,7 +13,6 @@ import ( "marktvogt.de/backend/internal/domain/discovery/crawler" "marktvogt.de/backend/internal/domain/market" - "marktvogt.de/backend/internal/pkg/ai" ) type marketCreator interface { @@ -34,39 +33,16 @@ type crawlerRunner interface { RunAll(ctx context.Context) (crawler.CrawlResult, error) } -// Service orchestrates bucket scheduling, agent invocation, and queue management. +// Service orchestrates crawler runs and queue management. type Service struct { repo Repository - agent *AgentClient crawler crawlerRunner marketCreator marketCreator linkChecker linkVerifier - batchSize int - forwardMonths int } -// NewService constructs a Service. batchSize and forwardMonths configure bucket picking. -func NewService(repo Repository, agent *AgentClient, mc marketCreator, batchSize, forwardMonths int) *Service { - return &Service{ - repo: repo, - agent: agent, - marketCreator: mc, - linkChecker: NewLinkChecker(), - batchSize: batchSize, - forwardMonths: forwardMonths, - } -} - -// SetCrawler attaches a crawler instance post-construction. For MR 1, this allows -// wiring both Tick (agent-driven) and Crawl (crawler-driven) paths on the same Service. -func (s *Service) SetCrawler(cr crawlerRunner) { - s.crawler = cr -} - -// NewServiceWithCrawler constructs a Service wired for the crawler-driven -// Crawl method. The existing Pass 0 Tick path is not wired here (no agent). -// MR 2 will consolidate this with NewService once the Mistral path is removed. -func NewServiceWithCrawler(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service { +// NewService constructs a Service wired for the crawler-driven Crawl path. +func NewService(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service { return &Service{ repo: repo, crawler: cr, @@ -75,199 +51,6 @@ func NewServiceWithCrawler(repo Repository, cr crawlerRunner, lc linkVerifier, m } } -// PickBuckets returns stale buckets eligible for the next discovery run. -func (s *Service) PickBuckets(ctx context.Context) ([]Bucket, error) { - return s.repo.PickStaleBuckets(ctx, s.forwardMonths, s.batchSize) -} - -// TickSummary reports what happened in one tick. -type TickSummary struct { - BucketsProcessed int `json:"buckets_processed"` - Discovered int `json:"markets_discovered"` - DedupedExisting int `json:"deduped_existing"` - DedupedRejected int `json:"deduped_rejected"` - DedupedQueue int `json:"deduped_queue"` - Errors int `json:"errors"` - RateLimited int `json:"rate_limited"` - LinkCheckFailed int `json:"link_check_failed"` - ValidationFailed int `json:"validation_failed"` -} - -// Tick picks N stale buckets and runs Pass 0 for each, writing net-new discoveries. -// On a rate-limit hit, aborts the remainder of the tick: subsequent buckets in the -// same batch would almost certainly hit the same limit, and we want to give Mistral's -// web_search budget time to refill before trying again. -func (s *Service) Tick(ctx context.Context) (TickSummary, error) { - if s.agent == nil || !s.agent.Enabled() { - return TickSummary{}, errors.New("discovery agent not configured") - } - buckets, err := s.PickBuckets(ctx) - if err != nil { - return TickSummary{}, fmt.Errorf("pick buckets: %w", err) - } - var summary TickSummary - summary.BucketsProcessed = len(buckets) - for _, b := range buckets { - if stop := s.processOneBucket(ctx, b, &summary); stop { - break - } - } - return summary, nil -} - -// processOneBucket runs Pass 0 for a single bucket. Returns stop=true when the tick -// should abort early (currently only on persistent rate limits). -func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickSummary) (stop bool) { - resp, err := s.agent.Discover(ctx, b) - if err != nil { - wait := 2 * time.Second - if ai.IsRateLimit(err) { - wait = 10 * time.Second - } - select { - case <-ctx.Done(): - return false - case <-time.After(wait): - } - resp, err = s.agent.Discover(ctx, b) - if err != nil { - if ai.IsRateLimit(err) { - // Leave last_queried_at unchanged so the bucket is re-picked on the - // next tick, and abort the rest of this tick — Mistral's web_search - // budget is shared across buckets, no point hammering it further. - slog.InfoContext(ctx, "pass0 rate-limited; deferring bucket + aborting tick", - "bucket_id", b.ID, "region", b.Region, "year_month", b.YearMonth) - summary.RateLimited++ - return true - } - slog.WarnContext(ctx, "pass0 failed twice", "bucket_id", b.ID, "error", err) - _ = s.repo.UpdateBucketQueried(ctx, b.ID, err.Error()) - summary.Errors++ - return false - } - } - sub := s.processBucketResponse(ctx, b, resp) - summary.Discovered += sub.Discovered - summary.DedupedExisting += sub.DedupedExisting - summary.DedupedRejected += sub.DedupedRejected - summary.DedupedQueue += sub.DedupedQueue - if err := s.repo.UpdateBucketQueried(ctx, b.ID, ""); err != nil { - slog.ErrorContext(ctx, "update bucket queried", "bucket_id", b.ID, "error", err) - } - return false -} - -func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary { - var summary TickSummary - seen := make(map[string]bool) // in-request dedup - for _, m := range resp.Maerkte { - key := NormalizeName(m.MarktName) + "|" + NormalizeCity(m.Stadt) + "|" + m.StartDatum - if seen[key] { - continue - } - seen[key] = true - - // Series candidates pre-filtered by city. - candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.Stadt)) - if err != nil { - slog.WarnContext(ctx, "list series by city", "city", m.Stadt, "error", err) - continue - } - matchedSeriesID := findSeriesMatch(m.MarktName, candidates) - - startDatum, endDatum := parseOptionalDate(m.StartDatum), parseOptionalDate(m.EndDatum) - year := 0 - if startDatum != nil { - year = startDatum.Year() - } - - // Dedup checks in order (spec §7.1.e). - if matchedSeriesID != nil && year > 0 { - exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year) - if err != nil { - slog.WarnContext(ctx, "edition exists check", "error", err) - continue - } - if exists { - summary.DedupedExisting++ - continue - } - } - nameNorm := NormalizeName(m.MarktName) - if year > 0 { - rejected, err := s.repo.IsRejected(ctx, nameNorm, m.Stadt, year) - if err != nil { - slog.WarnContext(ctx, "is rejected check", "error", err) - continue - } - if rejected { - summary.DedupedRejected++ - continue - } - } - pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.Stadt, startDatum) - if err != nil { - slog.WarnContext(ctx, "queue pending check", "error", err) - continue - } - if pending { - summary.DedupedQueue++ - continue - } - - // Link verification — drop URLs the agent hallucinated or that are now - // 404/unreachable. If all quellen fail, skip the market entirely (we - // need at least one verifiable source). If the website fails, clear it - // but keep the market since quellen is the primary evidence. - verifiedQuellen := s.linkChecker.FilterURLs(ctx, m.Quellen) - if len(verifiedQuellen) == 0 { - slog.InfoContext(ctx, "link check dropped all quellen; skipping market", - "markt", m.MarktName, "stadt", m.Stadt) - summary.LinkCheckFailed++ - continue - } - verifiedWebsite := m.Website - if verifiedWebsite != "" && !s.linkChecker.CheckURL(ctx, verifiedWebsite) { - verifiedWebsite = "" - } - - dm := DiscoveredMarket{ - BucketID: &b.ID, - MarktName: m.MarktName, - Stadt: m.Stadt, - Bundesland: m.Bundesland, - Land: b.Land, - StartDatum: startDatum, - EndDatum: endDatum, - Website: verifiedWebsite, - Quellen: verifiedQuellen, - Konfidenz: m.Konfidenz, - AgentStatus: m.AgentStatus, - Hinweis: m.Hinweis, - NameNormalized: nameNorm, - MatchedSeriesID: matchedSeriesID, - } - - // Semantic validation — catches agent self-contradictions that the - // schema alone cannot. Errors drop the market; warnings would be - // appended to hinweis (none defined yet at Pass 0 scope). - issues := ValidateForInsert(dm, &b) - if HasErrors(issues) { - slog.InfoContext(ctx, "validation failed; skipping market", - "markt", m.MarktName, "stadt", m.Stadt, "issues", formatIssues(issues)) - summary.ValidationFailed++ - continue - } - - if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil { - slog.WarnContext(ctx, "insert discovered", "error", err) - continue - } - summary.Discovered++ - } - return summary -} - // CrawlSummary reports the outcome of one Service.Crawl run. type CrawlSummary struct { StartedAt time.Time `json:"started_at"` @@ -474,17 +257,6 @@ func formatIssues(issues []Issue) string { return strings.Join(parts, ",") } -func parseOptionalDate(s string) *time.Time { - if s == "" { - return nil - } - t, err := time.Parse("2006-01-02", s) - if err != nil { - return nil - } - return &t -} - // Accept transitions a pending queue entry into a market edition. // Returns (seriesID, editionID, error). func (s *Service) Accept(ctx context.Context, queueID, reviewerID uuid.UUID) (uuid.UUID, uuid.UUID, error) { @@ -621,7 +393,8 @@ func (s *Service) UpdatePending(ctx context.Context, id uuid.UUID, fields Update // Stats returns a health snapshot for the admin dashboard. // Shows up to 5 most-recent error buckets inline. func (s *Service) Stats(ctx context.Context) (Stats, error) { - return s.repo.Stats(ctx, s.forwardMonths, 5) + const forwardMonths = 12 + return s.repo.Stats(ctx, forwardMonths, 5) } // findSeriesMatch returns the ID of the first candidate whose normalized name matches diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index 7454616..b6457df 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -24,8 +24,6 @@ func newMockRepo() *mockRepo { m.editionExistsFn = func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil } m.isRejectedFn = func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil } m.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil } - m.updateBucketFn = func(_ context.Context, _ uuid.UUID, _ string) error { return nil } - m.pickStaleFn = func(_ context.Context, _, _ int) ([]Bucket, error) { return nil, nil } return m } @@ -94,107 +92,6 @@ func TestFindSeriesMatch(t *testing.T) { } } -func TestPickBucketsPassesConfigToRepo(t *testing.T) { - var gotFM, gotLim int - m := &mockRepo{ - pickStaleFn: func(_ context.Context, fm, lim int) ([]Bucket, error) { - gotFM, gotLim = fm, lim - return []Bucket{{ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09"}}, nil - }, - } - svc := NewService(m, nil, nil, 4, 12) - svc.linkChecker = noopLinkVerifier{} - got, err := svc.PickBuckets(context.Background()) - if err != nil { - t.Fatalf("err: %v", err) - } - if len(got) != 1 { - t.Errorf("got %d buckets, want 1", len(got)) - } - if gotFM != 12 || gotLim != 4 { - t.Errorf("forwarded fm=%d lim=%d, want 12,4", gotFM, gotLim) - } -} - -func TestProcessBucket_DedupsExisting(t *testing.T) { - bucket := Bucket{ - ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09", - } - seriesID := uuid.New() - var inserted []DiscoveredMarket - m := &mockRepo{ - listSeriesFn: func(_ context.Context, city string) ([]SeriesCandidate, error) { - return []SeriesCandidate{{ID: seriesID, Name: "Mittelaltermarkt Trostberg", City: city}}, nil - }, - editionExistsFn: func(_ context.Context, _ uuid.UUID, year int) (bool, error) { - return year == 2026, nil // 2026 edition exists → dedup - }, - isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }, - queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }, - insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) { - inserted = append(inserted, d) - return uuid.New(), nil - }, - updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil }, - } - svc := NewService(m, nil, nil, 4, 12) - svc.linkChecker = noopLinkVerifier{} - - resp := Pass0Response{ - Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"}, - Maerkte: []Pass0Market{ - {MarktName: "Mittelaltermarkt Trostberg", Stadt: "Trostberg", StartDatum: "2026-09-12", EndDatum: "2026-09-14", Quellen: []string{"https://x"}}, - }, - } - summary := svc.processBucketResponse(context.Background(), bucket, resp) - if summary.Discovered != 0 { - t.Errorf("expected 0 discovered, got %d", summary.Discovered) - } - if summary.DedupedExisting != 1 { - t.Errorf("expected 1 deduped_existing, got %d", summary.DedupedExisting) - } - if len(inserted) != 0 { - t.Errorf("expected no inserts, got %d", len(inserted)) - } -} - -func TestProcessBucket_InsertsNetNew(t *testing.T) { - bucket := Bucket{ - ID: uuid.New(), Land: "Deutschland", Region: "Bayern", YearMonth: "2026-09", - } - var inserted []DiscoveredMarket - m := &mockRepo{ - listSeriesFn: func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil }, - editionExistsFn: func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil }, - isRejectedFn: func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }, - queuePendingFn: func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }, - insertDiscFn: func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) { - inserted = append(inserted, d) - return uuid.New(), nil - }, - updateBucketFn: func(_ context.Context, _ uuid.UUID, _ string) error { return nil }, - } - svc := NewService(m, nil, nil, 4, 12) - svc.linkChecker = noopLinkVerifier{} - - resp := Pass0Response{ - Bucket: Pass0Bucket{Land: "Deutschland", Region: "Bayern", JahrMonat: "2026-09"}, - Maerkte: []Pass0Market{ - {MarktName: "Neuer Markt", Stadt: "Passau", StartDatum: "2026-09-20", EndDatum: "2026-09-22", Quellen: []string{"https://x"}}, - }, - } - summary := svc.processBucketResponse(context.Background(), bucket, resp) - if summary.Discovered != 1 { - t.Errorf("expected 1 discovered, got %d", summary.Discovered) - } - if len(inserted) != 1 { - t.Errorf("expected 1 insert, got %d", len(inserted)) - } - if inserted[0].NameNormalized != "neuer" { - t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized) - } -} - type stubCreator struct { createCalls int createEditionForSeriesCalls int @@ -234,8 +131,7 @@ func TestAccept_NewSeries_CallsCreate(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, mc, 4, 12) - svc.linkChecker = noopLinkVerifier{} + svc := NewService(m, nil, noopLinkVerifier{}, mc) _, _, err := svc.Accept(context.Background(), qID, uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -257,8 +153,7 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) { markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, } mc := &stubCreator{} - svc := NewService(m, nil, mc, 4, 12) - svc.linkChecker = noopLinkVerifier{} + svc := NewService(m, nil, noopLinkVerifier{}, mc) _, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New()) if err != nil { t.Fatalf("accept err: %v", err) @@ -289,7 +184,7 @@ func TestServiceCrawlHappyPath(t *testing.T) { PerSourceMS: map[string]int64{"marktkalendarium": 1}, }, } - svc := NewServiceWithCrawler(repo, sc, lc, noopMarketCreator{}) + svc := NewService(repo, sc, lc, noopMarketCreator{}) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -333,7 +228,7 @@ func TestServiceCrawlLinkCheckFailed(t *testing.T) { }, }, } - svc := NewServiceWithCrawler(repo, sc, alwaysFailLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, alwaysFailLinkVerifier{}, noopMarketCreator{}) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -367,7 +262,7 @@ func TestServiceCrawlDedupQueue(t *testing.T) { }, }, } - svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) summary, err := svc.Crawl(context.Background()) if err != nil { @@ -398,7 +293,7 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) { }, }, } - svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) if _, err := svc.Crawl(context.Background()); err != nil { t.Fatal(err) @@ -427,7 +322,7 @@ func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { }, }, } - svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) + svc := NewService(repo, sc, noopLinkVerifier{}, noopMarketCreator{}) summary, err := svc.Crawl(context.Background()) if err != nil { diff --git a/backend/internal/server/routes.go b/backend/internal/server/routes.go index d6f4652..a8b5279 100644 --- a/backend/internal/server/routes.go +++ b/backend/internal/server/routes.go @@ -73,17 +73,8 @@ func (s *Server) registerRoutes() { // Discovery routes discoveryRepo := discovery.NewRepository(s.db) - discoveryAgent := discovery.NewAgentClient(aiClient, s.cfg.AI.AgentDiscovery) - discoveryService := discovery.NewService( - discoveryRepo, - discoveryAgent, - marketSvc, - s.cfg.Discovery.BatchSize, - s.cfg.Discovery.ForwardMonths, - ) - // Wire the crawler for the Crawl path (MR 1 keeps both Tick and Crawl paths) crawlerInstance := crawler.NewCrawler(s.cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs()) - discoveryService.SetCrawler(crawlerInstance) + discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc) discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour) requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token) discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken) diff --git a/web/src/routes/admin/discovery/+page.server.ts b/web/src/routes/admin/discovery/+page.server.ts index 7d1b11b..f91d161 100644 --- a/web/src/routes/admin/discovery/+page.server.ts +++ b/web/src/routes/admin/discovery/+page.server.ts @@ -1,6 +1,23 @@ import type { PageServerLoad, Actions } from './$types.js'; import { redirect, fail } from '@sveltejs/kit'; import { serverFetch } from '$lib/api/client.server.js'; +import { ApiClientError } from '$lib/api/client.js'; + +type CrawlSummary = { + started_at: string; + duration_ms: number; + per_source: Record; + merged: number; + merged_across_sites: number; + discovered: number; + deduped_existing: number; + deduped_rejected: number; + deduped_queue: number; + link_check_failed: number; + validation_failed: number; + date_conflicts: number; + source_errors: Array<{ source: string; error: string }>; +}; type DiscoveredMarket = { id: string; @@ -94,6 +111,24 @@ export const actions: Actions = { } }, + crawl: async ({ cookies, fetch }) => { + try { + const res = await serverFetch(`/admin/discovery/crawl-manual`, cookies, { + method: 'POST', + fetch + }); + return { crawlSummary: res.data }; + } catch (err) { + if (err instanceof ApiClientError && err.status === 429) { + return fail(429, { + crawlError: `Rate limit erreicht. Bitte kurz warten und erneut versuchen. (${err.message})` + }); + } + const message = err instanceof Error ? err.message : 'Crawl fehlgeschlagen.'; + return fail(500, { crawlError: message }); + } + }, + update: async ({ request, cookies, fetch }) => { const form = await request.formData(); const id = String(form.get('id') ?? ''); diff --git a/web/src/routes/admin/discovery/+page.svelte b/web/src/routes/admin/discovery/+page.svelte index 775cc9a..70fc31b 100644 --- a/web/src/routes/admin/discovery/+page.svelte +++ b/web/src/routes/admin/discovery/+page.svelte @@ -1,6 +1,8 @@