feat(market): in-backend research orchestrator with SearxNG + schema-validated LLM

Adds pkg/search (SearxNG impl), domain/market/research (orchestrator + embedded
German prompt and JSON schema), and reinstates POST /markets/:id/research on
top of the new pipeline. Seeds URLs from crawler provenance; falls back to
search when fewer than two distinct seed domains are known.
This commit is contained in:
2026-04-24 17:06:04 +02:00
parent 24e072b63d
commit 67b2eb5d74
21 changed files with 1088 additions and 10 deletions

View File

@@ -11,9 +11,6 @@ stringData:
{{- if .Values.ai.apiKey }}
AI_API_KEY: {{ .Values.ai.apiKey | quote }}
{{- end }}
{{- if .Values.ai.agentSimple }}
AI_AGENT_SIMPLE: {{ .Values.ai.agentSimple | quote }}
{{- end }}
{{- if .Values.turnstile.secretKey }}
TURNSTILE_SECRET_KEY: {{ .Values.turnstile.secretKey | quote }}
{{- end }}

View File

@@ -98,7 +98,6 @@ smtp:
# AI research credentials — passed via Woodpecker secrets during deploy.
ai:
apiKey: ""
agentSimple: ""
agentDiscovery: "" # set via CI secret in production
rateLimitRps: 1

View File

@@ -416,7 +416,7 @@ func (h *Handler) CrawlEnrichStatus(c *gin.Context) {
}
// EnrichLLM runs per-row LLM enrichment synchronously. 30s deadline is
// enough for scraping 5 URLs + one Mistral Pass2 call in typical conditions.
// enough for scraping 5 URLs + one LLM enrichment call in typical conditions.
// Operator clicks the button, waits, sees the result — no polling.
func (h *Handler) EnrichLLM(c *gin.Context) {
id, err := uuid.Parse(c.Param("id"))

View File

@@ -1,19 +1,89 @@
package market
import (
"errors"
"log/slog"
"net/http"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/market/research"
"marktvogt.de/backend/internal/pkg/ai"
"marktvogt.de/backend/internal/pkg/scrape"
"marktvogt.de/backend/internal/pkg/search"
)
type ResearchHandler struct {
service *Service
orch *research.Orchestrator
}
func NewResearchHandler(service *Service, _ any) *ResearchHandler {
return &ResearchHandler{service: service}
func NewResearchHandler(service *Service, provider ai.Provider, searchClient search.Client, scraper *scrape.Client) *ResearchHandler {
return &ResearchHandler{
service: service,
orch: &research.Orchestrator{
AI: provider,
Search: searchClient,
Scraper: scraper,
MaxPages: 6,
Concurrency: 4,
},
}
}
func (h *ResearchHandler) Research(c *gin.Context) {
c.JSON(http.StatusNotImplemented, gin.H{"error": "research temporarily disabled during AI provider migration"})
ctx := c.Request.Context()
id, err := uuid.Parse(c.Param("id"))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid market ID"})
return
}
m, err := h.service.GetByID(ctx, id)
if err != nil {
if errors.Is(err, ErrMarketNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "market not found"})
} else {
slog.ErrorContext(ctx, "research: get market failed", "market_id", id, "err", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
}
return
}
out, err := h.orch.Run(ctx, research.Input{
MarktName: m.Name,
Stadt: m.City,
StartDatumHint: m.StartDate.Format("2006-01-02"),
WebsiteHint: m.Website,
SeedURLs: nil, // seed URLs from crawler provenance: future enhancement
RechercheDatum: m.StartDate,
})
if err != nil {
var pe *ai.ProviderError
if errors.As(err, &pe) {
switch pe.Code {
case ai.ErrRateLimited:
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "rate limited"})
return
case ai.ErrSchemaViolation:
slog.ErrorContext(ctx, "research schema violation", "market_id", id, "raw", pe.RawOutput)
c.JSON(http.StatusBadGateway, gin.H{"error": "model returned invalid JSON"})
return
case ai.ErrInternal, ai.ErrQuotaExceeded, ai.ErrTimeout, ai.ErrInvalidRequest, ai.ErrUnavailable:
// fall through to generic 500
}
}
slog.ErrorContext(ctx, "research failed", "market_id", id, "err", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "research failed"})
return
}
c.JSON(http.StatusOK, gin.H{
"result": out.Raw,
"provider": out.Provider,
"tokens": out.TokensUsed,
"latency_ms": out.Latency.Milliseconds(),
})
}

View File

