feat(discovery): async crawl — 202 Accepted, status endpoint, UI polls

Handler.Crawl now spawns a goroutine with a 5-minute detached context
and returns 202 immediately. Admin UI polls the new
GET /admin/discovery/crawl-status every 3s until running=false, then
renders CrawlSummary. Bypasses the 60s nginx-gateway proxy_read_timeout
entirely — HTTP requests are all sub-second.

Concurrency: atomic.Bool guard (CompareAndSwap) replaces TryLock,
resultMu RWMutex protects the summary/error state, rateMu protects
the rate-limit check. Rate limit semantics unchanged (still applies
to admin-session path, bearer-token bypass via context flag).
This commit is contained in:
2026-04-18 19:24:48 +02:00
parent 2ea8a9a6f3
commit 9f286b8029
6 changed files with 376 additions and 126 deletions

View File

@@ -1,11 +1,13 @@
package discovery
import (
"context"
"fmt"
"log/slog"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gin-gonic/gin"
@@ -15,10 +17,21 @@ import (
)
type Handler struct {
service *Service
crawlMu sync.Mutex
service *Service
// Rate limit (admin-session only). crawlRateLimit == 0 means disabled.
rateMu sync.Mutex
crawlLastManual time.Time
crawlRateLimit time.Duration
// Async crawl state.
crawlRunning atomic.Bool
resultMu sync.RWMutex
// The following are guarded by resultMu:
lastStartedAt time.Time
lastFinishedAt time.Time
lastSummary *CrawlSummary
lastError string
}
// NewHandler constructs a Handler. manualRateLimitPerHour controls how
@@ -43,21 +56,26 @@ func NewHandler(s *Service, manualRateLimitPerHour int) *Handler {
return &Handler{service: s, crawlRateLimit: rl}
}
// Crawl runs the crawler once and returns CrawlSummary. A per-process mutex
// blocks concurrent runs. The manual-admin rate limit is bypassed when
// "crawl_bypass_rate_limit" is set in the gin context (set by the bearer-token
// middleware path).
// Crawl kicks off a crawler run in the background and returns 202 immediately.
// A per-process CAS gate prevents concurrent runs. Completion is observable
// via GET .../crawl-status or backend logs ("crawl completed" / "async crawl
// failed").
func (h *Handler) Crawl(c *gin.Context) {
if !h.crawlMu.TryLock() {
// Prevent concurrent runs (replaces TryLock).
if !h.crawlRunning.CompareAndSwap(false, true) {
apiErr := apierror.TooManyRequests("A crawler run is already in progress")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
defer h.crawlMu.Unlock()
// Rate limit (admin-session only — bypass via crawl_bypass_rate_limit).
if _, bypass := c.Get("crawl_bypass_rate_limit"); !bypass && h.crawlRateLimit > 0 {
if since := time.Since(h.crawlLastManual); since < h.crawlRateLimit {
h.rateMu.Lock()
since := time.Since(h.crawlLastManual)
if since < h.crawlRateLimit {
retryIn := (h.crawlRateLimit - since).Seconds()
h.rateMu.Unlock()
h.crawlRunning.Store(false) // release gate since we refused
c.Header("Retry-After", fmt.Sprint(int(retryIn)+1))
apiErr := apierror.TooManyRequests(
fmt.Sprintf("Manual crawl rate-limited; try again in ~%.0fs", retryIn),
@@ -66,16 +84,74 @@ func (h *Handler) Crawl(c *gin.Context) {
return
}
h.crawlLastManual = time.Now()
h.rateMu.Unlock()
}
summary, err := h.service.Crawl(c.Request.Context())
// Reset + mark started.
h.resultMu.Lock()
h.lastStartedAt = time.Now().UTC()
h.lastFinishedAt = time.Time{}
h.lastSummary = nil
h.lastError = ""
h.resultMu.Unlock()
go h.runCrawlAsync()
c.JSON(http.StatusAccepted, gin.H{
"data": gin.H{
"status": "started",
"message": "Crawl started in background. Poll /api/v1/admin/discovery/crawl-status for completion.",
},
})
}
// runCrawlAsync executes the crawl and stores the outcome for crawl-status.
// Uses a detached context with a 5-minute cap so a restarted gateway/admin
// session doesn't terminate in-flight work.
func (h *Handler) runCrawlAsync() {
defer h.crawlRunning.Store(false)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
summary, err := h.service.Crawl(ctx)
h.resultMu.Lock()
h.lastFinishedAt = time.Now().UTC()
if err != nil {
slog.ErrorContext(c.Request.Context(), "discovery crawl failed", "error", err)
apiErr := apierror.Internal("crawl failed")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
h.lastError = err.Error()
slog.ErrorContext(ctx, "async crawl failed", "error", err)
} else {
summaryCopy := summary
h.lastSummary = &summaryCopy
}
c.JSON(http.StatusOK, gin.H{"data": summary})
h.resultMu.Unlock()
}
// CrawlStatus returns the state of the most recent async crawl.
// Shape:
//
// {
// "data": {
// "running": bool,
// "started_at": "...", // zero if no crawl yet run
// "finished_at": "...", // zero while running or if no crawl yet
// "summary": CrawlSummary?, // null if running, errored, or not yet run
// "error": string? // non-empty only if last run failed
// }
// }
func (h *Handler) CrawlStatus(c *gin.Context) {
h.resultMu.RLock()
defer h.resultMu.RUnlock()
c.JSON(http.StatusOK, gin.H{
"data": gin.H{
"running": h.crawlRunning.Load(),
"started_at": h.lastStartedAt,
"finished_at": h.lastFinishedAt,
"summary": h.lastSummary,
"error": h.lastError,
},
})
}
func (h *Handler) Stats(c *gin.Context) {

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
@@ -18,70 +17,171 @@ func init() {
gin.SetMode(gin.TestMode)
}
// blockingCrawlerRunner blocks until its ctx is cancelled.
// blockingCrawlerRunner blocks until either its release channel is closed
// or the context is cancelled.
type blockingCrawlerRunner struct {
started chan struct{}
release chan struct{}
}
func (b *blockingCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult, error) {
close(b.started)
<-ctx.Done()
return crawler.CrawlResult{}, ctx.Err()
}
// TestCrawlHandlerMutexReentry verifies that a second concurrent Crawl request
// gets HTTP 429 while the first is still running.
func TestCrawlHandlerMutexReentry(t *testing.T) {
bc := &blockingCrawlerRunner{started: make(chan struct{})}
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
h := NewHandler(svc, 1)
// First request — runs in a goroutine so the handler blocks.
w1 := httptest.NewRecorder()
c1, _ := gin.CreateTestContext(w1)
ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil).WithContext(ctx1)
c1.Set("crawl_bypass_rate_limit", true) // bypass rate limit; test only mutex
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
h.Crawl(c1)
}()
// Wait until the blocking crawler has started and holds the mutex.
<-bc.started
// Second request — should see 429 because mutex is held.
w2 := httptest.NewRecorder()
c2, _ := gin.CreateTestContext(w2)
c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
c2.Set("crawl_bypass_rate_limit", true)
h.Crawl(c2)
if w2.Code != http.StatusTooManyRequests {
t.Errorf("expected 429 from second concurrent request, got %d", w2.Code)
select {
case <-b.release:
return crawler.CrawlResult{}, nil
case <-ctx.Done():
return crawler.CrawlResult{}, ctx.Err()
}
// Cancel the first request so the goroutine can unblock.
cancel1()
wg.Wait()
}
// TestCrawlHandlerRateLimit verifies that a second manual (non-bypass) request
// within the rate limit window returns 429 with Retry-After.
func TestCrawlHandlerRateLimit(t *testing.T) {
// Use an instant-returning crawler so the mutex is released quickly.
// waitFor polls cond every 2ms until it returns true or deadline passes.
func waitFor(t *testing.T, deadline time.Duration, cond func() bool) {
t.Helper()
end := time.Now().Add(deadline)
for time.Now().Before(end) {
if cond() {
return
}
time.Sleep(2 * time.Millisecond)
}
t.Fatal("condition not met within deadline")
}
// TestCrawlHandlerReturns202AndStartsCrawl verifies that POST /crawl returns
// 202 immediately and that the background goroutine completes with a summary.
func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
svc := NewService(
newMockRepo(),
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
)
// Set a long rate limit (1 per hour = 1h window).
h := NewHandler(svc, 0) // rate limit disabled
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
c.Set("crawl_bypass_rate_limit", true)
start := time.Now()
h.Crawl(c)
elapsed := time.Since(start)
if w.Code != http.StatusAccepted {
t.Fatalf("expected 202, got %d body=%s", w.Code, w.Body.String())
}
if elapsed > 50*time.Millisecond {
t.Errorf("handler took %v; expected < 50ms (fire-and-forget)", elapsed)
}
// Wait for goroutine to finish.
waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() })
h.resultMu.RLock()
summary := h.lastSummary
crawlErr := h.lastError
h.resultMu.RUnlock()
if summary == nil {
t.Errorf("lastSummary is nil after crawl completed (error: %q)", crawlErr)
}
}
// TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler.
func TestCrawlStatusInitialState(t *testing.T) {
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{})
h := NewHandler(svc, 0)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodGet, "/admin/discovery/crawl-status", nil)
h.CrawlStatus(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var resp struct {
Data struct {
Running bool `json:"running"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
Summary interface{} `json:"summary"`
Error string `json:"error"`
} `json:"data"`
}
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode response: %v", err)
}
if resp.Data.Running {
t.Error("expected running=false on fresh handler")
}
if resp.Data.Summary != nil {
t.Errorf("expected summary=null, got %v", resp.Data.Summary)
}
if resp.Data.Error != "" {
t.Errorf("expected error='', got %q", resp.Data.Error)
}
if !resp.Data.StartedAt.IsZero() {
t.Errorf("expected started_at zero, got %v", resp.Data.StartedAt)
}
if !resp.Data.FinishedAt.IsZero() {
t.Errorf("expected finished_at zero, got %v", resp.Data.FinishedAt)
}
}
// TestCrawlHandlerConcurrentReturnsTooManyRequests verifies that a second
// concurrent Crawl request gets HTTP 429 while the first is still running.
func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) {
bc := &blockingCrawlerRunner{
started: make(chan struct{}),
release: make(chan struct{}),
}
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
h := NewHandler(svc, 0) // rate limit disabled
// First request — returns 202 and spawns goroutine.
w1 := httptest.NewRecorder()
c1, _ := gin.CreateTestContext(w1)
c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
c1.Set("crawl_bypass_rate_limit", true)
h.Crawl(c1)
if w1.Code != http.StatusAccepted {
t.Fatalf("first request: expected 202, got %d", w1.Code)
}
// Wait until the blocking crawler has actually started (gate is held).
<-bc.started
// Second request — should see 429 because crawlRunning is true.
w2 := httptest.NewRecorder()
c2, _ := gin.CreateTestContext(w2)
c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
c2.Set("crawl_bypass_rate_limit", true)
h.Crawl(c2)
if w2.Code != http.StatusTooManyRequests {
t.Errorf("expected 429 from second concurrent request, got %d", w2.Code)
}
// Unblock the first goroutine so it finishes cleanly.
close(bc.release)
waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() })
}
// TestCrawlHandlerRateLimit verifies that consecutive manual requests within
// the rate-limit window get HTTP 429 with Retry-After after the first finishes.
func TestCrawlHandlerRateLimit(t *testing.T) {
svc := NewService(
newMockRepo(),
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
)
// 1 per hour window.
h := NewHandler(svc, 1)
doRequest := func(bypass bool) *httptest.ResponseRecorder {
@@ -95,12 +195,17 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
return w
}
// First request — not bypassed. Should succeed (200).
// First request — not bypassed. Should be accepted (202).
w1 := doRequest(false)
if w1.Code != http.StatusOK {
t.Fatalf("first request: expected 200, got %d body=%s", w1.Code, w1.Body.String())
if w1.Code != http.StatusAccepted {
t.Fatalf("first request: expected 202, got %d body=%s", w1.Code, w1.Body.String())
}
// Wait for the first goroutine to finish so crawlRunning drops to false.
// This ensures the second request is blocked by the rate limiter, not by
// the CAS gate.
waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() })
// Second request immediately after — should be rate-limited (429).
w2 := doRequest(false)
if w2.Code != http.StatusTooManyRequests {
@@ -110,41 +215,9 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
t.Error("second request: expected Retry-After header to be set")
}
// Bypassed request should succeed even within the window.
// Bypassed request should be accepted even within the window.
w3 := doRequest(true)
if w3.Code != http.StatusOK {
t.Errorf("bypass request: expected 200, got %d body=%s", w3.Code, w3.Body.String())
}
}
// TestCrawlHandlerRateLimitResets verifies that a manual request succeeds once
// the rate limit window has elapsed.
func TestCrawlHandlerRateLimitResets(t *testing.T) {
svc := NewService(
newMockRepo(),
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
)
h := NewHandler(svc, 1)
// Force crawlLastManual to well in the past so the window has expired.
h.crawlLastManual = time.Now().Add(-2 * time.Hour)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
h.Crawl(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200 after window reset, got %d body=%s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("response body not valid JSON: %v", err)
}
if _, ok := resp["data"]; !ok {
t.Error("expected 'data' key in response body")
if w3.Code != http.StatusAccepted {
t.Errorf("bypass request: expected 202, got %d body=%s", w3.Code, w3.Body.String())
}
}

View File

@@ -25,5 +25,7 @@ func RegisterRoutes(
admin.POST("/queue/:id/reject", h.Reject)
// Manual crawl trigger — subject to hourly rate limit.
admin.POST("/crawl-manual", h.Crawl)
// Async crawl status polling.
admin.GET("/crawl-status", h.CrawlStatus)
}
}

View File

@@ -3,22 +3,6 @@ import { redirect, fail } from '@sveltejs/kit';
import { serverFetch } from '$lib/api/client.server.js';
import { ApiClientError } from '$lib/api/client.js';
type CrawlSummary = {
started_at: string;
duration_ms: number;
per_source: Record<string, { events_fetched: number; elapsed_ms: number }>;
merged: number;
merged_across_sites: number;
discovered: number;
deduped_existing: number;
deduped_rejected: number;
deduped_queue: number;
link_check_failed: number;
validation_failed: number;
date_conflicts: number;
source_errors: Array<{ source: string; error: string }>;
};
type DiscoveredMarket = {
id: string;
markt_name: string;
@@ -113,11 +97,11 @@ export const actions: Actions = {
crawl: async ({ cookies, fetch }) => {
try {
const res = await serverFetch<CrawlSummary>(`/admin/discovery/crawl-manual`, cookies, {
await serverFetch<unknown>(`/admin/discovery/crawl-manual`, cookies, {
method: 'POST',
fetch
});
return { crawlSummary: res.data };
return { crawlStarted: true };
} catch (err) {
if (err instanceof ApiClientError && err.status === 429) {
return fail(429, {

View File

@@ -1,8 +1,36 @@
<script lang="ts">
import { onMount } from 'svelte';
import { enhance } from '$app/forms';
let { data, form } = $props();
type CrawlSummary = {
started_at: string;
duration_ms: number;
per_source: Record<string, { events_fetched: number; elapsed_ms: number }>;
merged: number;
merged_across_sites: number;
discovered: number;
deduped_existing: number;
deduped_rejected: number;
deduped_queue: number;
link_check_failed: number;
validation_failed: number;
date_conflicts: number;
source_errors: Array<{ source: string; error: string }>;
};
type CrawlStatus = {
running: boolean;
started_at: string;
finished_at: string;
summary: CrawlSummary | null;
error: string;
};
let crawling = $state(false);
let crawlStatus = $state<CrawlStatus | null>(null);
let pollInterval: ReturnType<typeof setInterval> | null = null;
// Coalesce nullable list fields (Go encodes nil slices as null).
const queue = $derived(data.queue ?? []);
@@ -63,6 +91,70 @@
!data.stats.last_tick_at ||
Date.now() - new Date(data.stats.last_tick_at).getTime() > 6 * 60 * 60 * 1000
);
async function fetchCrawlStatus(): Promise<CrawlStatus | null> {
try {
const res = await fetch('/admin/discovery/crawl-status');
if (!res.ok) return null;
return (await res.json()) as CrawlStatus;
} catch {
return null;
}
}
function stopPolling() {
if (pollInterval !== null) {
clearInterval(pollInterval);
pollInterval = null;
}
}
function startPolling() {
if (pollInterval !== null) return; // already polling
pollInterval = setInterval(async () => {
const status = await fetchCrawlStatus();
if (status === null) return;
crawlStatus = status;
if (!status.running) {
stopPolling();
crawling = false;
}
}, 3000);
}
// On mount: fetch current status so a page refresh picks up any in-progress
// or recently completed crawl.
onMount(async () => {
const status = await fetchCrawlStatus();
if (status === null) return;
crawlStatus = status;
if (status.running) {
crawling = true;
startPolling();
}
});
// When the crawl action returns crawlStarted, begin polling.
$effect(() => {
if (form?.crawlStarted) {
crawling = true;
startPolling();
}
});
// Elapsed time display while running.
const elapsedLabel = $derived.by(() => {
if (!crawlStatus?.running || !crawlStatus.started_at) return '';
const ms = Date.now() - new Date(crawlStatus.started_at).getTime();
const s = Math.round(ms / 1000);
return s < 60 ? `${s}s` : `${Math.floor(s / 60)}m ${s % 60}s`;
});
// The summary to display: either from poll result or a completed status.
const displaySummary = $derived(crawlStatus?.summary ?? null);
const displayError = $derived(
crawlStatus && !crawlStatus.running && crawlStatus.error ? crawlStatus.error : null
);
</script>
<svelte:head>
@@ -136,8 +228,7 @@
use:enhance={() => {
crawling = true;
return async ({ update }) => {
crawling = false;
await update();
await update({ reset: false });
};
}}
>
@@ -146,7 +237,11 @@
disabled={crawling}
class="rounded bg-indigo-600 px-3 py-1.5 text-sm text-white hover:bg-indigo-700 disabled:cursor-not-allowed disabled:opacity-60"
>
{crawling ? 'Crawling…' : 'Run crawl'}
{#if crawling}
Crawling…{elapsedLabel ? ` (${elapsedLabel})` : ''}
{:else}
Run crawl
{/if}
</button>
</form>
</div>
@@ -159,8 +254,16 @@
</div>
{/if}
{#if form?.crawlSummary}
{@const s = form.crawlSummary}
{#if displayError}
<div
class="mt-4 rounded border border-red-300 bg-red-50 px-3 py-2 text-sm text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
>
Crawl fehlgeschlagen: {displayError}
</div>
{/if}
{#if displaySummary}
{@const s = displaySummary}
<div
class="mt-4 rounded-lg border border-stone-200 bg-stone-50 p-4 text-sm dark:border-stone-700 dark:bg-stone-900"
>

View File

@@ -0,0 +1,12 @@
import { json } from '@sveltejs/kit';
import { serverFetch } from '$lib/api/client.server.js';
import type { RequestHandler } from './$types.js';
export const GET: RequestHandler = async ({ cookies }) => {
try {
const res = await serverFetch<unknown>('/admin/discovery/crawl-status', cookies);
return json(res.data);
} catch {
return json({ error: 'Failed to fetch crawl status' }, { status: 502 });
}
};