feat(merge-plan): convert to async polling to bypass nginx 60s timeout

POST /admin/markets/:id/merge-plan now returns 202 + job_id immediately
and runs the Gemini advisor in a detached goroutine. Frontend polls
GET .../merge-plan/:job_id until done, with backoff up to 3 minutes.

Adds in-memory job registry (keyed map + RWMutex, 5-min TTL sweep) and
handler tests covering the full pending→done and error paths.
This commit is contained in:
2026-04-25 23:37:03 +02:00
parent caaad8adf4
commit 643ee77600
6 changed files with 484 additions and 33 deletions

View File

@@ -7,6 +7,7 @@ import (
"log/slog"
"net/http"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
@@ -19,14 +20,61 @@ import (
const duplicateClassifyTopN = 5
const mergeJobTTL = 5 * time.Minute
type mergeJobStatus string
const (
mergeJobPending mergeJobStatus = "pending"
mergeJobDone mergeJobStatus = "done"
mergeJobError mergeJobStatus = "error"
)
type mergeJob struct {
sourceID uuid.UUID
targetID uuid.UUID
status mergeJobStatus
proposal *MarketMergeProposal
err string
startedAt time.Time
finishedAt time.Time
}
type AdminHandler struct {
service *Service
classifier enrich.SimilarityClassifier
advisor *MergeAdvisor
// proposeFn and getMarketFn are overridable in tests.
proposeFn func(ctx context.Context, a, b Market, v enrich.Verdict) (MarketMergeProposal, error)
getMarketFn func(ctx context.Context, id uuid.UUID) (Market, error)
mergeJobsMu sync.RWMutex
mergeJobs map[uuid.UUID]*mergeJob
}
func NewAdminHandler(service *Service, classifier enrich.SimilarityClassifier, advisor *MergeAdvisor) *AdminHandler {
return &AdminHandler{service: service, classifier: classifier, advisor: advisor}
h := &AdminHandler{
service: service,
classifier: classifier,
advisor: advisor,
mergeJobs: make(map[uuid.UUID]*mergeJob),
getMarketFn: service.GetByID,
}
if advisor != nil {
h.proposeFn = advisor.Propose
}
return h
}
// sweepMergeJobs removes finished jobs older than mergeJobTTL.
// Caller must hold mergeJobsMu (write lock).
func (h *AdminHandler) sweepMergeJobs() {
now := time.Now()
for id, job := range h.mergeJobs {
if !job.finishedAt.IsZero() && now.Sub(job.finishedAt) > mergeJobTTL {
delete(h.mergeJobs, id)
}
}
}
func (h *AdminHandler) List(c *gin.Context) { //nolint:dupl
@@ -278,10 +326,10 @@ type MergePlanRequest struct {
TargetID uuid.UUID `json:"target_id" binding:"required"`
}
// MergePlan generates a MarketMergeProposal for two editions without persisting anything.
// MergePlan validates both markets, registers an async job, and returns 202
// immediately. The client polls GET .../merge-plan/:job_id for the result.
func (h *AdminHandler) MergePlan(c *gin.Context) {
ctx, cancel := context.WithTimeout(c.Request.Context(), 110*time.Second)
defer cancel()
ctx := c.Request.Context()
sourceID, err := uuid.Parse(c.Param("id"))
if err != nil {
@@ -299,7 +347,12 @@ func (h *AdminHandler) MergePlan(c *gin.Context) {
return
}
sourceM, err := h.service.GetByID(ctx, sourceID)
if h.proposeFn == nil {
c.JSON(http.StatusServiceUnavailable, apierror.NewResponse(apierror.Internal("merge advisor not configured")))
return
}
sourceM, err := h.getMarketFn(ctx, sourceID)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("market")))
@@ -308,7 +361,7 @@ func (h *AdminHandler) MergePlan(c *gin.Context) {
}
return
}
targetM, err := h.service.GetByID(ctx, req.TargetID)
targetM, err := h.getMarketFn(ctx, req.TargetID)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("target market")))
@@ -320,23 +373,80 @@ func (h *AdminHandler) MergePlan(c *gin.Context) {
verdict := classifyPair(c, h.classifier, sourceM, targetM)
if h.advisor == nil {
c.JSON(http.StatusServiceUnavailable, apierror.NewResponse(apierror.Internal("merge advisor not configured")))
return
jobID := uuid.New()
job := &mergeJob{
sourceID: sourceID,
targetID: req.TargetID,
status: mergeJobPending,
startedAt: time.Now().UTC(),
}
proposal, err := h.advisor.Propose(ctx, sourceM, targetM, verdict)
if err != nil {
if errors.Is(err, ErrNotDuplicate) {
c.JSON(http.StatusConflict, apierror.NewResponse(apierror.BadRequest("not_duplicate", "Markt wird nicht als Duplikat klassifiziert")))
return
h.mergeJobsMu.Lock()
h.sweepMergeJobs()
h.mergeJobs[jobID] = job
h.mergeJobsMu.Unlock()
go h.runMergePlanAsync(jobID, sourceM, targetM, verdict)
c.JSON(http.StatusAccepted, gin.H{"data": gin.H{"job_id": jobID, "status": "pending"}})
}
// runMergePlanAsync calls the advisor in the background and stores the result.
// Uses a detached context so a cancelled HTTP connection does not abort the job.
func (h *AdminHandler) runMergePlanAsync(jobID uuid.UUID, sourceM, targetM Market, verdict enrich.Verdict) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
proposal, err := h.proposeFn(ctx, sourceM, targetM, verdict)
h.mergeJobsMu.Lock()
job, ok := h.mergeJobs[jobID]
if ok {
job.finishedAt = time.Now().UTC()
if err != nil {
job.status = mergeJobError
job.err = err.Error()
slog.ErrorContext(ctx, "merge-plan async: advisor failed",
"job_id", jobID, "source_id", sourceM.ID, "target_id", targetM.ID, "err", err)
} else {
job.status = mergeJobDone
job.proposal = &proposal
}
slog.ErrorContext(ctx, "merge-plan: advisor failed", "source_id", sourceID, "target_id", req.TargetID, "err", err)
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Merge-Plan konnte nicht erstellt werden")))
}
h.mergeJobsMu.Unlock()
}
// MergePlanStatus polls the outcome of an async merge-plan job.
// Returns 202 while pending, 200 when done or errored.
func (h *AdminHandler) MergePlanStatus(c *gin.Context) {
sourceID, err := uuid.Parse(c.Param("id"))
if err != nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_id", "invalid market ID")))
return
}
jobID, err := uuid.Parse(c.Param("job_id"))
if err != nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_job_id", "invalid job ID")))
return
}
c.JSON(http.StatusOK, gin.H{"data": proposal})
h.mergeJobsMu.RLock()
job, ok := h.mergeJobs[jobID]
h.mergeJobsMu.RUnlock()
if !ok || job.sourceID != sourceID {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("merge job")))
return
}
switch job.status {
case mergeJobPending:
c.JSON(http.StatusAccepted, gin.H{"data": gin.H{"status": "pending"}})
case mergeJobDone:
c.JSON(http.StatusOK, gin.H{"data": gin.H{"status": "done", "proposal": job.proposal}})
case mergeJobError:
c.JSON(http.StatusOK, gin.H{"data": gin.H{"status": "error", "error": job.err}})
}
}
// MergeIntoRequest is the body for the merge-into endpoint.
@@ -390,12 +500,12 @@ func (h *AdminHandler) MergeInto(c *gin.Context) {
if req.Proposal != nil {
proposal = *req.Proposal
} else {
if h.advisor == nil {
if h.proposeFn == nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("no_proposal", "proposal required when advisor not configured")))
return
}
verdict := classifyPair(c, h.classifier, sourceM, targetM)
proposal, err = h.advisor.Propose(ctx, sourceM, targetM, verdict)
proposal, err = h.proposeFn(ctx, sourceM, targetM, verdict)
if err != nil {
if errors.Is(err, ErrNotDuplicate) {
c.JSON(http.StatusConflict, apierror.NewResponse(apierror.BadRequest("not_duplicate", "Markt wird nicht als Duplikat klassifiziert")))

View File

@@ -0,0 +1,285 @@
package market
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/discovery/enrich"
)
func init() {
gin.SetMode(gin.TestMode)
}
// blockingProposer gates the propose call until its release channel is closed.
type blockingProposer struct {
started chan struct{}
release chan struct{}
result MarketMergeProposal
err error
}
func (b *blockingProposer) propose(_ context.Context, _, _ Market, _ enrich.Verdict) (MarketMergeProposal, error) {
close(b.started)
<-b.release
return b.result, b.err
}
// waitFor polls cond every 2 ms until it returns true or the deadline passes.
func waitFor(t *testing.T, deadline time.Duration, cond func() bool) {
t.Helper()
end := time.Now().Add(deadline)
for time.Now().Before(end) {
if cond() {
return
}
time.Sleep(2 * time.Millisecond)
}
t.Fatal("condition not met within deadline")
}
// newTestHandler builds a minimal AdminHandler with injected function stubs.
func newTestHandler(
markets map[uuid.UUID]Market,
proposeFn func(context.Context, Market, Market, enrich.Verdict) (MarketMergeProposal, error),
) *AdminHandler {
return &AdminHandler{
mergeJobs: make(map[uuid.UUID]*mergeJob),
proposeFn: proposeFn,
getMarketFn: func(_ context.Context, id uuid.UUID) (Market, error) {
m, ok := markets[id]
if !ok {
return Market{}, ErrMarketNotFound
}
return m, nil
},
}
}
func doKickoff(t *testing.T, h *AdminHandler, sourceID, targetID uuid.UUID) *httptest.ResponseRecorder {
t.Helper()
body := `{"target_id":"` + targetID.String() + `"}`
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost,
"/admin/markets/"+sourceID.String()+"/merge-plan",
strings.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
c.Params = gin.Params{{Key: "id", Value: sourceID.String()}}
h.MergePlan(c)
return w
}
func doPoll(t *testing.T, h *AdminHandler, sourceID, jobID uuid.UUID) *httptest.ResponseRecorder {
t.Helper()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodGet,
"/admin/markets/"+sourceID.String()+"/merge-plan/"+jobID.String(), nil)
c.Params = gin.Params{
{Key: "id", Value: sourceID.String()},
{Key: "job_id", Value: jobID.String()},
}
h.MergePlanStatus(c)
return w
}
// TestMergePlanKickoffAndPolling verifies:
// - POST returns 202 + job_id immediately
// - GET returns 202 pending while goroutine is blocked
// - GET returns 200 done with proposal once goroutine unblocks
func TestMergePlanKickoffAndPolling(t *testing.T) {
sourceID := uuid.New()
targetID := uuid.New()
want := MarketMergeProposal{TargetID: targetID, Summary: "test proposal"}
bp := &blockingProposer{
started: make(chan struct{}),
release: make(chan struct{}),
result: want,
}
h := newTestHandler(
map[uuid.UUID]Market{sourceID: {ID: sourceID}, targetID: {ID: targetID}},
bp.propose,
)
w := doKickoff(t, h, sourceID, targetID)
if w.Code != http.StatusAccepted {
t.Fatalf("kickoff: expected 202, got %d body=%s", w.Code, w.Body.String())
}
var kickoffResp struct {
Data struct {
JobID string `json:"job_id"`
Status string `json:"status"`
} `json:"data"`
}
if err := json.Unmarshal(w.Body.Bytes(), &kickoffResp); err != nil {
t.Fatalf("kickoff: decode body: %v", err)
}
if kickoffResp.Data.Status != "pending" {
t.Errorf("kickoff status: want pending, got %q", kickoffResp.Data.Status)
}
jobID, err := uuid.Parse(kickoffResp.Data.JobID)
if err != nil {
t.Fatalf("kickoff: invalid job_id %q: %v", kickoffResp.Data.JobID, err)
}
// Wait until goroutine has started (release gate is held).
<-bp.started
// Status while goroutine is still blocked → 202 pending.
wp := doPoll(t, h, sourceID, jobID)
if wp.Code != http.StatusAccepted {
t.Errorf("poll pending: expected 202, got %d body=%s", wp.Code, wp.Body.String())
}
// Unblock the goroutine.
close(bp.release)
// Wait for job to be marked done.
waitFor(t, 2*time.Second, func() bool {
h.mergeJobsMu.RLock()
defer h.mergeJobsMu.RUnlock()
j, ok := h.mergeJobs[jobID]
return ok && j.status == mergeJobDone
})
// Status after completion → 200 done with proposal.
wd := doPoll(t, h, sourceID, jobID)
if wd.Code != http.StatusOK {
t.Fatalf("poll done: expected 200, got %d body=%s", wd.Code, wd.Body.String())
}
var doneResp struct {
Data struct {
Status string `json:"status"`
Proposal MarketMergeProposal `json:"proposal"`
} `json:"data"`
}
if err := json.Unmarshal(wd.Body.Bytes(), &doneResp); err != nil {
t.Fatalf("poll done: decode body: %v", err)
}
if doneResp.Data.Status != "done" {
t.Errorf("poll done: status want done, got %q", doneResp.Data.Status)
}
if doneResp.Data.Proposal.Summary != want.Summary {
t.Errorf("poll done: proposal.summary want %q, got %q", want.Summary, doneResp.Data.Proposal.Summary)
}
}
// TestMergePlanAdvisorErrorSurfacedAsStatusError verifies that advisor failures
// reach the polling client as status=error (not a 5xx from the status endpoint).
func TestMergePlanAdvisorErrorSurfacedAsStatusError(t *testing.T) {
sourceID := uuid.New()
targetID := uuid.New()
called := make(chan struct{})
errFn := func(_ context.Context, _, _ Market, _ enrich.Verdict) (MarketMergeProposal, error) {
close(called)
return MarketMergeProposal{}, ErrNotDuplicate
}
h := newTestHandler(
map[uuid.UUID]Market{sourceID: {ID: sourceID}, targetID: {ID: targetID}},
errFn,
)
w := doKickoff(t, h, sourceID, targetID)
if w.Code != http.StatusAccepted {
t.Fatalf("kickoff: expected 202, got %d", w.Code)
}
var kickoffResp struct {
Data struct {
JobID string `json:"job_id"`
} `json:"data"`
}
json.Unmarshal(w.Body.Bytes(), &kickoffResp) //nolint:errcheck
jobID, _ := uuid.Parse(kickoffResp.Data.JobID)
<-called
waitFor(t, 2*time.Second, func() bool {
h.mergeJobsMu.RLock()
defer h.mergeJobsMu.RUnlock()
j, ok := h.mergeJobs[jobID]
return ok && j.status == mergeJobError
})
we := doPoll(t, h, sourceID, jobID)
if we.Code != http.StatusOK {
t.Fatalf("poll error: expected 200, got %d body=%s", we.Code, we.Body.String())
}
var errResp struct {
Data struct {
Status string `json:"status"`
Error string `json:"error"`
} `json:"data"`
}
json.Unmarshal(we.Body.Bytes(), &errResp) //nolint:errcheck
if errResp.Data.Status != "error" {
t.Errorf("want status=error, got %q", errResp.Data.Status)
}
if errResp.Data.Error == "" {
t.Error("want non-empty error message")
}
}
// TestMergePlanStatusUnknownJobReturns404 verifies that a stale or fabricated
// job_id yields 404.
func TestMergePlanStatusUnknownJobReturns404(t *testing.T) {
h := newTestHandler(nil, nil)
sourceID := uuid.New()
jobID := uuid.New() // never registered
w := doPoll(t, h, sourceID, jobID)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404 for unknown job, got %d", w.Code)
}
}
// TestMergePlanStatusWrongMarketReturns404 verifies that a valid job_id with a
// mismatched source market ID yields 404 (prevents cross-market leakage).
func TestMergePlanStatusWrongMarketReturns404(t *testing.T) {
h := newTestHandler(nil, nil)
realSourceID := uuid.New()
wrongSourceID := uuid.New()
jobID := uuid.New()
h.mergeJobsMu.Lock()
h.mergeJobs[jobID] = &mergeJob{
sourceID: realSourceID,
status: mergeJobDone,
finishedAt: time.Now(),
}
h.mergeJobsMu.Unlock()
w := doPoll(t, h, wrongSourceID, jobID)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404 for mismatched source, got %d", w.Code)
}
}
// TestMergePlanTTLSweepRemovesOldJobs verifies that finished jobs older than
// mergeJobTTL are removed from the map on the next sweep.
func TestMergePlanTTLSweepRemovesOldJobs(t *testing.T) {
h := newTestHandler(nil, nil)
oldJobID := uuid.New()
h.mergeJobsMu.Lock()
h.mergeJobs[oldJobID] = &mergeJob{
status: mergeJobDone,
finishedAt: time.Now().Add(-(mergeJobTTL + time.Second)),
}
h.sweepMergeJobs()
_, stillPresent := h.mergeJobs[oldJobID]
h.mergeJobsMu.Unlock()
if stillPresent {
t.Error("expected expired job to be removed by TTL sweep")
}
}