@@ -0,0 +1,86 @@
Du bist ein Daten-Extraktions-Agent fuer einen Mittelaltermarkt im DACH-Raum.
Ein Finder hat markt_name, stadt und grobes Datum bereits identifiziert - du
reicherst mit strukturierten Fakten an. Keine Beschreibungstexte, nur Fakten.
## Input (im User-Prompt)
- markt_name, stadt, start_datum_hinweis, website_hinweis (optional),
recherche_datum
## Vorgehen
1. **Primaerquelle**: website_hinweis oeffnen, sonst "{markt_name} {stadt}"
suchen und Veranstalter-Website oeffnen. Fallback: Facebook-Event oder
Kalender (mittelalterkalender.info, marktkalendarium.de, mittelalterfeste.com,
mittelalter-termine.de).
2. **Zweitquelle pflicht**: verifiziere Datum + Ort gegen mindestens eine
weitere Quelle. Schuetzt vor veralteten Daten auf schlecht gepflegten Seiten.
3. **Felder extrahieren** (siehe unten).
4. **status** auf Top-Level setzen:
- "bestaetigt": ALLE Felder fuer Recherchejahr bestaetigt
- "unklar": Quellen widerspruechlich ODER einzelne Felder aus Vorjahr
- "vorjahr_unbestaetigt": ueberwiegend Vorjahresdaten
- "abgesagt": explizite Absage/Pausierung
## Felder
- **website**: URL, die dem Veranstalter in `veranstalter` gehoert, oder
dedizierte Markt-Domain. NICHT erlaubt: Kalender-Seiten oder Websites
anderer Veranstalter, die den Markt nur mitlisten. FB-Event-URL ok falls
keine eigene Seite. Unklar -> `null` + hinweis.
- **strasse**: Strasse + Hausnummer. Bei Burgen ohne Adresse: Anlagenname ok.
- **plz**: Postleitzahl.
- **stadt**: Muss mit Input-Hinweis konsistent sein, sonst hinweis.
- **bundesland**: Bundesland (DE/AT) oder "Kanton X" (CH). Aus Quellen, nicht
aus Wissen.
- **land**: "Deutschland" | "Oesterreich" | "Schweiz".
- **veranstalter**: Verein, Firma oder Person. Impressum ist gute Quelle.
- **start_datum** / **end_datum**: YYYY-MM-DD, im Recherchejahr. Eintages-
Markt: beide gleich.
- **oeffnungszeiten**: Array von Zeitfenstern {datum_von, datum_bis, von, bis}.
Nimm NUR explizit genannte Zeiten. Keine Zeiten fuer Tage ohne Angabe
erfinden. Bei Muster ueber mehrere Wochenenden (z.B. "Fr 17-02, Sa 16-00:30
an allen Wochenenden"): Muster anwenden, keine widersprechenden Eintraege
erzeugen. Vor Abgabe: KEINE Duplikate (gleiches Datum mehrfach). Format 24h
"HH:MM", nach Mitternacht "00:30"/"02:00".
Kompakt: identische Zeiten ueber mehrere Tage -> ein Eintrag mit Datumsbereich.
- **eintrittspreise**: Array {name, betrag, waehrung}. ALLE Kategorien
extrahieren wenn mehrere gelistet (Erwachsene, Kinder, Ermaessigt,
Familie, Gewandete, Abendkasse etc.), nicht nur eine.
Ticketportale (Eventim, Ticketmaster, ticketmachine): Preise enthalten
Gebuehren und sind NICHT der Eintrittspreis. Veranstalter-Website
bevorzugen. Nur Portal verfuegbar: extrahieren + hinweis "inkl.
Servicegebuehr". Eintritt frei: ein Eintrag name="Eintritt frei", betrag=0.
- **bild_url**: Offizielles Plakat/Banner/Header, kein Stockfoto, kein
Sponsor-Logo. Social-Media-Vorschaubilder ok. Nur URLs die du tatsaechlich
als src/og:image gesehen hast. Nichts findbar -> `null`.
## Per-Feld-Metadaten
- **quellen**: URLs die du besucht hast und die genau diesen Wert belegen.
Mehrere Felder aus derselben Quelle: URL bei jedem wiederholen.
- **extraktion**:
- "direkt": Wert steht auf EINER Seite und alle weiteren Quellen bestaetigen
exakt denselben Wert. Trivial-Normalisierung (Datum, Zeit, Zahl) bleibt
direkt.
- "kombiniert": aus mehreren Stellen zusammengesetzt, aus Fliesstext
interpretiert, aus Vorjahr uebernommen, ODER Quellen liefern
unterschiedliche Werte.
- **hinweis**: Freitext bei Unsicherheit/Widerspruch/Vorjahr (mit
Ursprungsjahr). Sonst `null`.
## Harte Regeln
- Feld nicht findbar: `wert: null`, `quellen: []`, `extraktion: "direkt"`,
`hinweis` mit knapper Begruendung.
- NICHTS erfinden. Halluzinationen sind der teuerste Fehler.
- Widerspruch zwischen Quellen: Veranstalter-Website > Kalender > Social Media
> Presse. Widerspruch IMMER im hinweis dokumentieren, auch wenn die
offiziellste Quelle klar gewinnt. Format:
"Quelle A: X, Quelle B: Y, uebernommen: Z (Begruendung)".
- Vorjahresdaten: extrahieren, IMMER mit Ursprungsjahr im hinweis.
- Status-Konsistenz: auch nur EIN Feld mit Vorjahr-hinweis -> Top-Level-status
darf NICHT "bestaetigt" sein.
- Waehrung pflichtig: CH -> CHF, DE/AT -> EUR.
- Nur URLs zurueckgeben die du tatsaechlich aufgerufen hast.
- Antwort MUSS dem JSON-Schema entsprechen. Keine zusaetzlichen Felder, keine
Erklaerungen ausserhalb des JSON.

View File

@@ -0,0 +1,110 @@
{
"type": "object",
"$defs": {
"datumFeld": {
"type": "object",
"required": ["wert", "quellen", "extraktion", "hinweis"],
"properties": {
"wert": {"type": ["string", "null"], "pattern": "^[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$"},
"hinweis": {"type": ["string", "null"]},
"quellen": {"type": "array", "items": {"type": "string", "pattern": "^https?://"}},
"extraktion": {"enum": ["direkt", "kombiniert"], "type": "string"}
},
"additionalProperties": false
},
"stringFeld": {
"type": "object",
"required": ["wert", "quellen", "extraktion", "hinweis"],
"properties": {
"wert": {"type": ["string", "null"]},
"hinweis": {"type": ["string", "null"]},
"quellen": {"type": "array", "items": {"type": "string", "pattern": "^https?://"}},
"extraktion": {"enum": ["direkt", "kombiniert"], "type": "string"}
},
"additionalProperties": false
}
},
"required": ["markt_name", "recherche_datum", "status", "quellen_gesamt", "felder"],
"properties": {
"felder": {
"type": "object",
"required": ["website", "strasse", "plz", "stadt", "bundesland", "land", "veranstalter", "start_datum", "end_datum", "oeffnungszeiten", "eintrittspreise", "bild_url"],
"properties": {
"plz": {"$ref": "#/$defs/stringFeld"},
"land": {"$ref": "#/$defs/stringFeld"},
"stadt": {"$ref": "#/$defs/stringFeld"},
"strasse": {"$ref": "#/$defs/stringFeld"},
"website": {"$ref": "#/$defs/stringFeld"},
"bild_url": {"$ref": "#/$defs/stringFeld"},
"end_datum": {"$ref": "#/$defs/datumFeld"},
"bundesland": {"$ref": "#/$defs/stringFeld"},
"start_datum": {"$ref": "#/$defs/datumFeld"},
"veranstalter": {"$ref": "#/$defs/stringFeld"},
"eintrittspreise": {
"type": "object",
"required": ["wert", "quellen", "extraktion", "hinweis"],
"properties": {
"wert": {
"type": ["array", "null"],
"items": {
"type": "object",
"required": ["name", "betrag", "waehrung"],
"properties": {
"name": {"type": "string", "minLength": 2, "description": "Freie Kategoriebezeichnung, z.B. Erwachsene, Gewandete, Kinder unter 12, Familienticket"},
"betrag": {"type": "number", "minimum": 0},
"waehrung": {"enum": ["EUR", "CHF"], "type": "string"}
},
"additionalProperties": false
}
},
"hinweis": {"type": ["string", "null"]},
"quellen": {"type": "array", "items": {"type": "string", "pattern": "^https?://"}},
"extraktion": {"enum": ["direkt", "kombiniert"], "type": "string"}
},
"additionalProperties": false
},
"oeffnungszeiten": {
"type": "object",
"required": ["wert", "quellen", "extraktion", "hinweis"],
"properties": {
"wert": {
"type": ["array", "null"],
"items": {
"type": "object",
"required": ["datum_von", "datum_bis", "von", "bis"],
"properties": {
"bis": {"type": "string", "pattern": "^([01][0-9]|2[0-3]):[0-5][0-9]$"},
"von": {"type": "string", "pattern": "^([01][0-9]|2[0-3]):[0-5][0-9]$"},
"datum_bis": {"type": "string", "pattern": "^[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$"},
"datum_von": {"type": "string", "pattern": "^[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$"}
},
"additionalProperties": false
}
},
"hinweis": {"type": ["string", "null"]},
"quellen": {"type": "array", "items": {"type": "string", "pattern": "^https?://"}},
"extraktion": {"enum": ["direkt", "kombiniert"], "type": "string"}
},
"additionalProperties": false
}
},
"additionalProperties": false
},
"status": {
"enum": ["bestaetigt", "unklar", "vorjahr_unbestaetigt", "abgesagt"],
"type": "string",
"description": "Gesamtstatus der Veranstaltung im Recherchejahr"
},
"markt_name": {"type": "string", "minLength": 3},
"quellen_gesamt": {
"type": "array",
"items": {"type": "string", "pattern": "^https?://"},
"minItems": 1
},
"recherche_datum": {
"type": "string",
"pattern": "^[0-9]{4}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$"
}
},
"additionalProperties": false
}

View File

@@ -0,0 +1,40 @@
package research
import "net/url"
// BuildCandidates returns an ordered, domain-deduplicated URL list:
// seeds first (in order), then websiteHint (if new domain), then searchURLs (new domains only).
// The list is capped at maxPages.
func BuildCandidates(seeds []string, websiteHint string, searchURLs []string, maxPages int) []string {
seen := map[string]struct{}{}
out := make([]string, 0, maxPages)
add := func(raw string) bool {
u, err := url.Parse(raw)
if err != nil || u.Host == "" {
return false
}
if _, dup := seen[u.Host]; dup {
return false
}
seen[u.Host] = struct{}{}
out = append(out, raw)
return len(out) >= maxPages
}
for _, s := range seeds {
if add(s) {
return out
}
}
if websiteHint != "" {
if add(websiteHint) {
return out
}
}
for _, s := range searchURLs {
if add(s) {
return out
}
}
return out
}

View File

@@ -0,0 +1,45 @@
package research
import (
"reflect"
"testing"
)
func TestBuildCandidates_DeduplicatesByDomain(t *testing.T) {
seeds := []string{
"https://example.org/markt",
"https://example.org/markt?utm_source=x",
"https://other.example/",
}
got := BuildCandidates(seeds, "", nil, 10)
want := []string{"https://example.org/markt", "https://other.example/"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %v want %v", got, want)
}
}
func TestBuildCandidates_IncludesWebsiteHint(t *testing.T) {
got := BuildCandidates(nil, "https://hint.example", nil, 10)
if len(got) != 1 || got[0] != "https://hint.example" {
t.Fatalf("got %v", got)
}
}
func TestBuildCandidates_AppendsSearchResults(t *testing.T) {
seeds := []string{"https://a.example"}
search := []string{"https://b.example", "https://a.example/other"}
got := BuildCandidates(seeds, "", search, 10)
// a.example already represented by seed -> search URL on same domain is dropped
want := []string{"https://a.example", "https://b.example"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("got %v want %v", got, want)
}
}
func TestBuildCandidates_CapsAtMaxPages(t *testing.T) {
seeds := []string{"https://a.example", "https://b.example", "https://c.example"}
got := BuildCandidates(seeds, "", nil, 2)
if len(got) != 2 {
t.Fatalf("len=%d want 2", len(got))
}
}

View File

@@ -0,0 +1,50 @@
package research
import (
"context"
"errors"
"log/slog"
"sync"
"golang.org/x/sync/errgroup"
)
type Scraper interface {
Fetch(ctx context.Context, url string) (string, error)
}
type Page struct {
URL string
Title string
Text string
}
func FetchAll(ctx context.Context, sc Scraper, urls []string, concurrency int) ([]Page, error) {
if concurrency <= 0 {
concurrency = 4
}
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)
var mu sync.Mutex
pages := make([]Page, 0, len(urls))
for _, u := range urls {
g.Go(func() error {
text, err := sc.Fetch(gctx, u)
if err != nil {
slog.Warn("research fetch failed", "url", u, "err", err)
return nil // individual failure does not abort the group
}
mu.Lock()
pages = append(pages, Page{URL: u, Text: text})
mu.Unlock()
return nil
})
}
_ = g.Wait()
if len(pages) == 0 {
return nil, errors.New("all candidate URLs failed to fetch")
}
return pages, nil
}

