feat(discovery): add repository with bucket/queue/rejection queries

This commit is contained in:
2026-04-18 07:25:54 +02:00
parent f7e4bab2c3
commit e245f3ec22

View File

@@ -0,0 +1,216 @@
// backend/internal/domain/discovery/repository.go
package discovery
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Repository interface {
PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error)
UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error
ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error)
EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error)
InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error)
IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error)
QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error)
ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error)
GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error)
MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error
MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error
InsertRejection(ctx context.Context, tx pgx.Tx, r RejectedDiscovery) error
BeginTx(ctx context.Context) (pgx.Tx, error)
}
// SeriesCandidate is a minimal projection used for name-normalization comparison in Go.
type SeriesCandidate struct {
ID uuid.UUID
Name string
City string
}
type pgRepository struct {
pool *pgxpool.Pool
}
func NewRepository(pool *pgxpool.Pool) Repository {
return &pgRepository{pool: pool}
}
func (r *pgRepository) PickStaleBuckets(ctx context.Context, forwardMonths, limit int) ([]Bucket, error) {
q := `
SELECT id, land, region, year_month, last_queried_at, coalesce(last_error, ''), created_at
FROM discovery_buckets
WHERE year_month >= to_char(date_trunc('month', now()), 'YYYY-MM')
AND year_month <= to_char(date_trunc('month', now()) + ($1 || ' month')::interval, 'YYYY-MM')
AND (last_queried_at IS NULL OR last_queried_at < now() - interval '7 days')
ORDER BY last_queried_at NULLS FIRST, year_month
LIMIT $2`
rows, err := r.pool.Query(ctx, q, forwardMonths, limit)
if err != nil {
return nil, fmt.Errorf("pick buckets: %w", err)
}
defer rows.Close()
var out []Bucket
for rows.Next() {
var b Bucket
if err := rows.Scan(&b.ID, &b.Land, &b.Region, &b.YearMonth, &b.LastQueriedAt, &b.LastError, &b.CreatedAt); err != nil {
return nil, err
}
out = append(out, b)
}
return out, rows.Err()
}
func (r *pgRepository) UpdateBucketQueried(ctx context.Context, id uuid.UUID, errMsg string) error {
var errValue any
if errMsg == "" {
errValue = nil
} else {
errValue = errMsg
}
_, err := r.pool.Exec(ctx,
`UPDATE discovery_buckets SET last_queried_at = now(), last_error = $2 WHERE id = $1`,
id, errValue)
return err
}
func (r *pgRepository) ListSeriesByCity(ctx context.Context, cityNormalized string) ([]SeriesCandidate, error) {
rows, err := r.pool.Query(ctx,
`SELECT id, name, city FROM market_series WHERE LOWER(city) = $1`,
cityNormalized)
if err != nil {
return nil, err
}
defer rows.Close()
var out []SeriesCandidate
for rows.Next() {
var c SeriesCandidate
if err := rows.Scan(&c.ID, &c.Name, &c.City); err != nil {
return nil, err
}
out = append(out, c)
}
return out, rows.Err()
}
func (r *pgRepository) EditionExists(ctx context.Context, seriesID uuid.UUID, year int) (bool, error) {
var exists bool
err := r.pool.QueryRow(ctx,
`SELECT EXISTS(SELECT 1 FROM market_editions WHERE series_id = $1 AND year = $2)`,
seriesID, year).Scan(&exists)
return exists, err
}
func (r *pgRepository) InsertDiscovered(ctx context.Context, d DiscoveredMarket) (uuid.UUID, error) {
var id uuid.UUID
err := r.pool.QueryRow(ctx, `
INSERT INTO discovered_markets
(bucket_id, markt_name, stadt, bundesland, land, start_datum, end_datum, website,
quellen, extraktion, hinweis, name_normalized, matched_series_id)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
RETURNING id`,
d.BucketID, d.MarktName, d.Stadt, d.Bundesland, d.Land, d.StartDatum, d.EndDatum, d.Website,
d.Quellen, d.Extraktion, d.Hinweis, d.NameNormalized, d.MatchedSeriesID).Scan(&id)
return id, err
}
func (r *pgRepository) IsRejected(ctx context.Context, nameNormalized, stadt string, year int) (bool, error) {
var exists bool
err := r.pool.QueryRow(ctx,
`SELECT EXISTS(SELECT 1 FROM rejected_discoveries WHERE name_normalized=$1 AND stadt=$2 AND year=$3)`,
nameNormalized, stadt, year).Scan(&exists)
return exists, err
}
func (r *pgRepository) QueueHasPending(ctx context.Context, nameNormalized, stadt string, startDatum *time.Time) (bool, error) {
var exists bool
err := r.pool.QueryRow(ctx, `
SELECT EXISTS(
SELECT 1 FROM discovered_markets
WHERE status='pending'
AND name_normalized=$1 AND stadt=$2
AND start_datum IS NOT DISTINCT FROM $3
)`, nameNormalized, stadt, startDatum).Scan(&exists)
return exists, err
}
func (r *pgRepository) ListQueue(ctx context.Context, status string, limit, offset int) ([]DiscoveredMarket, error) {
rows, err := r.pool.Query(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''),
coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id
FROM discovered_markets
WHERE status = $1
ORDER BY discovered_at DESC
LIMIT $2 OFFSET $3`, status, limit, offset)
if err != nil {
return nil, err
}
defer rows.Close()
var out []DiscoveredMarket
for rows.Next() {
var d DiscoveredMarket
if err := rows.Scan(
&d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land,
&d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion,
&d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status,
&d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID,
); err != nil {
return nil, err
}
out = append(out, d)
}
return out, rows.Err()
}
func (r *pgRepository) GetDiscovered(ctx context.Context, id uuid.UUID) (DiscoveredMarket, error) {
var d DiscoveredMarket
err := r.pool.QueryRow(ctx, `
SELECT id, bucket_id, markt_name, stadt, coalesce(bundesland,''), land,
start_datum, end_datum, coalesce(website,''), quellen, coalesce(extraktion,''),
coalesce(hinweis,''), name_normalized, matched_series_id, status,
discovered_at, reviewed_at, reviewed_by, created_edition_id
FROM discovered_markets WHERE id = $1`, id).Scan(
&d.ID, &d.BucketID, &d.MarktName, &d.Stadt, &d.Bundesland, &d.Land,
&d.StartDatum, &d.EndDatum, &d.Website, &d.Quellen, &d.Extraktion,
&d.Hinweis, &d.NameNormalized, &d.MatchedSeriesID, &d.Status,
&d.DiscoveredAt, &d.ReviewedAt, &d.ReviewedBy, &d.CreatedEditionID,
)
return d, err
}
func (r *pgRepository) MarkAccepted(ctx context.Context, tx pgx.Tx, id, editionID, reviewer uuid.UUID) error {
_, err := tx.Exec(ctx, `
UPDATE discovered_markets
SET status='accepted', reviewed_at=now(), reviewed_by=$2, created_edition_id=$3
WHERE id = $1 AND status='pending'`, id, reviewer, editionID)
return err
}
func (r *pgRepository) MarkRejected(ctx context.Context, tx pgx.Tx, id uuid.UUID, reviewer uuid.UUID) error {
_, err := tx.Exec(ctx, `
UPDATE discovered_markets
SET status='rejected', reviewed_at=now(), reviewed_by=$2
WHERE id = $1 AND status='pending'`, id, reviewer)
return err
}
func (r *pgRepository) InsertRejection(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error {
_, err := tx.Exec(ctx, `
INSERT INTO rejected_discoveries (name_normalized, stadt, year, rejected_by, reason)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (name_normalized, stadt, year) DO NOTHING`,
rej.NameNormalized, rej.Stadt, rej.Year, rej.RejectedBy, rej.Reason)
return err
}
func (r *pgRepository) BeginTx(ctx context.Context) (pgx.Tx, error) {
return r.pool.BeginTx(ctx, pgx.TxOptions{})
}