From 540298fb8870c3587ccee11fdb0c2fd74b514a87 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Sat, 18 Apr 2026 07:37:39 +0200 Subject: [PATCH] feat(discovery): implement Accept + Reject with transactional state changes --- .../domain/discovery/mock_repo_test.go | 26 +++- backend/internal/domain/discovery/service.go | 121 +++++++++++++++++- .../internal/domain/discovery/service_test.go | 74 +++++++++++ 3 files changed, 215 insertions(+), 6 deletions(-) diff --git a/backend/internal/domain/discovery/mock_repo_test.go b/backend/internal/domain/discovery/mock_repo_test.go index 1f655c1..e90a7e2 100644 --- a/backend/internal/domain/discovery/mock_repo_test.go +++ b/backend/internal/domain/discovery/mock_repo_test.go @@ -19,6 +19,11 @@ type mockRepo struct { insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) isRejectedFn func(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) queuePendingFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) + getDiscoveredFn func(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) + beginTxFn func(ctx context.Context) (pgx.Tx, error) + markAcceptedFn func(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error + markRejectedFn func(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error + insertRejFn func(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error } func (m *mockRepo) PickStaleBuckets(ctx context.Context, fm, lim int) ([]Bucket, error) { @@ -46,17 +51,32 @@ func (m *mockRepo) ListQueue(ctx context.Context, status string, l, o int) ([]Di return nil, nil } func (m *mockRepo) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) { + if m.getDiscoveredFn != nil { + return m.getDiscoveredFn(ctx, id) + } return DiscoveredMarket{}, nil } +func (m *mockRepo) BeginTx(ctx context.Context) (pgx.Tx, error) { + if m.beginTxFn != nil { + return m.beginTxFn(ctx) + } + return nil, nil +} func (m *mockRepo) MarkAccepted(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error { + if m.markAcceptedFn != nil { + return m.markAcceptedFn(ctx, tx, id, eid, r) + } return nil } func (m *mockRepo) MarkRejected(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error { + if m.markRejectedFn != nil { + return m.markRejectedFn(ctx, tx, id, r) + } return nil } func (m *mockRepo) InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error { + if m.insertRejFn != nil { + return m.insertRejFn(ctx, tx, r) + } return nil } -func (m *mockRepo) BeginTx(ctx context.Context) (pgx.Tx, error) { - return nil, nil -} diff --git a/backend/internal/domain/discovery/service.go b/backend/internal/domain/discovery/service.go index 92c78c0..c42f09c 100644 --- a/backend/internal/domain/discovery/service.go +++ b/backend/internal/domain/discovery/service.go @@ -12,21 +12,26 @@ import ( "marktvogt.de/backend/internal/domain/market" ) +type marketCreator interface { + Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error) + CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error) +} + // Service orchestrates bucket scheduling, agent invocation, and queue management. type Service struct { repo Repository agent *AgentClient - marketService *market.Service + marketCreator marketCreator batchSize int forwardMonths int } // NewService constructs a Service. batchSize and forwardMonths configure bucket picking. -func NewService(repo Repository, agent *AgentClient, marketService *market.Service, batchSize, forwardMonths int) *Service { +func NewService(repo Repository, agent *AgentClient, mc marketCreator, batchSize, forwardMonths int) *Service { return &Service{ repo: repo, agent: agent, - marketService: marketService, + marketCreator: mc, batchSize: batchSize, forwardMonths: forwardMonths, } @@ -184,6 +189,116 @@ func parseOptionalDate(s string) *time.Time { return &t } +// Accept transitions a pending queue entry into a market edition. +// Returns (seriesID, editionID, error). +func (s *Service) Accept(ctx context.Context, queueID, reviewerID uuid.UUID) (uuid.UUID, uuid.UUID, error) { + d, err := s.repo.GetDiscovered(ctx, queueID) + if err != nil { + return uuid.Nil, uuid.Nil, fmt.Errorf("load queue entry: %w", err) + } + if d.Status != StatusPending { + return uuid.Nil, uuid.Nil, fmt.Errorf("queue entry is %s, expected pending", d.Status) + } + if d.StartDatum == nil || d.EndDatum == nil { + return uuid.Nil, uuid.Nil, errors.New("cannot accept entry without start/end date") + } + + var created market.Market + if d.MatchedSeriesID != nil { + req := market.CreateEditionRequest{ + Name: d.MarktName, + City: d.Stadt, + State: d.Bundesland, + Country: landToISO(d.Land), + StartDate: d.StartDatum.Format("2006-01-02"), + EndDate: d.EndDatum.Format("2006-01-02"), + Website: d.Website, + } + created, err = s.marketCreator.CreateEditionForSeries(ctx, *d.MatchedSeriesID, req) + } else { + req := market.CreateMarketRequest{ + Name: d.MarktName, + City: d.Stadt, + State: d.Bundesland, + Country: landToISO(d.Land), + StartDate: d.StartDatum.Format("2006-01-02"), + EndDate: d.EndDatum.Format("2006-01-02"), + Website: d.Website, + } + created, err = s.marketCreator.Create(ctx, req) + } + if err != nil { + return uuid.Nil, uuid.Nil, fmt.Errorf("create market: %w", err) + } + + // Discovery-side transition. + tx, err := s.repo.BeginTx(ctx) + if err != nil { + return created.SeriesID, created.ID, fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + if err := s.repo.MarkAccepted(ctx, tx, queueID, created.ID, reviewerID); err != nil { + return created.SeriesID, created.ID, fmt.Errorf("mark accepted: %w", err) + } + if err := tx.Commit(ctx); err != nil { + return created.SeriesID, created.ID, fmt.Errorf("commit accept tx: %w", err) + } + return created.SeriesID, created.ID, nil +} + +// Reject marks a pending entry rejected and records a sticky rejection. +func (s *Service) Reject(ctx context.Context, queueID, reviewerID uuid.UUID, reason string) error { + d, err := s.repo.GetDiscovered(ctx, queueID) + if err != nil { + return fmt.Errorf("load queue entry: %w", err) + } + if d.Status != StatusPending { + return fmt.Errorf("queue entry is %s, expected pending", d.Status) + } + year := time.Now().Year() + if d.StartDatum != nil { + year = d.StartDatum.Year() + } + + tx, err := s.repo.BeginTx(ctx) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + if err := s.repo.MarkRejected(ctx, tx, queueID, reviewerID); err != nil { + return fmt.Errorf("mark rejected: %w", err) + } + rejection := RejectedDiscovery{ + NameNormalized: d.NameNormalized, + Stadt: d.Stadt, + Year: year, + RejectedBy: &reviewerID, + Reason: reason, + } + if err := s.repo.InsertRejection(ctx, tx, rejection); err != nil { + return fmt.Errorf("insert rejection: %w", err) + } + return tx.Commit(ctx) +} + +// landToISO maps the Pass 0 land string to ISO-2 codes required by market.CreateMarketRequest. +func landToISO(land string) string { + switch land { + case "Deutschland": + return "DE" + case "Österreich", "Oesterreich": + return "AT" + case "Schweiz": + return "CH" + } + return "" +} + +// ListPendingQueue exposes queue listing for the handler layer. +func (s *Service) ListPendingQueue(ctx context.Context, limit, offset int) ([]DiscoveredMarket, error) { + return s.repo.ListQueue(ctx, StatusPending, limit, offset) +} + // findSeriesMatch returns the ID of the first candidate whose normalized name matches // incomingName after normalization. Candidates are expected to be pre-filtered by city. func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID { diff --git a/backend/internal/domain/discovery/service_test.go b/backend/internal/domain/discovery/service_test.go index f2114c8..6177274 100644 --- a/backend/internal/domain/discovery/service_test.go +++ b/backend/internal/domain/discovery/service_test.go @@ -6,6 +6,9 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5" + + "marktvogt.de/backend/internal/domain/market" ) func TestFindSeriesMatch(t *testing.T) { @@ -143,3 +146,74 @@ func TestProcessBucket_InsertsNetNew(t *testing.T) { t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized) } } + +type stubCreator struct { + createCalls int + createEditionForSeriesCalls int + returnErr error +} + +func (c *stubCreator) Create(_ context.Context, _ market.CreateMarketRequest) (market.Market, error) { + c.createCalls++ + if c.returnErr != nil { + return market.Market{}, c.returnErr + } + return market.Market{ID: uuid.New(), SeriesID: uuid.New()}, nil +} + +func (c *stubCreator) CreateEditionForSeries(_ context.Context, sid uuid.UUID, _ market.CreateEditionRequest) (market.Market, error) { + c.createEditionForSeriesCalls++ + if c.returnErr != nil { + return market.Market{}, c.returnErr + } + return market.Market{ID: uuid.New(), SeriesID: sid}, nil +} + +type noopTx struct{ pgx.Tx } + +func (noopTx) Commit(_ context.Context) error { return nil } +func (noopTx) Rollback(_ context.Context) error { return nil } + +func TestAccept_NewSeries_CallsCreate(t *testing.T) { + start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC) + end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC) + qID := uuid.New() + m := &mockRepo{ + getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{ID: qID, Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end}, nil + }, + beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil }, + markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, + } + mc := &stubCreator{} + svc := NewService(m, nil, mc, 4, 12) + _, _, err := svc.Accept(context.Background(), qID, uuid.New()) + if err != nil { + t.Fatalf("accept err: %v", err) + } + if mc.createCalls != 1 || mc.createEditionForSeriesCalls != 0 { + t.Errorf("expected Create=1 CreateEdition=0, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls) + } +} + +func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) { + start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC) + end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC) + sid := uuid.New() + m := &mockRepo{ + getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) { + return DiscoveredMarket{Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end, MatchedSeriesID: &sid}, nil + }, + beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil }, + markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil }, + } + mc := &stubCreator{} + svc := NewService(m, nil, mc, 4, 12) + _, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New()) + if err != nil { + t.Fatalf("accept err: %v", err) + } + if mc.createCalls != 0 || mc.createEditionForSeriesCalls != 1 { + t.Errorf("expected Create=0 CreateEdition=1, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls) + } +}