View File

@@ -0,0 +1,47 @@
package research
import (
"context"
"errors"
"testing"
)
type fakeScraper struct {
byURL map[string]string
errs map[string]error
}
func (f *fakeScraper) Fetch(ctx context.Context, url string) (string, error) {
if err, ok := f.errs[url]; ok {
return "", err
}
return f.byURL[url], nil
}
func TestFetchAll_ReturnsOnlySuccessfulPages(t *testing.T) {
s := &fakeScraper{
byURL: map[string]string{
"https://a.example": "content A",
"https://c.example": "content C",
},
errs: map[string]error{"https://b.example": errors.New("timeout")},
}
urls := []string{"https://a.example", "https://b.example", "https://c.example"}
pages, err := FetchAll(context.Background(), s, urls, 4)
if err != nil {
t.Fatalf("FetchAll: %v", err)
}
if len(pages) != 2 {
t.Fatalf("pages len=%d: %+v", len(pages), pages)
}
}
func TestFetchAll_FailsWhenZeroSucceed(t *testing.T) {
s := &fakeScraper{errs: map[string]error{
"https://a.example": errors.New("x"),
"https://b.example": errors.New("y"),
}}
if _, err := FetchAll(context.Background(), s, []string{"https://a.example", "https://b.example"}, 4); err == nil {
t.Fatal("want error when zero pages succeeded")
}
}

