diff --git a/backend/internal/domain/discovery/handler.go b/backend/internal/domain/discovery/handler.go index 993c2e3..455d6b3 100644 --- a/backend/internal/domain/discovery/handler.go +++ b/backend/internal/domain/discovery/handler.go @@ -1,11 +1,13 @@ package discovery import ( + "context" "fmt" "log/slog" "net/http" "strconv" "sync" + "sync/atomic" "time" "github.com/gin-gonic/gin" @@ -15,10 +17,21 @@ import ( ) type Handler struct { - service *Service - crawlMu sync.Mutex + service *Service + + // Rate limit (admin-session only). crawlRateLimit == 0 means disabled. + rateMu sync.Mutex crawlLastManual time.Time crawlRateLimit time.Duration + + // Async crawl state. + crawlRunning atomic.Bool + resultMu sync.RWMutex + // The following are guarded by resultMu: + lastStartedAt time.Time + lastFinishedAt time.Time + lastSummary *CrawlSummary + lastError string } // NewHandler constructs a Handler. manualRateLimitPerHour controls how @@ -43,21 +56,26 @@ func NewHandler(s *Service, manualRateLimitPerHour int) *Handler { return &Handler{service: s, crawlRateLimit: rl} } -// Crawl runs the crawler once and returns CrawlSummary. A per-process mutex -// blocks concurrent runs. The manual-admin rate limit is bypassed when -// "crawl_bypass_rate_limit" is set in the gin context (set by the bearer-token -// middleware path). +// Crawl kicks off a crawler run in the background and returns 202 immediately. +// A per-process CAS gate prevents concurrent runs. Completion is observable +// via GET .../crawl-status or backend logs ("crawl completed" / "async crawl +// failed"). func (h *Handler) Crawl(c *gin.Context) { - if !h.crawlMu.TryLock() { + // Prevent concurrent runs (replaces TryLock). + if !h.crawlRunning.CompareAndSwap(false, true) { apiErr := apierror.TooManyRequests("A crawler run is already in progress") c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) return } - defer h.crawlMu.Unlock() + // Rate limit (admin-session only — bypass via crawl_bypass_rate_limit). if _, bypass := c.Get("crawl_bypass_rate_limit"); !bypass && h.crawlRateLimit > 0 { - if since := time.Since(h.crawlLastManual); since < h.crawlRateLimit { + h.rateMu.Lock() + since := time.Since(h.crawlLastManual) + if since < h.crawlRateLimit { retryIn := (h.crawlRateLimit - since).Seconds() + h.rateMu.Unlock() + h.crawlRunning.Store(false) // release gate since we refused c.Header("Retry-After", fmt.Sprint(int(retryIn)+1)) apiErr := apierror.TooManyRequests( fmt.Sprintf("Manual crawl rate-limited; try again in ~%.0fs", retryIn), @@ -66,16 +84,74 @@ func (h *Handler) Crawl(c *gin.Context) { return } h.crawlLastManual = time.Now() + h.rateMu.Unlock() } - summary, err := h.service.Crawl(c.Request.Context()) + // Reset + mark started. + h.resultMu.Lock() + h.lastStartedAt = time.Now().UTC() + h.lastFinishedAt = time.Time{} + h.lastSummary = nil + h.lastError = "" + h.resultMu.Unlock() + + go h.runCrawlAsync() + + c.JSON(http.StatusAccepted, gin.H{ + "data": gin.H{ + "status": "started", + "message": "Crawl started in background. Poll /api/v1/admin/discovery/crawl-status for completion.", + }, + }) +} + +// runCrawlAsync executes the crawl and stores the outcome for crawl-status. +// Uses a detached context with a 5-minute cap so a restarted gateway/admin +// session doesn't terminate in-flight work. +func (h *Handler) runCrawlAsync() { + defer h.crawlRunning.Store(false) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + summary, err := h.service.Crawl(ctx) + + h.resultMu.Lock() + h.lastFinishedAt = time.Now().UTC() if err != nil { - slog.ErrorContext(c.Request.Context(), "discovery crawl failed", "error", err) - apiErr := apierror.Internal("crawl failed") - c.JSON(apiErr.Status, apierror.NewResponse(apiErr)) - return + h.lastError = err.Error() + slog.ErrorContext(ctx, "async crawl failed", "error", err) + } else { + summaryCopy := summary + h.lastSummary = &summaryCopy } - c.JSON(http.StatusOK, gin.H{"data": summary}) + h.resultMu.Unlock() +} + +// CrawlStatus returns the state of the most recent async crawl. +// Shape: +// +// { +// "data": { +// "running": bool, +// "started_at": "...", // zero if no crawl yet run +// "finished_at": "...", // zero while running or if no crawl yet +// "summary": CrawlSummary?, // null if running, errored, or not yet run +// "error": string? // non-empty only if last run failed +// } +// } +func (h *Handler) CrawlStatus(c *gin.Context) { + h.resultMu.RLock() + defer h.resultMu.RUnlock() + c.JSON(http.StatusOK, gin.H{ + "data": gin.H{ + "running": h.crawlRunning.Load(), + "started_at": h.lastStartedAt, + "finished_at": h.lastFinishedAt, + "summary": h.lastSummary, + "error": h.lastError, + }, + }) } func (h *Handler) Stats(c *gin.Context) { diff --git a/backend/internal/domain/discovery/handler_test.go b/backend/internal/domain/discovery/handler_test.go index ce135c7..721acc0 100644 --- a/backend/internal/domain/discovery/handler_test.go +++ b/backend/internal/domain/discovery/handler_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "net/http" "net/http/httptest" - "sync" "testing" "time" @@ -18,70 +17,171 @@ func init() { gin.SetMode(gin.TestMode) } -// blockingCrawlerRunner blocks until its ctx is cancelled. +// blockingCrawlerRunner blocks until either its release channel is closed +// or the context is cancelled. type blockingCrawlerRunner struct { started chan struct{} + release chan struct{} } func (b *blockingCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult, error) { close(b.started) - <-ctx.Done() - return crawler.CrawlResult{}, ctx.Err() -} - -// TestCrawlHandlerMutexReentry verifies that a second concurrent Crawl request -// gets HTTP 429 while the first is still running. -func TestCrawlHandlerMutexReentry(t *testing.T) { - bc := &blockingCrawlerRunner{started: make(chan struct{})} - svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}) - h := NewHandler(svc, 1) - - // First request — runs in a goroutine so the handler blocks. - w1 := httptest.NewRecorder() - c1, _ := gin.CreateTestContext(w1) - ctx1, cancel1 := context.WithCancel(context.Background()) - defer cancel1() - c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil).WithContext(ctx1) - c1.Set("crawl_bypass_rate_limit", true) // bypass rate limit; test only mutex - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - h.Crawl(c1) - }() - - // Wait until the blocking crawler has started and holds the mutex. - <-bc.started - - // Second request — should see 429 because mutex is held. - w2 := httptest.NewRecorder() - c2, _ := gin.CreateTestContext(w2) - c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil) - c2.Set("crawl_bypass_rate_limit", true) - - h.Crawl(c2) - - if w2.Code != http.StatusTooManyRequests { - t.Errorf("expected 429 from second concurrent request, got %d", w2.Code) + select { + case <-b.release: + return crawler.CrawlResult{}, nil + case <-ctx.Done(): + return crawler.CrawlResult{}, ctx.Err() } - - // Cancel the first request so the goroutine can unblock. - cancel1() - wg.Wait() } -// TestCrawlHandlerRateLimit verifies that a second manual (non-bypass) request -// within the rate limit window returns 429 with Retry-After. -func TestCrawlHandlerRateLimit(t *testing.T) { - // Use an instant-returning crawler so the mutex is released quickly. +// waitFor polls cond every 2ms until it returns true or deadline passes. +func waitFor(t *testing.T, deadline time.Duration, cond func() bool) { + t.Helper() + end := time.Now().Add(deadline) + for time.Now().Before(end) { + if cond() { + return + } + time.Sleep(2 * time.Millisecond) + } + t.Fatal("condition not met within deadline") +} + +// TestCrawlHandlerReturns202AndStartsCrawl verifies that POST /crawl returns +// 202 immediately and that the background goroutine completes with a summary. +func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) { svc := NewService( newMockRepo(), &stubCrawlerRunner{result: crawler.CrawlResult{}}, noopLinkVerifier{}, noopMarketCreator{}, ) - // Set a long rate limit (1 per hour = 1h window). + h := NewHandler(svc, 0) // rate limit disabled + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil) + c.Set("crawl_bypass_rate_limit", true) + + start := time.Now() + h.Crawl(c) + elapsed := time.Since(start) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected 202, got %d body=%s", w.Code, w.Body.String()) + } + if elapsed > 50*time.Millisecond { + t.Errorf("handler took %v; expected < 50ms (fire-and-forget)", elapsed) + } + + // Wait for goroutine to finish. + waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() }) + + h.resultMu.RLock() + summary := h.lastSummary + crawlErr := h.lastError + h.resultMu.RUnlock() + + if summary == nil { + t.Errorf("lastSummary is nil after crawl completed (error: %q)", crawlErr) + } +} + +// TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler. +func TestCrawlStatusInitialState(t *testing.T) { + svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{}) + h := NewHandler(svc, 0) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/admin/discovery/crawl-status", nil) + + h.CrawlStatus(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + + var resp struct { + Data struct { + Running bool `json:"running"` + StartedAt time.Time `json:"started_at"` + FinishedAt time.Time `json:"finished_at"` + Summary interface{} `json:"summary"` + Error string `json:"error"` + } `json:"data"` + } + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("decode response: %v", err) + } + + if resp.Data.Running { + t.Error("expected running=false on fresh handler") + } + if resp.Data.Summary != nil { + t.Errorf("expected summary=null, got %v", resp.Data.Summary) + } + if resp.Data.Error != "" { + t.Errorf("expected error='', got %q", resp.Data.Error) + } + if !resp.Data.StartedAt.IsZero() { + t.Errorf("expected started_at zero, got %v", resp.Data.StartedAt) + } + if !resp.Data.FinishedAt.IsZero() { + t.Errorf("expected finished_at zero, got %v", resp.Data.FinishedAt) + } +} + +// TestCrawlHandlerConcurrentReturnsTooManyRequests verifies that a second +// concurrent Crawl request gets HTTP 429 while the first is still running. +func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) { + bc := &blockingCrawlerRunner{ + started: make(chan struct{}), + release: make(chan struct{}), + } + svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{}) + h := NewHandler(svc, 0) // rate limit disabled + + // First request — returns 202 and spawns goroutine. + w1 := httptest.NewRecorder() + c1, _ := gin.CreateTestContext(w1) + c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil) + c1.Set("crawl_bypass_rate_limit", true) + h.Crawl(c1) + + if w1.Code != http.StatusAccepted { + t.Fatalf("first request: expected 202, got %d", w1.Code) + } + + // Wait until the blocking crawler has actually started (gate is held). + <-bc.started + + // Second request — should see 429 because crawlRunning is true. + w2 := httptest.NewRecorder() + c2, _ := gin.CreateTestContext(w2) + c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil) + c2.Set("crawl_bypass_rate_limit", true) + h.Crawl(c2) + + if w2.Code != http.StatusTooManyRequests { + t.Errorf("expected 429 from second concurrent request, got %d", w2.Code) + } + + // Unblock the first goroutine so it finishes cleanly. + close(bc.release) + waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() }) +} + +// TestCrawlHandlerRateLimit verifies that consecutive manual requests within +// the rate-limit window get HTTP 429 with Retry-After after the first finishes. +func TestCrawlHandlerRateLimit(t *testing.T) { + svc := NewService( + newMockRepo(), + &stubCrawlerRunner{result: crawler.CrawlResult{}}, + noopLinkVerifier{}, + noopMarketCreator{}, + ) + // 1 per hour window. h := NewHandler(svc, 1) doRequest := func(bypass bool) *httptest.ResponseRecorder { @@ -95,12 +195,17 @@ func TestCrawlHandlerRateLimit(t *testing.T) { return w } - // First request — not bypassed. Should succeed (200). + // First request — not bypassed. Should be accepted (202). w1 := doRequest(false) - if w1.Code != http.StatusOK { - t.Fatalf("first request: expected 200, got %d body=%s", w1.Code, w1.Body.String()) + if w1.Code != http.StatusAccepted { + t.Fatalf("first request: expected 202, got %d body=%s", w1.Code, w1.Body.String()) } + // Wait for the first goroutine to finish so crawlRunning drops to false. + // This ensures the second request is blocked by the rate limiter, not by + // the CAS gate. + waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() }) + // Second request immediately after — should be rate-limited (429). w2 := doRequest(false) if w2.Code != http.StatusTooManyRequests { @@ -110,41 +215,9 @@ func TestCrawlHandlerRateLimit(t *testing.T) { t.Error("second request: expected Retry-After header to be set") } - // Bypassed request should succeed even within the window. + // Bypassed request should be accepted even within the window. w3 := doRequest(true) - if w3.Code != http.StatusOK { - t.Errorf("bypass request: expected 200, got %d body=%s", w3.Code, w3.Body.String()) - } -} - -// TestCrawlHandlerRateLimitResets verifies that a manual request succeeds once -// the rate limit window has elapsed. -func TestCrawlHandlerRateLimitResets(t *testing.T) { - svc := NewService( - newMockRepo(), - &stubCrawlerRunner{result: crawler.CrawlResult{}}, - noopLinkVerifier{}, - noopMarketCreator{}, - ) - h := NewHandler(svc, 1) - - // Force crawlLastManual to well in the past so the window has expired. - h.crawlLastManual = time.Now().Add(-2 * time.Hour) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil) - h.Crawl(c) - - if w.Code != http.StatusOK { - t.Errorf("expected 200 after window reset, got %d body=%s", w.Code, w.Body.String()) - } - - var resp map[string]interface{} - if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { - t.Fatalf("response body not valid JSON: %v", err) - } - if _, ok := resp["data"]; !ok { - t.Error("expected 'data' key in response body") + if w3.Code != http.StatusAccepted { + t.Errorf("bypass request: expected 202, got %d body=%s", w3.Code, w3.Body.String()) } } diff --git a/backend/internal/domain/discovery/routes.go b/backend/internal/domain/discovery/routes.go index 8a6b310..158b4e1 100644 --- a/backend/internal/domain/discovery/routes.go +++ b/backend/internal/domain/discovery/routes.go @@ -25,5 +25,7 @@ func RegisterRoutes( admin.POST("/queue/:id/reject", h.Reject) // Manual crawl trigger — subject to hourly rate limit. admin.POST("/crawl-manual", h.Crawl) + // Async crawl status polling. + admin.GET("/crawl-status", h.CrawlStatus) } } diff --git a/web/src/routes/admin/discovery/+page.server.ts b/web/src/routes/admin/discovery/+page.server.ts index f91d161..c0b7464 100644 --- a/web/src/routes/admin/discovery/+page.server.ts +++ b/web/src/routes/admin/discovery/+page.server.ts @@ -3,22 +3,6 @@ import { redirect, fail } from '@sveltejs/kit'; import { serverFetch } from '$lib/api/client.server.js'; import { ApiClientError } from '$lib/api/client.js'; -type CrawlSummary = { - started_at: string; - duration_ms: number; - per_source: Record; - merged: number; - merged_across_sites: number; - discovered: number; - deduped_existing: number; - deduped_rejected: number; - deduped_queue: number; - link_check_failed: number; - validation_failed: number; - date_conflicts: number; - source_errors: Array<{ source: string; error: string }>; -}; - type DiscoveredMarket = { id: string; markt_name: string; @@ -113,11 +97,11 @@ export const actions: Actions = { crawl: async ({ cookies, fetch }) => { try { - const res = await serverFetch(`/admin/discovery/crawl-manual`, cookies, { + await serverFetch(`/admin/discovery/crawl-manual`, cookies, { method: 'POST', fetch }); - return { crawlSummary: res.data }; + return { crawlStarted: true }; } catch (err) { if (err instanceof ApiClientError && err.status === 429) { return fail(429, { diff --git a/web/src/routes/admin/discovery/+page.svelte b/web/src/routes/admin/discovery/+page.svelte index 70fc31b..5cf141a 100644 --- a/web/src/routes/admin/discovery/+page.svelte +++ b/web/src/routes/admin/discovery/+page.svelte @@ -1,8 +1,36 @@ @@ -136,8 +228,7 @@ use:enhance={() => { crawling = true; return async ({ update }) => { - crawling = false; - await update(); + await update({ reset: false }); }; }} > @@ -146,7 +237,11 @@ disabled={crawling} class="rounded bg-indigo-600 px-3 py-1.5 text-sm text-white hover:bg-indigo-700 disabled:cursor-not-allowed disabled:opacity-60" > - {crawling ? 'Crawling…' : 'Run crawl'} + {#if crawling} + Crawling…{elapsedLabel ? ` (${elapsedLabel})` : ''} + {:else} + Run crawl + {/if} @@ -159,8 +254,16 @@ {/if} - {#if form?.crawlSummary} - {@const s = form.crawlSummary} + {#if displayError} +
+ Crawl fehlgeschlagen: {displayError} +
+ {/if} + + {#if displaySummary} + {@const s = displaySummary}
diff --git a/web/src/routes/admin/discovery/crawl-status/+server.ts b/web/src/routes/admin/discovery/crawl-status/+server.ts new file mode 100644 index 0000000..6289d8d --- /dev/null +++ b/web/src/routes/admin/discovery/crawl-status/+server.ts @@ -0,0 +1,12 @@ +import { json } from '@sveltejs/kit'; +import { serverFetch } from '$lib/api/client.server.js'; +import type { RequestHandler } from './$types.js'; + +export const GET: RequestHandler = async ({ cookies }) => { + try { + const res = await serverFetch('/admin/discovery/crawl-status', cookies); + return json(res.data); + } catch { + return json({ error: 'Failed to fetch crawl status' }, { status: 502 }); + } +};