feat(discovery): POST /admin/discovery/crawl with mutex and rate limit
Exposes Service.Crawl via two HTTP routes: a bearer-token path that bypasses the manual rate limit, and an admin-session path subject to a configurable per-hour cap. A sync.Mutex blocks concurrent runs. Includes handler tests for mutex reentry and rate limit enforcement.
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
@@ -12,11 +15,18 @@ import (
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
service *Service
|
||||
service *Service
|
||||
crawlMu sync.Mutex
|
||||
crawlLastManual time.Time
|
||||
crawlRateLimit time.Duration
|
||||
}
|
||||
|
||||
func NewHandler(s *Service) *Handler {
|
||||
return &Handler{service: s}
|
||||
func NewHandler(s *Service, manualRateLimitPerHour int) *Handler {
|
||||
rl := time.Hour
|
||||
if manualRateLimitPerHour > 1 {
|
||||
rl = time.Hour / time.Duration(manualRateLimitPerHour)
|
||||
}
|
||||
return &Handler{service: s, crawlRateLimit: rl}
|
||||
}
|
||||
|
||||
func (h *Handler) Tick(c *gin.Context) {
|
||||
@@ -30,6 +40,41 @@ func (h *Handler) Tick(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"data": summary})
|
||||
}
|
||||
|
||||
// Crawl runs the crawler once and returns CrawlSummary. A per-process mutex
|
||||
// blocks concurrent runs. The manual-admin rate limit is bypassed when
|
||||
// "crawl_bypass_rate_limit" is set in the gin context (set by the bearer-token
|
||||
// middleware path).
|
||||
func (h *Handler) Crawl(c *gin.Context) {
|
||||
if !h.crawlMu.TryLock() {
|
||||
apiErr := apierror.TooManyRequests("A crawler run is already in progress")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
defer h.crawlMu.Unlock()
|
||||
|
||||
if _, bypass := c.Get("crawl_bypass_rate_limit"); !bypass {
|
||||
if since := time.Since(h.crawlLastManual); since < h.crawlRateLimit {
|
||||
retryIn := (h.crawlRateLimit - since).Seconds()
|
||||
c.Header("Retry-After", fmt.Sprint(int(retryIn)+1))
|
||||
apiErr := apierror.TooManyRequests(
|
||||
fmt.Sprintf("Manual crawl rate-limited; try again in ~%.0fs", retryIn),
|
||||
)
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
h.crawlLastManual = time.Now()
|
||||
}
|
||||
|
||||
summary, err := h.service.Crawl(c.Request.Context())
|
||||
if err != nil {
|
||||
slog.ErrorContext(c.Request.Context(), "discovery crawl failed", "error", err)
|
||||
apiErr := apierror.Internal("crawl failed")
|
||||
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"data": summary})
|
||||
}
|
||||
|
||||
func (h *Handler) Stats(c *gin.Context) {
|
||||
s, err := h.service.Stats(c.Request.Context())
|
||||
if err != nil {
|
||||
|
||||
150
backend/internal/domain/discovery/handler_test.go
Normal file
150
backend/internal/domain/discovery/handler_test.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"marktvogt.de/backend/internal/domain/discovery/crawler"
|
||||
)
|
||||
|
||||
func init() {
|
||||
gin.SetMode(gin.TestMode)
|
||||
}
|
||||
|
||||
// blockingCrawlerRunner blocks until its ctx is cancelled.
|
||||
type blockingCrawlerRunner struct {
|
||||
started chan struct{}
|
||||
}
|
||||
|
||||
func (b *blockingCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult, error) {
|
||||
close(b.started)
|
||||
<-ctx.Done()
|
||||
return crawler.CrawlResult{}, ctx.Err()
|
||||
}
|
||||
|
||||
// TestCrawlHandlerMutexReentry verifies that a second concurrent Crawl request
|
||||
// gets HTTP 429 while the first is still running.
|
||||
func TestCrawlHandlerMutexReentry(t *testing.T) {
|
||||
bc := &blockingCrawlerRunner{started: make(chan struct{})}
|
||||
svc := NewServiceWithCrawler(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
|
||||
h := NewHandler(svc, 1)
|
||||
|
||||
// First request — runs in a goroutine so the handler blocks.
|
||||
w1 := httptest.NewRecorder()
|
||||
c1, _ := gin.CreateTestContext(w1)
|
||||
ctx1, cancel1 := context.WithCancel(context.Background())
|
||||
defer cancel1()
|
||||
c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil).WithContext(ctx1)
|
||||
c1.Set("crawl_bypass_rate_limit", true) // bypass rate limit; test only mutex
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
h.Crawl(c1)
|
||||
}()
|
||||
|
||||
// Wait until the blocking crawler has started and holds the mutex.
|
||||
<-bc.started
|
||||
|
||||
// Second request — should see 429 because mutex is held.
|
||||
w2 := httptest.NewRecorder()
|
||||
c2, _ := gin.CreateTestContext(w2)
|
||||
c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
|
||||
c2.Set("crawl_bypass_rate_limit", true)
|
||||
|
||||
h.Crawl(c2)
|
||||
|
||||
if w2.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("expected 429 from second concurrent request, got %d", w2.Code)
|
||||
}
|
||||
|
||||
// Cancel the first request so the goroutine can unblock.
|
||||
cancel1()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestCrawlHandlerRateLimit verifies that a second manual (non-bypass) request
|
||||
// within the rate limit window returns 429 with Retry-After.
|
||||
func TestCrawlHandlerRateLimit(t *testing.T) {
|
||||
// Use an instant-returning crawler so the mutex is released quickly.
|
||||
svc := NewServiceWithCrawler(
|
||||
newMockRepo(),
|
||||
&stubCrawlerRunner{result: crawler.CrawlResult{}},
|
||||
noopLinkVerifier{},
|
||||
noopMarketCreator{},
|
||||
)
|
||||
// Set a long rate limit (1 per hour = 1h window).
|
||||
h := NewHandler(svc, 1)
|
||||
|
||||
doRequest := func(bypass bool) *httptest.ResponseRecorder {
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
|
||||
if bypass {
|
||||
c.Set("crawl_bypass_rate_limit", true)
|
||||
}
|
||||
h.Crawl(c)
|
||||
return w
|
||||
}
|
||||
|
||||
// First request — not bypassed. Should succeed (200).
|
||||
w1 := doRequest(false)
|
||||
if w1.Code != http.StatusOK {
|
||||
t.Fatalf("first request: expected 200, got %d body=%s", w1.Code, w1.Body.String())
|
||||
}
|
||||
|
||||
// Second request immediately after — should be rate-limited (429).
|
||||
w2 := doRequest(false)
|
||||
if w2.Code != http.StatusTooManyRequests {
|
||||
t.Errorf("second request: expected 429, got %d body=%s", w2.Code, w2.Body.String())
|
||||
}
|
||||
if w2.Header().Get("Retry-After") == "" {
|
||||
t.Error("second request: expected Retry-After header to be set")
|
||||
}
|
||||
|
||||
// Bypassed request should succeed even within the window.
|
||||
w3 := doRequest(true)
|
||||
if w3.Code != http.StatusOK {
|
||||
t.Errorf("bypass request: expected 200, got %d body=%s", w3.Code, w3.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrawlHandlerRateLimitResets verifies that a manual request succeeds once
|
||||
// the rate limit window has elapsed.
|
||||
func TestCrawlHandlerRateLimitResets(t *testing.T) {
|
||||
svc := NewServiceWithCrawler(
|
||||
newMockRepo(),
|
||||
&stubCrawlerRunner{result: crawler.CrawlResult{}},
|
||||
noopLinkVerifier{},
|
||||
noopMarketCreator{},
|
||||
)
|
||||
h := NewHandler(svc, 1)
|
||||
|
||||
// Force crawlLastManual to well in the past so the window has expired.
|
||||
h.crawlLastManual = time.Now().Add(-2 * time.Hour)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
|
||||
h.Crawl(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200 after window reset, got %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("response body not valid JSON: %v", err)
|
||||
}
|
||||
if _, ok := resp["data"]; !ok {
|
||||
t.Error("expected 'data' key in response body")
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,13 @@ func RegisterRoutes(
|
||||
// Machine-driven tick (bearer token).
|
||||
rg.POST("/admin/discovery/tick", requireTickToken, h.Tick)
|
||||
|
||||
// Admin-session queue routes.
|
||||
// Machine-driven crawl (bearer token; bypasses manual rate limit).
|
||||
rg.POST("/admin/discovery/crawl", requireTickToken, func(c *gin.Context) {
|
||||
c.Set("crawl_bypass_rate_limit", true)
|
||||
h.Crawl(c)
|
||||
})
|
||||
|
||||
// Admin-session routes (queue mgmt + manual crawl trigger).
|
||||
admin := rg.Group("/admin/discovery", requireAuth, requireAdmin)
|
||||
{
|
||||
admin.GET("/stats", h.Stats)
|
||||
@@ -20,5 +26,7 @@ func RegisterRoutes(
|
||||
admin.PATCH("/queue/:id", h.Update)
|
||||
admin.POST("/queue/:id/accept", h.Accept)
|
||||
admin.POST("/queue/:id/reject", h.Reject)
|
||||
// Manual crawl trigger — subject to hourly rate limit.
|
||||
admin.POST("/crawl-manual", h.Crawl)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ func (s *Server) registerRoutes() {
|
||||
s.cfg.Discovery.BatchSize,
|
||||
s.cfg.Discovery.ForwardMonths,
|
||||
)
|
||||
discoveryHandler := discovery.NewHandler(discoveryService)
|
||||
discoveryHandler := discovery.NewHandler(discoveryService, 1)
|
||||
requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token)
|
||||
discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user