feat(discovery): implement Accept + Reject with transactional state changes

This commit is contained in:
2026-04-18 07:37:39 +02:00
parent 4e7120e958
commit 540298fb88
3 changed files with 215 additions and 6 deletions

View File

@@ -19,6 +19,11 @@ type mockRepo struct {
insertDiscFn func(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
isRejectedFn func(ctx context.Context, nameNormalized, stadt string, year int) (bool, error)
queuePendingFn func(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error)
getDiscoveredFn func(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error)
beginTxFn func(ctx context.Context) (pgx.Tx, error)
markAcceptedFn func(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error
markRejectedFn func(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error
insertRejFn func(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error
}
func (m *mockRepo) PickStaleBuckets(ctx context.Context, fm, lim int) ([]Bucket, error) {
@@ -46,17 +51,32 @@ func (m *mockRepo) ListQueue(ctx context.Context, status string, l, o int) ([]Di
return nil, nil
}
func (m *mockRepo) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) {
if m.getDiscoveredFn != nil {
return m.getDiscoveredFn(ctx, id)
}
return DiscoveredMarket{}, nil
}
func (m *mockRepo) BeginTx(ctx context.Context) (pgx.Tx, error) {
if m.beginTxFn != nil {
return m.beginTxFn(ctx)
}
return nil, nil
}
func (m *mockRepo) MarkAccepted(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error {
if m.markAcceptedFn != nil {
return m.markAcceptedFn(ctx, tx, id, eid, r)
}
return nil
}
func (m *mockRepo) MarkRejected(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error {
if m.markRejectedFn != nil {
return m.markRejectedFn(ctx, tx, id, r)
}
return nil
}
func (m *mockRepo) InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error {
if m.insertRejFn != nil {
return m.insertRejFn(ctx, tx, r)
}
return nil
}
func (m *mockRepo) BeginTx(ctx context.Context) (pgx.Tx, error) {
return nil, nil
}

View File

@@ -12,21 +12,26 @@ import (
"marktvogt.de/backend/internal/domain/market"
)
type marketCreator interface {
Create(ctx context.Context, req market.CreateMarketRequest) (market.Market, error)
CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error)
}
// Service orchestrates bucket scheduling, agent invocation, and queue management.
type Service struct {
repo Repository
agent *AgentClient
marketService *market.Service
marketCreator marketCreator
batchSize int
forwardMonths int
}
// NewService constructs a Service. batchSize and forwardMonths configure bucket picking.
func NewService(repo Repository, agent *AgentClient, marketService *market.Service, batchSize, forwardMonths int) *Service {
func NewService(repo Repository, agent *AgentClient, mc marketCreator, batchSize, forwardMonths int) *Service {
return &Service{
repo: repo,
agent: agent,
marketService: marketService,
marketCreator: mc,
batchSize: batchSize,
forwardMonths: forwardMonths,
}
@@ -184,6 +189,116 @@ func parseOptionalDate(s string) *time.Time {
return &t
}
// Accept transitions a pending queue entry into a market edition.
// Returns (seriesID, editionID, error).
func (s *Service) Accept(ctx context.Context, queueID, reviewerID uuid.UUID) (uuid.UUID, uuid.UUID, error) {
d, err := s.repo.GetDiscovered(ctx, queueID)
if err != nil {
return uuid.Nil, uuid.Nil, fmt.Errorf("load queue entry: %w", err)
}
if d.Status != StatusPending {
return uuid.Nil, uuid.Nil, fmt.Errorf("queue entry is %s, expected pending", d.Status)
}
if d.StartDatum == nil || d.EndDatum == nil {
return uuid.Nil, uuid.Nil, errors.New("cannot accept entry without start/end date")
}
var created market.Market
if d.MatchedSeriesID != nil {
req := market.CreateEditionRequest{
Name: d.MarktName,
City: d.Stadt,
State: d.Bundesland,
Country: landToISO(d.Land),
StartDate: d.StartDatum.Format("2006-01-02"),
EndDate: d.EndDatum.Format("2006-01-02"),
Website: d.Website,
}
created, err = s.marketCreator.CreateEditionForSeries(ctx, *d.MatchedSeriesID, req)
} else {
req := market.CreateMarketRequest{
Name: d.MarktName,
City: d.Stadt,
State: d.Bundesland,
Country: landToISO(d.Land),
StartDate: d.StartDatum.Format("2006-01-02"),
EndDate: d.EndDatum.Format("2006-01-02"),
Website: d.Website,
}
created, err = s.marketCreator.Create(ctx, req)
}
if err != nil {
return uuid.Nil, uuid.Nil, fmt.Errorf("create market: %w", err)
}
// Discovery-side transition.
tx, err := s.repo.BeginTx(ctx)
if err != nil {
return created.SeriesID, created.ID, fmt.Errorf("begin tx: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
if err := s.repo.MarkAccepted(ctx, tx, queueID, created.ID, reviewerID); err != nil {
return created.SeriesID, created.ID, fmt.Errorf("mark accepted: %w", err)
}
if err := tx.Commit(ctx); err != nil {
return created.SeriesID, created.ID, fmt.Errorf("commit accept tx: %w", err)
}
return created.SeriesID, created.ID, nil
}
// Reject marks a pending entry rejected and records a sticky rejection.
func (s *Service) Reject(ctx context.Context, queueID, reviewerID uuid.UUID, reason string) error {
d, err := s.repo.GetDiscovered(ctx, queueID)
if err != nil {
return fmt.Errorf("load queue entry: %w", err)
}
if d.Status != StatusPending {
return fmt.Errorf("queue entry is %s, expected pending", d.Status)
}
year := time.Now().Year()
if d.StartDatum != nil {
year = d.StartDatum.Year()
}
tx, err := s.repo.BeginTx(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
if err := s.repo.MarkRejected(ctx, tx, queueID, reviewerID); err != nil {
return fmt.Errorf("mark rejected: %w", err)
}
rejection := RejectedDiscovery{
NameNormalized: d.NameNormalized,
Stadt: d.Stadt,
Year: year,
RejectedBy: &reviewerID,
Reason: reason,
}
if err := s.repo.InsertRejection(ctx, tx, rejection); err != nil {
return fmt.Errorf("insert rejection: %w", err)
}
return tx.Commit(ctx)
}
// landToISO maps the Pass 0 land string to ISO-2 codes required by market.CreateMarketRequest.
func landToISO(land string) string {
switch land {
case "Deutschland":
return "DE"
case "Österreich", "Oesterreich":
return "AT"
case "Schweiz":
return "CH"
}
return ""
}
// ListPendingQueue exposes queue listing for the handler layer.
func (s *Service) ListPendingQueue(ctx context.Context, limit, offset int) ([]DiscoveredMarket, error) {
return s.repo.ListQueue(ctx, StatusPending, limit, offset)
}
// findSeriesMatch returns the ID of the first candidate whose normalized name matches
// incomingName after normalization. Candidates are expected to be pre-filtered by city.
func findSeriesMatch(incomingName string, candidates []SeriesCandidate) *uuid.UUID {

View File

@@ -6,6 +6,9 @@ import (
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"marktvogt.de/backend/internal/domain/market"
)
func TestFindSeriesMatch(t *testing.T) {
@@ -143,3 +146,74 @@ func TestProcessBucket_InsertsNetNew(t *testing.T) {
t.Errorf("name_normalized = %q, want 'neuer'", inserted[0].NameNormalized)
}
}
type stubCreator struct {
createCalls int
createEditionForSeriesCalls int
returnErr error
}
func (c *stubCreator) Create(_ context.Context, _ market.CreateMarketRequest) (market.Market, error) {
c.createCalls++
if c.returnErr != nil {
return market.Market{}, c.returnErr
}
return market.Market{ID: uuid.New(), SeriesID: uuid.New()}, nil
}
func (c *stubCreator) CreateEditionForSeries(_ context.Context, sid uuid.UUID, _ market.CreateEditionRequest) (market.Market, error) {
c.createEditionForSeriesCalls++
if c.returnErr != nil {
return market.Market{}, c.returnErr
}
return market.Market{ID: uuid.New(), SeriesID: sid}, nil
}
type noopTx struct{ pgx.Tx }
func (noopTx) Commit(_ context.Context) error { return nil }
func (noopTx) Rollback(_ context.Context) error { return nil }
func TestAccept_NewSeries_CallsCreate(t *testing.T) {
start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC)
end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC)
qID := uuid.New()
m := &mockRepo{
getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{ID: qID, Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end}, nil
},
beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil },
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, mc, 4, 12)
_, _, err := svc.Accept(context.Background(), qID, uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
}
if mc.createCalls != 1 || mc.createEditionForSeriesCalls != 0 {
t.Errorf("expected Create=1 CreateEdition=0, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls)
}
}
func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) {
start := time.Date(2026, 9, 12, 0, 0, 0, 0, time.UTC)
end := time.Date(2026, 9, 14, 0, 0, 0, 0, time.UTC)
sid := uuid.New()
m := &mockRepo{
getDiscoveredFn: func(_ context.Context, _ uuid.UUID) (DiscoveredMarket, error) {
return DiscoveredMarket{Status: StatusPending, MarktName: "X", Stadt: "Y", Land: "Deutschland", StartDatum: &start, EndDatum: &end, MatchedSeriesID: &sid}, nil
},
beginTxFn: func(_ context.Context) (pgx.Tx, error) { return noopTx{}, nil },
markAcceptedFn: func(_ context.Context, _ pgx.Tx, _, _, _ uuid.UUID) error { return nil },
}
mc := &stubCreator{}
svc := NewService(m, nil, mc, 4, 12)
_, _, err := svc.Accept(context.Background(), uuid.New(), uuid.New())
if err != nil {
t.Fatalf("accept err: %v", err)
}
if mc.createCalls != 0 || mc.createEditionForSeriesCalls != 1 {
t.Errorf("expected Create=0 CreateEdition=1, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls)
}
}