fix(discovery): crawl survives gateway timeout and long-running runs
- HTTPRoute: add 300s request+backendRequest timeout rule for /api/v1/admin/discovery/crawl; default rule unchanged. nginx-gateway's 60s default was cutting the connection mid-crawl. - Service.Crawl: detach insert pipeline from HTTP request context with a 3-minute internal timeout. Previously a canceled request ctx cascaded into the link-verifier, failing every URL check and counting every merged event as LinkCheckFailed. Inserts now complete even if the gateway cut the connection. - Log CrawlSummary at INFO on completion so outcomes are visible in backend logs without needing the HTTP response body. - New test: TestServiceCrawlDetachesInsertContextFromRequestCtx.
This commit is contained in:
@@ -20,6 +20,16 @@ spec:
|
||||
hostnames:
|
||||
- {{ .Values.httpRoute.hostname | quote }}
|
||||
rules:
|
||||
- matches:
|
||||
- path:
|
||||
type: PathPrefix
|
||||
value: /api/v1/admin/discovery/crawl
|
||||
timeouts:
|
||||
request: 300s
|
||||
backendRequest: 300s
|
||||
backendRefs:
|
||||
- name: {{ include "marktvogt-backend.fullname" . }}
|
||||
port: {{ .Values.service.port }}
|
||||
- backendRefs:
|
||||
- name: {{ include "marktvogt-backend.fullname" . }}
|
||||
port: {{ .Values.service.port }}
|
||||
|
||||
@@ -84,8 +84,6 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
|
||||
StartedAt: time.Now().UTC(),
|
||||
PerSource: make(map[string]SourceSummary),
|
||||
}
|
||||
defer func() { summary.DurationMs = time.Since(summary.StartedAt).Milliseconds() }()
|
||||
|
||||
res, err := s.crawler.RunAll(ctx)
|
||||
if err != nil {
|
||||
return summary, err
|
||||
@@ -126,18 +124,26 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Detach the insert pipeline from the HTTP request context. The crawl is
|
||||
// conceptually a background job that happens to be triggered by an HTTP
|
||||
// request; a slow client or gateway timeout should not orphan partially-
|
||||
// processed events. 3 minutes is ample for a few hundred link checks +
|
||||
// dedup + inserts; longer than any reasonable crawl run duration.
|
||||
insertCtx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
for _, m := range merged {
|
||||
quellen := s.linkChecker.FilterURLs(ctx, m.Quellen)
|
||||
quellen := s.linkChecker.FilterURLs(insertCtx, m.Quellen)
|
||||
if len(quellen) == 0 {
|
||||
summary.LinkCheckFailed++
|
||||
continue
|
||||
}
|
||||
website := m.Website
|
||||
if website != "" && !s.linkChecker.CheckURL(ctx, website) {
|
||||
if website != "" && !s.linkChecker.CheckURL(insertCtx, website) {
|
||||
website = ""
|
||||
}
|
||||
|
||||
candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.City))
|
||||
candidates, err := s.repo.ListSeriesByCity(insertCtx, NormalizeCity(m.City))
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "list series by city", "city", m.City, "error", err)
|
||||
continue
|
||||
@@ -150,7 +156,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
|
||||
}
|
||||
|
||||
if matchedSeriesID != nil && year > 0 {
|
||||
exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year)
|
||||
exists, err := s.repo.EditionExists(insertCtx, *matchedSeriesID, year)
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "edition exists check", "error", err)
|
||||
continue
|
||||
@@ -162,7 +168,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
|
||||
}
|
||||
nameNorm := NormalizeName(m.Name)
|
||||
if year > 0 {
|
||||
rejected, err := s.repo.IsRejected(ctx, nameNorm, m.City, year)
|
||||
rejected, err := s.repo.IsRejected(insertCtx, nameNorm, m.City, year)
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "is rejected check", "error", err)
|
||||
continue
|
||||
@@ -172,7 +178,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.City, m.StartDate)
|
||||
pending, err := s.repo.QueueHasPending(insertCtx, nameNorm, m.City, m.StartDate)
|
||||
if err != nil {
|
||||
slog.WarnContext(ctx, "queue pending check", "error", err)
|
||||
continue
|
||||
@@ -216,12 +222,26 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil {
|
||||
if _, err := s.repo.InsertDiscovered(insertCtx, dm); err != nil {
|
||||
slog.WarnContext(ctx, "insert discovered", "error", err)
|
||||
continue
|
||||
}
|
||||
summary.Discovered++
|
||||
}
|
||||
summary.DurationMs = time.Since(summary.StartedAt).Milliseconds()
|
||||
slog.InfoContext(ctx, "crawl completed",
|
||||
"duration_ms", summary.DurationMs,
|
||||
"merged", summary.Merged,
|
||||
"merged_across_sites", summary.MergedAcrossSites,
|
||||
"discovered", summary.Discovered,
|
||||
"deduped_existing", summary.DedupedExisting,
|
||||
"deduped_rejected", summary.DedupedRejected,
|
||||
"deduped_queue", summary.DedupedQueue,
|
||||
"link_check_failed", summary.LinkCheckFailed,
|
||||
"validation_failed", summary.ValidationFailed,
|
||||
"date_conflicts", summary.DateConflicts,
|
||||
"source_errors", len(summary.SourceErrors),
|
||||
)
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -310,6 +310,55 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ctxAwareLinkVerifier simulates a link checker that respects context
|
||||
// cancellation: if ctx is already done it returns nil / false, just as
|
||||
// the real HTTP-based implementation would.
|
||||
type ctxAwareLinkVerifier struct{}
|
||||
|
||||
func (ctxAwareLinkVerifier) FilterURLs(ctx context.Context, urls []string) []string {
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
return urls
|
||||
}
|
||||
func (ctxAwareLinkVerifier) CheckURL(ctx context.Context, _ string) bool {
|
||||
return ctx.Err() == nil
|
||||
}
|
||||
|
||||
func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) {
|
||||
repo := newMockRepo()
|
||||
start := mustParseDate(t, "2026-05-01")
|
||||
|
||||
sc := &stubCrawlerRunner{
|
||||
result: crawler.CrawlResult{
|
||||
PerSource: map[string][]crawler.RawEvent{
|
||||
"marktkalendarium": {
|
||||
{SourceName: "marktkalendarium", SourceURL: "https://a/",
|
||||
Name: "X", City: "Y", StartDate: start},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{})
|
||||
|
||||
// Cancel the context BEFORE Crawl runs — simulates gateway timeout
|
||||
// that fires while the handler is still mid-run.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
_, err := svc.Crawl(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// Key assertion: the event got inserted despite ctx being canceled.
|
||||
// If the insert pipeline used ctx, linkChecker would have returned nil,
|
||||
// LinkCheckFailed would be 1, and inserted would be 0.
|
||||
if len(repo.inserted) != 1 {
|
||||
t.Errorf("inserted = %d; want 1 — insert pipeline not properly detached from canceled request ctx", len(repo.inserted))
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
|
||||
repo := newMockRepo()
|
||||
start := mustParseDate(t, "2026-05-01")
|
||||
|
||||
Reference in New Issue
Block a user