View File

@@ -0,0 +1,159 @@
package research_test
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"marktvogt.de/backend/internal/domain/market/research"
"marktvogt.de/backend/internal/pkg/ai"
"marktvogt.de/backend/internal/pkg/scrape"
"marktvogt.de/backend/internal/pkg/search"
)
// validResearchJSON is a minimal JSON document that satisfies researcher_schema.json.
// All stringFeld values use the required sub-structure; datumFeld values include valid date strings.
const validResearchJSON = `{
"markt_name": "Testmarkt Berlin",
"recherche_datum": "2026-04-24",
"status": "bestaetigt",
"quellen_gesamt": ["http://example.com/testmarkt"],
"felder": {
"website": {"wert": "http://example.com/testmarkt", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"strasse": {"wert": "Hauptstraße 1", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"plz": {"wert": "10115", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"stadt": {"wert": "Berlin", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"bundesland": {"wert": "Berlin", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"land": {"wert": "Deutschland", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"veranstalter": {"wert": "Musterveranstalter GmbH", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"start_datum": {"wert": "2026-06-01", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"end_datum": {"wert": "2026-06-03", "quellen": ["http://example.com/testmarkt"], "extraktion": "direkt", "hinweis": null},
"oeffnungszeiten": {
"wert": [{"datum_von": "2026-06-01", "datum_bis": "2026-06-01", "von": "10:00", "bis": "20:00"}],
"quellen": ["http://example.com/testmarkt"],
"extraktion": "direkt",
"hinweis": null
},
"eintrittspreise": {
"wert": [{"name": "Erwachsene", "betrag": 8.00, "waehrung": "EUR"}],
"quellen": ["http://example.com/testmarkt"],
"extraktion": "direkt",
"hinweis": null
},
"bild_url": {"wert": null, "quellen": [], "extraktion": "direkt", "hinweis": null}
}
}`
// fakeOllamaHandler returns a valid Ollama non-streaming chat response whose
// content field contains validResearchJSON.
func fakeOllamaHandler(t *testing.T) http.Handler {
t.Helper()
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost || r.URL.Path != "/api/chat" {
http.NotFound(w, r)
return
}
resp := map[string]any{
"model": "test",
"created_at": "2026-04-24T00:00:00Z",
"message": map[string]string{
"role": "assistant",
"content": validResearchJSON,
},
"done": true,
"prompt_eval_count": 10,
"eval_count": 20,
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
t.Errorf("fakeOllamaHandler: encode response: %v", err)
}
})
}
// fakeSearxngHandler returns a SearxNG JSON response whose result URLs point
// at the provided page server.
func fakeSearxngHandler(t *testing.T, pageBaseURL string) http.Handler {
t.Helper()
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/search" {
http.NotFound(w, r)
return
}
resp := map[string]any{
"results": []map[string]any{
{"url": pageBaseURL + "/page1", "title": "Testmarkt Berlin Seite 1", "content": "Mittelalterfest im Juni"},
{"url": pageBaseURL + "/page2", "title": "Testmarkt Berlin Seite 2", "content": "Eintritt und Öffnungszeiten"},
},
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
t.Errorf("fakeSearxngHandler: encode response: %v", err)
}
})
}
// fakePageHandler serves minimal HTML at /page1 and /page2 and counts hits.
func fakePageHandler(t *testing.T, hits *atomic.Int64) http.Handler {
t.Helper()
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/page1", "/page2":
hits.Add(1)
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_, _ = w.Write([]byte(`<html><body><p>Testmarkt Berlin findet statt.</p></body></html>`))
default:
http.NotFound(w, r)
}
})
}
func TestIntegrationOrchestratorFullPipeline(t *testing.T) {
var pageHits atomic.Int64
// Start fake page server first so we know its URL for SearxNG results.
fakePage := httptest.NewServer(fakePageHandler(t, &pageHits))
defer fakePage.Close()
fakeOllama := httptest.NewServer(fakeOllamaHandler(t))
defer fakeOllama.Close()
fakeSearxng := httptest.NewServer(fakeSearxngHandler(t, fakePage.URL))
defer fakeSearxng.Close()
orch := &research.Orchestrator{
AI: ai.NewOllamaProvider(ai.OllamaConfig{BaseURL: fakeOllama.URL, Model: "test"}),
Search: search.NewSearxng(search.SearxngConfig{BaseURL: fakeSearxng.URL}),
Scraper: scrape.New("test-agent/1.0"),
MaxPages: 4,
Concurrency: 2,
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
out, err := orch.Run(ctx, research.Input{
MarktName: "Testmarkt",
Stadt: "Berlin",
RechercheDatum: time.Date(2026, 6, 1, 0, 0, 0, 0, time.UTC),
})
if err != nil {
t.Fatalf("Orchestrator.Run returned error: %v", err)
}
if !json.Valid(out.Raw) {
t.Fatalf("out.Raw is not valid JSON: %s", string(out.Raw))
}
if err := ai.ValidateSchema(research.SchemaJSON, []byte(out.Raw)); err != nil {
t.Fatalf("out.Raw does not satisfy researcher schema: %v", err)
}
if pageHits.Load() == 0 {
t.Fatal("expected at least one page server hit, got zero")
}
}

View File

@@ -0,0 +1,158 @@
package research
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"time"
"marktvogt.de/backend/internal/pkg/ai"
"marktvogt.de/backend/internal/pkg/search"
)
type Input struct {
MarktName string
Stadt string
StartDatumHint string
WebsiteHint string
SeedURLs []string
RechercheDatum time.Time
}
type Output struct {
Raw json.RawMessage
Provider string
TokensUsed int
Latency time.Duration
}
type Orchestrator struct {
AI ai.Provider
Search search.Client
Scraper Scraper
MaxPages int
Concurrency int
}
func (o *Orchestrator) Run(ctx context.Context, in Input) (Output, error) {
start := time.Now()
// 1. Candidate URLs
candidates := BuildCandidates(in.SeedURLs, in.WebsiteHint, nil, o.MaxPages)
if countDomains(candidates) < 2 {
queries := []string{
fmt.Sprintf("%s %s", in.MarktName, in.Stadt),
fmt.Sprintf("%s %s %d", in.MarktName, in.Stadt, in.RechercheDatum.Year()),
}
var extra []string
for _, q := range queries {
res, err := o.Search.Search(ctx, q, search.Options{Language: "de", Count: 10})
if err != nil {
break
}
for _, r := range res {
extra = append(extra, r.URL)
}
if len(extra) > 0 {
break
}
}
candidates = BuildCandidates(in.SeedURLs, in.WebsiteHint, extra, o.MaxPages)
}
// 2. Parallel fetch
pages, err := FetchAll(ctx, o.Scraper, candidates, o.Concurrency)
if err != nil {
return Output{}, err
}
// 3. User prompt assembly
userPrompt, err := buildUserPrompt(in, pages)
if err != nil {
return Output{}, err
}
// 4. LLM call with one retry on schema violation
resp, err := callLLM(ctx, o.AI, userPrompt, SchemaJSON)
if err == nil {
if verr := ai.ValidateSchema(SchemaJSON, []byte(resp.Content)); verr != nil {
err = &ai.ProviderError{Code: ai.ErrSchemaViolation, Retryable: true, RawOutput: resp.Content, Inner: verr}
}
}
if err != nil {
var pe *ai.ProviderError
if errors.As(err, &pe) && pe.Code == ai.ErrSchemaViolation {
resp, err = callLLM(ctx, o.AI, userPrompt+"\n\nYour previous response failed schema validation. Re-emit the JSON strictly matching the schema.", SchemaJSON)
if err == nil {
if verr := ai.ValidateSchema(SchemaJSON, []byte(resp.Content)); verr != nil {
err = &ai.ProviderError{Code: ai.ErrSchemaViolation, Retryable: false, RawOutput: resp.Content, Inner: verr}
}
}
}
if err != nil {
return Output{}, err
}
}
return Output{
Raw: json.RawMessage(resp.Content),
Provider: o.AI.Name(),
TokensUsed: resp.TotalTokens,
Latency: time.Since(start),
}, nil
}
func callLLM(ctx context.Context, p ai.Provider, userPrompt string, schema []byte) (*ai.ChatResponse, error) {
return p.Chat(ctx, &ai.ChatRequest{
SystemPrompt: SystemPrompt,
UserMessage: userPrompt,
JSONSchema: schema,
})
}
type userPromptPayload struct {
MarktName string `json:"markt_name"`
Stadt string `json:"stadt"`
StartDatumHint string `json:"start_datum_hinweis"`
WebsiteHint string `json:"website_hinweis"`
RechercheDatum string `json:"recherche_datum"`
Quellen []quellePage `json:"quellen"`
}
type quellePage struct {
URL string `json:"url"`
Title string `json:"titel"`
Text string `json:"text"`
}
func buildUserPrompt(in Input, pages []Page) (string, error) {
p := userPromptPayload{
MarktName: in.MarktName,
Stadt: in.Stadt,
StartDatumHint: in.StartDatumHint,
WebsiteHint: in.WebsiteHint,
RechercheDatum: in.RechercheDatum.Format("2006-01-02"),
}
for _, pg := range pages {
p.Quellen = append(p.Quellen, quellePage(pg))
}
buf, err := json.Marshal(p)
if err != nil {
return "", fmt.Errorf("marshal user prompt: %w", err)
}
return string(buf), nil
}
func countDomains(urls []string) int {
seen := map[string]struct{}{}
for _, raw := range urls {
u, err := url.Parse(raw)
if err != nil || u.Host == "" {
continue
}
seen[u.Host] = struct{}{}
}
return len(seen)
}

