diff --git a/backend/internal/domain/market/admin_handler.go b/backend/internal/domain/market/admin_handler.go index 893202f..e88019c 100644 --- a/backend/internal/domain/market/admin_handler.go +++ b/backend/internal/domain/market/admin_handler.go @@ -1,8 +1,11 @@ package market import ( + "encoding/json" "errors" + "log/slog" "net/http" + "strings" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -17,10 +20,11 @@ const duplicateClassifyTopN = 5 type AdminHandler struct { service *Service classifier enrich.SimilarityClassifier + advisor *MergeAdvisor } -func NewAdminHandler(service *Service, classifier enrich.SimilarityClassifier) *AdminHandler { - return &AdminHandler{service: service, classifier: classifier} +func NewAdminHandler(service *Service, classifier enrich.SimilarityClassifier, advisor *MergeAdvisor) *AdminHandler { + return &AdminHandler{service: service, classifier: classifier, advisor: advisor} } func (h *AdminHandler) List(c *gin.Context) { //nolint:dupl @@ -267,6 +271,198 @@ func (h *AdminHandler) FindDuplicates(c *gin.Context) { c.JSON(http.StatusOK, DuplicatesResponse{Data: dupes}) } +// MergePlanRequest is the body for the merge-plan endpoint. +type MergePlanRequest struct { + TargetID uuid.UUID `json:"target_id" binding:"required"` +} + +// MergePlan generates a MarketMergeProposal for two editions without persisting anything. +func (h *AdminHandler) MergePlan(c *gin.Context) { + ctx := c.Request.Context() + + sourceID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_id", "invalid market ID"))) + return + } + + var req MergePlanRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_body", err.Error()))) + return + } + if sourceID == req.TargetID { + c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("same_id", "source and target must differ"))) + return + } + + sourceM, err := h.service.GetByID(ctx, sourceID) + if err != nil { + if errors.Is(err, ErrMarketNotFound) { + c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("market"))) + } else { + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load source market"))) + } + return + } + targetM, err := h.service.GetByID(ctx, req.TargetID) + if err != nil { + if errors.Is(err, ErrMarketNotFound) { + c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("target market"))) + } else { + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load target market"))) + } + return + } + + verdict := classifyPair(c, h.classifier, sourceM, targetM) + + if h.advisor == nil { + c.JSON(http.StatusServiceUnavailable, apierror.NewResponse(apierror.Internal("merge advisor not configured"))) + return + } + + proposal, err := h.advisor.Propose(ctx, sourceM, targetM, verdict) + if err != nil { + if errors.Is(err, ErrNotDuplicate) { + c.JSON(http.StatusConflict, apierror.NewResponse(apierror.BadRequest("not_duplicate", "Markt wird nicht als Duplikat klassifiziert"))) + return + } + slog.ErrorContext(ctx, "merge-plan: advisor failed", "source_id", sourceID, "target_id", req.TargetID, "err", err) + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Merge-Plan konnte nicht erstellt werden"))) + return + } + + c.JSON(http.StatusOK, gin.H{"data": proposal}) +} + +// MergeIntoRequest is the body for the merge-into endpoint. +type MergeIntoRequest struct { + Proposal *MarketMergeProposal `json:"proposal"` // optional admin-edited proposal +} + +// MergeInto applies a merge of `:id` (source) into `:target_id` (survivor). +// This is destructive: the source edition is marked merged. +func (h *AdminHandler) MergeInto(c *gin.Context) { + ctx := c.Request.Context() + + sourceID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_id", "invalid source market ID"))) + return + } + targetID, err := uuid.Parse(c.Param("target_id")) + if err != nil { + c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("invalid_target_id", "invalid target market ID"))) + return + } + if sourceID == targetID { + c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("same_id", "source and target must differ"))) + return + } + + var req MergeIntoRequest + _ = c.ShouldBindJSON(&req) + + sourceM, err := h.service.GetByID(ctx, sourceID) + if err != nil { + if errors.Is(err, ErrMarketNotFound) { + c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("market"))) + } else { + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load source market"))) + } + return + } + targetM, err := h.service.GetByID(ctx, targetID) + if err != nil { + if errors.Is(err, ErrMarketNotFound) { + c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("target market"))) + } else { + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("failed to load target market"))) + } + return + } + + var proposal MarketMergeProposal + if req.Proposal != nil { + proposal = *req.Proposal + } else { + if h.advisor == nil { + c.JSON(http.StatusBadRequest, apierror.NewResponse(apierror.BadRequest("no_proposal", "proposal required when advisor not configured"))) + return + } + verdict := classifyPair(c, h.classifier, sourceM, targetM) + proposal, err = h.advisor.Propose(ctx, sourceM, targetM, verdict) + if err != nil { + if errors.Is(err, ErrNotDuplicate) { + c.JSON(http.StatusConflict, apierror.NewResponse(apierror.BadRequest("not_duplicate", "Markt wird nicht als Duplikat klassifiziert"))) + return + } + slog.ErrorContext(ctx, "merge-into: advisor failed", "source_id", sourceID, "target_id", targetID, "err", err) + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Merge-Plan konnte nicht erstellt werden"))) + return + } + } + + // Apply field merges to target. + updateReq := UpdateMarketRequest{} + for fieldName, decision := range proposal.FieldMerges { + fm := FieldMerge{Field: fieldName, Suggested: decision.Value} + if applyErr := applyFieldMerge(fieldName, fm, &updateReq); applyErr != nil { + slog.WarnContext(ctx, "merge-into: skip field", "field", fieldName, "err", applyErr) + } + } + + updated, err := h.service.Update(ctx, targetID, updateReq) + if err != nil { + if errors.Is(err, ErrMarketNotFound) { + c.JSON(http.StatusNotFound, apierror.NewResponse(apierror.NotFound("target market"))) + } else { + slog.ErrorContext(ctx, "merge-into: update target failed", "target_id", targetID, "err", err) + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Fehler beim Aktualisieren des Zielmarkts"))) + } + return + } + + userID, _ := c.Get("user_id") + mergedBy, _ := userID.(uuid.UUID) + + proposalJSON, _ := json.Marshal(proposal) + if mergeErr := h.service.MarkMerged(ctx, sourceID, targetID, mergedBy, proposalJSON); mergeErr != nil { + slog.ErrorContext(ctx, "merge-into: mark merged failed", "source_id", sourceID, "err", mergeErr) + c.JSON(http.StatusInternalServerError, apierror.NewResponse(apierror.Internal("Fehler beim Markieren als zusammengeführt"))) + return + } + + c.JSON(http.StatusOK, gin.H{"data": ToAdminDetail(updated)}) +} + +// classifyPair runs the similarity classifier on two markets. Returns a low-confidence +// verdict if the classifier is nil or errors, so the caller can still proceed. +func classifyPair(ctx *gin.Context, classifier enrich.SimilarityClassifier, a, b Market) enrich.Verdict { + if classifier == nil { + return enrich.Verdict{Same: true, Confidence: 0} + } + aRow := enrich.SimilarityRow{ + NameNormalized: strings.ToLower(strings.TrimSpace(a.Name)), + Stadt: a.City, + Year: a.StartDate.Year(), + Name: a.Name, + } + bRow := enrich.SimilarityRow{ + NameNormalized: strings.ToLower(strings.TrimSpace(b.Name)), + Stadt: b.City, + Year: b.StartDate.Year(), + Name: b.Name, + } + verdict, err := classifier.Classify(ctx.Request.Context(), aRow, bRow) + if err != nil { + slog.WarnContext(ctx.Request.Context(), "classify pair failed", "err", err) + return enrich.Verdict{Same: true, Confidence: 0} + } + return verdict +} + func (h *AdminHandler) CreateEdition(c *gin.Context) { //nolint:dupl seriesID, err := uuid.Parse(c.Param("series_id")) if err != nil { diff --git a/backend/internal/domain/market/assets/merge_advisor_prompt.de.md b/backend/internal/domain/market/assets/merge_advisor_prompt.de.md new file mode 100644 index 0000000..add3277 --- /dev/null +++ b/backend/internal/domain/market/assets/merge_advisor_prompt.de.md @@ -0,0 +1,47 @@ +Du bist ein Datenkurations-Agent fuer Marktvogt. Zwei Markteintraege wurden +als wahrscheinlich gleicher Markt klassifiziert. Schlage einen Merge vor: +welcher Eintrag bleibt erhalten und welche Felder uebernehmen wir aus +welcher Quelle. + +## Harte Regeln + +- Niemals Werte erfinden. Wenn keine Quelle einen Wert liefert: leerer String. +- `target_id` MUSS einer der beiden Eingabe-IDs sein. +- Wenn `kontext.ist_gleicher_markt` nicht `true` ist: Antwort mit `"confidence": 0` + und `"flags": ["uncertain_match"]`; alle `field_merges` leer. +- Antwort MUSS dem Schema entsprechen, nichts ausserhalb des JSON. + +## Zielwahl (target_id) + +Bevorzuge in dieser Reihenfolge: +1. Vollstaendigerer Datensatz (mehr nicht-leere Felder). +2. Aktuelleres `zuletzt_geaendert`. +3. Quelle `admin` > `user_submission` > `crawler`. + +## Feldwahl je Feld + +Fuer jedes Feld waehle `"source": "a"` oder `"b"`. Erlaube `"combined"` nur +wenn beide Quellen komplementaere Daten liefern, die sich nicht widersprechen +(z.B. A hat Oeffnungszeiten, B hat Eintrittspreise). + +Spezifisch: +- `name`: kuerzere, etabliertere Form bevorzugen. Niemals kombinieren. +- `veranstalter`: Rechtstraeger > generischer Name. In `flags` vermerken wenn unsicher. +- `start_datum` / `end_datum`: muessen identisch sein — wenn nicht, in `flags` + und `confidence` senken; nicht raten. +- `beschreibung`: laengere, faktische Variante bevorzugen; `combined` nur + wenn die Inhalte ergaenzend sind. +- `bild_url` / `logo_url`: offizieller wirkende Quelle bevorzugen. +- `oeffnungszeiten` / `eintrittspreise`: bei Konflikt die Quelle mit aktuelleren Daten. + +## flags + +Liste konkrete Punkte fuer menschliche Pruefung: +- Unterschiedliche Organisationsnamen (auch wenn vermutlich derselbe Traeger). +- Datumsabweichungen. +- Adressabweichungen jenseits Tippfehler. +- Alle `combined`-Felder. + +## Ausgabe + +Antworte ausschliesslich mit validem JSON gemaess dem Schema. diff --git a/backend/internal/domain/market/assets/merge_advisor_schema.json b/backend/internal/domain/market/assets/merge_advisor_schema.json new file mode 100644 index 0000000..e194ca8 --- /dev/null +++ b/backend/internal/domain/market/assets/merge_advisor_schema.json @@ -0,0 +1,52 @@ +{ + "type": "object", + "required": ["target_id", "target_reason", "field_merges", "flags", "confidence", "summary"], + "properties": { + "target_id": { + "type": "string", + "description": "UUID of the surviving market edition (must be one of the two input IDs)" + }, + "target_reason": { + "type": "string", + "description": "One sentence explaining why this side was chosen as the target" + }, + "field_merges": { + "type": "object", + "description": "Per-field merge decisions. Keys are field names, values are source/value/reason triples.", + "additionalProperties": { + "type": "object", + "required": ["source", "value", "reason"], + "properties": { + "source": { + "type": "string", + "enum": ["a", "b", "combined"], + "description": "Which candidate provides the value: a, b, or combined (synthesised from both)" + }, + "value": { + "description": "The chosen field value (string, number, or object depending on field)" + }, + "reason": { + "type": "string", + "description": "One sentence justifying the choice" + } + } + } + }, + "flags": { + "type": "array", + "items": { "type": "string" }, + "description": "Human-review concerns: conflicting organizer names, date mismatches, etc." + }, + "confidence": { + "type": "number", + "minimum": 0, + "maximum": 1, + "description": "Advisor's confidence in the proposed merge (0=refuse, 1=certain)" + }, + "summary": { + "type": "string", + "description": "2-3 sentence German summary of the proposed merge" + } + }, + "additionalProperties": false +} diff --git a/backend/internal/domain/market/merge_advisor.go b/backend/internal/domain/market/merge_advisor.go new file mode 100644 index 0000000..2a60661 --- /dev/null +++ b/backend/internal/domain/market/merge_advisor.go @@ -0,0 +1,188 @@ +package market + +import ( + "context" + _ "embed" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/google/uuid" + + "marktvogt.de/backend/internal/domain/discovery/enrich" + "marktvogt.de/backend/internal/pkg/ai" +) + +//go:embed assets/merge_advisor_prompt.de.md +var mergeAdvisorPromptDE string + +// ErrNotDuplicate is returned by MergeAdvisor.Propose when the similarity +// verdict confidently says the two markets are not the same. +var ErrNotDuplicate = errors.New("markets are not duplicates") + +// MarketMergeProposal is the AI advisor's proposed merge for two market editions. +type MarketMergeProposal struct { + TargetID uuid.UUID `json:"target_id"` + TargetReason string `json:"target_reason"` + FieldMerges map[string]mergeFieldDecision `json:"field_merges"` + Flags []string `json:"flags"` + Confidence float64 `json:"confidence"` + Summary string `json:"summary"` + GeneratedAt time.Time `json:"generated_at"` +} + +// mergeFieldDecision is one field's merge outcome inside MarketMergeProposal. +type mergeFieldDecision struct { + Source string `json:"source"` // "a" | "b" | "combined" + Value any `json:"value"` + Reason string `json:"reason"` +} + +// mergeAdvisorResponse is the raw JSON shape the LLM returns. +type mergeAdvisorResponse struct { + TargetID string `json:"target_id"` + TargetReason string `json:"target_reason"` + FieldMerges map[string]mergeFieldDecision `json:"field_merges"` + Flags []string `json:"flags"` + Confidence float64 `json:"confidence"` + Summary string `json:"summary"` +} + +// MergeAdvisor calls the LLM to propose how to merge two market editions. +type MergeAdvisor struct { + ai ai.Provider +} + +func NewMergeAdvisor(provider ai.Provider) *MergeAdvisor { + return &MergeAdvisor{ai: provider} +} + +// Propose calls the LLM to generate a merge proposal for two market editions. +// Returns ErrNotDuplicate without an LLM call when the similarity verdict says +// Same=false AND confidence > 0.5 (confidently different markets). +func (a *MergeAdvisor) Propose(ctx context.Context, mA, mB Market, verdict enrich.Verdict) (MarketMergeProposal, error) { + if !verdict.Same && verdict.Confidence > 0.5 { + return MarketMergeProposal{}, ErrNotDuplicate + } + + userMsg, err := buildAdvisorUserMessage(mA, mB, verdict) + if err != nil { + return MarketMergeProposal{}, fmt.Errorf("build user message: %w", err) + } + + resp, err := a.ai.Chat(ctx, &ai.ChatRequest{ + SystemPrompt: mergeAdvisorPromptDE, + UserMessage: userMsg, + JSONMode: true, + Grounded: false, + Temperature: 0.1, + CallType: "merge_advisor", + PromptVersion: ai.VersionMergeAdvisor, + }) + if err != nil { + return MarketMergeProposal{}, fmt.Errorf("chat: %w", err) + } + + var raw mergeAdvisorResponse + if err := json.Unmarshal([]byte(resp.Content), &raw); err != nil { + return MarketMergeProposal{}, fmt.Errorf("parse response: %w (content=%q)", err, resp.Content) + } + + targetID, err := uuid.Parse(raw.TargetID) + if err != nil { + return MarketMergeProposal{}, fmt.Errorf("invalid target_id %q: %w", raw.TargetID, err) + } + + conf := raw.Confidence + if conf < 0 { + conf = 0 + } + if conf > 1 { + conf = 1 + } + + flags := raw.Flags + if flags == nil { + flags = []string{} + } + + return MarketMergeProposal{ + TargetID: targetID, + TargetReason: raw.TargetReason, + FieldMerges: raw.FieldMerges, + Flags: flags, + Confidence: conf, + Summary: raw.Summary, + GeneratedAt: time.Now(), + }, nil +} + +type advisorMarketInput struct { + ID string `json:"id"` + Name string `json:"name"` + Stadt string `json:"stadt"` + Land string `json:"land"` + Strasse string `json:"strasse"` + PLZ string `json:"plz"` + Veranstalter string `json:"veranstalter"` + StartDatum string `json:"start_datum"` + EndDatum string `json:"end_datum"` + Website string `json:"website"` + Beschreibung string `json:"beschreibung"` + BildURL string `json:"bild_url"` + LogoURL string `json:"logo_url"` + ZuletztGeaend string `json:"zuletzt_geaendert"` +} + +type advisorKontext struct { + IstGleicherMarkt *bool `json:"ist_gleicher_markt"` + Confidence float64 `json:"confidence"` + Begruendung string `json:"begruendung"` +} + +type advisorUserPayload struct { + KandidatA advisorMarketInput `json:"kandidat_a"` + KandidatB advisorMarketInput `json:"kandidat_b"` + Kontext advisorKontext `json:"kontext"` +} + +func buildAdvisorUserMessage(mA, mB Market, verdict enrich.Verdict) (string, error) { + payload := advisorUserPayload{ + KandidatA: marketToAdvisorInput(mA), + KandidatB: marketToAdvisorInput(mB), + Kontext: advisorKontext{ + Confidence: verdict.Confidence, + Begruendung: verdict.Reason, + }, + } + if verdict.Same { + t := true + payload.Kontext.IstGleicherMarkt = &t + } + + b, err := json.Marshal(payload) + if err != nil { + return "", err + } + return string(b), nil +} + +func marketToAdvisorInput(m Market) advisorMarketInput { + return advisorMarketInput{ + ID: m.ID.String(), + Name: m.Name, + Stadt: m.City, + Land: m.Country, + Strasse: m.Street, + PLZ: m.Zip, + Veranstalter: m.OrganizerName, + StartDatum: m.StartDate.Format("2006-01-02"), + EndDatum: m.EndDate.Format("2006-01-02"), + Website: m.Website, + Beschreibung: m.Description, + BildURL: m.ImageURL, + LogoURL: m.LogoURL, + ZuletztGeaend: m.UpdatedAt.Format(time.RFC3339), + } +} diff --git a/backend/internal/domain/market/merge_advisor_test.go b/backend/internal/domain/market/merge_advisor_test.go new file mode 100644 index 0000000..a0dcba2 --- /dev/null +++ b/backend/internal/domain/market/merge_advisor_test.go @@ -0,0 +1,133 @@ +package market + +import ( + "context" + "encoding/json" + "errors" + "testing" + + "marktvogt.de/backend/internal/domain/discovery/enrich" + "marktvogt.de/backend/internal/pkg/ai" +) + +// stubAIProvider is a minimal ai.Provider for testing the merge advisor. +type stubAIProvider struct { + response string + err error + calls int +} + +func (p *stubAIProvider) Name() string { return "stub" } +func (p *stubAIProvider) SupportsJSONMode() bool { return true } +func (p *stubAIProvider) SupportsJSONSchema() bool { return false } +func (p *stubAIProvider) Chat(_ context.Context, _ *ai.ChatRequest) (*ai.ChatResponse, error) { + p.calls++ + if p.err != nil { + return nil, p.err + } + return &ai.ChatResponse{Content: p.response}, nil +} + +func validAdvisorResponse(targetID string) string { + resp := mergeAdvisorResponse{ + TargetID: targetID, + TargetReason: "Market A has more complete data", + FieldMerges: map[string]mergeFieldDecision{ + "name": {Source: "a", Value: "Testmarkt Berlin", Reason: "established name"}, + }, + Flags: []string{}, + Confidence: 0.9, + Summary: "Beide Eintraege bezeichnen denselben Markt. Daten von A werden uebernommen.", + } + b, _ := json.Marshal(resp) + return string(b) +} + +func TestMergeAdvisor_RejectsWhenNotDuplicate(t *testing.T) { + p := &stubAIProvider{} + a := Market{Name: "Markt A"} + b := Market{Name: "Markt B"} + v := enrich.Verdict{Same: false, Confidence: 0.85} + + advisor := NewMergeAdvisor(p) + _, err := advisor.Propose(context.Background(), a, b, v) + + if !errors.Is(err, ErrNotDuplicate) { + t.Errorf("expected ErrNotDuplicate, got %v", err) + } + if p.calls != 0 { + t.Errorf("expected no LLM call when not duplicate, got %d calls", p.calls) + } +} + +func TestMergeAdvisor_AllowsUncertainVerdict(t *testing.T) { + // same=false but confidence <= 0.5 should still call LLM (uncertain case) + p := &stubAIProvider{} + a := Market{Name: "Markt A"} + b := Market{Name: "Markt B"} + v := enrich.Verdict{Same: false, Confidence: 0.4} + + advisor := NewMergeAdvisor(p) + p.response = validAdvisorResponse(a.ID.String()) + + // Should not error with ErrNotDuplicate for low-confidence non-match + _, err := advisor.Propose(context.Background(), a, b, v) + // May error on empty target ID / parse, but not ErrNotDuplicate + if errors.Is(err, ErrNotDuplicate) { + t.Error("uncertain verdict (confidence<=0.5) should not return ErrNotDuplicate") + } +} + +func TestMergeAdvisor_ParsesValidResponse(t *testing.T) { + a := Market{Name: "Markt A", City: "Berlin"} + b := Market{Name: "Markt B", City: "Berlin"} + p := &stubAIProvider{response: validAdvisorResponse(a.ID.String())} + v := enrich.Verdict{Same: true, Confidence: 0.92} + + advisor := NewMergeAdvisor(p) + proposal, err := advisor.Propose(context.Background(), a, b, v) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if p.calls != 1 { + t.Errorf("expected 1 LLM call, got %d", p.calls) + } + if proposal.TargetReason == "" { + t.Error("expected non-empty TargetReason") + } + if proposal.Confidence != 0.9 { + t.Errorf("expected confidence 0.9, got %f", proposal.Confidence) + } + if len(proposal.FieldMerges) == 0 { + t.Error("expected at least one field merge") + } +} + +func TestMergeAdvisor_ProviderErrorPropagates(t *testing.T) { + p := &stubAIProvider{err: errors.New("provider down")} + a := Market{Name: "Markt A"} + b := Market{Name: "Markt B"} + v := enrich.Verdict{Same: true, Confidence: 0.9} + + advisor := NewMergeAdvisor(p) + _, err := advisor.Propose(context.Background(), a, b, v) + + if err == nil { + t.Error("expected error from provider, got nil") + } +} + +func TestMergeAdvisor_InvalidJSONErrors(t *testing.T) { + p := &stubAIProvider{response: "not json at all"} + a := Market{Name: "Markt A"} + b := Market{Name: "Markt B"} + v := enrich.Verdict{Same: true, Confidence: 0.9} + + advisor := NewMergeAdvisor(p) + _, err := advisor.Propose(context.Background(), a, b, v) + + if err == nil { + t.Error("expected parse error for invalid JSON") + } +} diff --git a/backend/internal/domain/market/repository.go b/backend/internal/domain/market/repository.go index 3064725..a6e5fde 100644 --- a/backend/internal/domain/market/repository.go +++ b/backend/internal/domain/market/repository.go @@ -43,6 +43,11 @@ type Repository interface { // Merge audit log InsertMergeLog(ctx context.Context, entry MergeLogEntry) error + + // MarkMerged marks sourceID as merged into targetID in a single transaction: + // sets status='merged', merged_into_id, merged_at on the source edition; + // reparents discovered_markets.created_edition_id; writes a market_merge_log row. + MarkMerged(ctx context.Context, sourceID, targetID, mergedBy uuid.UUID, proposalJSON []byte) error } type pgRepository struct { @@ -1084,3 +1089,43 @@ func (r *pgRepository) InsertMergeLog(ctx context.Context, entry MergeLogEntry) ) return err } + +// MarkMerged marks sourceID as merged into targetID in a single transaction. +func (r *pgRepository) MarkMerged(ctx context.Context, sourceID, targetID, mergedBy uuid.UUID, proposalJSON []byte) error { + tx, err := r.db.Begin(ctx) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + _, err = tx.Exec(ctx, ` + UPDATE market_editions + SET status = 'merged', merged_into_id = $1, merged_at = now(), updated_at = now() + WHERE id = $2`, + targetID, sourceID, + ) + if err != nil { + return fmt.Errorf("mark source merged: %w", err) + } + + _, err = tx.Exec(ctx, ` + UPDATE discovered_markets + SET created_edition_id = $1 + WHERE created_edition_id = $2`, + targetID, sourceID, + ) + if err != nil { + return fmt.Errorf("reparent discovered_markets: %w", err) + } + + _, err = tx.Exec(ctx, ` + INSERT INTO market_merge_log (market_id, plan, applied_fields, applied_by, research_sources) + VALUES ($1, $2, '{}', $3, '{}')`, + sourceID, proposalJSON, mergedBy, + ) + if err != nil { + return fmt.Errorf("insert merge log: %w", err) + } + + return tx.Commit(ctx) +} diff --git a/backend/internal/domain/market/routes.go b/backend/internal/domain/market/routes.go index 99d0ab8..578b28d 100644 --- a/backend/internal/domain/market/routes.go +++ b/backend/internal/domain/market/routes.go @@ -30,6 +30,8 @@ func RegisterAdminRoutes(rg *gin.RouterGroup, h *AdminHandler, rh *ResearchHandl markets.POST("/:id/research/plan", rh.Plan) markets.POST("/:id/research/apply", rh.Apply) markets.GET("/:id/duplicates", h.FindDuplicates) + markets.POST("/:id/merge-plan", h.MergePlan) + markets.POST("/:id/merge-into/:target_id", h.MergeInto) } series := admin.Group("/series") diff --git a/backend/internal/domain/market/service.go b/backend/internal/domain/market/service.go index 558c338..be7d9cb 100644 --- a/backend/internal/domain/market/service.go +++ b/backend/internal/domain/market/service.go @@ -338,6 +338,12 @@ func (s *Service) InsertMergeLog(ctx context.Context, entry MergeLogEntry) error return s.repo.InsertMergeLog(ctx, entry) } +// MarkMerged marks sourceID as merged into targetID (status='merged', +// merged_into_id set) and reparents any discovered_markets rows. +func (s *Service) MarkMerged(ctx context.Context, sourceID, targetID, mergedBy uuid.UUID, proposalJSON []byte) error { + return s.repo.MarkMerged(ctx, sourceID, targetID, mergedBy, proposalJSON) +} + func orDefault(val, fallback string) string { if val != "" { return val diff --git a/backend/internal/pkg/ai/versions.go b/backend/internal/pkg/ai/versions.go index 4cc2fe6..cd329f5 100644 --- a/backend/internal/pkg/ai/versions.go +++ b/backend/internal/pkg/ai/versions.go @@ -3,7 +3,8 @@ package ai // Prompt version constants. Bump when the system prompt for a call type // changes materially so ai_usage rows can be correlated with prompt iterations. const ( - VersionResearch = "research/v2" - VersionEnrichB = "enrich_b/v2" - VersionSimilarity = "similarity/v2" + VersionResearch = "research/v2" + VersionEnrichB = "enrich_b/v2" + VersionSimilarity = "similarity/v2" + VersionMergeAdvisor = "merge_advisor/v1" ) diff --git a/backend/internal/server/routes.go b/backend/internal/server/routes.go index 4c27fe4..8985324 100644 --- a/backend/internal/server/routes.go +++ b/backend/internal/server/routes.go @@ -98,7 +98,8 @@ func (s *Server) registerRoutes() { llmEnricher := enrich.NewLLMEnricher(aiProvider, scraper) simClassifier := enrich.NewSimilarityClassifier(aiProvider) - adminMarketHandler := market.NewAdminHandler(marketSvc, simClassifier) + mergeAdvisor := market.NewMergeAdvisor(aiProvider) + adminMarketHandler := market.NewAdminHandler(marketSvc, simClassifier, mergeAdvisor) market.RegisterAdminRoutes(v1, adminMarketHandler, researchHandler, requireAuth, requireAdmin) discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher, simClassifier)