feat(discovery/crawler): polite HTTP fetcher with retry and 429 backoff
This commit is contained in:
170
backend/internal/domain/discovery/crawler/fetch.go
Normal file
170
backend/internal/domain/discovery/crawler/fetch.go
Normal file
@@ -0,0 +1,170 @@
|
||||
package crawler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Fetcher is the shared HTTP client across all crawler sources. Enforces polite
|
||||
// defaults: retry on 5xx/network, back off on 429, stop immediately on 4xx.
|
||||
// Per-host connection cap is on the Transport so all callers share the gate.
|
||||
type Fetcher struct {
|
||||
client *http.Client
|
||||
userAgent string
|
||||
retryWait []time.Duration
|
||||
rateLimitWaits []time.Duration
|
||||
}
|
||||
|
||||
type FetcherOptions struct {
|
||||
UserAgent string
|
||||
Timeout time.Duration
|
||||
RetryWait []time.Duration
|
||||
RateLimitWaits []time.Duration
|
||||
}
|
||||
|
||||
func NewFetcher(opts FetcherOptions) *Fetcher {
|
||||
if opts.Timeout == 0 {
|
||||
opts.Timeout = 30 * time.Second
|
||||
}
|
||||
if opts.RetryWait == nil {
|
||||
opts.RetryWait = []time.Duration{2 * time.Second, 5 * time.Second}
|
||||
}
|
||||
if opts.RateLimitWaits == nil {
|
||||
opts.RateLimitWaits = []time.Duration{60 * time.Second, 180 * time.Second}
|
||||
}
|
||||
return &Fetcher{
|
||||
client: &http.Client{
|
||||
Timeout: opts.Timeout,
|
||||
Transport: &http.Transport{
|
||||
MaxIdleConnsPerHost: 2,
|
||||
MaxConnsPerHost: 1,
|
||||
IdleConnTimeout: 30 * time.Second,
|
||||
},
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
if len(via) >= 10 {
|
||||
return http.ErrUseLastResponse
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
userAgent: opts.UserAgent,
|
||||
retryWait: opts.RetryWait,
|
||||
rateLimitWaits: opts.RateLimitWaits,
|
||||
}
|
||||
}
|
||||
|
||||
// Get fetches a URL and returns the body bytes. accept overrides the default
|
||||
// Accept header ("" -> HTML default).
|
||||
func (f *Fetcher) Get(ctx context.Context, url, accept string) ([]byte, error) {
|
||||
if accept == "" {
|
||||
accept = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
|
||||
}
|
||||
|
||||
attempt := func() (*http.Response, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("User-Agent", f.userAgent)
|
||||
req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8")
|
||||
req.Header.Set("Accept", accept)
|
||||
return f.client.Do(req)
|
||||
}
|
||||
|
||||
// Initial + 5xx retries.
|
||||
var lastErr error
|
||||
waits := append([]time.Duration{0}, f.retryWait...)
|
||||
for i, wait := range waits {
|
||||
if wait > 0 {
|
||||
if err := sleepCtx(ctx, wait); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
resp, err := attempt()
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
if i < len(waits)-1 {
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("fetch %s: %w", url, err)
|
||||
}
|
||||
|
||||
// 4xx other than 429: stop immediately.
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
_ = resp.Body.Close()
|
||||
return f.handle429(ctx, url, accept)
|
||||
}
|
||||
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
_ = resp.Body.Close()
|
||||
return nil, fmt.Errorf("fetch %s: http %d: %s", url, resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// 5xx: retry unless this was the last attempt.
|
||||
if resp.StatusCode >= 500 {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
_ = resp.Body.Close()
|
||||
lastErr = fmt.Errorf("http %d: %s", resp.StatusCode, string(body))
|
||||
if i < len(waits)-1 {
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("fetch %s: %w", url, lastErr)
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read body %s: %w", url, err)
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
// handle429 applies longer 429-specific backoffs, then one more attempt per
|
||||
// wait in rateLimitWaits. Gives up after the last wait still sees 429 or worse.
|
||||
func (f *Fetcher) handle429(ctx context.Context, url, accept string) ([]byte, error) {
|
||||
for _, wait := range f.rateLimitWaits {
|
||||
if err := sleepCtx(ctx, wait); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("User-Agent", f.userAgent)
|
||||
req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8")
|
||||
req.Header.Set("Accept", accept)
|
||||
resp, err := f.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch %s after 429 wait: %w", url, err)
|
||||
}
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
body, rerr := io.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
if rerr != nil {
|
||||
return nil, fmt.Errorf("read body %s: %w", url, rerr)
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusTooManyRequests {
|
||||
return nil, fmt.Errorf("fetch %s after 429 wait: http %d", url, resp.StatusCode)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("fetch %s: rate-limited (429) after all retries", url)
|
||||
}
|
||||
|
||||
func sleepCtx(ctx context.Context, d time.Duration) error {
|
||||
t := time.NewTimer(d)
|
||||
defer t.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-t.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
117
backend/internal/domain/discovery/crawler/fetch_test.go
Normal file
117
backend/internal/domain/discovery/crawler/fetch_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package crawler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFetcherSuccess(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if ua := r.Header.Get("User-Agent"); !strings.Contains(ua, "Firefox") {
|
||||
t.Errorf("unexpected UA: %q", ua)
|
||||
}
|
||||
w.WriteHeader(200)
|
||||
_, _ = io.WriteString(w, "ok")
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
f := NewFetcher(FetcherOptions{UserAgent: "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0"})
|
||||
body, err := f.Get(context.Background(), srv.URL, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(body) != "ok" {
|
||||
t.Errorf("body = %q; want %q", body, "ok")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetcherRetriesOn5xx(t *testing.T) {
|
||||
var hits int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
n := atomic.AddInt32(&hits, 1)
|
||||
if n < 3 {
|
||||
w.WriteHeader(503)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(200)
|
||||
_, _ = io.WriteString(w, "ok")
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
f := NewFetcher(FetcherOptions{
|
||||
UserAgent: "t",
|
||||
RetryWait: []time.Duration{10 * time.Millisecond, 20 * time.Millisecond},
|
||||
})
|
||||
body, err := f.Get(context.Background(), srv.URL, "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(body) != "ok" {
|
||||
t.Errorf("body = %q; want %q", body, "ok")
|
||||
}
|
||||
if got := atomic.LoadInt32(&hits); got != 3 {
|
||||
t.Errorf("hits = %d; want 3", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetcherGivesUpOn4xx(t *testing.T) {
|
||||
var hits int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&hits, 1)
|
||||
w.WriteHeader(404)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
f := NewFetcher(FetcherOptions{UserAgent: "t", RetryWait: []time.Duration{1 * time.Millisecond}})
|
||||
_, err := f.Get(context.Background(), srv.URL, "")
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if got := atomic.LoadInt32(&hits); got != 1 {
|
||||
t.Errorf("hits = %d; want 1 (no retry on 4xx)", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetcherBacksOffOn429(t *testing.T) {
|
||||
var hits int32
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
atomic.AddInt32(&hits, 1)
|
||||
w.WriteHeader(429)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
f := NewFetcher(FetcherOptions{
|
||||
UserAgent: "t",
|
||||
RetryWait: []time.Duration{1 * time.Millisecond},
|
||||
RateLimitWaits: []time.Duration{1 * time.Millisecond, 2 * time.Millisecond},
|
||||
})
|
||||
_, err := f.Get(context.Background(), srv.URL, "")
|
||||
if err == nil {
|
||||
t.Fatal("expected error after 429 retries exhausted")
|
||||
}
|
||||
// One initial + two rate-limit retries = 3 hits total.
|
||||
if got := atomic.LoadInt32(&hits); got != 3 {
|
||||
t.Errorf("hits = %d; want 3", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetcherHonorsContextCancel(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
f := NewFetcher(FetcherOptions{UserAgent: "t"})
|
||||
if _, err := f.Get(ctx, srv.URL, ""); err == nil {
|
||||
t.Fatal("expected context error, got nil")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user