diff --git a/backend/internal/domain/discovery/crawler/fetch.go b/backend/internal/domain/discovery/crawler/fetch.go new file mode 100644 index 0000000..dc2cf8a --- /dev/null +++ b/backend/internal/domain/discovery/crawler/fetch.go @@ -0,0 +1,170 @@ +package crawler + +import ( + "context" + "fmt" + "io" + "net/http" + "time" +) + +// Fetcher is the shared HTTP client across all crawler sources. Enforces polite +// defaults: retry on 5xx/network, back off on 429, stop immediately on 4xx. +// Per-host connection cap is on the Transport so all callers share the gate. +type Fetcher struct { + client *http.Client + userAgent string + retryWait []time.Duration + rateLimitWaits []time.Duration +} + +type FetcherOptions struct { + UserAgent string + Timeout time.Duration + RetryWait []time.Duration + RateLimitWaits []time.Duration +} + +func NewFetcher(opts FetcherOptions) *Fetcher { + if opts.Timeout == 0 { + opts.Timeout = 30 * time.Second + } + if opts.RetryWait == nil { + opts.RetryWait = []time.Duration{2 * time.Second, 5 * time.Second} + } + if opts.RateLimitWaits == nil { + opts.RateLimitWaits = []time.Duration{60 * time.Second, 180 * time.Second} + } + return &Fetcher{ + client: &http.Client{ + Timeout: opts.Timeout, + Transport: &http.Transport{ + MaxIdleConnsPerHost: 2, + MaxConnsPerHost: 1, + IdleConnTimeout: 30 * time.Second, + }, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return http.ErrUseLastResponse + } + return nil + }, + }, + userAgent: opts.UserAgent, + retryWait: opts.RetryWait, + rateLimitWaits: opts.RateLimitWaits, + } +} + +// Get fetches a URL and returns the body bytes. accept overrides the default +// Accept header ("" -> HTML default). +func (f *Fetcher) Get(ctx context.Context, url, accept string) ([]byte, error) { + if accept == "" { + accept = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" + } + + attempt := func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("User-Agent", f.userAgent) + req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8") + req.Header.Set("Accept", accept) + return f.client.Do(req) + } + + // Initial + 5xx retries. + var lastErr error + waits := append([]time.Duration{0}, f.retryWait...) + for i, wait := range waits { + if wait > 0 { + if err := sleepCtx(ctx, wait); err != nil { + return nil, err + } + } + resp, err := attempt() + if err != nil { + lastErr = err + if i < len(waits)-1 { + continue + } + return nil, fmt.Errorf("fetch %s: %w", url, err) + } + + // 4xx other than 429: stop immediately. + if resp.StatusCode == http.StatusTooManyRequests { + _ = resp.Body.Close() + return f.handle429(ctx, url, accept) + } + if resp.StatusCode >= 400 && resp.StatusCode < 500 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) + _ = resp.Body.Close() + return nil, fmt.Errorf("fetch %s: http %d: %s", url, resp.StatusCode, string(body)) + } + + // 5xx: retry unless this was the last attempt. + if resp.StatusCode >= 500 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) + _ = resp.Body.Close() + lastErr = fmt.Errorf("http %d: %s", resp.StatusCode, string(body)) + if i < len(waits)-1 { + continue + } + return nil, fmt.Errorf("fetch %s: %w", url, lastErr) + } + + body, err := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("read body %s: %w", url, err) + } + return body, nil + } + return nil, lastErr +} + +// handle429 applies longer 429-specific backoffs, then one more attempt per +// wait in rateLimitWaits. Gives up after the last wait still sees 429 or worse. +func (f *Fetcher) handle429(ctx context.Context, url, accept string) ([]byte, error) { + for _, wait := range f.rateLimitWaits { + if err := sleepCtx(ctx, wait); err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("User-Agent", f.userAgent) + req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8") + req.Header.Set("Accept", accept) + resp, err := f.client.Do(req) + if err != nil { + return nil, fmt.Errorf("fetch %s after 429 wait: %w", url, err) + } + if resp.StatusCode == http.StatusOK { + body, rerr := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if rerr != nil { + return nil, fmt.Errorf("read body %s: %w", url, rerr) + } + return body, nil + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusTooManyRequests { + return nil, fmt.Errorf("fetch %s after 429 wait: http %d", url, resp.StatusCode) + } + } + return nil, fmt.Errorf("fetch %s: rate-limited (429) after all retries", url) +} + +func sleepCtx(ctx context.Context, d time.Duration) error { + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + return nil + } +} diff --git a/backend/internal/domain/discovery/crawler/fetch_test.go b/backend/internal/domain/discovery/crawler/fetch_test.go new file mode 100644 index 0000000..8bc475c --- /dev/null +++ b/backend/internal/domain/discovery/crawler/fetch_test.go @@ -0,0 +1,117 @@ +package crawler + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" +) + +func TestFetcherSuccess(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if ua := r.Header.Get("User-Agent"); !strings.Contains(ua, "Firefox") { + t.Errorf("unexpected UA: %q", ua) + } + w.WriteHeader(200) + _, _ = io.WriteString(w, "ok") + })) + defer srv.Close() + + f := NewFetcher(FetcherOptions{UserAgent: "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0"}) + body, err := f.Get(context.Background(), srv.URL, "") + if err != nil { + t.Fatal(err) + } + if string(body) != "ok" { + t.Errorf("body = %q; want %q", body, "ok") + } +} + +func TestFetcherRetriesOn5xx(t *testing.T) { + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&hits, 1) + if n < 3 { + w.WriteHeader(503) + return + } + w.WriteHeader(200) + _, _ = io.WriteString(w, "ok") + })) + defer srv.Close() + + f := NewFetcher(FetcherOptions{ + UserAgent: "t", + RetryWait: []time.Duration{10 * time.Millisecond, 20 * time.Millisecond}, + }) + body, err := f.Get(context.Background(), srv.URL, "") + if err != nil { + t.Fatal(err) + } + if string(body) != "ok" { + t.Errorf("body = %q; want %q", body, "ok") + } + if got := atomic.LoadInt32(&hits); got != 3 { + t.Errorf("hits = %d; want 3", got) + } +} + +func TestFetcherGivesUpOn4xx(t *testing.T) { + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&hits, 1) + w.WriteHeader(404) + })) + defer srv.Close() + + f := NewFetcher(FetcherOptions{UserAgent: "t", RetryWait: []time.Duration{1 * time.Millisecond}}) + _, err := f.Get(context.Background(), srv.URL, "") + if err == nil { + t.Fatal("expected error, got nil") + } + if got := atomic.LoadInt32(&hits); got != 1 { + t.Errorf("hits = %d; want 1 (no retry on 4xx)", got) + } +} + +func TestFetcherBacksOffOn429(t *testing.T) { + var hits int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&hits, 1) + w.WriteHeader(429) + })) + defer srv.Close() + + f := NewFetcher(FetcherOptions{ + UserAgent: "t", + RetryWait: []time.Duration{1 * time.Millisecond}, + RateLimitWaits: []time.Duration{1 * time.Millisecond, 2 * time.Millisecond}, + }) + _, err := f.Get(context.Background(), srv.URL, "") + if err == nil { + t.Fatal("expected error after 429 retries exhausted") + } + // One initial + two rate-limit retries = 3 hits total. + if got := atomic.LoadInt32(&hits); got != 3 { + t.Errorf("hits = %d; want 3", got) + } +} + +func TestFetcherHonorsContextCancel(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(500 * time.Millisecond) + })) + defer srv.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + + f := NewFetcher(FetcherOptions{UserAgent: "t"}) + if _, err := f.Get(ctx, srv.URL, ""); err == nil { + t.Fatal("expected context error, got nil") + } +}