View File

@@ -0,0 +1,119 @@
package research
import (
"context"
"errors"
"testing"
"time"
"marktvogt.de/backend/internal/pkg/ai"
"marktvogt.de/backend/internal/pkg/search"
)
type fakeProvider struct {
responses []string
errs []error
calls int
}
func (f *fakeProvider) Name() string { return "fake" }
func (f *fakeProvider) SupportsJSONMode() bool { return true }
func (f *fakeProvider) SupportsJSONSchema() bool { return true }
func (f *fakeProvider) Chat(ctx context.Context, req *ai.ChatRequest) (*ai.ChatResponse, error) {
i := f.calls
f.calls++
if i < len(f.errs) && f.errs[i] != nil {
return nil, f.errs[i]
}
return &ai.ChatResponse{Content: f.responses[i]}, nil
}
type fakeSearch struct{ res []search.Result }
func (f *fakeSearch) Search(ctx context.Context, q string, o search.Options) ([]search.Result, error) {
return f.res, nil
}
func validJSON(t *testing.T) string {
t.Helper()
return `{
"markt_name":"Test",
"recherche_datum":"2026-04-24",
"status":"bestaetigt",
"quellen_gesamt":["https://a.example"],
"felder":{
"website":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null},
"strasse":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null},
"plz":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null},
"stadt":{"wert":"Test","quellen":["https://a.example"],"extraktion":"direkt","hinweis":null},
"bundesland":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null},
"land":{"wert":"Deutschland","quellen":["https://a.example"],"extraktion":"direkt","hinweis":null},
"veranstalter":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null},
"start_datum":{"wert":"2026-06-14","quellen":["https://a.example"],"extraktion":"direkt","hinweis":null},
"end_datum":{"wert":"2026-06-14","quellen":["https://a.example"],"extraktion":"direkt","hinweis":null},
"oeffnungszeiten":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null},
"eintrittspreise":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null},
"bild_url":{"wert":null,"quellen":[],"extraktion":"direkt","hinweis":null}
}
}`
}
func TestOrchestrator_Run_HappyPath(t *testing.T) {
prov := &fakeProvider{responses: []string{validJSON(t)}}
o := &Orchestrator{
AI: prov,
Search: &fakeSearch{},
Scraper: &fakeScraper{byURL: map[string]string{"https://a.example": "content"}},
MaxPages: 6,
Concurrency: 2,
}
out, err := o.Run(context.Background(), Input{
MarktName: "Test",
Stadt: "Teststadt",
SeedURLs: []string{"https://a.example"},
RechercheDatum: time.Now(),
})
if err != nil {
t.Fatalf("Run: %v", err)
}
if out.Provider != "fake" || len(out.Raw) == 0 {
t.Fatalf("unexpected: %+v", out)
}
}
func TestOrchestrator_Run_SchemaViolationRetriesOnce(t *testing.T) {
prov := &fakeProvider{responses: []string{`{"garbage":1}`, validJSON(t)}}
o := &Orchestrator{
AI: prov,
Search: &fakeSearch{},
Scraper: &fakeScraper{byURL: map[string]string{"https://a.example": "content"}},
MaxPages: 6,
Concurrency: 2,
}
_, err := o.Run(context.Background(), Input{MarktName: "Test", Stadt: "X", SeedURLs: []string{"https://a.example"}, RechercheDatum: time.Now()})
if err != nil {
t.Fatalf("Run: %v", err)
}
if prov.calls != 2 {
t.Fatalf("expected 2 calls, got %d", prov.calls)
}
}
func TestOrchestrator_Run_SchemaViolationFailsAfterRetry(t *testing.T) {
prov := &fakeProvider{responses: []string{`{"garbage":1}`, `{"still":"bad"}`}}
o := &Orchestrator{
AI: prov,
Search: &fakeSearch{},
Scraper: &fakeScraper{byURL: map[string]string{"https://a.example": "content"}},
MaxPages: 6,
Concurrency: 2,
}
_, err := o.Run(context.Background(), Input{MarktName: "Test", Stadt: "X", SeedURLs: []string{"https://a.example"}, RechercheDatum: time.Now()})
if err == nil {
t.Fatal("want schema violation error after retry")
}
var pe *ai.ProviderError
if !errors.As(err, &pe) || pe.Code != ai.ErrSchemaViolation || pe.RawOutput == "" {
t.Fatalf("want SchemaViolation with RawOutput, got %v", err)
}
}

