From e245f3ec22749c4c441decade6fc51758301e115 Mon Sep 17 00:00:00 2001 From: vikingowl Date: Sat, 18 Apr 2026 07:25:54 +0200 Subject: [PATCH] feat(discovery): add repository with bucket/queue/rejection queries --- .../internal/domain/discovery/repository.go | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 backend/internal/domain/discovery/repository.go diff --git a/backend/internal/domain/discovery/repository.go b/backend/internal/domain/discovery/repository.go new file mode 100644 index 0000000..bad1ee7 --- /dev/null +++ b/backend/internal/domain/discovery/repository.go @@ -0,0 +1,216 @@ +// backend/internal/domain/discovery/repository.go +package discovery + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +type Repository interface { + PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) + UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error + ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) + EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) + InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) + IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) + QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) + ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error) + GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) + MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error + MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error + InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error + BeginTx(ctx context.Context) (pgx.Tx, error) +} + +// SeriesCandidate is a minimal projection used for name-normalization comparison in Go. +type SeriesCandidate struct { + ID uuid.UUID + Name string + City string +} + +type pgRepository struct { + pool *pgxpool.Pool +} + +func NewRepository(pool *pgxpool.Pool) Repository { + return &pgRepository{pool: pool} +} + +func (r *pgRepository) PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) { + q := ` +SELECT id, land, region, year_month, last_queried_at, coalesce(last_error, ''), created_at +FROM discovery_buckets +WHERE year_month >= to_char(date_trunc('month', now()), 'YYYY-MM') + AND year_month <= to_char(date_trunc('month', now()) + ($1 || ' month')::interval, 'YYYY-MM') + AND (last_queried_at IS NULL OR last_queried_at < now() - interval '7 days') +ORDER BY last_queried_at NULLS FIRST, year_month +LIMIT $2` + rows, err := r.pool.Query(ctx, q, forwardMonths, limit) + if err != nil { + return nil, fmt.Errorf("pick buckets: %w", err) + } + defer rows.Close() + var out []Bucket + for rows.Next() { + var b Bucket + if err := rows.Scan(&b.ID, &b.Land, &b.Region, &b.YearMonth, &b.LastQueriedAt, &b.LastError, &b.CreatedAt); err != nil { + return nil, err + } + out = append(out, b) + } + return out, rows.Err() +} + +func (r *pgRepository) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error { + var errValue any + if errMsg == "" { + errValue = nil + } else { + errValue = errMsg + } + _, err := r.pool.Exec(ctx, + `UPDATE discovery_buckets SET last_queried_at = now(), last_error = $2 WHERE id = $1`, + id, errValue) + return err +} + +func (r *pgRepository) ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) { + rows, err := r.pool.Query(ctx, + `SELECT id, name, city FROM market_series WHERE LOWER(city) = $1`, + cityNormalized) + if err != nil { + return nil, err + } + defer rows.Close() + var out []SeriesCandidate + for rows.Next() { + var c SeriesCandidate + if err := rows.Scan(&c.ID, &c.Name, &c.City); err != nil { + return nil, err + } + out = append(out, c) + } + return out, rows.Err() +} + +func (r *pgRepository) EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) { + var exists bool + err := r.pool.QueryRow(ctx, + `SELECT EXISTS(SELECT 1 FROM market_editions WHERE series_id = $1 AND year = $2)`, + seriesID, year).Scan(&exists) + return exists, err +} + +func (r *pgRepository) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) { + var id uuid.UUID + err := r.pool.QueryRow(ctx, ` +INSERT INTO discovered_markets + (bucket_id, markt_name, stadt, bundesland, land, start_datum, end_datum, website, + quellen, extraktion, hinweis, name_normalized, matched_series_id) +VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) +RETURNING id`, + d.BucketID, d.MarktName, d.Stadt, d.Bundesland, d.Land, d.StartDatum, d.EndDatum, d.Website, + d.Quellen, d.Extraktion, d.Hinweis, d.NameNormalized, d.MatchedSeriesID).Scan(&id) + return id, err +} + +func (r *pgRepository) IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) { + var exists bool + err := r.pool.QueryRow(ctx, + `SELECT EXISTS(SELECT 1 FROM rejected_discoveries WHERE name_normalized=$1 AND stadt=$2 AND year=$3)`, + nameNormalized, stadt, year).Scan(&exists) + return exists, err +} + +func (r *pgRepository) QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) { + var exists bool + err := r.pool.QueryRow(ctx, ` +SELECT EXISTS( + SELECT 1 FROM discovered_markets + WHERE status='pending' + AND name_normalized=$1 AND stadt=$2 + AND start_datum IS NOT DISTINCT FROM $3 +)`, nameNormalized, stadt, startDatum).Scan(&exists) + return exists, err +} + +func (r *pgRepository) ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error) { + rows, err := r.pool.Query(ctx, ` +SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, + start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''), + coalesce(hinweis,''), name_normalized, matched_series_id, status, + discovered_at, reviewed_at, reviewed_by, created_edition_id +FROM discovered_markets +WHERE status = $1 +ORDER BY discovered_at DESC +LIMIT $2 OFFSET $3`, status, limit, offset) + if err != nil { + return nil, err + } + defer rows.Close() + var out []DiscoveredMarket + for rows.Next() { + var d DiscoveredMarket + if err := rows.Scan( + &d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land, + &d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion, + &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status, + &d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID, + ); err != nil { + return nil, err + } + out = append(out, d) + } + return out, rows.Err() +} + +func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) { + var d DiscoveredMarket + err := r.pool.QueryRow(ctx, ` +SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land, + start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''), + coalesce(hinweis,''), name_normalized, matched_series_id, status, + discovered_at, reviewed_at, reviewed_by, created_edition_id +FROM discovered_markets WHERE id = $1`, id).Scan( + &d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land, + &d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion, + &d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status, + &d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID, + ) + return d, err +} + +func (r *pgRepository) MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error { + _, err := tx.Exec(ctx, ` +UPDATE discovered_markets +SET status='accepted', reviewed_at=now(), reviewed_by=$2, created_edition_id=$3 +WHERE id = $1 AND status='pending'`, id, reviewer, editionID) + return err +} + +func (r *pgRepository) MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error { + _, err := tx.Exec(ctx, ` +UPDATE discovered_markets +SET status='rejected', reviewed_at=now(), reviewed_by=$2 +WHERE id = $1 AND status='pending'`, id, reviewer) + return err +} + +func (r *pgRepository) InsertRejection(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error { + _, err := tx.Exec(ctx, ` +INSERT INTO rejected_discoveries (name_normalized, stadt, year, rejected_by, reason) +VALUES ($1, $2, $3, $4, $5) +ON CONFLICT (name_normalized, stadt, year) DO NOTHING`, + rej.NameNormalized, rej.Stadt, rej.Year, rej.RejectedBy, rej.Reason) + return err +} + +func (r *pgRepository) BeginTx(ctx context.Context) (pgx.Tx, error) { + return r.pool.BeginTx(ctx, pgx.TxOptions{}) +}