View File

@@ -31,6 +31,7 @@ func RegisterAdminRoutes(rg *gin.RouterGroup, h *AdminHandler, rh *ResearchHandl
markets.POST("/:id/research/apply", rh.Apply)
markets.GET("/:id/duplicates", h.FindDuplicates)
markets.POST("/:id/merge-plan", h.MergePlan)
markets.GET("/:id/merge-plan/:job_id", h.MergePlanStatus)
markets.POST("/:id/merge-into/:target_id", h.MergeInto)
}

View File

@@ -19,27 +19,60 @@
let mergeError: string | null = $state(null);
let applying = $state(false);
async function readJSON(res: Response): Promise<{ error?: string; [k: string]: unknown }> {
const text = await res.text();
try {
return text ? JSON.parse(text) : {};
} catch {
return {};
}
}
async function pollMergePlan(
marketId: string,
jobId: string,
timeoutMs: number
): Promise<{ status: string; proposal?: MarketMergeProposal; error?: string }> {
const deadline = Date.now() + timeoutMs;
let intervalMs = 1500;
while (Date.now() < deadline) {
await new Promise((r) => setTimeout(r, intervalMs));
const res = await fetch(`/admin/maerkte/${marketId}/merge-plan/${jobId}`);
const body = await readJSON(res);
if (!res.ok) {
return { status: 'error', error: (body.error as string) ?? `HTTP ${res.status}` };
}
const status = body.status as string;
if (status === 'done' || status === 'error') {
return body as { status: string; proposal?: MarketMergeProposal; error?: string };
}
intervalMs = Math.min(intervalMs * 1.3, 4000);
}
return { status: 'error', error: 'Merge-Plan: Zeitüberschreitung beim Polling.' };
}
async function handlePlan(candidate: DuplicateMarket) {
mergeError = null;
planningId = candidate.id;
try {
const res = await fetch(`/admin/maerkte/${data.market.id}/merge-plan`, {
const startRes = await fetch(`/admin/maerkte/${data.market.id}/merge-plan`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ target_id: candidate.id })
});
const text = await res.text();
let body: { error?: string; [k: string]: unknown } = {};
try {
body = text ? JSON.parse(text) : {};
} catch {
// non-JSON body
}
if (!res.ok) {
mergeError = body.error ?? `Merge-Plan fehlgeschlagen (HTTP ${res.status}).`;
const startBody = await readJSON(startRes);
if (!startRes.ok || !startBody.job_id) {
mergeError =
(startBody.error as string) ?? `Merge-Plan fehlgeschlagen (HTTP ${startRes.status}).`;
return;
}
proposal = body as unknown as MarketMergeProposal;
const result = await pollMergePlan(data.market.id, startBody.job_id as string, 180_000);
if (result.status === 'error') {
mergeError = result.error ?? 'Merge-Plan fehlgeschlagen.';
return;
}
proposal = result.proposal as MarketMergeProposal;
proposalCandidate = candidate;
} catch (err) {
mergeError = err instanceof Error ? err.message : 'Merge-Plan fehlgeschlagen.';

View File

@@ -1,17 +1,16 @@
import { json } from '@sveltejs/kit';
import { serverFetch } from '$lib/api/client.server.js';
import type { MarketMergeProposal } from '$lib/api/types.js';
import type { RequestHandler } from './$types.js';
export const POST: RequestHandler = async ({ cookies, params, request }) => {
try {
const body = (await request.json()) as { target_id: string };
const res = await serverFetch<MarketMergeProposal>(
const res = await serverFetch<{ job_id: string; status: string }>(
`/admin/markets/${params.id}/merge-plan`,
cookies,
{ method: 'POST', body: JSON.stringify(body) }
);
return json(res.data);
return json(res.data, { status: 202 });
} catch (err) {
const message = err instanceof Error ? err.message : 'Merge-Plan fehlgeschlagen.';
return json({ error: message }, { status: 502 });

View File

@@ -0,0 +1,23 @@
import { json } from '@sveltejs/kit';
import { serverFetch } from '$lib/api/client.server.js';
import type { MarketMergeProposal } from '$lib/api/types.js';
import type { RequestHandler } from './$types.js';
type MergePlanStatusData =
| { status: 'pending' }
| { status: 'done'; proposal: MarketMergeProposal }
| { status: 'error'; error: string };
export const GET: RequestHandler = async ({ cookies, params }) => {
try {
const res = await serverFetch<MergePlanStatusData>(
`/admin/markets/${params.id}/merge-plan/${params.jobId}`,
cookies
);
const httpStatus = res.data.status === 'pending' ? 202 : 200;
return json(res.data, { status: httpStatus });
} catch (err) {
const message = err instanceof Error ? err.message : 'Merge-Plan-Status fehlgeschlagen.';
return json({ error: message }, { status: 502 });
}
};