View File

@@ -0,0 +1,6 @@
package research
import _ "embed"
//go:embed assets/researcher_prompt.de.md
var SystemPrompt string

View File

@@ -0,0 +1,6 @@
package research
import _ "embed"
//go:embed assets/researcher_schema.json
var SchemaJSON []byte

View File

@@ -0,0 +1,19 @@
package research
import (
"encoding/json"
"testing"
)
func TestSchemaJSON_IsValidJSON(t *testing.T) {
var v any
if err := json.Unmarshal(SchemaJSON, &v); err != nil {
t.Fatalf("schema not valid JSON: %v", err)
}
}
func TestSystemPrompt_NonEmpty(t *testing.T) {
if len(SystemPrompt) < 500 {
t.Fatalf("system prompt seems truncated: %d bytes", len(SystemPrompt))
}
}

View File

@@ -0,0 +1,20 @@
package search
import "context"
type Client interface {
Search(ctx context.Context, query string, opts Options) ([]Result, error)
}
type Options struct {
Language string
Count int
Region string
}
type Result struct {
URL string
Title string
Snippet string
Score float64
}

View File

@@ -0,0 +1,19 @@
package search
import (
"context"
"testing"
)
type stubClient struct {
results []Result
err error
}
func (s *stubClient) Search(ctx context.Context, query string, opts Options) ([]Result, error) {
return s.results, s.err
}
func TestStubSatisfiesInterface(t *testing.T) {
var _ Client = (*stubClient)(nil)
}

