feat(dedup): E5+E2+E2b — merge advisor LLM + merge-plan/merge-into endpoints

MergeAdvisor calls Gemini with a German system prompt to propose how to merge
two duplicate market editions. It guards against confident non-duplicates via
ErrNotDuplicate (same=false AND confidence>0.5).

POST /:id/merge-plan generates a MarketMergeProposal (read-only).
POST /:id/merge-into/:target_id applies the merge: updates target fields,
marks source as status=merged with merged_into_id set, reparents discovered_markets,
and writes a market_merge_log audit row — all in one transaction.

AdminHandler gains advisor and updated constructor. VersionMergeAdvisor added
to pkg/ai versions.
This commit is contained in:
2026-04-25 19:05:52 +02:00
parent 5a643098d1
commit 77e150f122
10 changed files with 677 additions and 6 deletions

View File

@@ -1,8 +1,11 @@
package market
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
"strings"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
@@ -17,10 +20,11 @@ const duplicateClassifyTopN = 5
type AdminHandler struct {
service *Service
classifier enrich.SimilarityClassifier
advisor *MergeAdvisor
}
func NewAdminHandler(service *Service, classifier enrich.SimilarityClassifier) *AdminHandler {
return &AdminHandler{service: service, classifier: classifier}
func NewAdminHandler(service *Service, classifier enrich.SimilarityClassifier, advisor *MergeAdvisor) *AdminHandler {
return &AdminHandler{service: service, classifier: classifier, advisor: advisor}
}
func (h *AdminHandler) List(c *gin.Context) { //nolint:dupl
@@ -267,6 +271,198 @@ func (h *AdminHandler) FindDuplicates(c *gin.Context) {
c.JSON(http.StatusOK, DuplicatesResponse{Data: dupes})
}
// MergePlanRequest is the body for the merge-plan endpoint.
type MergePlanRequest struct {
TargetID uuid.UUID `json:"target_id" binding:"required"`
}
// MergePlan generates a MarketMergeProposal for two editions without persisting anything.
func (h *AdminHandler) MergePlan(c *gin.Context) {
ctx := c.Request.Context()
sourceID, err := uuid.Parse(c.Param("id"))
if err != nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_id", "invalid market ID")))
return
}
var req MergePlanRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_body", err.Error())))
return
}
if sourceID == req.TargetID {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("same_id", "source and target must differ")))
return
}
sourceM, err := h.service.GetByID(ctx, sourceID)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("market")))
} else {
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load source market")))
}
return
}
targetM, err := h.service.GetByID(ctx, req.TargetID)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("target market")))
} else {
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load target market")))
}
return
}
verdict := classifyPair(c, h.classifier, sourceM, targetM)
if h.advisor == nil {
c.JSON(http.StatusServiceUnavailable, apierror.NewResponse(apierror.Internal("merge advisor not configured")))
return
}
proposal, err := h.advisor.Propose(ctx, sourceM, targetM, verdict)
if err != nil {
if errors.Is(err, ErrNotDuplicate) {
c.JSON(http.StatusConflict, apierror.NewResponse(apierror.BadRequest("not_duplicate", "Markt wird nicht als Duplikat klassifiziert")))
return
}
slog.ErrorContext(ctx, "merge-plan: advisor failed", "source_id", sourceID, "target_id", req.TargetID, "err", err)
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Merge-Plan konnte nicht erstellt werden")))
return
}
c.JSON(http.StatusOK, gin.H{"data": proposal})
}
// MergeIntoRequest is the body for the merge-into endpoint.
type MergeIntoRequest struct {
Proposal *MarketMergeProposal `json:"proposal"` // optional admin-edited proposal
}
// MergeInto applies a merge of `:id` (source) into `:target_id` (survivor).
// This is destructive: the source edition is marked merged.
func (h *AdminHandler) MergeInto(c *gin.Context) {
ctx := c.Request.Context()
sourceID, err := uuid.Parse(c.Param("id"))
if err != nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_id", "invalid source market ID")))
return
}
targetID, err := uuid.Parse(c.Param("target_id"))
if err != nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_target_id", "invalid target market ID")))
return
}
if sourceID == targetID {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("same_id", "source and target must differ")))
return
}
var req MergeIntoRequest
_ = c.ShouldBindJSON(&req)
sourceM, err := h.service.GetByID(ctx, sourceID)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("market")))
} else {
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load source market")))
}
return
}
targetM, err := h.service.GetByID(ctx, targetID)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("target market")))
} else {
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load target market")))
}
return
}
var proposal MarketMergeProposal
if req.Proposal != nil {
proposal = *req.Proposal
} else {
if h.advisor == nil {
c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("no_proposal", "proposal required when advisor not configured")))
return
}
verdict := classifyPair(c, h.classifier, sourceM, targetM)
proposal, err = h.advisor.Propose(ctx, sourceM, targetM, verdict)
if err != nil {
if errors.Is(err, ErrNotDuplicate) {
c.JSON(http.StatusConflict, apierror.NewResponse(apierror.BadRequest("not_duplicate", "Markt wird nicht als Duplikat klassifiziert")))
return
}
slog.ErrorContext(ctx, "merge-into: advisor failed", "source_id", sourceID, "target_id", targetID, "err", err)
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Merge-Plan konnte nicht erstellt werden")))
return
}
}
// Apply field merges to target.
updateReq := UpdateMarketRequest{}
for fieldName, decision := range proposal.FieldMerges {
fm := FieldMerge{Field: fieldName, Suggested: decision.Value}
if applyErr := applyFieldMerge(fieldName, fm, &updateReq); applyErr != nil {
slog.WarnContext(ctx, "merge-into: skip field", "field", fieldName, "err", applyErr)
}
}
updated, err := h.service.Update(ctx, targetID, updateReq)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("target market")))
} else {
slog.ErrorContext(ctx, "merge-into: update target failed", "target_id", targetID, "err", err)
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Fehler beim Aktualisieren des Zielmarkts")))
}
return
}
userID, _ := c.Get("user_id")
mergedBy, _ := userID.(uuid.UUID)
proposalJSON, _ := json.Marshal(proposal)
if mergeErr := h.service.MarkMerged(ctx, sourceID, targetID, mergedBy, proposalJSON); mergeErr != nil {
slog.ErrorContext(ctx, "merge-into: mark merged failed", "source_id", sourceID, "err", mergeErr)
c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Fehler beim Markieren als zusammengeführt")))
return
}
c.JSON(http.StatusOK, gin.H{"data": ToAdminDetail(updated)})
}
// classifyPair runs the similarity classifier on two markets. Returns a low-confidence
// verdict if the classifier is nil or errors, so the caller can still proceed.
func classifyPair(ctx *gin.Context, classifier enrich.SimilarityClassifier, a, b Market) enrich.Verdict {
if classifier == nil {
return enrich.Verdict{Same: true, Confidence: 0}
}
aRow := enrich.SimilarityRow{
NameNormalized: strings.ToLower(strings.TrimSpace(a.Name)),
Stadt: a.City,
Year: a.StartDate.Year(),
Name: a.Name,
}
bRow := enrich.SimilarityRow{
NameNormalized: strings.ToLower(strings.TrimSpace(b.Name)),
Stadt: b.City,
Year: b.StartDate.Year(),
Name: b.Name,
}
verdict, err := classifier.Classify(ctx.Request.Context(), aRow, bRow)
if err != nil {
slog.WarnContext(ctx.Request.Context(), "classify pair failed", "err", err)
return enrich.Verdict{Same: true, Confidence: 0}
}
return verdict
}
func (h *AdminHandler) CreateEdition(c *gin.Context) { //nolint:dupl
seriesID, err := uuid.Parse(c.Param("series_id"))
if err != nil {

View File

@@ -0,0 +1,47 @@
Du bist ein Datenkurations-Agent fuer Marktvogt. Zwei Markteintraege wurden
als wahrscheinlich gleicher Markt klassifiziert. Schlage einen Merge vor:
welcher Eintrag bleibt erhalten und welche Felder uebernehmen wir aus
welcher Quelle.
## Harte Regeln
- Niemals Werte erfinden. Wenn keine Quelle einen Wert liefert: leerer String.
- `target_id` MUSS einer der beiden Eingabe-IDs sein.
- Wenn `kontext.ist_gleicher_markt` nicht `true` ist: Antwort mit `"confidence": 0`
und `"flags": ["uncertain_match"]`; alle `field_merges` leer.
- Antwort MUSS dem Schema entsprechen, nichts ausserhalb des JSON.
## Zielwahl (target_id)
Bevorzuge in dieser Reihenfolge:
1. Vollstaendigerer Datensatz (mehr nicht-leere Felder).
2. Aktuelleres `zuletzt_geaendert`.
3. Quelle `admin` > `user_submission` > `crawler`.
## Feldwahl je Feld
Fuer jedes Feld waehle `"source": "a"` oder `"b"`. Erlaube `"combined"` nur
wenn beide Quellen komplementaere Daten liefern, die sich nicht widersprechen
(z.B. A hat Oeffnungszeiten, B hat Eintrittspreise).
Spezifisch:
- `name`: kuerzere, etabliertere Form bevorzugen. Niemals kombinieren.
- `veranstalter`: Rechtstraeger > generischer Name. In `flags` vermerken wenn unsicher.
- `start_datum` / `end_datum`: muessen identisch sein — wenn nicht, in `flags`
und `confidence` senken; nicht raten.
- `beschreibung`: laengere, faktische Variante bevorzugen; `combined` nur
wenn die Inhalte ergaenzend sind.
- `bild_url` / `logo_url`: offizieller wirkende Quelle bevorzugen.
- `oeffnungszeiten` / `eintrittspreise`: bei Konflikt die Quelle mit aktuelleren Daten.
## flags
Liste konkrete Punkte fuer menschliche Pruefung:
- Unterschiedliche Organisationsnamen (auch wenn vermutlich derselbe Traeger).
- Datumsabweichungen.
- Adressabweichungen jenseits Tippfehler.
- Alle `combined`-Felder.
## Ausgabe
Antworte ausschliesslich mit validem JSON gemaess dem Schema.

View File

@@ -0,0 +1,52 @@
{
"type": "object",
"required": ["target_id", "target_reason", "field_merges", "flags", "confidence", "summary"],
"properties": {
"target_id": {
"type": "string",
"description": "UUID of the surviving market edition (must be one of the two input IDs)"
},
"target_reason": {
"type": "string",
"description": "One sentence explaining why this side was chosen as the target"
},
"field_merges": {
"type": "object",
"description": "Per-field merge decisions. Keys are field names, values are source/value/reason triples.",
"additionalProperties": {
"type": "object",
"required": ["source", "value", "reason"],
"properties": {
"source": {
"type": "string",
"enum": ["a", "b", "combined"],
"description": "Which candidate provides the value: a, b, or combined (synthesised from both)"
},
"value": {
"description": "The chosen field value (string, number, or object depending on field)"
},
"reason": {
"type": "string",
"description": "One sentence justifying the choice"
}
}
}
},
"flags": {
"type": "array",
"items": { "type": "string" },
"description": "Human-review concerns: conflicting organizer names, date mismatches, etc."
},
"confidence": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "Advisor's confidence in the proposed merge (0=refuse, 1=certain)"
},
"summary": {
"type": "string",
"description": "2-3 sentence German summary of the proposed merge"
}
},
"additionalProperties": false
}

View File

@@ -0,0 +1,188 @@
package market
import (
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/discovery/enrich"
"marktvogt.de/backend/internal/pkg/ai"
)
//go:embed assets/merge_advisor_prompt.de.md
var mergeAdvisorPromptDE string
// ErrNotDuplicate is returned by MergeAdvisor.Propose when the similarity
// verdict confidently says the two markets are not the same.
var ErrNotDuplicate = errors.New("markets are not duplicates")
// MarketMergeProposal is the AI advisor's proposed merge for two market editions.
type MarketMergeProposal struct {
TargetID uuid.UUID `json:"target_id"`
TargetReason string `json:"target_reason"`
FieldMerges map[string]mergeFieldDecision `json:"field_merges"`
Flags []string `json:"flags"`
Confidence float64 `json:"confidence"`
Summary string `json:"summary"`
GeneratedAt time.Time `json:"generated_at"`
}
// mergeFieldDecision is one field's merge outcome inside MarketMergeProposal.
type mergeFieldDecision struct {
Source string `json:"source"` // "a" | "b" | "combined"
Value any `json:"value"`
Reason string `json:"reason"`
}
// mergeAdvisorResponse is the raw JSON shape the LLM returns.
type mergeAdvisorResponse struct {
TargetID string `json:"target_id"`
TargetReason string `json:"target_reason"`
FieldMerges map[string]mergeFieldDecision `json:"field_merges"`
Flags []string `json:"flags"`
Confidence float64 `json:"confidence"`
Summary string `json:"summary"`
}
// MergeAdvisor calls the LLM to propose how to merge two market editions.
type MergeAdvisor struct {
ai ai.Provider
}
func NewMergeAdvisor(provider ai.Provider) *MergeAdvisor {
return &MergeAdvisor{ai: provider}
}
// Propose calls the LLM to generate a merge proposal for two market editions.
// Returns ErrNotDuplicate without an LLM call when the similarity verdict says
// Same=false AND confidence > 0.5 (confidently different markets).
func (a *MergeAdvisor) Propose(ctx context.Context, mA, mB Market, verdict enrich.Verdict) (MarketMergeProposal, error) {
if !verdict.Same && verdict.Confidence > 0.5 {
return MarketMergeProposal{}, ErrNotDuplicate
}
userMsg, err := buildAdvisorUserMessage(mA, mB, verdict)
if err != nil {
return MarketMergeProposal{}, fmt.Errorf("build user message: %w", err)
}
resp, err := a.ai.Chat(ctx, &ai.ChatRequest{
SystemPrompt: mergeAdvisorPromptDE,
UserMessage: userMsg,
JSONMode: true,
Grounded: false,
Temperature: 0.1,
CallType: "merge_advisor",
PromptVersion: ai.VersionMergeAdvisor,
})
if err != nil {
return MarketMergeProposal{}, fmt.Errorf("chat: %w", err)
}
var raw mergeAdvisorResponse
if err := json.Unmarshal([]byte(resp.Content), &raw); err != nil {
return MarketMergeProposal{}, fmt.Errorf("parse response: %w (content=%q)", err, resp.Content)
}
targetID, err := uuid.Parse(raw.TargetID)
if err != nil {
return MarketMergeProposal{}, fmt.Errorf("invalid target_id %q: %w", raw.TargetID, err)
}
conf := raw.Confidence
if conf < 0 {
conf = 0
}
if conf > 1 {
conf = 1
}
flags := raw.Flags
if flags == nil {
flags = []string{}
}
return MarketMergeProposal{
TargetID: targetID,
TargetReason: raw.TargetReason,
FieldMerges: raw.FieldMerges,
Flags: flags,
Confidence: conf,
Summary: raw.Summary,
GeneratedAt: time.Now(),
}, nil
}
type advisorMarketInput struct {
ID string `json:"id"`
Name string `json:"name"`
Stadt string `json:"stadt"`
Land string `json:"land"`
Strasse string `json:"strasse"`
PLZ string `json:"plz"`
Veranstalter string `json:"veranstalter"`
StartDatum string `json:"start_datum"`
EndDatum string `json:"end_datum"`
Website string `json:"website"`
Beschreibung string `json:"beschreibung"`
BildURL string `json:"bild_url"`
LogoURL string `json:"logo_url"`
ZuletztGeaend string `json:"zuletzt_geaendert"`
}
type advisorKontext struct {
IstGleicherMarkt *bool `json:"ist_gleicher_markt"`
Confidence float64 `json:"confidence"`
Begruendung string `json:"begruendung"`
}
type advisorUserPayload struct {
KandidatA advisorMarketInput `json:"kandidat_a"`
KandidatB advisorMarketInput `json:"kandidat_b"`
Kontext advisorKontext `json:"kontext"`
}
func buildAdvisorUserMessage(mA, mB Market, verdict enrich.Verdict) (string, error) {
payload := advisorUserPayload{
KandidatA: marketToAdvisorInput(mA),
KandidatB: marketToAdvisorInput(mB),
Kontext: advisorKontext{
Confidence: verdict.Confidence,
Begruendung: verdict.Reason,
},
}
if verdict.Same {
t := true
payload.Kontext.IstGleicherMarkt = &t
}
b, err := json.Marshal(payload)
if err != nil {
return "", err
}
return string(b), nil
}
func marketToAdvisorInput(m Market) advisorMarketInput {
return advisorMarketInput{
ID: m.ID.String(),
Name: m.Name,
Stadt: m.City,
Land: m.Country,
Strasse: m.Street,
PLZ: m.Zip,
Veranstalter: m.OrganizerName,
StartDatum: m.StartDate.Format("2006-01-02"),
EndDatum: m.EndDate.Format("2006-01-02"),
Website: m.Website,
Beschreibung: m.Description,
BildURL: m.ImageURL,
LogoURL: m.LogoURL,
ZuletztGeaend: m.UpdatedAt.Format(time.RFC3339),
}
}

View File

@@ -0,0 +1,133 @@
package market
import (
"context"
"encoding/json"
"errors"
"testing"
"marktvogt.de/backend/internal/domain/discovery/enrich"
"marktvogt.de/backend/internal/pkg/ai"
)
// stubAIProvider is a minimal ai.Provider for testing the merge advisor.
type stubAIProvider struct {
response string
err error
calls int
}
func (p *stubAIProvider) Name() string { return "stub" }
func (p *stubAIProvider) SupportsJSONMode() bool { return true }
func (p *stubAIProvider) SupportsJSONSchema() bool { return false }
func (p *stubAIProvider) Chat(_ context.Context, _ *ai.ChatRequest) (*ai.ChatResponse, error) {
p.calls++
if p.err != nil {
return nil, p.err
}
return &ai.ChatResponse{Content: p.response}, nil
}
func validAdvisorResponse(targetID string) string {
resp := mergeAdvisorResponse{
TargetID: targetID,
TargetReason: "Market A has more complete data",
FieldMerges: map[string]mergeFieldDecision{
"name": {Source: "a", Value: "Testmarkt Berlin", Reason: "established name"},
},
Flags: []string{},
Confidence: 0.9,
Summary: "Beide Eintraege bezeichnen denselben Markt. Daten von A werden uebernommen.",
}
b, _ := json.Marshal(resp)
return string(b)
}
func TestMergeAdvisor_RejectsWhenNotDuplicate(t *testing.T) {
p := &stubAIProvider{}
a := Market{Name: "Markt A"}
b := Market{Name: "Markt B"}
v := enrich.Verdict{Same: false, Confidence: 0.85}
advisor := NewMergeAdvisor(p)
_, err := advisor.Propose(context.Background(), a, b, v)
if !errors.Is(err, ErrNotDuplicate) {
t.Errorf("expected ErrNotDuplicate, got %v", err)
}
if p.calls != 0 {
t.Errorf("expected no LLM call when not duplicate, got %d calls", p.calls)
}
}
func TestMergeAdvisor_AllowsUncertainVerdict(t *testing.T) {
// same=false but confidence <= 0.5 should still call LLM (uncertain case)
p := &stubAIProvider{}
a := Market{Name: "Markt A"}
b := Market{Name: "Markt B"}
v := enrich.Verdict{Same: false, Confidence: 0.4}
advisor := NewMergeAdvisor(p)
p.response = validAdvisorResponse(a.ID.String())
// Should not error with ErrNotDuplicate for low-confidence non-match
_, err := advisor.Propose(context.Background(), a, b, v)
// May error on empty target ID / parse, but not ErrNotDuplicate
if errors.Is(err, ErrNotDuplicate) {
t.Error("uncertain verdict (confidence<=0.5) should not return ErrNotDuplicate")
}
}
func TestMergeAdvisor_ParsesValidResponse(t *testing.T) {
a := Market{Name: "Markt A", City: "Berlin"}
b := Market{Name: "Markt B", City: "Berlin"}
p := &stubAIProvider{response: validAdvisorResponse(a.ID.String())}
v := enrich.Verdict{Same: true, Confidence: 0.92}
advisor := NewMergeAdvisor(p)
proposal, err := advisor.Propose(context.Background(), a, b, v)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if p.calls != 1 {
t.Errorf("expected 1 LLM call, got %d", p.calls)
}
if proposal.TargetReason == "" {
t.Error("expected non-empty TargetReason")
}
if proposal.Confidence != 0.9 {
t.Errorf("expected confidence 0.9, got %f", proposal.Confidence)
}
if len(proposal.FieldMerges) == 0 {
t.Error("expected at least one field merge")
}
}
func TestMergeAdvisor_ProviderErrorPropagates(t *testing.T) {
p := &stubAIProvider{err: errors.New("provider down")}
a := Market{Name: "Markt A"}
b := Market{Name: "Markt B"}
v := enrich.Verdict{Same: true, Confidence: 0.9}
advisor := NewMergeAdvisor(p)
_, err := advisor.Propose(context.Background(), a, b, v)
if err == nil {
t.Error("expected error from provider, got nil")
}
}
func TestMergeAdvisor_InvalidJSONErrors(t *testing.T) {
p := &stubAIProvider{response: "not json at all"}
a := Market{Name: "Markt A"}
b := Market{Name: "Markt B"}
v := enrich.Verdict{Same: true, Confidence: 0.9}
advisor := NewMergeAdvisor(p)
_, err := advisor.Propose(context.Background(), a, b, v)
if err == nil {
t.Error("expected parse error for invalid JSON")
}
}

View File

@@ -43,6 +43,11 @@ type Repository interface {
// Merge audit log
InsertMergeLog(ctx context.Context, entry MergeLogEntry) error
// MarkMerged marks sourceID as merged into targetID in a single transaction:
// sets status='merged', merged_into_id, merged_at on the source edition;
// reparents discovered_markets.created_edition_id; writes a market_merge_log row.
MarkMerged(ctx context.Context, sourceID, targetID, mergedBy uuid.UUID, proposalJSON []byte) error
}
type pgRepository struct {
@@ -1084,3 +1089,43 @@ func (r *pgRepository) InsertMergeLog(ctx context.Context, entry MergeLogEntry)
)
return err
}
// MarkMerged marks sourceID as merged into targetID in a single transaction.
func (r *pgRepository) MarkMerged(ctx context.Context, sourceID, targetID, mergedBy uuid.UUID, proposalJSON []byte) error {
tx, err := r.db.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
_, err = tx.Exec(ctx, `
UPDATE market_editions
SET status = 'merged', merged_into_id = $1, merged_at = now(), updated_at = now()
WHERE id = $2`,
targetID, sourceID,
)
if err != nil {
return fmt.Errorf("mark source merged: %w", err)
}
_, err = tx.Exec(ctx, `
UPDATE discovered_markets
SET created_edition_id = $1
WHERE created_edition_id = $2`,
targetID, sourceID,
)
if err != nil {
return fmt.Errorf("reparent discovered_markets: %w", err)
}
_, err = tx.Exec(ctx, `
INSERT INTO market_merge_log (market_id, plan, applied_fields, applied_by, research_sources)
VALUES ($1, $2, '{}', $3, '{}')`,
sourceID, proposalJSON, mergedBy,
)
if err != nil {
return fmt.Errorf("insert merge log: %w", err)
}
return tx.Commit(ctx)
}

