diff --git a/backend/deploy/helm/templates/httproute.yaml b/backend/deploy/helm/templates/httproute.yaml index a5d6096..67c3dd8 100644 --- a/backend/deploy/helm/templates/httproute.yaml +++ b/backend/deploy/helm/templates/httproute.yaml @@ -20,6 +20,16 @@ spec: hostnames: - {{ .Values.httpRoute.hostname | quote }} rules: + - matches: + - path: + type: PathPrefix + value: /api/v1/admin/discovery/crawl + timeouts: + request: 300s + backendRequest: 300s + backendRefs: + - name: {{ include "marktvogt-backend.fullname" . }} + port: {{ .Values.service.port }} - backendRefs: - name: {{ include "marktvogt-backend.fullname" . }} port: {{ .Values.service.port }} diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 915068b..b29b731 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -84,8 +84,6 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { StartedAt: time.Now().UTC(), PerSource: make(map[string]SourceSummary), } - defer func() { summary.DurationMs = time.Since(summary.StartedAt).Milliseconds() }() - res, err := s.crawler.RunAll(ctx) if err != nil { return summary, err @@ -126,18 +124,26 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { } } + // Detach the insert pipeline from the HTTP request context. The crawl is + // conceptually a background job that happens to be triggered by an HTTP + // request; a slow client or gateway timeout should not orphan partially- + // processed events. 3 minutes is ample for a few hundred link checks + + // dedup + inserts; longer than any reasonable crawl run duration. + insertCtx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + for _, m := range merged { - quellen := s.linkChecker.FilterURLs(ctx, m.Quellen) + quellen := s.linkChecker.FilterURLs(insertCtx, m.Quellen) if len(quellen) == 0 { summary.LinkCheckFailed++ continue } website := m.Website - if website != "" && !s.linkChecker.CheckURL(ctx, website) { + if website != "" && !s.linkChecker.CheckURL(insertCtx, website) { website = "" } - candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.City)) + candidates, err := s.repo.ListSeriesByCity(insertCtx, NormalizeCity(m.City)) if err != nil { slog.WarnContext(ctx, "list series by city", "city", m.City, "error", err) continue @@ -150,7 +156,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { } if matchedSeriesID != nil && year > 0 { - exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year) + exists, err := s.repo.EditionExists(insertCtx, *matchedSeriesID, year) if err != nil { slog.WarnContext(ctx, "edition exists check", "error", err) continue @@ -162,7 +168,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { } nameNorm := NormalizeName(m.Name) if year > 0 { - rejected, err := s.repo.IsRejected(ctx, nameNorm, m.City, year) + rejected, err := s.repo.IsRejected(insertCtx, nameNorm, m.City, year) if err != nil { slog.WarnContext(ctx, "is rejected check", "error", err) continue @@ -172,7 +178,7 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { continue } } - pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.City, m.StartDate) + pending, err := s.repo.QueueHasPending(insertCtx, nameNorm, m.City, m.StartDate) if err != nil { slog.WarnContext(ctx, "queue pending check", "error", err) continue @@ -216,12 +222,26 @@ func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) { continue } - if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil { + if _, err := s.repo.InsertDiscovered(insertCtx, dm); err != nil { slog.WarnContext(ctx, "insert discovered", "error", err) continue } summary.Discovered++ } + summary.DurationMs = time.Since(summary.StartedAt).Milliseconds() + slog.InfoContext(ctx, "crawl completed", + "duration_ms", summary.DurationMs, + "merged", summary.Merged, + "merged_across_sites", summary.MergedAcrossSites, + "discovered", summary.Discovered, + "deduped_existing", summary.DedupedExisting, + "deduped_rejected", summary.DedupedRejected, + "deduped_queue", summary.DedupedQueue, + "link_check_failed", summary.LinkCheckFailed, + "validation_failed", summary.ValidationFailed, + "date_conflicts", summary.DateConflicts, + "source_errors", len(summary.SourceErrors), + ) return summary, nil } diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index b6457df..4dacd66 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -310,6 +310,55 @@ func TestServiceCrawlDefaultsEndDate(t *testing.T) { } } +// ctxAwareLinkVerifier simulates a link checker that respects context +// cancellation: if ctx is already done it returns nil / false, just as +// the real HTTP-based implementation would. +type ctxAwareLinkVerifier struct{} + +func (ctxAwareLinkVerifier) FilterURLs(ctx context.Context, urls []string) []string { + if ctx.Err() != nil { + return nil + } + return urls +} +func (ctxAwareLinkVerifier) CheckURL(ctx context.Context, _ string) bool { + return ctx.Err() == nil +} + +func TestServiceCrawlDetachesInsertContextFromRequestCtx(t *testing.T) { + repo := newMockRepo() + start := mustParseDate(t, "2026-05-01") + + sc := &stubCrawlerRunner{ + result: crawler.CrawlResult{ + PerSource: map[string][]crawler.RawEvent{ + "marktkalendarium": { + {SourceName: "marktkalendarium", SourceURL: "https://a/", + Name: "X", City: "Y", StartDate: start}, + }, + }, + }, + } + svc := NewService(repo, sc, ctxAwareLinkVerifier{}, noopMarketCreator{}) + + // Cancel the context BEFORE Crawl runs — simulates gateway timeout + // that fires while the handler is still mid-run. + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := svc.Crawl(ctx) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Key assertion: the event got inserted despite ctx being canceled. + // If the insert pipeline used ctx, linkChecker would have returned nil, + // LinkCheckFailed would be 1, and inserted would be 0. + if len(repo.inserted) != 1 { + t.Errorf("inserted = %d; want 1 — insert pipeline not properly detached from canceled request ctx", len(repo.inserted)) + } +} + func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) { repo := newMockRepo() start := mustParseDate(t, "2026-05-01")