View File

@@ -0,0 +1,77 @@
package search
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
)
type SearxngConfig struct {
BaseURL string
Timeout time.Duration
}
type Searxng struct {
cfg SearxngConfig
client *http.Client
}
func NewSearxng(cfg SearxngConfig) *Searxng {
if cfg.Timeout == 0 {
cfg.Timeout = 15 * time.Second
}
return &Searxng{cfg: cfg, client: &http.Client{Timeout: cfg.Timeout}}
}
type searxngResponse struct {
Results []struct {
URL string `json:"url"`
Title string `json:"title"`
Content string `json:"content"`
Score float64 `json:"score"`
} `json:"results"`
}
func (s *Searxng) Search(ctx context.Context, query string, opts Options) ([]Result, error) {
q := url.Values{}
q.Set("q", query)
q.Set("format", "json")
if opts.Language != "" {
q.Set("language", opts.Language)
}
if opts.Count > 0 {
q.Set("pageno", "1")
q.Set("safesearch", "0")
}
u := s.cfg.BaseURL + "/search?" + q.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, fmt.Errorf("searxng new request: %w", err)
}
req.Header.Set("Accept", "application/json")
resp, err := s.client.Do(req)
if err != nil {
return nil, fmt.Errorf("searxng do: %w", err)
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("searxng status %d: %s", resp.StatusCode, string(body))
}
var out searxngResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("searxng decode: %w", err)
}
results := make([]Result, 0, len(out.Results))
for _, r := range out.Results {
results = append(results, Result{URL: r.URL, Title: r.Title, Snippet: r.Content, Score: r.Score})
}
return results, nil
}

