Merge branch 'feat/crawl-async' — async crawl handler, UI polls status
Gateway (NGF) ignored our HTTPRoute timeouts field (UnsupportedField). Flipping to fire-and-forget: handler returns 202 immediately, goroutine runs crawl with detached 5-min context, GET /admin/discovery/crawl-status returns state, admin UI polls every 3s until running=false. HTTP requests are now all sub-second; gateway timeout is no longer in the crawl critical path. Concurrent-run protection via atomic.Bool (replaces TryLock), rate limit semantics unchanged.
This commit is contained in:
@@ -1,11 +1,13 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -15,10 +17,21 @@ import (
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
service *Service
|
||||
crawlMu sync.Mutex
|
||||
service *Service
|
||||
|
||||
// Rate limit (admin-session only). crawlRateLimit == 0 means disabled.
|
||||
rateMu sync.Mutex
|
||||
crawlLastManual time.Time
|
||||
crawlRateLimit time.Duration
|
||||
|
||||
// Async crawl state.
|
||||
crawlRunning atomic.Bool
|
||||
resultMu sync.RWMutex
|
||||
// The following are guarded by resultMu:
|
||||
lastStartedAt time.Time
|
||||
lastFinishedAt time.Time
|
||||
lastSummary *CrawlSummary
|
||||
lastError string
|
||||
}
|
||||
|
||||
// NewHandler constructs a Handler. manualRateLimitPerHour controls how
|
||||
@@ -43,21 +56,26 @@ func NewHandler(s *Service, manualRateLimitPerHour int) *Handler {
|
||||
return &Handler{service: s, crawlRateLimit: rl}
|
||||
}
|
||||
|
||||
// Crawl runs the crawler once and returns CrawlSummary. A per-process mutex
|
||||
// blocks concurrent runs. The manual-admin rate limit is bypassed when
|
||||
// "crawl_bypass_rate_limit" is set in the gin context (set by the bearer-token
|
||||
// middleware path).
|
||||
// Crawl kicks off a crawler run in the background and returns 202 immediately.
|
||||
// A per-process CAS gate prevents concurrent runs. Completion is observable
|
||||
// via GET .../crawl-status or backend logs ("crawl completed" / "async crawl
|
||||
// failed").
|
||||
func (h *Handler) Crawl(c *gin.Context) {
|
||||
if !h.crawlMu.TryLock() {
|
||||
// Prevent concurrent runs (replaces TryLock).
|
||||
if !h.crawlRunning.CompareAndSwap(false, true) {
|
||||
apiErr := apierror.TooManyRequests("A crawler run is already in progress")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
defer h.crawlMu.Unlock()
|
||||
|
||||
// Rate limit (admin-session only — bypass via crawl_bypass_rate_limit).
|
||||
if _, bypass := c.Get("crawl_bypass_rate_limit"); !bypass && h.crawlRateLimit > 0 {
|
||||
if since := time.Since(h.crawlLastManual); since < h.crawlRateLimit {
|
||||
h.rateMu.Lock()
|
||||
since := time.Since(h.crawlLastManual)
|
||||
if since < h.crawlRateLimit {
|
||||
retryIn := (h.crawlRateLimit - since).Seconds()
|
||||
h.rateMu.Unlock()
|
||||
h.crawlRunning.Store(false) // release gate since we refused
|
||||
c.Header("Retry-After", fmt.Sprint(int(retryIn)+1))
|
||||
apiErr := apierror.TooManyRequests(
|
||||
fmt.Sprintf("Manual crawl rate-limited; try again in ~%.0fs", retryIn),
|
||||
@@ -66,16 +84,74 @@ func (h *Handler) Crawl(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
h.crawlLastManual = time.Now()
|
||||
h.rateMu.Unlock()
|
||||
}
|
||||
|
||||
summary, err := h.service.Crawl(c.Request.Context())
|
||||
// Reset + mark started.
|
||||
h.resultMu.Lock()
|
||||
h.lastStartedAt = time.Now().UTC()
|
||||
h.lastFinishedAt = time.Time{}
|
||||
h.lastSummary = nil
|
||||
h.lastError = ""
|
||||
h.resultMu.Unlock()
|
||||
|
||||
go h.runCrawlAsync()
|
||||
|
||||
c.JSON(http.StatusAccepted, gin.H{
|
||||
"data": gin.H{
|
||||
"status": "started",
|
||||
"message": "Crawl started in background. Poll /api/v1/admin/discovery/crawl-status for completion.",
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// runCrawlAsync executes the crawl and stores the outcome for crawl-status.
|
||||
// Uses a detached context with a 5-minute cap so a restarted gateway/admin
|
||||
// session doesn't terminate in-flight work.
|
||||
func (h *Handler) runCrawlAsync() {
|
||||
defer h.crawlRunning.Store(false)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
summary, err := h.service.Crawl(ctx)
|
||||
|
||||
h.resultMu.Lock()
|
||||
h.lastFinishedAt = time.Now().UTC()
|
||||
if err != nil {
|
||||
slog.ErrorContext(c.Request.Context(), "discovery crawl failed", "error", err)
|
||||
apiErr := apierror.Internal("crawl failed")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
h.lastError = err.Error()
|
||||
slog.ErrorContext(ctx, "async crawl failed", "error", err)
|
||||
} else {
|
||||
summaryCopy := summary
|
||||
h.lastSummary = &summaryCopy
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"data": summary})
|
||||
h.resultMu.Unlock()
|
||||
}
|
||||
|
||||
// CrawlStatus returns the state of the most recent async crawl.
|
||||
// Shape:
|
||||
//
|
||||
// {
|
||||
// "data": {
|
||||
// "running": bool,
|
||||
// "started_at": "...", // zero if no crawl yet run
|
||||
// "finished_at": "...", // zero while running or if no crawl yet
|
||||
// "summary": CrawlSummary?, // null if running, errored, or not yet run
|
||||
// "error": string? // non-empty only if last run failed
|
||||
// }
|
||||
// }
|
||||
func (h *Handler) CrawlStatus(c *gin.Context) {
|
||||
h.resultMu.RLock()
|
||||
defer h.resultMu.RUnlock()
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"data": gin.H{
|
||||
"running": h.crawlRunning.Load(),
|
||||
"started_at": h.lastStartedAt,
|
||||
"finished_at": h.lastFinishedAt,
|
||||
"summary": h.lastSummary,
|
||||
"error": h.lastError,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) Stats(c *gin.Context) {
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -18,70 +17,171 @@ func init() {
|
||||
gin.SetMode(gin.TestMode)
|
||||
}
|
||||
|
||||
// blockingCrawlerRunner blocks until its ctx is cancelled.
|
||||
// blockingCrawlerRunner blocks until either its release channel is closed
|
||||
// or the context is cancelled.
|
||||
type blockingCrawlerRunner struct {
|
||||
started chan struct{}
|
||||
release chan struct{}
|
||||
}
|
||||
|
||||
func (b *blockingCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult, error) {
|
||||
close(b.started)
|
||||
<-ctx.Done()
|
||||
return crawler.CrawlResult{}, ctx.Err()
|
||||
}
|
||||
|
||||
// TestCrawlHandlerMutexReentry verifies that a second concurrent Crawl request
|
||||
// gets HTTP 429 while the first is still running.
|
||||
func TestCrawlHandlerMutexReentry(t *testing.T) {
|
||||
bc := &blockingCrawlerRunner{started: make(chan struct{})}
|
||||
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
|
||||
h := NewHandler(svc, 1)
|
||||
|
||||
// First request — runs in a goroutine so the handler blocks.
|
||||
w1 := httptest.NewRecorder()
|
||||
c1, _ := gin.CreateTestContext(w1)
|
||||
ctx1, cancel1 := context.WithCancel(context.Background())
|
||||
defer cancel1()
|
||||
c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil).WithContext(ctx1)
|
||||
c1.Set("crawl_bypass_rate_limit", true) // bypass rate limit; test only mutex
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
h.Crawl(c1)
|
||||
}()
|
||||
|
||||
// Wait until the blocking crawler has started and holds the mutex.
|
||||
<-bc.started
|
||||
|
||||
// Second request — should see 429 because mutex is held.
|
||||
w2 := httptest.NewRecorder()
|
||||
c2, _ := gin.CreateTestContext(w2)
|
||||
c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
|
||||
c2.Set("crawl_bypass_rate_limit", true)
|
||||
|
||||
h.Crawl(c2)
|
||||
|
||||
if w2.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("expected 429 from second concurrent request, got %d", w2.Code)
|
||||
select {
|
||||
case <-b.release:
|
||||
return crawler.CrawlResult{}, nil
|
||||
case <-ctx.Done():
|
||||
return crawler.CrawlResult{}, ctx.Err()
|
||||
}
|
||||
|
||||
// Cancel the first request so the goroutine can unblock.
|
||||
cancel1()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestCrawlHandlerRateLimit verifies that a second manual (non-bypass) request
|
||||
// within the rate limit window returns 429 with Retry-After.
|
||||
func TestCrawlHandlerRateLimit(t *testing.T) {
|
||||
// Use an instant-returning crawler so the mutex is released quickly.
|
||||
// waitFor polls cond every 2ms until it returns true or deadline passes.
|
||||
func waitFor(t *testing.T, deadline time.Duration, cond func() bool) {
|
||||
t.Helper()
|
||||
end := time.Now().Add(deadline)
|
||||
for time.Now().Before(end) {
|
||||
if cond() {
|
||||
return
|
||||
}
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
}
|
||||
t.Fatal("condition not met within deadline")
|
||||
}
|
||||
|
||||
// TestCrawlHandlerReturns202AndStartsCrawl verifies that POST /crawl returns
|
||||
// 202 immediately and that the background goroutine completes with a summary.
|
||||
func TestCrawlHandlerReturns202AndStartsCrawl(t *testing.T) {
|
||||
svc := NewService(
|
||||
newMockRepo(),
|
||||
&stubCrawlerRunner{result: crawler.CrawlResult{}},
|
||||
noopLinkVerifier{},
|
||||
noopMarketCreator{},
|
||||
)
|
||||
// Set a long rate limit (1 per hour = 1h window).
|
||||
h := NewHandler(svc, 0) // rate limit disabled
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
|
||||
c.Set("crawl_bypass_rate_limit", true)
|
||||
|
||||
start := time.Now()
|
||||
h.Crawl(c)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if w.Code != http.StatusAccepted {
|
||||
t.Fatalf("expected 202, got %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
if elapsed > 50*time.Millisecond {
|
||||
t.Errorf("handler took %v; expected < 50ms (fire-and-forget)", elapsed)
|
||||
}
|
||||
|
||||
// Wait for goroutine to finish.
|
||||
waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() })
|
||||
|
||||
h.resultMu.RLock()
|
||||
summary := h.lastSummary
|
||||
crawlErr := h.lastError
|
||||
h.resultMu.RUnlock()
|
||||
|
||||
if summary == nil {
|
||||
t.Errorf("lastSummary is nil after crawl completed (error: %q)", crawlErr)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrawlStatusInitialState verifies the zero state of a freshly constructed Handler.
|
||||
func TestCrawlStatusInitialState(t *testing.T) {
|
||||
svc := NewService(newMockRepo(), &stubCrawlerRunner{}, noopLinkVerifier{}, noopMarketCreator{})
|
||||
h := NewHandler(svc, 0)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest(http.MethodGet, "/admin/discovery/crawl-status", nil)
|
||||
|
||||
h.CrawlStatus(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var resp struct {
|
||||
Data struct {
|
||||
Running bool `json:"running"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
FinishedAt time.Time `json:"finished_at"`
|
||||
Summary interface{} `json:"summary"`
|
||||
Error string `json:"error"`
|
||||
} `json:"data"`
|
||||
}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
|
||||
if resp.Data.Running {
|
||||
t.Error("expected running=false on fresh handler")
|
||||
}
|
||||
if resp.Data.Summary != nil {
|
||||
t.Errorf("expected summary=null, got %v", resp.Data.Summary)
|
||||
}
|
||||
if resp.Data.Error != "" {
|
||||
t.Errorf("expected error='', got %q", resp.Data.Error)
|
||||
}
|
||||
if !resp.Data.StartedAt.IsZero() {
|
||||
t.Errorf("expected started_at zero, got %v", resp.Data.StartedAt)
|
||||
}
|
||||
if !resp.Data.FinishedAt.IsZero() {
|
||||
t.Errorf("expected finished_at zero, got %v", resp.Data.FinishedAt)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrawlHandlerConcurrentReturnsTooManyRequests verifies that a second
|
||||
// concurrent Crawl request gets HTTP 429 while the first is still running.
|
||||
func TestCrawlHandlerConcurrentReturnsTooManyRequests(t *testing.T) {
|
||||
bc := &blockingCrawlerRunner{
|
||||
started: make(chan struct{}),
|
||||
release: make(chan struct{}),
|
||||
}
|
||||
svc := NewService(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
|
||||
h := NewHandler(svc, 0) // rate limit disabled
|
||||
|
||||
// First request — returns 202 and spawns goroutine.
|
||||
w1 := httptest.NewRecorder()
|
||||
c1, _ := gin.CreateTestContext(w1)
|
||||
c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
|
||||
c1.Set("crawl_bypass_rate_limit", true)
|
||||
h.Crawl(c1)
|
||||
|
||||
if w1.Code != http.StatusAccepted {
|
||||
t.Fatalf("first request: expected 202, got %d", w1.Code)
|
||||
}
|
||||
|
||||
// Wait until the blocking crawler has actually started (gate is held).
|
||||
<-bc.started
|
||||
|
||||
// Second request — should see 429 because crawlRunning is true.
|
||||
w2 := httptest.NewRecorder()
|
||||
c2, _ := gin.CreateTestContext(w2)
|
||||
c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
|
||||
c2.Set("crawl_bypass_rate_limit", true)
|
||||
h.Crawl(c2)
|
||||
|
||||
if w2.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("expected 429 from second concurrent request, got %d", w2.Code)
|
||||
}
|
||||
|
||||
// Unblock the first goroutine so it finishes cleanly.
|
||||
close(bc.release)
|
||||
waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() })
|
||||
}
|
||||
|
||||
// TestCrawlHandlerRateLimit verifies that consecutive manual requests within
|
||||
// the rate-limit window get HTTP 429 with Retry-After after the first finishes.
|
||||
func TestCrawlHandlerRateLimit(t *testing.T) {
|
||||
svc := NewService(
|
||||
newMockRepo(),
|
||||
&stubCrawlerRunner{result: crawler.CrawlResult{}},
|
||||
noopLinkVerifier{},
|
||||
noopMarketCreator{},
|
||||
)
|
||||
// 1 per hour window.
|
||||
h := NewHandler(svc, 1)
|
||||
|
||||
doRequest := func(bypass bool) *httptest.ResponseRecorder {
|
||||
@@ -95,12 +195,17 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
|
||||
return w
|
||||
}
|
||||
|
||||
// First request — not bypassed. Should succeed (200).
|
||||
// First request — not bypassed. Should be accepted (202).
|
||||
w1 := doRequest(false)
|
||||
if w1.Code != http.StatusOK {
|
||||
t.Fatalf("first request: expected 200, got %d body=%s", w1.Code, w1.Body.String())
|
||||
if w1.Code != http.StatusAccepted {
|
||||
t.Fatalf("first request: expected 202, got %d body=%s", w1.Code, w1.Body.String())
|
||||
}
|
||||
|
||||
// Wait for the first goroutine to finish so crawlRunning drops to false.
|
||||
// This ensures the second request is blocked by the rate limiter, not by
|
||||
// the CAS gate.
|
||||
waitFor(t, 2*time.Second, func() bool { return !h.crawlRunning.Load() })
|
||||
|
||||
// Second request immediately after — should be rate-limited (429).
|
||||
w2 := doRequest(false)
|
||||
if w2.Code != http.StatusTooManyRequests {
|
||||
@@ -110,41 +215,9 @@ func TestCrawlHandlerRateLimit(t *testing.T) {
|
||||
t.Error("second request: expected Retry-After header to be set")
|
||||
}
|
||||
|
||||
// Bypassed request should succeed even within the window.
|
||||
// Bypassed request should be accepted even within the window.
|
||||
w3 := doRequest(true)
|
||||
if w3.Code != http.StatusOK {
|
||||
t.Errorf("bypass request: expected 200, got %d body=%s", w3.Code, w3.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrawlHandlerRateLimitResets verifies that a manual request succeeds once
|
||||
// the rate limit window has elapsed.
|
||||
func TestCrawlHandlerRateLimitResets(t *testing.T) {
|
||||
svc := NewService(
|
||||
newMockRepo(),
|
||||
&stubCrawlerRunner{result: crawler.CrawlResult{}},
|
||||
noopLinkVerifier{},
|
||||
noopMarketCreator{},
|
||||
)
|
||||
h := NewHandler(svc, 1)
|
||||
|
||||
// Force crawlLastManual to well in the past so the window has expired.
|
||||
h.crawlLastManual = time.Now().Add(-2 * time.Hour)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
|
||||
h.Crawl(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200 after window reset, got %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("response body not valid JSON: %v", err)
|
||||
}
|
||||
if _, ok := resp["data"]; !ok {
|
||||
t.Error("expected 'data' key in response body")
|
||||
if w3.Code != http.StatusAccepted {
|
||||
t.Errorf("bypass request: expected 202, got %d body=%s", w3.Code, w3.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,5 +25,7 @@ func RegisterRoutes(
|
||||
admin.POST("/queue/:id/reject", h.Reject)
|
||||
// Manual crawl trigger — subject to hourly rate limit.
|
||||
admin.POST("/crawl-manual", h.Crawl)
|
||||
// Async crawl status polling.
|
||||
admin.GET("/crawl-status", h.CrawlStatus)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,22 +3,6 @@ import { redirect, fail } from '@sveltejs/kit';
|
||||
import { serverFetch } from '$lib/api/client.server.js';
|
||||
import { ApiClientError } from '$lib/api/client.js';
|
||||
|
||||
type CrawlSummary = {
|
||||
started_at: string;
|
||||
duration_ms: number;
|
||||
per_source: Record<string, { events_fetched: number; elapsed_ms: number }>;
|
||||
merged: number;
|
||||
merged_across_sites: number;
|
||||
discovered: number;
|
||||
deduped_existing: number;
|
||||
deduped_rejected: number;
|
||||
deduped_queue: number;
|
||||
link_check_failed: number;
|
||||
validation_failed: number;
|
||||
date_conflicts: number;
|
||||
source_errors: Array<{ source: string; error: string }>;
|
||||
};
|
||||
|
||||
type DiscoveredMarket = {
|
||||
id: string;
|
||||
markt_name: string;
|
||||
@@ -113,11 +97,11 @@ export const actions: Actions = {
|
||||
|
||||
crawl: async ({ cookies, fetch }) => {
|
||||
try {
|
||||
const res = await serverFetch<CrawlSummary>(`/admin/discovery/crawl-manual`, cookies, {
|
||||
await serverFetch<unknown>(`/admin/discovery/crawl-manual`, cookies, {
|
||||
method: 'POST',
|
||||
fetch
|
||||
});
|
||||
return { crawlSummary: res.data };
|
||||
return { crawlStarted: true };
|
||||
} catch (err) {
|
||||
if (err instanceof ApiClientError && err.status === 429) {
|
||||
return fail(429, {
|
||||
|
||||
@@ -1,8 +1,36 @@
|
||||
<script lang="ts">
|
||||
import { onMount } from 'svelte';
|
||||
import { enhance } from '$app/forms';
|
||||
|
||||
let { data, form } = $props();
|
||||
|
||||
type CrawlSummary = {
|
||||
started_at: string;
|
||||
duration_ms: number;
|
||||
per_source: Record<string, { events_fetched: number; elapsed_ms: number }>;
|
||||
merged: number;
|
||||
merged_across_sites: number;
|
||||
discovered: number;
|
||||
deduped_existing: number;
|
||||
deduped_rejected: number;
|
||||
deduped_queue: number;
|
||||
link_check_failed: number;
|
||||
validation_failed: number;
|
||||
date_conflicts: number;
|
||||
source_errors: Array<{ source: string; error: string }>;
|
||||
};
|
||||
|
||||
type CrawlStatus = {
|
||||
running: boolean;
|
||||
started_at: string;
|
||||
finished_at: string;
|
||||
summary: CrawlSummary | null;
|
||||
error: string;
|
||||
};
|
||||
|
||||
let crawling = $state(false);
|
||||
let crawlStatus = $state<CrawlStatus | null>(null);
|
||||
let pollInterval: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
// Coalesce nullable list fields (Go encodes nil slices as null).
|
||||
const queue = $derived(data.queue ?? []);
|
||||
@@ -63,6 +91,70 @@
|
||||
!data.stats.last_tick_at ||
|
||||
Date.now() - new Date(data.stats.last_tick_at).getTime() > 6 * 60 * 60 * 1000
|
||||
);
|
||||
|
||||
async function fetchCrawlStatus(): Promise<CrawlStatus | null> {
|
||||
try {
|
||||
const res = await fetch('/admin/discovery/crawl-status');
|
||||
if (!res.ok) return null;
|
||||
return (await res.json()) as CrawlStatus;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function stopPolling() {
|
||||
if (pollInterval !== null) {
|
||||
clearInterval(pollInterval);
|
||||
pollInterval = null;
|
||||
}
|
||||
}
|
||||
|
||||
function startPolling() {
|
||||
if (pollInterval !== null) return; // already polling
|
||||
pollInterval = setInterval(async () => {
|
||||
const status = await fetchCrawlStatus();
|
||||
if (status === null) return;
|
||||
crawlStatus = status;
|
||||
if (!status.running) {
|
||||
stopPolling();
|
||||
crawling = false;
|
||||
}
|
||||
}, 3000);
|
||||
}
|
||||
|
||||
// On mount: fetch current status so a page refresh picks up any in-progress
|
||||
// or recently completed crawl.
|
||||
onMount(async () => {
|
||||
const status = await fetchCrawlStatus();
|
||||
if (status === null) return;
|
||||
crawlStatus = status;
|
||||
if (status.running) {
|
||||
crawling = true;
|
||||
startPolling();
|
||||
}
|
||||
});
|
||||
|
||||
// When the crawl action returns crawlStarted, begin polling.
|
||||
$effect(() => {
|
||||
if (form?.crawlStarted) {
|
||||
crawling = true;
|
||||
startPolling();
|
||||
}
|
||||
});
|
||||
|
||||
// Elapsed time display while running.
|
||||
const elapsedLabel = $derived.by(() => {
|
||||
if (!crawlStatus?.running || !crawlStatus.started_at) return '';
|
||||
const ms = Date.now() - new Date(crawlStatus.started_at).getTime();
|
||||
const s = Math.round(ms / 1000);
|
||||
return s < 60 ? `${s}s` : `${Math.floor(s / 60)}m ${s % 60}s`;
|
||||
});
|
||||
|
||||
// The summary to display: either from poll result or a completed status.
|
||||
const displaySummary = $derived(crawlStatus?.summary ?? null);
|
||||
const displayError = $derived(
|
||||
crawlStatus && !crawlStatus.running && crawlStatus.error ? crawlStatus.error : null
|
||||
);
|
||||
</script>
|
||||
|
||||
<svelte:head>
|
||||
@@ -136,8 +228,7 @@
|
||||
use:enhance={() => {
|
||||
crawling = true;
|
||||
return async ({ update }) => {
|
||||
crawling = false;
|
||||
await update();
|
||||
await update({ reset: false });
|
||||
};
|
||||
}}
|
||||
>
|
||||
@@ -146,7 +237,11 @@
|
||||
disabled={crawling}
|
||||
class="rounded bg-indigo-600 px-3 py-1.5 text-sm text-white hover:bg-indigo-700 disabled:cursor-not-allowed disabled:opacity-60"
|
||||
>
|
||||
{crawling ? 'Crawling…' : 'Run crawl'}
|
||||
{#if crawling}
|
||||
Crawling…{elapsedLabel ? ` (${elapsedLabel})` : ''}
|
||||
{:else}
|
||||
Run crawl
|
||||
{/if}
|
||||
</button>
|
||||
</form>
|
||||
</div>
|
||||
@@ -159,8 +254,16 @@
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
{#if form?.crawlSummary}
|
||||
{@const s = form.crawlSummary}
|
||||
{#if displayError}
|
||||
<div
|
||||
class="mt-4 rounded border border-red-300 bg-red-50 px-3 py-2 text-sm text-red-700 dark:border-red-700 dark:bg-red-950 dark:text-red-300"
|
||||
>
|
||||
Crawl fehlgeschlagen: {displayError}
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
{#if displaySummary}
|
||||
{@const s = displaySummary}
|
||||
<div
|
||||
class="mt-4 rounded-lg border border-stone-200 bg-stone-50 p-4 text-sm dark:border-stone-700 dark:bg-stone-900"
|
||||
>
|
||||
|
||||
12
web/src/routes/admin/discovery/crawl-status/+server.ts
Normal file
12
web/src/routes/admin/discovery/crawl-status/+server.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import { json } from '@sveltejs/kit';
|
||||
import { serverFetch } from '$lib/api/client.server.js';
|
||||
import type { RequestHandler } from './$types.js';
|
||||
|
||||
export const GET: RequestHandler = async ({ cookies }) => {
|
||||
try {
|
||||
const res = await serverFetch<unknown>('/admin/discovery/crawl-status', cookies);
|
||||
return json(res.data);
|
||||
} catch {
|
||||
return json({ error: 'Failed to fetch crawl status' }, { status: 502 });
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user