View File

@@ -30,6 +30,8 @@ func RegisterAdminRoutes(rg *gin.RouterGroup, h *AdminHandler, rh *ResearchHandl
markets.POST("/:id/research/plan", rh.Plan)
markets.POST("/:id/research/apply", rh.Apply)
markets.GET("/:id/duplicates", h.FindDuplicates)
markets.POST("/:id/merge-plan", h.MergePlan)
markets.POST("/:id/merge-into/:target_id", h.MergeInto)
}
series := admin.Group("/series")

View File

@@ -338,6 +338,12 @@ func (s *Service) InsertMergeLog(ctx context.Context, entry MergeLogEntry) error
return s.repo.InsertMergeLog(ctx, entry)
}
// MarkMerged marks sourceID as merged into targetID (status='merged',
// merged_into_id set) and reparents any discovered_markets rows.
func (s *Service) MarkMerged(ctx context.Context, sourceID, targetID, mergedBy uuid.UUID, proposalJSON []byte) error {
return s.repo.MarkMerged(ctx, sourceID, targetID, mergedBy, proposalJSON)
}
func orDefault(val, fallback string) string {
if val != "" {
return val

View File

@@ -3,7 +3,8 @@ package ai
// Prompt version constants. Bump when the system prompt for a call type
// changes materially so ai_usage rows can be correlated with prompt iterations.
const (
VersionResearch = "research/v2"
VersionEnrichB = "enrich_b/v2"
VersionSimilarity = "similarity/v2"
VersionResearch = "research/v2"
VersionEnrichB = "enrich_b/v2"
VersionSimilarity = "similarity/v2"
VersionMergeAdvisor = "merge_advisor/v1"
)

View File

@@ -98,7 +98,8 @@ func (s *Server) registerRoutes() {
llmEnricher := enrich.NewLLMEnricher(aiProvider, scraper)
simClassifier := enrich.NewSimilarityClassifier(aiProvider)
adminMarketHandler := market.NewAdminHandler(marketSvc, simClassifier)
mergeAdvisor := market.NewMergeAdvisor(aiProvider)
adminMarketHandler := market.NewAdminHandler(marketSvc, simClassifier, mergeAdvisor)
market.RegisterAdminRoutes(v1, adminMarketHandler, researchHandler, requireAuth, requireAdmin)
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher, simClassifier)