View File

@@ -0,0 +1,49 @@
package search
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
)
func TestSearxng_Search_ParsesResults(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/search" {
t.Errorf("path: %s", r.URL.Path)
}
if r.URL.Query().Get("q") != "mittelaltermarkt esslingen" {
t.Errorf("q: %q", r.URL.Query().Get("q"))
}
if r.URL.Query().Get("format") != "json" {
t.Errorf("format must be json: %q", r.URL.Query().Get("format"))
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"results":[
{"url":"https://a.example","title":"A","content":"snippet A","score":0.9},
{"url":"https://b.example","title":"B","content":"snippet B","score":0.5}
]}`))
}))
defer srv.Close()
c := NewSearxng(SearxngConfig{BaseURL: srv.URL, Timeout: 5 * time.Second})
res, err := c.Search(context.Background(), "mittelaltermarkt esslingen", Options{Language: "de", Count: 5})
if err != nil {
t.Fatalf("Search: %v", err)
}
if len(res) != 2 || res[0].URL != "https://a.example" || res[0].Score != 0.9 {
t.Fatalf("unexpected: %+v", res)
}
}
func TestSearxng_Search_PropagatesHTTPError(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "upstream exploded", http.StatusBadGateway)
}))
defer srv.Close()
c := NewSearxng(SearxngConfig{BaseURL: srv.URL, Timeout: time.Second})
if _, err := c.Search(context.Background(), "x", Options{}); err == nil {
t.Fatal("want error")
}
}

View File

@@ -17,6 +17,7 @@ import (
"marktvogt.de/backend/internal/pkg/email"
"marktvogt.de/backend/internal/pkg/geocode"
"marktvogt.de/backend/internal/pkg/scrape"
"marktvogt.de/backend/internal/pkg/search"
"marktvogt.de/backend/internal/pkg/turnstile"
)
@@ -73,14 +74,15 @@ func (s *Server) registerRoutes() {
if err != nil {
panic(fmt.Errorf("init ai provider: %w", err))
}
researchHandler := market.NewResearchHandler(marketSvc, aiProvider)
scraper := scrape.New(s.cfg.Discovery.CrawlerUserAgent)
searchClient := search.NewSearxng(search.SearxngConfig{BaseURL: s.cfg.Search.SearxngURL})
researchHandler := market.NewResearchHandler(marketSvc, aiProvider, searchClient, scraper)
requireAdmin := middleware.RequireRole("admin")
market.RegisterAdminRoutes(v1, adminMarketHandler, researchHandler, requireAuth, requireAdmin)
// Discovery routes
discoveryRepo := discovery.NewRepository(s.db)
crawlerInstance := crawler.NewCrawler(s.cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs())
scraper := scrape.New(s.cfg.Discovery.CrawlerUserAgent)
llmEnricher := enrich.NewLLMEnricher(aiProvider, scraper)
simClassifier := enrich.NewSimilarityClassifier(aiProvider)
discoveryService := discovery.NewService(discoveryRepo, crawlerInstance, discovery.NewLinkChecker(), marketSvc, geocoder, llmEnricher, simClassifier)