Merge branch 'feat/discovery-crawler' — DACH discovery crawler MR 1

Replaces Mistral Pass 0 with a deterministic 5-source Go crawler
(marktkalendarium.de, mittelalterkalender.info, festival-alarm.com,
mittelaltermarkt.online Tribe REST, suendenfrei.tv). Pass 1/2 enrichment
paths unchanged. Existing Mistral Tick path preserved alongside; cutover
gated on coverage verification via cmd/discovery-compare.

Spec: docs/superpowers/specs/2026-04-18-dach-discovery-crawler-design.md
Plan: docs/superpowers/plans/2026-04-18-dach-discovery-crawler.md
This commit is contained in:
2026-04-18 17:03:27 +02:00
52 changed files with 21904 additions and 185 deletions

View File

@@ -10,6 +10,7 @@ repos:
exclude: ^web/tsconfig\.json$
- id: check-merge-conflict
- id: check-added-large-files
exclude: ^backend/internal/domain/discovery/crawler/testdata/
- id: no-commit-to-branch
args: ['--branch', 'main']

View File

@@ -0,0 +1,233 @@
// discovery-compare: Run the crawler and Mistral Pass 0 against a sample of
// (land, region, year-month) buckets; emit markdown diff to stdout or --out.
// Purpose: verify crawler coverage before MR 2 deletes the Mistral path.
//
// Requires the usual backend env vars. JWT_SECRET must be set (any value works
// for this tool since no HTTP server is started).
package main
import (
"context"
"flag"
"fmt"
"os"
"sort"
"strings"
"time"
"marktvogt.de/backend/internal/config"
"marktvogt.de/backend/internal/domain/discovery"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/pkg/ai"
)
type sampleBucket struct {
Land string
Region string
YearMonth string
}
func main() {
var (
bucketsFlag = flag.String("buckets", "", "comma-separated LAND:REGION:YYYY-MM list (e.g., Deutschland:Bayern:2026-04)")
outFlag = flag.String("out", "", "write markdown report to this file (default: stdout)")
)
flag.Parse()
if *bucketsFlag == "" {
fmt.Fprintln(os.Stderr, "--buckets required")
os.Exit(2)
}
buckets, err := parseBuckets(*bucketsFlag)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(2)
}
if err := run(buckets, *outFlag); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func run(buckets []sampleBucket, outPath string) error {
cfg, err := config.Load()
if err != nil {
return fmt.Errorf("load config: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
// 1. Crawler run (dry-run — no DB inserts).
cr := crawler.NewCrawler(cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs())
res, err := cr.RunAll(ctx)
if err != nil {
return fmt.Errorf("crawler: %w", err)
}
crawlerEvents := make([]crawler.RawEvent, 0)
for _, evs := range res.PerSource {
crawlerEvents = append(crawlerEvents, evs...)
}
merged := crawler.Merge(crawlerEvents)
crawlerByBucket := groupCrawlerByBucket(merged, buckets)
// 2. Mistral Pass 0 run (one per sample bucket).
aiClient := ai.New(cfg.AI.APIKey, cfg.AI.AgentSimple, cfg.AI.ModelComplex, cfg.AI.RateLimitRPS)
agentClient := discovery.NewAgentClient(aiClient, cfg.AI.AgentDiscovery)
mistralByBucket := runMistralForBuckets(ctx, agentClient, buckets)
// 3. Emit report.
report := buildMarkdownReport(buckets, crawlerByBucket, mistralByBucket)
if outPath == "" {
fmt.Println(report)
return nil
}
if err := os.WriteFile(outPath, []byte(report), 0644); err != nil {
return fmt.Errorf("write out: %w", err)
}
return nil
}
func parseBuckets(s string) ([]sampleBucket, error) {
parts := strings.Split(s, ",")
out := make([]sampleBucket, 0, len(parts))
for _, item := range parts {
fields := strings.Split(strings.TrimSpace(item), ":")
if len(fields) != 3 {
return nil, fmt.Errorf("bucket %q: want LAND:REGION:YYYY-MM", item)
}
out = append(out, sampleBucket{Land: fields[0], Region: fields[1], YearMonth: fields[2]})
}
return out, nil
}
// groupCrawlerByBucket assigns merged crawler events to sample buckets.
//
// NOTE: this is an approximation for the diagnostic CLI only — not for
// production dedup. The Bundesland match uses `strings.Contains` so a merged
// event with Bundesland="Bayern" will join a bucket with Region="Bay" (or
// "ern"). Good enough to compare coverage between the crawler and Mistral
// Pass 0 at bucket granularity; not safe for business-logic routing.
func groupCrawlerByBucket(merged []crawler.MergedEvent, buckets []sampleBucket) map[string][]crawler.MergedEvent {
result := make(map[string][]crawler.MergedEvent)
for _, b := range buckets {
result[bucketKey(b)] = nil
}
for _, m := range merged {
if m.StartDate == nil {
continue
}
ym := m.StartDate.Format("2006-01")
for _, b := range buckets {
if b.YearMonth != ym {
continue
}
if m.Land != "" && m.Land != b.Land {
continue
}
if m.Bundesland != "" && !strings.Contains(m.Bundesland, b.Region) {
continue
}
key := bucketKey(b)
result[key] = append(result[key], m)
}
}
return result
}
func runMistralForBuckets(ctx context.Context, ac *discovery.AgentClient, buckets []sampleBucket) map[string][]discovery.Pass0Market {
out := make(map[string][]discovery.Pass0Market)
for _, b := range buckets {
bk := discovery.Bucket{Land: b.Land, Region: b.Region, YearMonth: b.YearMonth, Halbmonat: "H1"}
resp, err := ac.Discover(ctx, bk)
if err != nil {
fmt.Fprintf(os.Stderr, "mistral %s: %v\n", bucketKey(b), err)
continue
}
out[bucketKey(b)] = resp.Maerkte
}
return out
}
func bucketKey(b sampleBucket) string {
return b.Land + ":" + b.Region + ":" + b.YearMonth
}
func buildMarkdownReport(buckets []sampleBucket, c map[string][]crawler.MergedEvent, m map[string][]discovery.Pass0Market) string {
var sb strings.Builder
sb.WriteString("# Discovery coverage comparison\n\n")
sb.WriteString(fmt.Sprintf("Generated: %s\n\n", time.Now().Format(time.RFC3339)))
for _, b := range buckets {
key := bucketKey(b)
ce := c[key]
me := m[key]
sb.WriteString(fmt.Sprintf("## %s\n\n", key))
sb.WriteString(fmt.Sprintf("- crawler: %d events\n", len(ce)))
sb.WriteString(fmt.Sprintf("- mistral: %d events\n\n", len(me)))
crawlerNames := nameSet(ceNames(ce))
mistralNames := nameSet(meNames(me))
sb.WriteString("### Only in crawler\n")
for _, n := range diff(crawlerNames, mistralNames) {
sb.WriteString(fmt.Sprintf("- %s\n", n))
}
sb.WriteString("\n### Only in mistral\n")
for _, n := range diff(mistralNames, crawlerNames) {
sb.WriteString(fmt.Sprintf("- %s\n", n))
}
sb.WriteString("\n### In both\n")
for _, n := range intersect(crawlerNames, mistralNames) {
sb.WriteString(fmt.Sprintf("- %s\n", n))
}
sb.WriteString("\n")
}
return sb.String()
}
func ceNames(ce []crawler.MergedEvent) []string {
out := make([]string, len(ce))
for i, e := range ce {
out[i] = discovery.NormalizeName(e.Name)
}
return out
}
func meNames(me []discovery.Pass0Market) []string {
out := make([]string, len(me))
for i, e := range me {
out[i] = discovery.NormalizeName(e.MarktName)
}
return out
}
func nameSet(names []string) map[string]bool {
s := make(map[string]bool, len(names))
for _, n := range names {
s[n] = true
}
return s
}
func diff(a, b map[string]bool) []string {
out := make([]string, 0, len(a))
for n := range a {
if !b[n] {
out = append(out, n)
}
}
sort.Strings(out)
return out
}
func intersect(a, b map[string]bool) []string {
out := make([]string, 0)
for n := range a {
if b[n] {
out = append(out, n)
}
}
sort.Strings(out)
return out
}

View File

@@ -3,6 +3,7 @@ module marktvogt.de/backend
go 1.26
require (
github.com/PuerkitoBio/goquery v1.12.0
github.com/VikingOwl91/mistral-go-sdk v1.3.0
github.com/gin-gonic/gin v1.11.0
github.com/go-playground/validator/v10 v10.30.1
@@ -11,12 +12,13 @@ require (
github.com/jackc/pgx/v5 v5.8.0
github.com/pquerna/otp v1.5.0
github.com/valkey-io/valkey-go v1.0.72
golang.org/x/crypto v0.48.0
golang.org/x/crypto v0.49.0
golang.org/x/oauth2 v0.35.0
golang.org/x/time v0.14.0
)
require (
github.com/andybalholm/cascadia v1.3.3 // indirect
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect
github.com/bytedance/sonic v1.14.0 // indirect
github.com/bytedance/sonic/loader v0.3.0 // indirect
@@ -42,9 +44,9 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.3.0 // indirect
golang.org/x/arch v0.20.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/protobuf v1.36.9 // indirect
)

View File

@@ -1,5 +1,9 @@
github.com/PuerkitoBio/goquery v1.12.0 h1:pAcL4g3WRXekcB9AU/y1mbKez2dbY2AajVhtkO8RIBo=
github.com/PuerkitoBio/goquery v1.12.0/go.mod h1:802ej+gV2y7bbIhOIoPY5sT183ZW0YFofScC4q/hIpQ=
github.com/VikingOwl91/mistral-go-sdk v1.3.0 h1:OkTsodDE5lmdf7p2cwScqD2vIk8sScQ2IGk65dUjuz0=
github.com/VikingOwl91/mistral-go-sdk v1.3.0/go.mod h1:f4emNtHUx2zSqY3V0LBz6lNI1jE6q/zh+SEU+/hJ0i4=
github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM=
github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA=
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI=
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bytedance/sonic v1.14.0 h1:/OfKt8HFw0kh2rj8N0F6C/qPGRESq0BbaNZgcNXXzQQ=
@@ -31,6 +35,7 @@ github.com/goccy/go-yaml v1.18.0 h1:8W7wMFS12Pcas7KU+VVkaiCng+kG8QiFeFwzFb+rwuw=
github.com/goccy/go-yaml v1.18.0/go.mod h1:XBurs7gK8ATbW4ZPGKgcbrY1Br56PdM69F7LkFRi1kA=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -84,27 +89,91 @@ github.com/ugorji/go/codec v1.3.0 h1:Qd2W2sQawAfG8XSvzwhBeoGq71zXOC/Q1E9y/wUcsUA
github.com/ugorji/go/codec v1.3.0/go.mod h1:pRBVtBSKl77K30Bv8R2P+cLSGaTtex6fsA2Wjqmfxj4=
github.com/valkey-io/valkey-go v1.0.72 h1:iRWt1hJyOchcEgbHSkRY3aKkcBudxvMaVMsmxuYxuxE=
github.com/valkey-io/valkey-go v1.0.72/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/mock v0.5.2 h1:LbtPTcP8A5k9WPXj54PPPbjcI4Y6lhyOZXn+VS7wNko=
go.uber.org/mock v0.5.2/go.mod h1:wLlUxC2vVTPTaE3UD51E0BGOAElKrILxhVSDYQLld5o=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/arch v0.20.0 h1:dx1zTU0MAE98U+TQ8BLl7XsJbgze2WnNKF/8tGp/Q6c=
golang.org/x/arch v0.20.0/go.mod h1:bdwinDaKcfZUGpH09BB7ZmOfhalA8lQdzl62l8gGWsk=
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ=
golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -26,9 +26,11 @@ type Config struct {
}
type DiscoveryConfig struct {
Token string // bearer token for /tick endpoint
BatchSize int // buckets per tick (default 4)
ForwardMonths int // forward window in months (default 12)
Token string // bearer token for /tick endpoint
BatchSize int // buckets per tick (default 4)
ForwardMonths int // forward window in months (default 12)
CrawlerUserAgent string // user-agent for crawler HTTP requests
CrawlerManualRateLimitPerHour int // max manual crawl requests per hour (1-3600, default 1)
}
type AIConfig struct {
@@ -199,6 +201,11 @@ func Load() (*Config, error) {
slog.Warn("DISCOVERY_TOKEN is empty; /api/v1/admin/discovery/tick is disabled")
}
crawlerRateLimit, err := envInt("DISCOVERY_CRAWLER_MANUAL_RATE_LIMIT_PER_HOUR", 1)
if err != nil {
return nil, fmt.Errorf("DISCOVERY_CRAWLER_MANUAL_RATE_LIMIT_PER_HOUR: %w", err)
}
jwtSecret := envStr("JWT_SECRET", "")
if jwtSecret == "" {
return nil, fmt.Errorf("JWT_SECRET is required")
@@ -285,9 +292,11 @@ func Load() (*Config, error) {
RateLimitRPS: rpsAI,
},
Discovery: DiscoveryConfig{
Token: discoveryToken,
BatchSize: batchSize,
ForwardMonths: forwardMonths,
Token: discoveryToken,
BatchSize: batchSize,
ForwardMonths: forwardMonths,
CrawlerUserAgent: envStr("DISCOVERY_CRAWLER_USER_AGENT", "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0"),
CrawlerManualRateLimitPerHour: crawlerRateLimit,
},
}, nil
}

View File

@@ -0,0 +1,20 @@
package crawler
// Land names used in RawEvent/MergedEvent.Land and throughout discovery. ASCII
// forms (Oesterreich, not Österreich) to keep string comparisons and log output
// stable across locales. Defined in one place so every source parser references
// the same constants.
const (
landDeutschland = "Deutschland"
landOesterreich = "Oesterreich"
landSchweiz = "Schweiz"
)
// Source name constants — used in SourceConfig.Name, switch cases, and tests.
const (
sourceMarktkalendarium = "marktkalendarium"
sourceMittelalterkalender = "mittelalterkalender"
sourceFestivalAlarm = "festival_alarm"
sourceMittelaltermarktOnline = "mittelaltermarkt_online"
sourceSuendenfrei = "suendenfrei"
)

View File

@@ -0,0 +1,81 @@
package crawler
import (
"context"
"fmt"
"log/slog"
"time"
)
// Crawler orchestrates per-source fetch, collects results + errors.
type Crawler struct {
sources []Source
}
// NewCrawler builds source instances from the supplied configs. The UA is
// injected into the shared Fetcher.
func NewCrawler(userAgent string, configs []SourceConfig) *Crawler {
f := NewFetcher(FetcherOptions{UserAgent: userAgent})
c := &Crawler{}
for _, cfg := range configs {
switch cfg.Name {
case sourceMarktkalendarium:
c.sources = append(c.sources, NewMarktkalendarium(f, cfg.URLs))
case sourceMittelalterkalender:
c.sources = append(c.sources, NewMittelalterkalender(f, cfg.URLs))
case sourceFestivalAlarm:
y2u := make(map[int]string, len(cfg.URLs))
for _, u := range cfg.URLs {
// Year is the last 4 digits of the URL.
if len(u) < 4 {
continue
}
var year int
if _, err := fmt.Sscanf(u[len(u)-4:], "%d", &year); err == nil {
y2u[year] = u
}
}
c.sources = append(c.sources, NewFestivalAlarm(f, y2u))
case sourceMittelaltermarktOnline:
c.sources = append(c.sources, NewMittelaltermarktOnline(f, cfg.URLs))
case sourceSuendenfrei:
if len(cfg.URLs) > 0 {
c.sources = append(c.sources, NewSuendenfrei(f, cfg.URLs[0]))
}
default:
slog.Warn("unknown crawler source, skipping", "name", cfg.Name)
}
}
return c
}
// RunAll fetches each source sequentially, isolating errors so one bad source
// doesn't kill the whole run. Per-source elapsed time is recorded.
func (c *Crawler) RunAll(ctx context.Context) (CrawlResult, error) {
res := CrawlResult{
PerSource: make(map[string][]RawEvent),
PerSourceMS: make(map[string]int64),
}
for i, src := range c.sources {
if err := ctx.Err(); err != nil {
return res, err
}
if i > 0 {
if err := sleepCtx(ctx, 1*time.Second); err != nil {
return res, err
}
}
start := time.Now()
events, err := src.Fetch(ctx)
elapsed := time.Since(start).Milliseconds()
res.PerSourceMS[src.Name()] = elapsed
if err != nil {
res.SourceErrors = append(res.SourceErrors, SourceError{Name: src.Name(), Err: err})
slog.ErrorContext(ctx, "crawler source failed", "source", src.Name(), "error", err)
continue
}
res.PerSource[src.Name()] = events
slog.InfoContext(ctx, "crawler source ok", "source", src.Name(), "events", len(events), "elapsed_ms", elapsed)
}
return res, nil
}

View File

@@ -0,0 +1,52 @@
package crawler
import (
"context"
"errors"
"testing"
"time"
)
type stubSource struct {
name string
events []RawEvent
err error
}
func (s *stubSource) Name() string { return s.name }
func (s *stubSource) Fetch(ctx context.Context) ([]RawEvent, error) {
return s.events, s.err
}
func TestCrawlerRunAllCollectsEventsAndErrors(t *testing.T) {
c := &Crawler{
sources: []Source{
&stubSource{name: "a", events: []RawEvent{{SourceName: "a", Name: "X"}}},
&stubSource{name: "b", err: errors.New("boom")},
&stubSource{name: "c", events: []RawEvent{{SourceName: "c", Name: "Y"}}},
},
}
res, err := c.RunAll(context.Background())
if err != nil {
t.Fatal(err)
}
if len(res.PerSource["a"]) != 1 || len(res.PerSource["c"]) != 1 {
t.Errorf("per-source events missing: %+v", res.PerSource)
}
if len(res.SourceErrors) != 1 || res.SourceErrors[0].Name != "b" {
t.Errorf("expected one SourceError for b; got %+v", res.SourceErrors)
}
}
func TestCrawlerRunAllCancellation(t *testing.T) {
c := &Crawler{sources: []Source{
&stubSource{name: "slow", events: []RawEvent{{Name: "X"}}},
}}
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := c.RunAll(ctx)
if err == nil {
t.Fatal("expected ctx error, got nil")
}
_ = time.Second // prevent unused import
}

View File

@@ -0,0 +1,139 @@
package crawler
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"github.com/PuerkitoBio/goquery"
)
// FestivalAlarmSource scrapes festival-alarm.com. The Mittelalter category
// page renders a flat <table> with tr.event-wrapper rows. Genre is in
// td.event-genre; only rows containing "Mittelalter" are emitted. ISO dates
// are available as content attributes on span[itemprop=startDate/endDate].
type FestivalAlarmSource struct {
fetcher *Fetcher
urls []festivalAlarmURL
}
type festivalAlarmURL struct {
URL string
Year int
}
func NewFestivalAlarm(f *Fetcher, yearToURL map[int]string) *FestivalAlarmSource {
urls := make([]festivalAlarmURL, 0, len(yearToURL))
for y, u := range yearToURL {
urls = append(urls, festivalAlarmURL{URL: u, Year: y})
}
return &FestivalAlarmSource{fetcher: f, urls: urls}
}
func (s *FestivalAlarmSource) Name() string { return "festival_alarm" }
func (s *FestivalAlarmSource) Fetch(ctx context.Context) ([]RawEvent, error) {
var all []RawEvent
for i, u := range s.urls {
if i > 0 {
if err := sleepCtx(ctx, 2*time.Second); err != nil {
return all, err
}
}
body, err := s.fetcher.Get(ctx, u.URL, "")
if err != nil {
return all, fmt.Errorf("festival_alarm %s: %w", u.URL, err)
}
events, err := parseFestivalAlarm(body, u.URL, u.Year)
if err != nil {
return all, fmt.Errorf("festival_alarm parse %s: %w", u.URL, err)
}
all = append(all, events...)
}
return all, nil
}
func parseFestivalAlarm(data []byte, sourceURL string, year int) ([]RawEvent, error) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
seen := make(map[string]struct{})
var events []RawEvent
doc.Find("tr.event-wrapper").Each(func(_ int, tr *goquery.Selection) {
// Genre filter — must contain "Mittelalter".
genre := strings.TrimSpace(tr.Find("td.event-genre").Text())
if !strings.Contains(strings.ToLower(genre), "mittelalter") {
return
}
// Name + detail URL from the event-title cell.
titleAnchor := tr.Find("td.event-title a").First()
name := strings.TrimSpace(titleAnchor.Text())
if name == "" {
return
}
href, _ := titleAnchor.Attr("href")
detailURL := resolveURL(sourceURL, strings.TrimSpace(href))
// Dedup by detail URL (same event can be listed multiple times for
// multi-day spans on some pages).
dedupKey := detailURL
if dedupKey == "" {
dedupKey = name
}
if _, exists := seen[dedupKey]; exists {
return
}
seen[dedupKey] = struct{}{}
// Dates — prefer ISO content attributes; fall back to year parameter for year.
startDate := parseDateAttr(tr.Find("span[itemprop='startDate']").First(), year)
endDate := parseDateAttr(tr.Find("span[itemprop='endDate']").First(), year)
if startDate == nil {
return
}
// Venue fields from the hidden event-venue cell.
venue := strings.TrimSpace(tr.Find("span[itemprop='name']").First().Text())
city := strings.TrimSpace(tr.Find("span[itemprop='addressLocality']").Text())
plz := strings.TrimSpace(tr.Find("span[itemprop='postalCode']").Text())
events = append(events, RawEvent{
SourceName: "festival_alarm",
SourceURL: sourceURL,
DetailURL: detailURL,
Name: name,
City: city,
PLZ: plz,
Land: InferLand(plz),
Venue: venue,
StartDate: startDate,
EndDate: endDate,
})
})
return events, nil
}
// parseDateAttr reads the content="YYYY-MM-DD" attribute from an itemprop
// span and returns a *time.Time in UTC. Returns nil if absent or malformed.
// The year parameter is accepted for signature symmetry with future sources
// that may not include full dates in their markup; it is intentionally unused
// here because festival-alarm always emits complete ISO dates.
func parseDateAttr(s *goquery.Selection, year int) *time.Time {
_ = year
content, exists := s.Attr("content")
if !exists || content == "" {
return nil
}
t, err := time.Parse("2006-01-02", content)
if err != nil {
return nil
}
return &t
}

View File

@@ -0,0 +1,31 @@
package crawler
import (
"os"
"testing"
)
func TestFestivalAlarmParse(t *testing.T) {
data, err := os.ReadFile("testdata/festival_alarm.html")
if err != nil {
t.Fatal(err)
}
events, err := parseFestivalAlarm(data, "https://www.festival-alarm.com/Kategorien/Mittelalter-Festivals/(year)/2026", 2026)
if err != nil {
t.Fatalf("parse: %v", err)
}
if len(events) == 0 {
t.Fatal("expected at least one Mittelalter event")
}
for _, e := range events {
if e.SourceName != "festival_alarm" {
t.Errorf("SourceName = %q", e.SourceName)
}
if e.Name == "" {
t.Errorf("Name empty for %+v", e)
}
if e.StartDate == nil || e.StartDate.Year() != 2026 {
t.Errorf("StartDate = %v (want year 2026)", e.StartDate)
}
}
}

View File

@@ -0,0 +1,170 @@
package crawler
import (
"context"
"fmt"
"io"
"net/http"
"time"
)
// Fetcher is the shared HTTP client across all crawler sources. Enforces polite
// defaults: retry on 5xx/network, back off on 429, stop immediately on 4xx.
// Per-host connection cap is on the Transport so all callers share the gate.
type Fetcher struct {
client *http.Client
userAgent string
retryWait []time.Duration
rateLimitWaits []time.Duration
}
type FetcherOptions struct {
UserAgent string
Timeout time.Duration
RetryWait []time.Duration
RateLimitWaits []time.Duration
}
func NewFetcher(opts FetcherOptions) *Fetcher {
if opts.Timeout == 0 {
opts.Timeout = 30 * time.Second
}
if opts.RetryWait == nil {
opts.RetryWait = []time.Duration{2 * time.Second, 5 * time.Second}
}
if opts.RateLimitWaits == nil {
opts.RateLimitWaits = []time.Duration{60 * time.Second, 180 * time.Second}
}
return &Fetcher{
client: &http.Client{
Timeout: opts.Timeout,
Transport: &http.Transport{
MaxIdleConnsPerHost: 2,
MaxConnsPerHost: 1,
IdleConnTimeout: 30 * time.Second,
},
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return http.ErrUseLastResponse
}
return nil
},
},
userAgent: opts.UserAgent,
retryWait: opts.RetryWait,
rateLimitWaits: opts.RateLimitWaits,
}
}
// Get fetches a URL and returns the body bytes. accept overrides the default
// Accept header ("" -> HTML default).
func (f *Fetcher) Get(ctx context.Context, url, accept string) ([]byte, error) {
if accept == "" {
accept = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"
}
attempt := func() (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", f.userAgent)
req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8")
req.Header.Set("Accept", accept)
return f.client.Do(req)
}
// Initial + 5xx retries.
var lastErr error
waits := append([]time.Duration{0}, f.retryWait...)
for i, wait := range waits {
if wait > 0 {
if err := sleepCtx(ctx, wait); err != nil {
return nil, err
}
}
resp, err := attempt()
if err != nil {
lastErr = err
if i < len(waits)-1 {
continue
}
return nil, fmt.Errorf("fetch %s: %w", url, err)
}
// 4xx other than 429: stop immediately.
if resp.StatusCode == http.StatusTooManyRequests {
_ = resp.Body.Close()
return f.handle429(ctx, url, accept)
}
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
_ = resp.Body.Close()
return nil, fmt.Errorf("fetch %s: http %d: %s", url, resp.StatusCode, string(body))
}
// 5xx: retry unless this was the last attempt.
if resp.StatusCode >= 500 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
_ = resp.Body.Close()
lastErr = fmt.Errorf("http %d: %s", resp.StatusCode, string(body))
if i < len(waits)-1 {
continue
}
return nil, fmt.Errorf("fetch %s: %w", url, lastErr)
}
body, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("read body %s: %w", url, err)
}
return body, nil
}
return nil, lastErr
}
// handle429 applies longer 429-specific backoffs, then one more attempt per
// wait in rateLimitWaits. Gives up after the last wait still sees 429 or worse.
func (f *Fetcher) handle429(ctx context.Context, url, accept string) ([]byte, error) {
for _, wait := range f.rateLimitWaits {
if err := sleepCtx(ctx, wait); err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", f.userAgent)
req.Header.Set("Accept-Language", "de-DE,de;q=0.9,en;q=0.8")
req.Header.Set("Accept", accept)
resp, err := f.client.Do(req)
if err != nil {
return nil, fmt.Errorf("fetch %s after 429 wait: %w", url, err)
}
if resp.StatusCode == http.StatusOK {
body, rerr := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if rerr != nil {
return nil, fmt.Errorf("read body %s: %w", url, rerr)
}
return body, nil
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusTooManyRequests {
return nil, fmt.Errorf("fetch %s after 429 wait: http %d", url, resp.StatusCode)
}
}
return nil, fmt.Errorf("fetch %s: rate-limited (429) after all retries", url)
}
func sleepCtx(ctx context.Context, d time.Duration) error {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
return nil
}
}

View File

@@ -0,0 +1,117 @@
package crawler
import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
)
func TestFetcherSuccess(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if ua := r.Header.Get("User-Agent"); !strings.Contains(ua, "Firefox") {
t.Errorf("unexpected UA: %q", ua)
}
w.WriteHeader(200)
_, _ = io.WriteString(w, "ok")
}))
defer srv.Close()
f := NewFetcher(FetcherOptions{UserAgent: "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0"})
body, err := f.Get(context.Background(), srv.URL, "")
if err != nil {
t.Fatal(err)
}
if string(body) != "ok" {
t.Errorf("body = %q; want %q", body, "ok")
}
}
func TestFetcherRetriesOn5xx(t *testing.T) {
var hits int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
n := atomic.AddInt32(&hits, 1)
if n < 3 {
w.WriteHeader(503)
return
}
w.WriteHeader(200)
_, _ = io.WriteString(w, "ok")
}))
defer srv.Close()
f := NewFetcher(FetcherOptions{
UserAgent: "t",
RetryWait: []time.Duration{10 * time.Millisecond, 20 * time.Millisecond},
})
body, err := f.Get(context.Background(), srv.URL, "")
if err != nil {
t.Fatal(err)
}
if string(body) != "ok" {
t.Errorf("body = %q; want %q", body, "ok")
}
if got := atomic.LoadInt32(&hits); got != 3 {
t.Errorf("hits = %d; want 3", got)
}
}
func TestFetcherGivesUpOn4xx(t *testing.T) {
var hits int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&hits, 1)
w.WriteHeader(404)
}))
defer srv.Close()
f := NewFetcher(FetcherOptions{UserAgent: "t", RetryWait: []time.Duration{1 * time.Millisecond}})
_, err := f.Get(context.Background(), srv.URL, "")
if err == nil {
t.Fatal("expected error, got nil")
}
if got := atomic.LoadInt32(&hits); got != 1 {
t.Errorf("hits = %d; want 1 (no retry on 4xx)", got)
}
}
func TestFetcherBacksOffOn429(t *testing.T) {
var hits int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&hits, 1)
w.WriteHeader(429)
}))
defer srv.Close()
f := NewFetcher(FetcherOptions{
UserAgent: "t",
RetryWait: []time.Duration{1 * time.Millisecond},
RateLimitWaits: []time.Duration{1 * time.Millisecond, 2 * time.Millisecond},
})
_, err := f.Get(context.Background(), srv.URL, "")
if err == nil {
t.Fatal("expected error after 429 retries exhausted")
}
// One initial + two rate-limit retries = 3 hits total.
if got := atomic.LoadInt32(&hits); got != 3 {
t.Errorf("hits = %d; want 3", got)
}
}
func TestFetcherHonorsContextCancel(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(500 * time.Millisecond)
}))
defer srv.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
f := NewFetcher(FetcherOptions{UserAgent: "t"})
if _, err := f.Get(ctx, srv.URL, ""); err == nil {
t.Fatal("expected context error, got nil")
}
}

View File

@@ -0,0 +1,151 @@
package crawler
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"github.com/PuerkitoBio/goquery"
)
// MarktkalendariumSource scrapes www.marktkalendarium.de. Table rows:
// Von | Bis | Veranstaltung | Ort | Platz | Webseite | Veranstalter
type MarktkalendariumSource struct {
fetcher *Fetcher
urls []string
}
func NewMarktkalendarium(f *Fetcher, urls []string) *MarktkalendariumSource {
return &MarktkalendariumSource{fetcher: f, urls: urls}
}
func (s *MarktkalendariumSource) Name() string { return "marktkalendarium" }
func (s *MarktkalendariumSource) Fetch(ctx context.Context) ([]RawEvent, error) {
var all []RawEvent
for i, url := range s.urls {
if i > 0 {
if err := sleepCtx(ctx, 2*time.Second); err != nil {
return all, err
}
}
body, err := s.fetcher.Get(ctx, url, "")
if err != nil {
return all, fmt.Errorf("marktkalendarium %s: %w", url, err)
}
events, err := parseMarktkalendarium(body, url)
if err != nil {
return all, fmt.Errorf("marktkalendarium parse %s: %w", url, err)
}
all = append(all, events...)
}
return all, nil
}
func parseMarktkalendarium(data []byte, sourceURL string) ([]RawEvent, error) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
var events []RawEvent
doc.Find("table tr").Each(func(_ int, tr *goquery.Selection) {
cells := tr.Find("td")
if cells.Length() < 4 {
return // header row or layout
}
von := strings.TrimSpace(cells.Eq(0).Text())
bis := strings.TrimSpace(cells.Eq(1).Text())
name := strings.TrimSpace(cells.Eq(2).Text())
ort := strings.TrimSpace(cells.Eq(3).Text())
platz := ""
website := ""
organizer := ""
if cells.Length() >= 5 {
platz = strings.TrimSpace(cells.Eq(4).Text())
}
if cells.Length() >= 6 {
// First anchor href, not the text — text may be "[Facebook link]".
href, _ := cells.Eq(5).Find("a").First().Attr("href")
website = strings.TrimSpace(href)
if website == "" {
website = strings.TrimSpace(cells.Eq(5).Text())
}
}
if cells.Length() >= 7 {
organizer = strings.TrimSpace(cells.Eq(6).Text())
}
if name == "" || von == "" {
return
}
start := parseDEDate(von)
end := parseDEDate(bis)
if start == nil {
return
}
land, plz, city := splitMarktkalendariumOrt(ort)
events = append(events, RawEvent{
SourceName: "marktkalendarium",
SourceURL: sourceURL,
Name: name,
City: city,
PLZ: plz,
Land: land,
StartDate: start,
EndDate: end,
Website: website,
Venue: platz,
Organizer: organizer,
})
})
return events, nil
}
// splitMarktkalendariumOrt parses "D-49186 Bad Iburg" into (land, PLZ, city).
// Returns ("", "", raw) when the prefix or PLZ doesn't match the expected shape.
func splitMarktkalendariumOrt(ort string) (land, plz, city string) {
idx := strings.Index(ort, "-")
if idx < 1 || idx > 3 {
return "", "", ort
}
prefix := ort[:idx]
rest := ort[idx+1:]
switch prefix {
case "D":
land = landDeutschland
case "A":
land = landOesterreich
case "CH":
land = landSchweiz
default:
return "", "", ort
}
sp := strings.IndexByte(rest, ' ')
if sp < 0 {
return land, "", strings.TrimSpace(rest)
}
plz = strings.TrimSpace(rest[:sp])
city = strings.TrimSpace(rest[sp+1:])
return land, plz, city
}
// parseDEDate parses "3.4.2026" and returns nil on any failure.
func parseDEDate(s string) *time.Time {
s = strings.TrimSpace(s)
if s == "" {
return nil
}
for _, layout := range []string{"2.1.2006", "02.01.2006"} {
if t, err := time.Parse(layout, s); err == nil {
return &t
}
}
return nil
}

View File

@@ -0,0 +1,64 @@
package crawler
import (
"os"
"testing"
)
func TestMarktkalendariumParse(t *testing.T) {
data, err := os.ReadFile("testdata/marktkalendarium.html")
if err != nil {
t.Fatal(err)
}
events, err := parseMarktkalendarium(data, "https://www.marktkalendarium.de/maerkte2026.php")
if err != nil {
t.Fatalf("parse: %v", err)
}
if len(events) < 10 {
t.Fatalf("got %d events; expected at least 10", len(events))
}
// Basic shape checks on the first event — tighten once fixture is stable.
e := events[0]
if e.SourceName != sourceMarktkalendarium {
t.Errorf("SourceName = %q; want marktkalendarium", e.SourceName)
}
if e.Name == "" {
t.Error("Name empty")
}
if e.City == "" {
t.Error("City empty")
}
if e.Land != "Deutschland" && e.Land != "Oesterreich" && e.Land != "Schweiz" {
t.Errorf("Land = %q; want DACH country", e.Land)
}
if e.StartDate == nil {
t.Error("StartDate nil")
}
if e.SourceURL != "https://www.marktkalendarium.de/maerkte2026.php" {
t.Errorf("SourceURL = %q", e.SourceURL)
}
}
func TestParseMarktkalendariumOrtField(t *testing.T) {
tests := []struct {
in string
wantLand, wantPLZ, wantCity string
}{
{"D-49186 Bad Iburg", "Deutschland", "49186", "Bad Iburg"},
{"A-1010 Wien", "Oesterreich", "1010", "Wien"},
{"CH-8001 Zuerich", "Schweiz", "8001", "Zuerich"},
{"D-94152 Neuhaus am Inn", "Deutschland", "94152", "Neuhaus am Inn"},
{"garbage", "", "", "garbage"},
}
for _, tc := range tests {
t.Run(tc.in, func(t *testing.T) {
land, plz, city := splitMarktkalendariumOrt(tc.in)
if land != tc.wantLand || plz != tc.wantPLZ || city != tc.wantCity {
t.Errorf("splitMarktkalendariumOrt(%q) = (%q, %q, %q); want (%q, %q, %q)",
tc.in, land, plz, city, tc.wantLand, tc.wantPLZ, tc.wantCity)
}
})
}
}

View File

@@ -0,0 +1,216 @@
package crawler
import (
"sort"
"strings"
"time"
"marktvogt.de/backend/internal/domain/discovery/normalize"
)
// sourceRank: lower = better. Used for field-by-field tie-breaking.
var sourceRank = map[string]int{
"mittelaltermarkt_online": 1,
"marktkalendarium": 2,
"mittelalterkalender": 3,
"festival_alarm": 4,
"suendenfrei": 5,
}
// Merge groups RawEvents by normalized (name, city, start_date) and picks the
// best value for each field using the source rank. Pure function.
//
// Second-pass fold: when NormalizeName collapses an event name entirely to ""
// (e.g. "Mittelaltermarkt" — a pure strip-word), the resulting empty-name bucket
// is folded into any same-(city, date) bucket that has a non-empty normalized
// name. This handles sources that use only generic names for an event that other
// sources describe more specifically.
func Merge(raws []RawEvent) []MergedEvent {
groups := make(map[string][]RawEvent)
order := []string{}
for _, r := range raws {
key := mergeKey(r)
if _, seen := groups[key]; !seen {
order = append(order, key)
}
groups[key] = append(groups[key], r)
}
// Second pass: fold empty-name keys into matching (city, date) keys.
cityDateToKey := make(map[string]string) // cityDate → first non-empty-name key
for _, key := range order {
name, cityDate := splitMergeKey(key)
if name != "" {
if _, exists := cityDateToKey[cityDate]; !exists {
cityDateToKey[cityDate] = key
}
}
}
// Remap empty-name keys and rebuild order without orphan keys.
remapped := make(map[string]string) // emptyKey → targetKey
for _, key := range order {
name, cityDate := splitMergeKey(key)
if name == "" {
if target, ok := cityDateToKey[cityDate]; ok {
remapped[key] = target
}
}
}
for emptyKey, targetKey := range remapped {
groups[targetKey] = append(groups[targetKey], groups[emptyKey]...)
delete(groups, emptyKey)
}
var filteredOrder []string
for _, key := range order {
if _, removed := remapped[key]; !removed {
filteredOrder = append(filteredOrder, key)
}
}
out := make([]MergedEvent, 0, len(filteredOrder))
for _, key := range filteredOrder {
out = append(out, mergeGroup(groups[key]))
}
return out
}
// splitMergeKey returns the normalized-name part and the "city|date" part of a
// merge key (format: "name|city|date").
func splitMergeKey(key string) (name, cityDate string) {
idx := strings.Index(key, "|")
if idx < 0 {
return key, ""
}
return key[:idx], key[idx+1:]
}
func mergeKey(r RawEvent) string {
date := ""
if r.StartDate != nil {
date = r.StartDate.Format("2006-01-02")
}
return normalize.Name(r.Name) + "|" + normalize.City(r.City) + "|" + date
}
func mergeGroup(raws []RawEvent) MergedEvent {
// Sort by source rank so "first non-empty" == "best source's value".
sort.SliceStable(raws, func(i, j int) bool {
return rankOf(raws[i].SourceName) < rankOf(raws[j].SourceName)
})
m := MergedEvent{}
sourceSet := map[string]bool{}
quellenSet := map[string]bool{}
for _, r := range raws {
sourceSet[r.SourceName] = true
if r.SourceURL != "" {
quellenSet[r.SourceURL] = true
}
if r.DetailURL != "" {
quellenSet[r.DetailURL] = true
}
// Longest name wins (regardless of rank).
if len(r.Name) > len(m.Name) {
m.Name = r.Name
}
if m.City == "" {
m.City = r.City
}
if m.PLZ == "" {
m.PLZ = r.PLZ
}
if m.Land == "" {
m.Land = r.Land
}
if m.Bundesland == "" {
m.Bundesland = r.Bundesland
}
if m.StartDate == nil && r.StartDate != nil {
m.StartDate = r.StartDate
}
if m.Venue == "" {
m.Venue = r.Venue
}
if m.Organizer == "" {
m.Organizer = r.Organizer
}
// Website: prefer non-empty from best rank, but only if not social-media.
if m.Website == "" && r.Website != "" && !isSocialURL(r.Website) {
m.Website = r.Website
}
// EndDate: best-rank non-empty, then detect conflict <= 2 days.
if m.EndDate == nil && r.EndDate != nil {
m.EndDate = r.EndDate
continue
}
if m.EndDate != nil && r.EndDate != nil && !sameDay(m.EndDate, r.EndDate) {
diff := r.EndDate.Sub(*m.EndDate)
if diff < 0 {
diff = -diff
}
if diff <= 48*time.Hour {
m.Hinweis = appendHinweis(m.Hinweis, "date_conflict")
}
// differences > 2 days: events were likely different in the first place;
// merge key already collapsed them, but we don't tag noise as a conflict.
}
}
// Fallback: if no non-social website, take best-rank social URL.
if m.Website == "" {
for _, r := range raws {
if r.Website != "" {
m.Website = r.Website
break
}
}
}
m.Quellen = sortedKeys(quellenSet)
m.Sources = sortedKeys(sourceSet)
return m
}
func rankOf(name string) int {
if r, ok := sourceRank[name]; ok {
return r
}
return 999
}
func isSocialURL(u string) bool {
lu := strings.ToLower(u)
for _, domain := range []string{"facebook.com", "instagram.com", "twitter.com", "x.com", "tiktok.com"} {
if strings.Contains(lu, domain) {
return true
}
}
return false
}
func sameDay(a, b *time.Time) bool {
return a.Year() == b.Year() && a.Month() == b.Month() && a.Day() == b.Day()
}
func sortedKeys(m map[string]bool) []string {
out := make([]string, 0, len(m))
for k := range m {
out = append(out, k)
}
sort.Strings(out)
return out
}
func appendHinweis(cur, add string) string {
if cur == "" {
return add
}
if strings.Contains(cur, add) {
return cur
}
return cur + "; " + add
}

View File

@@ -0,0 +1,106 @@
package crawler
import (
"testing"
"time"
)
func mkTime(t *testing.T, s string) *time.Time {
t.Helper()
tm, err := time.Parse("2006-01-02", s)
if err != nil {
t.Fatal(err)
}
return &tm
}
func TestMergeSingleSourcePassthrough(t *testing.T) {
raws := []RawEvent{
{SourceName: "marktkalendarium", SourceURL: "https://a/", Name: "X", City: "Y", StartDate: mkTime(t, "2026-05-01"), EndDate: mkTime(t, "2026-05-03")},
}
merged := Merge(raws)
if len(merged) != 1 {
t.Fatalf("len = %d; want 1", len(merged))
}
if merged[0].Name != "X" || merged[0].City != "Y" {
t.Errorf("unexpected shape: %+v", merged[0])
}
if len(merged[0].Quellen) != 1 || merged[0].Quellen[0] != "https://a/" {
t.Errorf("Quellen = %v", merged[0].Quellen)
}
}
func TestMergeTwoSourcesByRank(t *testing.T) {
raws := []RawEvent{
// marktkalendarium: rich organizer + website
{SourceName: "marktkalendarium", SourceURL: "https://mk/", Name: "Mittelaltermarkt X", City: "Dresden", PLZ: "01067", StartDate: mkTime(t, "2026-05-01"), EndDate: mkTime(t, "2026-05-03"), Website: "https://organizer.de", Organizer: "Verein Y"},
// mittelaltermarkt_online (rank 1): adds detail URL and venue
{SourceName: "mittelaltermarkt_online", SourceURL: "https://mo/", DetailURL: "https://mo/e/1", Name: "Mittelaltermarkt X", City: "Dresden", PLZ: "01067", StartDate: mkTime(t, "2026-05-01"), EndDate: mkTime(t, "2026-05-03"), Venue: "Stallhof", Land: "Deutschland"},
}
merged := Merge(raws)
if len(merged) != 1 {
t.Fatalf("len = %d; want 1", len(merged))
}
m := merged[0]
if m.Organizer != "Verein Y" {
t.Errorf("Organizer = %q; want 'Verein Y'", m.Organizer)
}
if m.Venue != "Stallhof" {
t.Errorf("Venue = %q; want 'Stallhof'", m.Venue)
}
if m.Website != "https://organizer.de" {
t.Errorf("Website = %q; want 'https://organizer.de'", m.Website)
}
if len(m.Quellen) != 3 { // SourceURL + DetailURL from rank-1 + SourceURL from rank-2
t.Errorf("Quellen = %v", m.Quellen)
}
}
func TestMergeDateConflictHinweis(t *testing.T) {
raws := []RawEvent{
{SourceName: "mittelaltermarkt_online", SourceURL: "https://a/", Name: "X", City: "Y", StartDate: mkTime(t, "2026-05-01"), EndDate: mkTime(t, "2026-05-03")},
{SourceName: "marktkalendarium", SourceURL: "https://b/", Name: "X", City: "Y", StartDate: mkTime(t, "2026-05-01"), EndDate: mkTime(t, "2026-05-05")},
}
merged := Merge(raws)
if len(merged) != 1 {
t.Fatalf("len = %d", len(merged))
}
if !containsSubstr(merged[0].Hinweis, "date_conflict") {
t.Errorf("Hinweis = %q; want date_conflict note", merged[0].Hinweis)
}
// Winning EndDate comes from rank-1 source.
if merged[0].EndDate.Day() != 3 {
t.Errorf("EndDate day = %d; want 3 (rank-1 wins)", merged[0].EndDate.Day())
}
}
func TestMergeSocialURLFilter(t *testing.T) {
raws := []RawEvent{
{SourceName: "marktkalendarium", SourceURL: "https://a/", Name: "X", City: "Y", StartDate: mkTime(t, "2026-05-01"), Website: "https://facebook.com/event/1"},
{SourceName: "mittelalterkalender", SourceURL: "https://b/", Name: "X", City: "Y", StartDate: mkTime(t, "2026-05-01"), Website: "https://realsite.de"},
}
merged := Merge(raws)
if merged[0].Website != "https://realsite.de" {
t.Errorf("Website = %q; want realsite (facebook filtered)", merged[0].Website)
}
}
func TestMergeLongestNameWins(t *testing.T) {
raws := []RawEvent{
{SourceName: "mittelalterkalender", SourceURL: "https://a/", Name: "Mittelaltermarkt", City: "Dresden", StartDate: mkTime(t, "2026-05-01")},
{SourceName: "marktkalendarium", SourceURL: "https://b/", Name: "Mittelaltermarkt zu Dresden im Stallhof", City: "Dresden", StartDate: mkTime(t, "2026-05-01")},
}
merged := Merge(raws)
if merged[0].Name != "Mittelaltermarkt zu Dresden im Stallhof" {
t.Errorf("Name = %q; want longest", merged[0].Name)
}
}
func containsSubstr(s, sub string) bool {
for i := 0; i+len(sub) <= len(s); i++ {
if s[i:i+len(sub)] == sub {
return true
}
}
return false
}

View File

@@ -0,0 +1,127 @@
package crawler
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"github.com/PuerkitoBio/goquery"
)
// MittelalterkalenderSource scrapes www.mittelalterkalender.info. Page has
// twelve monthly <table>s; each has columns: Beginn | Ende | Titel | PLZ | Ort |
// [Details link].
type MittelalterkalenderSource struct {
fetcher *Fetcher
urls []string
}
func NewMittelalterkalender(f *Fetcher, urls []string) *MittelalterkalenderSource {
return &MittelalterkalenderSource{fetcher: f, urls: urls}
}
func (s *MittelalterkalenderSource) Name() string { return "mittelalterkalender" }
func (s *MittelalterkalenderSource) Fetch(ctx context.Context) ([]RawEvent, error) {
var all []RawEvent
for i, url := range s.urls {
if i > 0 {
if err := sleepCtx(ctx, 2*time.Second); err != nil {
return all, err
}
}
body, err := s.fetcher.Get(ctx, url, "")
if err != nil {
return all, fmt.Errorf("mittelalterkalender %s: %w", url, err)
}
events, err := parseMittelalterkalender(body, url)
if err != nil {
return all, fmt.Errorf("mittelalterkalender parse %s: %w", url, err)
}
all = append(all, events...)
}
return all, nil
}
func parseMittelalterkalender(data []byte, sourceURL string) ([]RawEvent, error) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
var events []RawEvent
doc.Find("table tr").Each(func(_ int, tr *goquery.Selection) {
cells := tr.Find("td")
if cells.Length() < 5 {
return
}
// First cell contains start date followed by " bis " span; extract just the date.
beginnText := strings.TrimSpace(cells.Eq(0).Text())
// Remove the "bis" suffix (cell text is "DD.MM.YYYY bis")
if idx := strings.Index(beginnText, " bis"); idx > 0 {
beginnText = beginnText[:idx]
}
beginn := strings.TrimSpace(beginnText)
ende := strings.TrimSpace(cells.Eq(1).Text())
titel := strings.TrimSpace(cells.Eq(2).Text())
plz := strings.TrimSpace(cells.Eq(3).Text())
ort := strings.TrimSpace(cells.Eq(4).Text())
if titel == "" || beginn == "" {
return
}
start := parseDEDate(beginn)
if start == nil {
return
}
end := parseDEDate(ende)
detailURL := ""
if cells.Length() >= 6 {
href, ok := cells.Eq(5).Find("a").First().Attr("href")
if ok {
detailURL = resolveURL(sourceURL, strings.TrimSpace(href))
}
}
events = append(events, RawEvent{
SourceName: "mittelalterkalender",
SourceURL: sourceURL,
DetailURL: detailURL,
Name: titel,
City: ort,
PLZ: plz,
Land: InferLand(plz),
StartDate: start,
EndDate: end,
})
})
return events, nil
}
// resolveURL joins a relative href against the source URL. Leaves absolute
// URLs untouched; empty input returns empty.
func resolveURL(source, href string) string {
if href == "" {
return ""
}
if strings.HasPrefix(href, "http://") || strings.HasPrefix(href, "https://") {
return href
}
if strings.HasPrefix(href, "/") {
// Strip path from source, keep scheme://host.
// Simple impl — source is always a full URL from our config.
end := strings.Index(source[len("https://"):], "/")
if end < 0 {
return source + href
}
return source[:len("https://")+end] + href
}
// Relative to current dir — drop filename from source.
lastSlash := strings.LastIndex(source, "/")
if lastSlash < 0 {
return source + "/" + href
}
return source[:lastSlash+1] + href
}

View File

@@ -0,0 +1,38 @@
package crawler
import (
"os"
"testing"
)
func TestMittelalterkalenderParse(t *testing.T) {
data, err := os.ReadFile("testdata/mittelalterkalender.html")
if err != nil {
t.Fatal(err)
}
events, err := parseMittelalterkalender(data, "https://www.mittelalterkalender.info/mittelaltermarkt/mittelalterfeste-2026-nach-datum.php")
if err != nil {
t.Fatalf("parse: %v", err)
}
t.Logf("Parsed %d events", len(events))
if len(events) < 10 {
t.Fatalf("got %d events; expected at least 10", len(events))
}
e := events[0]
if e.SourceName != sourceMittelalterkalender {
t.Errorf("SourceName = %q", e.SourceName)
}
if e.Name == "" {
t.Error("Name empty")
}
if e.City == "" {
t.Error("City empty")
}
if e.StartDate == nil {
t.Error("StartDate nil")
}
// Land inferred from PLZ via InferLand.
if e.Land == "" && e.PLZ != "" {
t.Errorf("Land empty but PLZ=%q", e.PLZ)
}
}

View File

@@ -0,0 +1,212 @@
package crawler
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
)
// MittelaltermarktOnlineSource pulls events from the Tribe Events REST API on
// mittelaltermarkt.online. Pagination is driven by the next_rest_url field.
type MittelaltermarktOnlineSource struct {
fetcher *Fetcher
startURLs []string
}
func NewMittelaltermarktOnline(f *Fetcher, startURLs []string) *MittelaltermarktOnlineSource {
return &MittelaltermarktOnlineSource{fetcher: f, startURLs: startURLs}
}
func (s *MittelaltermarktOnlineSource) Name() string { return "mittelaltermarkt_online" }
func (s *MittelaltermarktOnlineSource) Fetch(ctx context.Context) ([]RawEvent, error) {
var all []RawEvent
const perHostCap = 100
requests := 0
for _, startURL := range s.startURLs {
url := startURL
for url != "" {
if requests >= perHostCap {
return all, fmt.Errorf("mittelaltermarkt_online: per-host request cap (%d) reached", perHostCap)
}
if requests > 0 {
if err := sleepCtx(ctx, 2*time.Second); err != nil {
return all, err
}
}
body, err := s.fetcher.Get(ctx, url, "application/json")
if err != nil {
return all, fmt.Errorf("mittelaltermarkt_online %s: %w", url, err)
}
events, next, err := parseTribeEvents(body, url)
if err != nil {
return all, fmt.Errorf("mittelaltermarkt_online parse %s: %w", url, err)
}
all = append(all, events...)
url = next
requests++
}
}
return all, nil
}
// tribeCategorySlugAllowlist is the set of category slugs we accept. Empty
// category lists are accepted (the site is Mittelalter-focused overall).
// Slugs are taken from the actual API responses on mittelaltermarkt.online.
var tribeCategorySlugAllowlist = map[string]bool{
"mittelaltermaerkten": true,
"mittelalterfeste": true,
"mittelalterspektakel": true,
"wikingerspektakel": true,
"ritterspektakel": true,
"historisches-fest": true,
}
type tribeEventsResponse struct {
Events []tribeEvent `json:"events"`
NextRestURL string `json:"next_rest_url"`
}
type tribeEvent struct {
ID int `json:"id"`
Title string `json:"title"`
URL string `json:"url"`
Description string `json:"description"`
StartDate string `json:"start_date"`
EndDate string `json:"end_date"`
Website string `json:"website"`
Venue *tribeVenue `json:"venue"`
Organizer json.RawMessage `json:"organizer"` // sometimes object, sometimes []
Categories []tribeTerm `json:"categories"`
}
type tribeVenue struct {
Venue string `json:"venue"`
City string `json:"city"`
Zip string `json:"zip"`
State string `json:"state"`
Country string `json:"country"`
}
type tribeTerm struct {
Slug string `json:"slug"`
Name string `json:"name"`
}
func parseTribeEvents(data []byte, sourceURL string) ([]RawEvent, string, error) {
var resp tribeEventsResponse
if err := json.Unmarshal(data, &resp); err != nil {
return nil, "", fmt.Errorf("unmarshal: %w", err)
}
events := make([]RawEvent, 0, len(resp.Events))
for _, te := range resp.Events {
if te.Title == "" || te.StartDate == "" {
continue
}
if !tribePassesCategoryFilter(te.Categories) {
continue
}
start, err := parseTribeDate(te.StartDate)
if err != nil {
continue
}
var end *time.Time
if e, err := parseTribeDate(te.EndDate); err == nil {
end = e
}
ev := RawEvent{
SourceName: "mittelaltermarkt_online",
SourceURL: sourceURL,
DetailURL: te.URL,
Name: strings.TrimSpace(te.Title),
StartDate: start,
EndDate: end,
Website: te.Website,
}
if te.Venue != nil {
ev.City = te.Venue.City
ev.PLZ = te.Venue.Zip
ev.Venue = te.Venue.Venue
ev.Bundesland = te.Venue.State
ev.Land = tribeCountryToLand(te.Venue.Country)
if ev.Land == "" && ev.PLZ != "" {
ev.Land = InferLand(ev.PLZ)
}
}
ev.Organizer = extractTribeOrganizerName(te.Organizer)
events = append(events, ev)
}
return events, resp.NextRestURL, nil
}
func tribePassesCategoryFilter(cats []tribeTerm) bool {
if len(cats) == 0 {
return true
}
for _, c := range cats {
if tribeCategorySlugAllowlist[c.Slug] {
return true
}
}
return false
}
func tribeCountryToLand(country string) string {
switch strings.ToLower(strings.TrimSpace(country)) {
case "de", "germany", "deutschland":
return landDeutschland
case "at", "austria", "oesterreich", "österreich":
return landOesterreich
case "ch", "switzerland", "schweiz":
return landSchweiz
}
return ""
}
// parseTribeDate parses the Tribe Events JSON date format
// "YYYY-MM-DD HH:MM:SS" in the site's local time. If the site ever switches
// to ISO-8601 with T-separator or an explicit timezone, this function will
// start returning errors and events will be silently dropped. The symptom
// will show up as "eventsFetched > 0 but Discovered = 0" in CrawlSummary —
// at that point switch to the iCal endpoint at
// https://mittelaltermarkt.online/events/?ical=1 which uses the standard
// VEVENT DTSTART format and is more durable across plugin upgrades.
func parseTribeDate(s string) (*time.Time, error) {
s = strings.TrimSpace(s)
if s == "" {
return nil, fmt.Errorf("empty")
}
t, err := time.Parse("2006-01-02 15:04:05", s)
if err != nil {
return nil, err
}
return &t, nil
}
// extractTribeOrganizerName handles the Tribe quirk where organizer is either
// an object, a list of objects, or an empty list.
func extractTribeOrganizerName(raw json.RawMessage) string {
if len(raw) == 0 || string(raw) == "null" || string(raw) == "[]" {
return ""
}
// Try single object first.
var obj struct {
Organizer string `json:"organizer"`
}
if err := json.Unmarshal(raw, &obj); err == nil && obj.Organizer != "" {
return obj.Organizer
}
// Try list.
var list []struct {
Organizer string `json:"organizer"`
}
if err := json.Unmarshal(raw, &list); err == nil && len(list) > 0 {
return list[0].Organizer
}
return ""
}

View File

@@ -0,0 +1,58 @@
package crawler
import (
"os"
"testing"
)
func TestMittelaltermarktOnlineParse(t *testing.T) {
data, err := os.ReadFile("testdata/mittelaltermarkt_online_page1.json")
if err != nil {
t.Fatal(err)
}
events, nextURL, err := parseTribeEvents(data, "https://mittelaltermarkt.online/wp-json/tribe/events/v1/events?per_page=20&start_date=2026-01-01&end_date=2026-12-31")
if err != nil {
t.Fatalf("parse: %v", err)
}
if len(events) == 0 {
t.Fatal("got zero events from page 1")
}
if nextURL == "" {
t.Error("expected next_rest_url on page 1 fixture")
}
e := events[0]
if e.SourceName != sourceMittelaltermarktOnline {
t.Errorf("SourceName = %q", e.SourceName)
}
if e.Name == "" {
t.Error("Name empty")
}
if e.StartDate == nil {
t.Error("StartDate nil")
}
if e.DetailURL == "" {
t.Error("DetailURL empty")
}
}
func TestMittelaltermarktOnlineCategoryFilter(t *testing.T) {
// Event whose categories don't include any of our allowlist should be skipped.
jsonBody := []byte(`{
"events": [
{"id":1, "title":"Skip Me", "url":"https://x/1", "start_date":"2026-05-01 00:00:00",
"end_date":"2026-05-02 00:00:00", "categories":[{"slug":"sommerfest"}]},
{"id":2, "title":"Keep Me", "url":"https://x/2", "start_date":"2026-05-01 00:00:00",
"end_date":"2026-05-02 00:00:00", "categories":[{"slug":"mittelaltermaerkten"}]}
]
}`)
events, _, err := parseTribeEvents(jsonBody, "")
if err != nil {
t.Fatal(err)
}
if len(events) != 1 {
t.Fatalf("got %d events; want 1 (category filter)", len(events))
}
if events[0].Name != "Keep Me" {
t.Errorf("kept = %q; want 'Keep Me'", events[0].Name)
}
}

View File

@@ -0,0 +1,47 @@
package crawler
import "strings"
// InferLand maps a PLZ to a DACH country. 5-digit -> DE. 4-digit is ambiguous
// between AT and CH because Vienna (1000-1239) overlaps Swiss Geneva/Vaud
// (1000-1299). We pick the common case: Vienna wins 1000-1199, and only
// 1200-1299 is classified as CH. Other CH ranges: 3000-3999 Bern,
// 4000-4999 Basel/Aarau, 6000-6999 Luzern/Ticino/Wallis, 8000-8999 Zurich,
// 9000-9999 St. Gallen. Everything else 4-digit falls back to AT.
// Unknown -> "". The Land string is the form used throughout discovery
// (not ISO-2).
func InferLand(plz string) string {
plz = strings.TrimSpace(plz)
switch len(plz) {
case 5:
if !isAllDigits(plz) {
return ""
}
return landDeutschland
case 4:
if !isAllDigits(plz) {
return ""
}
switch plz[0] {
case '1':
if plz >= "1200" && plz <= "1299" {
return landSchweiz
}
return landOesterreich
case '3', '4', '6', '8', '9':
return landSchweiz
default:
return landOesterreich
}
}
return ""
}
func isAllDigits(s string) bool {
for _, r := range s {
if r < '0' || r > '9' {
return false
}
}
return true
}

View File

@@ -0,0 +1,35 @@
package crawler
import "testing"
func TestInferLand(t *testing.T) {
tests := []struct {
name string
plz string
want string
}{
{"empty", "", ""},
{"de 5-digit", "49186", "Deutschland"},
{"de 5-digit low", "01067", "Deutschland"},
{"at 4-digit typical", "1010", "Oesterreich"},
{"at 4-digit boundary 1199", "1199", "Oesterreich"},
{"ch 4-digit boundary 1200", "1200", "Schweiz"},
{"ch 4-digit boundary 1299", "1299", "Schweiz"},
{"ch 4-digit typical", "8001", "Schweiz"},
{"ch 4-digit zurich range", "8000", "Schweiz"},
{"ch 4-digit bern range", "3000", "Schweiz"},
{"ch 4-digit lucerne", "6000", "Schweiz"},
{"ch 4-digit st gallen", "9000", "Schweiz"},
{"at 4-digit outside ch ranges", "2500", "Oesterreich"},
{"short garbage", "12", ""},
{"4-digit non-numeric", "1a1a", ""},
{"non-numeric", "abcde", ""},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if got := InferLand(tc.plz); got != tc.want {
t.Errorf("InferLand(%q) = %q; want %q", tc.plz, got, tc.want)
}
})
}
}

View File

@@ -0,0 +1,37 @@
package crawler
// SourceConfig defines the per-source URL list (years, start pages, etc.).
type SourceConfig struct {
Name string
URLs []string
}
// DefaultSourceConfigs returns the hardcoded Ship 1 source list. Update the
// per-source URL arrays each January (or sooner if a site adds a new year).
func DefaultSourceConfigs() []SourceConfig {
return []SourceConfig{
{Name: sourceMarktkalendarium, URLs: []string{
"https://www.marktkalendarium.de/maerkte2026.php",
"https://www.marktkalendarium.de/maerkte2027.php",
"https://www.marktkalendarium.de/maerkte2028.php",
}},
{Name: sourceMittelalterkalender, URLs: []string{
"https://www.mittelalterkalender.info/mittelaltermarkt/mittelalterfeste-2026-nach-datum.php",
"https://www.mittelalterkalender.info/mittelaltermarkt/historische-feste-mittelaltermaerkte-und-fantasy-festivals-2027-nach-datum.php",
// 2028 slug unknown until the site publishes it — add here when available.
}},
{Name: sourceFestivalAlarm, URLs: []string{
"https://www.festival-alarm.com/Kategorien/Mittelalter-Festivals/(year)/2026",
"https://www.festival-alarm.com/Kategorien/Mittelalter-Festivals/(year)/2027",
"https://www.festival-alarm.com/Kategorien/Mittelalter-Festivals/(year)/2028",
}},
{Name: sourceMittelaltermarktOnline, URLs: []string{
"https://mittelaltermarkt.online/wp-json/tribe/events/v1/events?per_page=100&start_date=2026-01-01&end_date=2026-12-31",
"https://mittelaltermarkt.online/wp-json/tribe/events/v1/events?per_page=100&start_date=2027-01-01&end_date=2027-12-31",
"https://mittelaltermarkt.online/wp-json/tribe/events/v1/events?per_page=100&start_date=2028-01-01&end_date=2028-12-31",
}},
{Name: sourceSuendenfrei, URLs: []string{
"https://www.suendenfrei.tv/veranstaltungen",
}},
}
}

View File

@@ -0,0 +1,172 @@
package crawler
import (
"bytes"
"context"
"fmt"
"log/slog"
"regexp"
"strconv"
"strings"
"time"
"github.com/PuerkitoBio/goquery"
)
// SuendenfreiSource scrapes www.suendenfrei.tv/veranstaltungen. Events are
// <h3><a> with free-form text. A regex parses "<date-range> <name> [in <PLZ> <city>]".
// Unparseable entries are logged at INFO and skipped; Ship 2's local LLM is
// the long-term rescue for prose that doesn't fit the regex.
type SuendenfreiSource struct {
fetcher *Fetcher
baseURL string
}
func truncateForLog(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "..."
}
func NewSuendenfrei(f *Fetcher, baseURL string) *SuendenfreiSource {
return &SuendenfreiSource{fetcher: f, baseURL: baseURL}
}
func (s *SuendenfreiSource) Name() string { return "suendenfrei" }
func (s *SuendenfreiSource) Fetch(ctx context.Context) ([]RawEvent, error) {
var all []RawEvent
const perHostCap = 100
requests := 0
page := 1
for {
if requests >= perHostCap {
return all, fmt.Errorf("suendenfrei: per-host request cap (%d) reached", perHostCap)
}
var url string
if page == 1 {
url = s.baseURL
} else {
url = fmt.Sprintf("%s/page-%d", s.baseURL, page)
}
if requests > 0 {
if err := sleepCtx(ctx, 2*time.Second); err != nil {
return all, err
}
}
body, err := s.fetcher.Get(ctx, url, "")
if err != nil {
// 404 on missing page ends pagination cleanly.
if strings.Contains(err.Error(), "http 404") {
break
}
return all, fmt.Errorf("suendenfrei %s: %w", url, err)
}
events, more := parseSuendenfreiPage(body, url)
all = append(all, events...)
if !more {
break
}
page++
requests++
}
return all, nil
}
// parseSuendenfreiPage extracts events from one listing page. Returns the
// events and whether there were any <h3><a> elements (= continue paginating).
func parseSuendenfreiPage(data []byte, sourceURL string) ([]RawEvent, bool) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(data))
if err != nil {
return nil, false
}
var events []RawEvent
anchors := doc.Find("h3 a")
anchors.Each(func(_ int, a *goquery.Selection) {
text := strings.TrimSpace(a.Text())
if text == "" {
return
}
parsed, ok := parseSuendenfreiHeader(text)
if !ok {
// Unparseable entries are real — footer links, layout anchors,
// or event headers in a prose shape our regex doesn't cover.
// Log at INFO so operators can grep the count post-run without
// introducing a counter we'd have to thread through CrawlSummary.
slog.Info("suendenfrei: unparseable h3 anchor; skipping",
"source_url", sourceURL,
"text", truncateForLog(text, 120),
)
return
}
href, _ := a.Attr("href")
parsed.SourceName = "suendenfrei"
parsed.SourceURL = sourceURL
parsed.DetailURL = resolveURL(sourceURL, strings.TrimSpace(href))
parsed.Land = InferLand(parsed.PLZ)
events = append(events, parsed)
})
return events, anchors.Length() > 0
}
// Month names we accept (lowercase, with and without umlauts). Maps to time.Month.
var suendenfreiMonths = map[string]time.Month{
"januar": time.January,
"februar": time.February,
"maerz": time.March,
"märz": time.March,
"april": time.April,
"mai": time.May,
"juni": time.June,
"juli": time.July,
"august": time.August,
"september": time.September,
"oktober": time.October,
"november": time.November,
"dezember": time.December,
}
// Header like "18. bis 19. April 2026 Mittelaltermarkt in 41849 Tueschenbroich OT von Wegberg".
// Also handles missing space "19.April" and absent " in <PLZ> <city>" suffix.
var suendenfreiHeaderRE = regexp.MustCompile(
`^(\d{1,2})\.?\s*(?:bis|-|)\s*(\d{1,2})\.?\s*([A-Za-zäöüÄÖÜ]+)\s+(\d{4})\s+(.+?)$`,
)
var suendenfreiLocationRE = regexp.MustCompile(`\s+in\s+(\d{5})\s+(.+)$`)
func parseSuendenfreiHeader(text string) (RawEvent, bool) {
m := suendenfreiHeaderRE.FindStringSubmatch(text)
if m == nil {
return RawEvent{}, false
}
startDay, _ := strconv.Atoi(m[1])
endDay, _ := strconv.Atoi(m[2])
monthName := strings.ToLower(m[3])
year, _ := strconv.Atoi(m[4])
rest := strings.TrimSpace(m[5])
month, ok := suendenfreiMonths[monthName]
if !ok {
return RawEvent{}, false
}
name := rest
plz := ""
city := ""
if locM := suendenfreiLocationRE.FindStringSubmatchIndex(rest); locM != nil {
name = strings.TrimSpace(rest[:locM[0]])
plz = rest[locM[2]:locM[3]]
city = strings.TrimSpace(rest[locM[4]:locM[5]])
}
start := time.Date(year, month, startDay, 0, 0, 0, 0, time.UTC)
end := time.Date(year, month, endDay, 0, 0, 0, 0, time.UTC)
return RawEvent{
Name: name,
PLZ: plz,
City: city,
StartDate: &start,
EndDate: &end,
}, true
}

View File

@@ -0,0 +1,67 @@
package crawler
import (
"testing"
"time"
)
func TestSuendenfreiParseHeader(t *testing.T) {
tests := []struct {
in string
wantName string
wantPLZ string
wantCity string
wantStartY int
wantStartM time.Month
wantStartD int
wantEndD int
}{
{
in: "18. bis 19. April 2026 Mittelaltermarkt in 41849 Tueschenbroich OT von Wegberg",
wantName: "Mittelaltermarkt",
wantPLZ: "41849",
wantCity: "Tueschenbroich OT von Wegberg",
wantStartY: 2026, wantStartM: time.April, wantStartD: 18, wantEndD: 19,
},
{
in: "18. bis 19.April 2026 Mittelalterspektakel in 02829 Koenigshain bei Goerlitz",
wantName: "Mittelalterspektakel",
wantPLZ: "02829",
wantCity: "Koenigshain bei Goerlitz",
wantStartY: 2026, wantStartM: time.April, wantStartD: 18, wantEndD: 19,
},
{
in: "25. bis 26. April 2026 Turnierspiele Weissensee",
wantName: "Turnierspiele Weissensee",
wantPLZ: "",
wantCity: "",
wantStartY: 2026, wantStartM: time.April, wantStartD: 25, wantEndD: 26,
},
}
for _, tc := range tests {
t.Run(tc.in, func(t *testing.T) {
got, ok := parseSuendenfreiHeader(tc.in)
if !ok {
t.Fatalf("parse failed for %q", tc.in)
}
if got.Name != tc.wantName {
t.Errorf("Name = %q; want %q", got.Name, tc.wantName)
}
if got.PLZ != tc.wantPLZ {
t.Errorf("PLZ = %q; want %q", got.PLZ, tc.wantPLZ)
}
if got.City != tc.wantCity {
t.Errorf("City = %q; want %q", got.City, tc.wantCity)
}
if got.StartDate == nil {
t.Fatal("StartDate nil")
}
if got.StartDate.Year() != tc.wantStartY || got.StartDate.Month() != tc.wantStartM || got.StartDate.Day() != tc.wantStartD {
t.Errorf("StartDate = %v; want %d-%02d-%02d", got.StartDate, tc.wantStartY, tc.wantStartM, tc.wantStartD)
}
if got.EndDate == nil || got.EndDate.Day() != tc.wantEndD {
t.Errorf("EndDate = %v; want day %d", got.EndDate, tc.wantEndD)
}
})
}
}

View File

@@ -0,0 +1,16 @@
# Crawler test fixtures
Captured with curl using the default Firefox UA on 2026-04-18.
These are the exact bytes each source served at capture time. If a parser starts
failing after a site redesign, re-capture the corresponding file with the curl
commands documented in `docs/superpowers/plans/2026-04-18-dach-discovery-crawler.md`
(Task 2) and update the parser's expected assertions.
- `marktkalendarium.html` — https://www.marktkalendarium.de/maerkte2026.php
- `mittelalterkalender.html` — https://www.mittelalterkalender.info/mittelaltermarkt/mittelalterfeste-2026-nach-datum.php
- `festival_alarm.html` — https://www.festival-alarm.com/Kategorien/Mittelalter-Festivals/(year)/2026
- `mittelaltermarkt_online_page1.json` — Tribe REST API page 1
- `mittelaltermarkt_online_page2.json` — Tribe REST API page 2
- `suendenfrei_page1.html` — https://www.suendenfrei.tv/veranstaltungen
- `suendenfrei_page2.html` — https://www.suendenfrei.tv/veranstaltungen/page-2

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,63 @@
// Package crawler scrapes five DACH Mittelalter event calendars and emits
// RawEvent rows. The merger deduplicates across sources before the events
// flow into the existing discovery pipeline (link verify, validate, dedup).
package crawler
import (
"context"
"time"
)
// Source is implemented by each calendar adapter.
type Source interface {
Name() string
Fetch(ctx context.Context) ([]RawEvent, error)
}
// RawEvent is one event as reported by a single source before merging.
type RawEvent struct {
SourceName string
SourceURL string
DetailURL string
Name string
City string
PLZ string
Land string
Bundesland string
StartDate *time.Time
EndDate *time.Time
Website string
Venue string
Organizer string
}
// MergedEvent is one event after cross-source merging. Quellen holds the union
// of source URLs, Hinweis carries merger-generated notes (date_conflict, etc.).
type MergedEvent struct {
Name string
City string
PLZ string
Land string
Bundesland string
StartDate *time.Time
EndDate *time.Time
Website string
Venue string
Organizer string
Quellen []string
Hinweis string
Sources []string
}
// SourceError records a per-source failure without stopping the whole crawl.
type SourceError struct {
Name string
Err error
}
// CrawlResult is what the orchestrator returns to Service.Crawl.
type CrawlResult struct {
PerSource map[string][]RawEvent
SourceErrors []SourceError
PerSourceMS map[string]int64
}

View File

@@ -1,9 +1,12 @@
package discovery
import (
"fmt"
"log/slog"
"net/http"
"strconv"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
@@ -12,11 +15,32 @@ import (
)
type Handler struct {
service *Service
service *Service
crawlMu sync.Mutex
crawlLastManual time.Time
crawlRateLimit time.Duration
}
func NewHandler(s *Service) *Handler {
return &Handler{service: s}
// NewHandler constructs a Handler. manualRateLimitPerHour controls how
// frequently the admin-session /crawl-manual endpoint may be invoked:
//
// <= 0 : disabled (no rate limit — every request is allowed)
// 1 : 1 request per hour (default)
// > 1 : N requests per hour, evenly spaced
//
// The bearer-token /crawl endpoint always bypasses this limit via the
// `crawl_bypass_rate_limit` gin-context flag set by its route handler.
func NewHandler(s *Service, manualRateLimitPerHour int) *Handler {
var rl time.Duration
switch {
case manualRateLimitPerHour <= 0:
rl = 0 // sentinel: rate limiting disabled
case manualRateLimitPerHour == 1:
rl = time.Hour
default:
rl = time.Hour / time.Duration(manualRateLimitPerHour)
}
return &Handler{service: s, crawlRateLimit: rl}
}
func (h *Handler) Tick(c *gin.Context) {
@@ -30,6 +54,41 @@ func (h *Handler) Tick(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"data": summary})
}
// Crawl runs the crawler once and returns CrawlSummary. A per-process mutex
// blocks concurrent runs. The manual-admin rate limit is bypassed when
// "crawl_bypass_rate_limit" is set in the gin context (set by the bearer-token
// middleware path).
func (h *Handler) Crawl(c *gin.Context) {
if !h.crawlMu.TryLock() {
apiErr := apierror.TooManyRequests("A crawler run is already in progress")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
defer h.crawlMu.Unlock()
if _, bypass := c.Get("crawl_bypass_rate_limit"); !bypass && h.crawlRateLimit > 0 {
if since := time.Since(h.crawlLastManual); since < h.crawlRateLimit {
retryIn := (h.crawlRateLimit - since).Seconds()
c.Header("Retry-After", fmt.Sprint(int(retryIn)+1))
apiErr := apierror.TooManyRequests(
fmt.Sprintf("Manual crawl rate-limited; try again in ~%.0fs", retryIn),
)
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
h.crawlLastManual = time.Now()
}
summary, err := h.service.Crawl(c.Request.Context())
if err != nil {
slog.ErrorContext(c.Request.Context(), "discovery crawl failed", "error", err)
apiErr := apierror.Internal("crawl failed")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
return
}
c.JSON(http.StatusOK, gin.H{"data": summary})
}
func (h *Handler) Stats(c *gin.Context) {
s, err := h.service.Stats(c.Request.Context())
if err != nil {

View File

@@ -0,0 +1,150 @@
package discovery
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/gin-gonic/gin"
"marktvogt.de/backend/internal/domain/discovery/crawler"
)
func init() {
gin.SetMode(gin.TestMode)
}
// blockingCrawlerRunner blocks until its ctx is cancelled.
type blockingCrawlerRunner struct {
started chan struct{}
}
func (b *blockingCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult, error) {
close(b.started)
<-ctx.Done()
return crawler.CrawlResult{}, ctx.Err()
}
// TestCrawlHandlerMutexReentry verifies that a second concurrent Crawl request
// gets HTTP 429 while the first is still running.
func TestCrawlHandlerMutexReentry(t *testing.T) {
bc := &blockingCrawlerRunner{started: make(chan struct{})}
svc := NewServiceWithCrawler(newMockRepo(), bc, noopLinkVerifier{}, noopMarketCreator{})
h := NewHandler(svc, 1)
// First request — runs in a goroutine so the handler blocks.
w1 := httptest.NewRecorder()
c1, _ := gin.CreateTestContext(w1)
ctx1, cancel1 := context.WithCancel(context.Background())
defer cancel1()
c1.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil).WithContext(ctx1)
c1.Set("crawl_bypass_rate_limit", true) // bypass rate limit; test only mutex
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
h.Crawl(c1)
}()
// Wait until the blocking crawler has started and holds the mutex.
<-bc.started
// Second request — should see 429 because mutex is held.
w2 := httptest.NewRecorder()
c2, _ := gin.CreateTestContext(w2)
c2.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl", nil)
c2.Set("crawl_bypass_rate_limit", true)
h.Crawl(c2)
if w2.Code != http.StatusTooManyRequests {
t.Errorf("expected 429 from second concurrent request, got %d", w2.Code)
}
// Cancel the first request so the goroutine can unblock.
cancel1()
wg.Wait()
}
// TestCrawlHandlerRateLimit verifies that a second manual (non-bypass) request
// within the rate limit window returns 429 with Retry-After.
func TestCrawlHandlerRateLimit(t *testing.T) {
// Use an instant-returning crawler so the mutex is released quickly.
svc := NewServiceWithCrawler(
newMockRepo(),
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
)
// Set a long rate limit (1 per hour = 1h window).
h := NewHandler(svc, 1)
doRequest := func(bypass bool) *httptest.ResponseRecorder {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
if bypass {
c.Set("crawl_bypass_rate_limit", true)
}
h.Crawl(c)
return w
}
// First request — not bypassed. Should succeed (200).
w1 := doRequest(false)
if w1.Code != http.StatusOK {
t.Fatalf("first request: expected 200, got %d body=%s", w1.Code, w1.Body.String())
}
// Second request immediately after — should be rate-limited (429).
w2 := doRequest(false)
if w2.Code != http.StatusTooManyRequests {
t.Errorf("second request: expected 429, got %d body=%s", w2.Code, w2.Body.String())
}
if w2.Header().Get("Retry-After") == "" {
t.Error("second request: expected Retry-After header to be set")
}
// Bypassed request should succeed even within the window.
w3 := doRequest(true)
if w3.Code != http.StatusOK {
t.Errorf("bypass request: expected 200, got %d body=%s", w3.Code, w3.Body.String())
}
}
// TestCrawlHandlerRateLimitResets verifies that a manual request succeeds once
// the rate limit window has elapsed.
func TestCrawlHandlerRateLimitResets(t *testing.T) {
svc := NewServiceWithCrawler(
newMockRepo(),
&stubCrawlerRunner{result: crawler.CrawlResult{}},
noopLinkVerifier{},
noopMarketCreator{},
)
h := NewHandler(svc, 1)
// Force crawlLastManual to well in the past so the window has expired.
h.crawlLastManual = time.Now().Add(-2 * time.Hour)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodPost, "/admin/discovery/crawl-manual", nil)
h.Crawl(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200 after window reset, got %d body=%s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("response body not valid JSON: %v", err)
}
if _, ok := resp["data"]; !ok {
t.Error("expected 'data' key in response body")
}
}

View File

@@ -24,6 +24,10 @@ type mockRepo struct {
markAcceptedFn func(ctx context.Context, tx pgx.Tx, id, eid, r uuid.UUID) error
markRejectedFn func(ctx context.Context, tx pgx.Tx, id, r uuid.UUID) error
insertRejFn func(ctx context.Context, tx pgx.Tx, rej RejectedDiscovery) error
// inserted captures every DiscoveredMarket passed to InsertDiscovered.
// Populated by newMockRepo(); nil when insertDiscFn is set externally.
inserted []DiscoveredMarket
}
func (m *mockRepo) PickStaleBuckets(ctx context.Context, fm, lim int) ([]Bucket, error) {

View File

@@ -27,7 +27,7 @@ type Bucket struct {
// vorjahr_unbestaetigt|abgesagt).
type DiscoveredMarket struct {
ID uuid.UUID `json:"id"`
BucketID uuid.UUID `json:"bucket_id"`
BucketID *uuid.UUID `json:"bucket_id"`
MarktName string `json:"markt_name"`
Stadt string `json:"stadt"`
Bundesland string `json:"bundesland"`
@@ -107,6 +107,27 @@ const (
StatusRejected = "rejected"
)
// AgentStatus constants.
// Mistral Pass 0 produces: bestaetigt | unklar | vorjahr_unbestaetigt | abgesagt.
// The crawler uses its own sentinel value so the validator's agent-specific
// rules (e.g. bestaetigt+vorjahr_hinweis inconsistency) don't fire on crawler-
// produced rows, and so operators can filter the queue by origin.
const (
AgentStatusBestaetigt = "bestaetigt"
AgentStatusUnklar = "unklar"
AgentStatusVorjahrUnbestaetigt = "vorjahr_unbestaetigt"
AgentStatusAbgesagt = "abgesagt"
AgentStatusCrawler = "crawler"
)
// Konfidenz constants. The three-level scale is used by both Pass 0 (agent-
// reported) and the crawler (derived from source agreement + source rank).
const (
KonfidenzHoch = "hoch"
KonfidenzMittel = "mittel"
KonfidenzNiedrig = "niedrig"
)
// Stats is the discovery health snapshot used by the admin dashboard strip.
type Stats struct {
LastTickAt *time.Time `json:"last_tick_at"`

View File

@@ -1,136 +1,17 @@
package discovery
import (
"strings"
"unicode"
"marktvogt.de/backend/internal/domain/discovery/normalize"
)
// umlautMap expands German diacritics.
var umlautMap = map[rune]string{
'ä': "ae", 'ö': "oe", 'ü': "ue", 'ß': "ss",
'Ä': "ae", 'Ö': "oe", 'Ü': "ue",
}
// stripWords are removed from the start and end of normalized market names.
// Kept short intentionally — over-aggressive stripping collides unrelated markets.
var stripWords = map[string]struct{}{
"mittelaltermarkt": {},
"mittelalterlicher": {},
"markt": {},
"zu": {},
"am": {},
"der": {},
"die": {},
"das": {},
"auf": {},
"dem": {},
"den": {},
"in": {},
"im": {},
}
// NormalizeName returns a stable, dedup-safe form of a market name:
// lowercase, umlauts expanded, punctuation stripped, whitespace collapsed,
// leading/trailing filler words removed.
//
// Used exclusively for matching. NOT suitable for slugs — see slugify() in service.go
// for the URL-safe form that preserves identifying words.
func NormalizeName(s string) string {
s = expandUmlauts(s)
s = toLowerAlnumSpace(s)
return trimStripWords(s)
}
// Used exclusively for matching. NOT suitable for slugs.
func NormalizeName(s string) string { return normalize.Name(s) }
// NormalizeCity lowercases, expands umlauts, and collapses internal whitespace.
// Keeps hyphens (Baden-Baden) and punctuation (St. Wendel). Used for
// pre-filtering series candidates by city.
func NormalizeCity(s string) string {
s = expandUmlauts(s)
s = strings.ToLower(s)
return strings.Join(strings.Fields(s), " ")
}
func expandUmlauts(s string) string {
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
if rep, ok := umlautMap[r]; ok {
b.WriteString(rep)
} else {
b.WriteRune(r)
}
}
return b.String()
}
func toLowerAlnumSpace(s string) string {
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
switch {
case unicode.IsLetter(r) || unicode.IsDigit(r):
b.WriteRune(unicode.ToLower(r))
default:
b.WriteRune(' ')
}
}
return b.String()
}
func trimStripWords(s string) string {
tokens := strings.Fields(s)
// trim from left
for len(tokens) > 0 {
if _, ok := stripWords[tokens[0]]; !ok {
break
}
if shouldStopStripping(tokens[1:]) {
break
}
tokens = tokens[1:]
}
// trim from right
for len(tokens) > 0 {
if _, ok := stripWords[tokens[len(tokens)-1]]; !ok {
break
}
if shouldStopStripping(tokens[:len(tokens)-1]) {
break
}
tokens = tokens[:len(tokens)-1]
}
return strings.Join(tokens, " ")
}
// shouldStopStripping reports whether the tokens remaining after a hypothetical
// edge-strip would be "meaningless" — i.e., contain only stripwords and purely
// numeric tokens. When true, the caller should preserve the edge stripword to
// avoid destroying identifying content (e.g. "Markt 2026" → "2026"). When the
// remaining set is entirely stripwords (no numerics), stripping continues;
// that's what lets "der die das" reduce to "".
func shouldStopStripping(remaining []string) bool {
nonStripwordCount := 0
allNumeric := true
for _, t := range remaining {
if _, ok := stripWords[t]; ok {
continue
}
nonStripwordCount++
if !isNumericOnly(t) {
allNumeric = false
}
}
return nonStripwordCount > 0 && allNumeric
}
func isNumericOnly(s string) bool {
if s == "" {
return false
}
for _, r := range s {
if !unicode.IsDigit(r) {
return false
}
}
return true
}
func NormalizeCity(s string) string { return normalize.City(s) }

View File

@@ -0,0 +1,129 @@
// Package normalize provides pure string normalization helpers shared between
// the discovery package and the crawler subpackage. Keeping these in a leaf
// package breaks the otherwise circular import:
//
// discovery/crawler → discovery (for NormalizeName/NormalizeCity)
// discovery → discovery/crawler (for Service.Crawl)
package normalize
import (
"strings"
"unicode"
)
// umlautMap expands German diacritics.
var umlautMap = map[rune]string{
'ä': "ae", 'ö': "oe", 'ü': "ue", 'ß': "ss",
'Ä': "ae", 'Ö': "oe", 'Ü': "ue",
}
// stripWords are removed from the start and end of normalized market names.
var stripWords = map[string]struct{}{
"mittelaltermarkt": {},
"mittelalterlicher": {},
"markt": {},
"zu": {},
"am": {},
"der": {},
"die": {},
"das": {},
"auf": {},
"dem": {},
"den": {},
"in": {},
"im": {},
}
// Name returns a stable, dedup-safe form of a market name:
// lowercase, umlauts expanded, punctuation stripped, whitespace collapsed,
// leading/trailing filler words removed.
func Name(s string) string {
s = expandUmlauts(s)
s = toLowerAlnumSpace(s)
return trimStripWords(s)
}
// City lowercases, expands umlauts, and collapses internal whitespace.
// Keeps hyphens (Baden-Baden) and punctuation (St. Wendel).
func City(s string) string {
s = expandUmlauts(s)
s = strings.ToLower(s)
return strings.Join(strings.Fields(s), " ")
}
func expandUmlauts(s string) string {
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
if rep, ok := umlautMap[r]; ok {
b.WriteString(rep)
} else {
b.WriteRune(r)
}
}
return b.String()
}
func toLowerAlnumSpace(s string) string {
var b strings.Builder
b.Grow(len(s))
for _, r := range s {
switch {
case unicode.IsLetter(r) || unicode.IsDigit(r):
b.WriteRune(unicode.ToLower(r))
default:
b.WriteRune(' ')
}
}
return b.String()
}
func trimStripWords(s string) string {
tokens := strings.Fields(s)
for len(tokens) > 0 {
if _, ok := stripWords[tokens[0]]; !ok {
break
}
if shouldStopStripping(tokens[1:]) {
break
}
tokens = tokens[1:]
}
for len(tokens) > 0 {
if _, ok := stripWords[tokens[len(tokens)-1]]; !ok {
break
}
if shouldStopStripping(tokens[:len(tokens)-1]) {
break
}
tokens = tokens[:len(tokens)-1]
}
return strings.Join(tokens, " ")
}
func shouldStopStripping(remaining []string) bool {
nonStripwordCount := 0
allNumeric := true
for _, t := range remaining {
if _, ok := stripWords[t]; ok {
continue
}
nonStripwordCount++
if !isNumericOnly(t) {
allNumeric = false
}
}
return nonStripwordCount > 0 && allNumeric
}
func isNumericOnly(s string) bool {
if s == "" {
return false
}
for _, r := range s {
if !unicode.IsDigit(r) {
return false
}
}
return true
}

View File

@@ -12,7 +12,13 @@ func RegisterRoutes(
// Machine-driven tick (bearer token).
rg.POST("/admin/discovery/tick", requireTickToken, h.Tick)
// Admin-session queue routes.
// Machine-driven crawl (bearer token; bypasses manual rate limit).
rg.POST("/admin/discovery/crawl", requireTickToken, func(c *gin.Context) {
c.Set("crawl_bypass_rate_limit", true)
h.Crawl(c)
})
// Admin-session routes (queue mgmt + manual crawl trigger).
admin := rg.Group("/admin/discovery", requireAuth, requireAdmin)
{
admin.GET("/stats", h.Stats)
@@ -20,5 +26,7 @@ func RegisterRoutes(
admin.PATCH("/queue/:id", h.Update)
admin.POST("/queue/:id/accept", h.Accept)
admin.POST("/queue/:id/reject", h.Reject)
// Manual crawl trigger — subject to hourly rate limit.
admin.POST("/crawl-manual", h.Crawl)
}
}

View File

@@ -5,12 +5,15 @@ import (
"errors"
"fmt"
"log/slog"
"sort"
"strings"
"time"
"github.com/google/uuid"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/market"
"marktvogt.de/backend/internal/pkg/ai"
)
type marketCreator interface {
@@ -18,7 +21,6 @@ type marketCreator interface {
CreateEditionForSeries(ctx context.Context, seriesID uuid.UUID, req market.CreateEditionRequest) (market.Market, error)
}
// Service orchestrates bucket scheduling, agent invocation, and queue management.
// linkVerifier is the narrow interface Service depends on for URL validation.
// *LinkChecker is the real implementation; tests inject a noop stub.
type linkVerifier interface {
@@ -26,9 +28,17 @@ type linkVerifier interface {
CheckURL(ctx context.Context, url string) bool
}
// crawlerRunner is the narrow interface Service depends on for crawler runs.
// *crawler.Crawler is the real implementation; tests inject a stub.
type crawlerRunner interface {
RunAll(ctx context.Context) (crawler.CrawlResult, error)
}
// Service orchestrates bucket scheduling, agent invocation, and queue management.
type Service struct {
repo Repository
agent *AgentClient
crawler crawlerRunner
marketCreator marketCreator
linkChecker linkVerifier
batchSize int
@@ -47,6 +57,24 @@ func NewService(repo Repository, agent *AgentClient, mc marketCreator, batchSize
}
}
// SetCrawler attaches a crawler instance post-construction. For MR 1, this allows
// wiring both Tick (agent-driven) and Crawl (crawler-driven) paths on the same Service.
func (s *Service) SetCrawler(cr crawlerRunner) {
s.crawler = cr
}
// NewServiceWithCrawler constructs a Service wired for the crawler-driven
// Crawl method. The existing Pass 0 Tick path is not wired here (no agent).
// MR 2 will consolidate this with NewService once the Mistral path is removed.
func NewServiceWithCrawler(repo Repository, cr crawlerRunner, lc linkVerifier, mc marketCreator) *Service {
return &Service{
repo: repo,
crawler: cr,
marketCreator: mc,
linkChecker: lc,
}
}
// PickBuckets returns stale buckets eligible for the next discovery run.
func (s *Service) PickBuckets(ctx context.Context) ([]Bucket, error) {
return s.repo.PickStaleBuckets(ctx, s.forwardMonths, s.batchSize)
@@ -93,7 +121,7 @@ func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickS
resp, err := s.agent.Discover(ctx, b)
if err != nil {
wait := 2 * time.Second
if isRateLimit(err) {
if ai.IsRateLimit(err) {
wait = 10 * time.Second
}
select {
@@ -103,7 +131,7 @@ func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickS
}
resp, err = s.agent.Discover(ctx, b)
if err != nil {
if isRateLimit(err) {
if ai.IsRateLimit(err) {
// Leave last_queried_at unchanged so the bucket is re-picked on the
// next tick, and abort the rest of this tick — Mistral's web_search
// budget is shared across buckets, no point hammering it further.
@@ -129,18 +157,6 @@ func (s *Service) processOneBucket(ctx context.Context, b Bucket, summary *TickS
return false
}
// isRateLimit detects Mistral 429 / web_search rate-limit errors. Matches the
// error string shape surfaced by the SDK: '... rate limit reached. (status 429)'.
func isRateLimit(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "rate limit") ||
strings.Contains(msg, "status 429") ||
strings.Contains(msg, "429")
}
func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass0Response) TickSummary {
var summary TickSummary
seen := make(map[string]bool) // in-request dedup
@@ -216,7 +232,7 @@ func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass
}
dm := DiscoveredMarket{
BucketID: b.ID,
BucketID: &b.ID,
MarktName: m.MarktName,
Stadt: m.Stadt,
Bundesland: m.Bundesland,
@@ -235,7 +251,7 @@ func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass
// Semantic validation — catches agent self-contradictions that the
// schema alone cannot. Errors drop the market; warnings would be
// appended to hinweis (none defined yet at Pass 0 scope).
issues := ValidateForInsert(dm, b)
issues := ValidateForInsert(dm, &b)
if HasErrors(issues) {
slog.InfoContext(ctx, "validation failed; skipping market",
"markt", m.MarktName, "stadt", m.Stadt, "issues", formatIssues(issues))
@@ -252,6 +268,203 @@ func (s *Service) processBucketResponse(ctx context.Context, b Bucket, resp Pass
return summary
}
// CrawlSummary reports the outcome of one Service.Crawl run.
type CrawlSummary struct {
StartedAt time.Time `json:"started_at"`
DurationMs int64 `json:"duration_ms"`
PerSource map[string]SourceSummary `json:"per_source"`
Merged int `json:"merged"`
MergedAcrossSites int `json:"merged_across_sites"`
Discovered int `json:"discovered"`
DedupedExisting int `json:"deduped_existing"`
DedupedRejected int `json:"deduped_rejected"`
DedupedQueue int `json:"deduped_queue"`
LinkCheckFailed int `json:"link_check_failed"`
ValidationFailed int `json:"validation_failed"`
DateConflicts int `json:"date_conflicts"`
SourceErrors []map[string]string `json:"source_errors"`
}
// SourceSummary reports per-source fetch statistics.
type SourceSummary struct {
EventsFetched int `json:"events_fetched"`
ElapsedMs int64 `json:"elapsed_ms"`
}
// Crawl orchestrates one crawler run: fetch, merge, and push through the
// existing discovery pipeline (link-verify, validate, dedup, insert).
func (s *Service) Crawl(ctx context.Context) (CrawlSummary, error) {
if s.crawler == nil {
return CrawlSummary{}, errors.New("crawler not configured")
}
summary := CrawlSummary{
StartedAt: time.Now().UTC(),
PerSource: make(map[string]SourceSummary),
}
defer func() { summary.DurationMs = time.Since(summary.StartedAt).Milliseconds() }()
res, err := s.crawler.RunAll(ctx)
if err != nil {
return summary, err
}
// Sort source names for deterministic event ordering across runs;
// Merge's internal bucket order then depends only on input.
sourceNames := make([]string, 0, len(res.PerSource))
for name := range res.PerSource {
sourceNames = append(sourceNames, name)
}
sort.Strings(sourceNames)
var all []crawler.RawEvent
for _, name := range sourceNames {
evs := res.PerSource[name]
summary.PerSource[name] = SourceSummary{
EventsFetched: len(evs),
ElapsedMs: res.PerSourceMS[name],
}
all = append(all, evs...)
}
for _, serr := range res.SourceErrors {
summary.SourceErrors = append(summary.SourceErrors, map[string]string{
"source": serr.Name,
"error": serr.Err.Error(),
})
}
merged := crawler.Merge(all)
summary.Merged = len(merged)
for _, m := range merged {
if len(m.Sources) >= 2 {
summary.MergedAcrossSites++
}
if strings.Contains(m.Hinweis, "date_conflict") {
summary.DateConflicts++
}
}
for _, m := range merged {
quellen := s.linkChecker.FilterURLs(ctx, m.Quellen)
if len(quellen) == 0 {
summary.LinkCheckFailed++
continue
}
website := m.Website
if website != "" && !s.linkChecker.CheckURL(ctx, website) {
website = ""
}
candidates, err := s.repo.ListSeriesByCity(ctx, NormalizeCity(m.City))
if err != nil {
slog.WarnContext(ctx, "list series by city", "city", m.City, "error", err)
continue
}
matchedSeriesID := findSeriesMatch(m.Name, candidates)
year := 0
if m.StartDate != nil {
year = m.StartDate.Year()
}
if matchedSeriesID != nil && year > 0 {
exists, err := s.repo.EditionExists(ctx, *matchedSeriesID, year)
if err != nil {
slog.WarnContext(ctx, "edition exists check", "error", err)
continue
}
if exists {
summary.DedupedExisting++
continue
}
}
nameNorm := NormalizeName(m.Name)
if year > 0 {
rejected, err := s.repo.IsRejected(ctx, nameNorm, m.City, year)
if err != nil {
slog.WarnContext(ctx, "is rejected check", "error", err)
continue
}
if rejected {
summary.DedupedRejected++
continue
}
}
pending, err := s.repo.QueueHasPending(ctx, nameNorm, m.City, m.StartDate)
if err != nil {
slog.WarnContext(ctx, "queue pending check", "error", err)
continue
}
if pending {
summary.DedupedQueue++
continue
}
// Default EndDatum to StartDatum for sources that only reported a
// single date (festival_alarm one-day events, suendenfrei lines
// without a "bis" range). Admin can still edit via /queue/:id
// before accept. Avoids a blocking nil-EndDatum check in Service.Accept.
endDatum := m.EndDate
if endDatum == nil && m.StartDate != nil {
endDatum = m.StartDate
}
dm := DiscoveredMarket{
BucketID: nil,
MarktName: m.Name,
Stadt: m.City,
Bundesland: m.Bundesland,
Land: m.Land,
StartDatum: m.StartDate,
EndDatum: endDatum,
Website: website,
Quellen: quellen,
Konfidenz: crawlerKonfidenz(m),
AgentStatus: AgentStatusCrawler,
Hinweis: m.Hinweis,
NameNormalized: nameNorm,
MatchedSeriesID: matchedSeriesID,
}
issues := ValidateForInsert(dm, nil)
if HasErrors(issues) {
slog.InfoContext(ctx, "validation failed; skipping market",
"markt", m.Name, "stadt", m.City, "issues", formatIssues(issues))
summary.ValidationFailed++
continue
}
if _, err := s.repo.InsertDiscovered(ctx, dm); err != nil {
slog.WarnContext(ctx, "insert discovered", "error", err)
continue
}
summary.Discovered++
}
return summary, nil
}
// crawlerKonfidenz derives a three-level confidence label for a merged event.
// Signal: cross-source agreement is the strongest indicator — two or more
// independent calendars emitting the same (normalized name, city, start_date)
// triple is high confidence. Single-source rows fall back to source rank:
// Tribe JSON and marktkalendarium curate their data, suendenfrei's prose
// regex is brittle.
func crawlerKonfidenz(m crawler.MergedEvent) string {
if len(m.Sources) >= 2 {
return KonfidenzHoch
}
if len(m.Sources) == 1 {
switch m.Sources[0] {
case "mittelaltermarkt_online", "marktkalendarium":
return KonfidenzMittel
case "mittelalterkalender", "festival_alarm":
return KonfidenzMittel
case "suendenfrei":
return KonfidenzNiedrig
}
}
return KonfidenzNiedrig
}
// formatIssues produces a compact log-friendly summary of validation issues.
func formatIssues(issues []Issue) string {
parts := make([]string, 0, len(issues))

View File

@@ -8,9 +8,54 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/market"
)
// newMockRepo returns a mockRepo with default no-op implementations and an
// inserted field that captures every InsertDiscovered call.
func newMockRepo() *mockRepo {
m := &mockRepo{}
m.insertDiscFn = func(_ context.Context, d DiscoveredMarket) (uuid.UUID, error) {
m.inserted = append(m.inserted, d)
return uuid.New(), nil
}
m.listSeriesFn = func(_ context.Context, _ string) ([]SeriesCandidate, error) { return nil, nil }
m.editionExistsFn = func(_ context.Context, _ uuid.UUID, _ int) (bool, error) { return false, nil }
m.isRejectedFn = func(_ context.Context, _, _ string, _ int) (bool, error) { return false, nil }
m.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) { return false, nil }
m.updateBucketFn = func(_ context.Context, _ uuid.UUID, _ string) error { return nil }
m.pickStaleFn = func(_ context.Context, _, _ int) ([]Bucket, error) { return nil, nil }
return m
}
func mustParseDate(t *testing.T, s string) *time.Time {
t.Helper()
tm, err := time.Parse("2006-01-02", s)
if err != nil {
t.Fatalf("mustParseDate(%q): %v", s, err)
}
return &tm
}
type noopMarketCreator struct{}
func (noopMarketCreator) Create(_ context.Context, _ market.CreateMarketRequest) (market.Market, error) {
return market.Market{ID: uuid.New(), SeriesID: uuid.New()}, nil
}
func (noopMarketCreator) CreateEditionForSeries(_ context.Context, sid uuid.UUID, _ market.CreateEditionRequest) (market.Market, error) {
return market.Market{ID: uuid.New(), SeriesID: sid}, nil
}
type stubCrawlerRunner struct {
result crawler.CrawlResult
err error
}
func (s *stubCrawlerRunner) RunAll(ctx context.Context) (crawler.CrawlResult, error) {
return s.result, s.err
}
func TestFindSeriesMatch(t *testing.T) {
target := SeriesCandidate{
ID: uuid.MustParse("11111111-1111-1111-1111-111111111111"),
@@ -222,3 +267,179 @@ func TestAccept_ExistingSeries_CallsCreateEditionForSeries(t *testing.T) {
t.Errorf("expected Create=0 CreateEdition=1, got %d/%d", mc.createCalls, mc.createEditionForSeriesCalls)
}
}
func TestServiceCrawlHappyPath(t *testing.T) {
repo := newMockRepo()
lc := noopLinkVerifier{}
start := mustParseDate(t, "2026-05-01")
end := mustParseDate(t, "2026-05-03")
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
"marktkalendarium": {
{
SourceName: "marktkalendarium", SourceURL: "https://a/",
Name: "Markt X", City: "Dresden", PLZ: "01067", Land: "Deutschland",
StartDate: start, EndDate: end,
},
},
},
PerSourceMS: map[string]int64{"marktkalendarium": 1},
},
}
svc := NewServiceWithCrawler(repo, sc, lc, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {
t.Fatal(err)
}
if summary.Discovered != 1 {
t.Errorf("Discovered = %d; want 1", summary.Discovered)
}
if len(repo.inserted) != 1 {
t.Errorf("inserted = %d; want 1", len(repo.inserted))
}
got := repo.inserted[0]
if got.BucketID != nil {
t.Errorf("BucketID = %v; want nil (crawler-produced row)", got.BucketID)
}
if got.AgentStatus != AgentStatusCrawler {
t.Errorf("AgentStatus = %q; want %q", got.AgentStatus, AgentStatusCrawler)
}
if got.Konfidenz != KonfidenzMittel {
t.Errorf("Konfidenz = %q; want %q (single curated source)", got.Konfidenz, KonfidenzMittel)
}
}
// alwaysFailLinkVerifier filters every URL out — simulates a batch where every
// source URL fails link verification.
type alwaysFailLinkVerifier struct{}
func (alwaysFailLinkVerifier) FilterURLs(_ context.Context, _ []string) []string { return nil }
func (alwaysFailLinkVerifier) CheckURL(_ context.Context, _ string) bool { return false }
func TestServiceCrawlLinkCheckFailed(t *testing.T) {
repo := newMockRepo()
start := mustParseDate(t, "2026-05-01")
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
"marktkalendarium": {
{SourceName: "marktkalendarium", SourceURL: "https://dead/", Name: "X", City: "Y", StartDate: start},
},
},
},
}
svc := NewServiceWithCrawler(repo, sc, alwaysFailLinkVerifier{}, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {
t.Fatal(err)
}
if summary.LinkCheckFailed != 1 {
t.Errorf("LinkCheckFailed = %d; want 1", summary.LinkCheckFailed)
}
if summary.Discovered != 0 {
t.Errorf("Discovered = %d; want 0", summary.Discovered)
}
if len(repo.inserted) != 0 {
t.Errorf("inserted = %d; want 0 (dead link should block insert)", len(repo.inserted))
}
}
func TestServiceCrawlDedupQueue(t *testing.T) {
repo := newMockRepo()
// Simulate: queue already has a matching pending row.
repo.queuePendingFn = func(_ context.Context, _, _ string, _ *time.Time) (bool, error) {
return true, nil
}
start := mustParseDate(t, "2026-05-01")
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
"marktkalendarium": {
{SourceName: "marktkalendarium", SourceURL: "https://a/", Name: "X", City: "Y", StartDate: start},
},
},
},
}
svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {
t.Fatal(err)
}
if summary.DedupedQueue != 1 {
t.Errorf("DedupedQueue = %d; want 1", summary.DedupedQueue)
}
if summary.Discovered != 0 {
t.Errorf("Discovered = %d; want 0 (dupe should block insert)", summary.Discovered)
}
if len(repo.inserted) != 0 {
t.Errorf("inserted = %d; want 0", len(repo.inserted))
}
}
func TestServiceCrawlDefaultsEndDate(t *testing.T) {
repo := newMockRepo()
start := mustParseDate(t, "2026-05-01")
// RawEvent with no EndDate (e.g., festival_alarm one-day event).
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
"marktkalendarium": {
{SourceName: "marktkalendarium", SourceURL: "https://a/", Name: "One Day Fest", City: "Y", StartDate: start, EndDate: nil},
},
},
},
}
svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
if _, err := svc.Crawl(context.Background()); err != nil {
t.Fatal(err)
}
if len(repo.inserted) != 1 {
t.Fatalf("inserted = %d; want 1", len(repo.inserted))
}
got := repo.inserted[0]
if got.EndDatum == nil {
t.Error("EndDatum is nil; expected default to StartDatum")
}
if !got.EndDatum.Equal(*got.StartDatum) {
t.Errorf("EndDatum = %v; want equal to StartDatum %v", got.EndDatum, got.StartDatum)
}
}
func TestServiceCrawlMultiSourceHighKonfidenz(t *testing.T) {
repo := newMockRepo()
start := mustParseDate(t, "2026-05-01")
sc := &stubCrawlerRunner{
result: crawler.CrawlResult{
PerSource: map[string][]crawler.RawEvent{
"marktkalendarium": {{SourceName: "marktkalendarium", SourceURL: "https://a/", Name: "X", City: "Y", StartDate: start}},
"mittelaltermarkt_online": {{SourceName: "mittelaltermarkt_online", SourceURL: "https://b/", Name: "X", City: "Y", StartDate: start}},
},
},
}
svc := NewServiceWithCrawler(repo, sc, noopLinkVerifier{}, noopMarketCreator{})
summary, err := svc.Crawl(context.Background())
if err != nil {
t.Fatal(err)
}
if summary.Discovered != 1 {
t.Errorf("Discovered = %d; want 1 (two sources merge into one event)", summary.Discovered)
}
if summary.MergedAcrossSites != 1 {
t.Errorf("MergedAcrossSites = %d; want 1", summary.MergedAcrossSites)
}
if repo.inserted[0].Konfidenz != KonfidenzHoch {
t.Errorf("Konfidenz = %q; want %q (2+ sources)", repo.inserted[0].Konfidenz, KonfidenzHoch)
}
}

View File

@@ -36,15 +36,15 @@ type Issue struct {
// from, before it hits the queue. Current checks:
//
// - bundesland_mismatch: m.Bundesland does not equal b.Region (CH "Kanton X"
// prefix is normalized).
// prefix is normalized). Skipped when b is nil.
// - status_hinweis_inconsistent: AgentStatus=="bestaetigt" AND hinweis
// mentions "vorjahr" — the agent contradicted itself.
//
// Returns nil when clean.
func ValidateForInsert(m DiscoveredMarket, b Bucket) []Issue {
func ValidateForInsert(m DiscoveredMarket, b *Bucket) []Issue {
var issues []Issue
if m.Bundesland != "" && !regionsEqual(m.Bundesland, b.Region) {
if b != nil && m.Bundesland != "" && !regionsEqual(m.Bundesland, b.Region) {
issues = append(issues, Issue{
Severity: SeverityError,
Code: "bundesland_mismatch",

View File

@@ -8,27 +8,27 @@ func TestValidateForInsert(t *testing.T) {
tests := []struct {
name string
m DiscoveredMarket
b Bucket
b *Bucket
wantCodes []string
wantErrors bool
}{
{
name: "clean",
m: DiscoveredMarket{Bundesland: "Bayern", AgentStatus: "bestaetigt"},
b: baseBucket,
b: &baseBucket,
wantCodes: nil,
},
{
name: "bundesland mismatch",
m: DiscoveredMarket{Bundesland: "Baden-Württemberg", AgentStatus: "bestaetigt"},
b: baseBucket,
b: &baseBucket,
wantCodes: []string{"bundesland_mismatch"},
wantErrors: true,
},
{
name: "bundesland empty is not an error",
m: DiscoveredMarket{Bundesland: "", AgentStatus: "bestaetigt"},
b: baseBucket,
b: &baseBucket,
wantCodes: nil,
},
{
@@ -38,7 +38,7 @@ func TestValidateForInsert(t *testing.T) {
AgentStatus: "bestaetigt",
Hinweis: "Termin aus Vorjahr, noch nicht bestaetigt",
},
b: baseBucket,
b: &baseBucket,
wantCodes: []string{"status_hinweis_inconsistent"},
wantErrors: true,
},
@@ -49,13 +49,13 @@ func TestValidateForInsert(t *testing.T) {
AgentStatus: "vorjahr_unbestaetigt",
Hinweis: "Aus dem Vorjahr uebernommen",
},
b: baseBucket,
b: &baseBucket,
wantCodes: nil,
},
{
name: "kanton prefix is normalized when bucket is just the kanton name",
m: DiscoveredMarket{Bundesland: "Kanton Zürich", AgentStatus: "bestaetigt"},
b: Bucket{Region: "Zürich"},
b: &Bucket{Region: "Zürich"},
wantCodes: nil,
},
{
@@ -65,10 +65,21 @@ func TestValidateForInsert(t *testing.T) {
AgentStatus: "bestaetigt",
Hinweis: "Termin aus dem Vorjahr uebernommen",
},
b: baseBucket,
b: &baseBucket,
wantCodes: []string{"bundesland_mismatch", "status_hinweis_inconsistent"},
wantErrors: true,
},
{
name: "nil bucket skips bundesland_mismatch check",
m: DiscoveredMarket{
MarktName: "Testmarkt",
Stadt: "Testdorf",
Bundesland: "Bayern",
AgentStatus: "bestaetigt",
},
b: nil,
wantCodes: nil,
},
}
for _, tc := range tests {

View File

@@ -69,9 +69,17 @@ func (h *ResearchHandler) Research(c *gin.Context) {
pass1Prompt := buildPass1Prompt(m)
pass1Result, err := h.aiClient.Pass1(ctx, pass1Prompt)
if err != nil {
slog.ErrorContext(ctx, "pass1 failed, retrying", "market_id", id, "error", err)
if ai.IsRateLimit(err) {
h.respondRateLimited(c, "pass1", id, err)
return
}
slog.WarnContext(ctx, "pass1 failed, retrying", "market_id", id, "error", err)
pass1Result, err = h.aiClient.Pass1(ctx, pass1Prompt)
if err != nil {
if ai.IsRateLimit(err) {
h.respondRateLimited(c, "pass1", id, err)
return
}
slog.ErrorContext(ctx, "pass1 retry failed", "market_id", id, "error", err)
apiErr := apierror.Internal("AI research failed")
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
@@ -93,14 +101,27 @@ func (h *ResearchHandler) Research(c *gin.Context) {
allSources := pass1Data.Sources
// --- Pass 2: Description + retry fields via chat completions ---
// Pass 2 is enrichment only — if it fails we still return the pass1 result.
// Rate-limit on pass2 is treated as a warning and the partial result ships.
pass2Prompt := buildPass2UserPrompt(m, pass1Data, retryFields, allSources)
pass2Result, err := h.aiClient.Pass2(ctx, pass2SystemPrompt, pass2Prompt)
var pass2Data pass2Response
if err != nil {
slog.WarnContext(ctx, "pass2 failed, retrying", "market_id", id, "error", err)
pass2Result, err = h.aiClient.Pass2(ctx, pass2SystemPrompt, pass2Prompt)
if err != nil {
slog.WarnContext(ctx, "pass2 retry failed, using pass1 results only", "market_id", id, "error", err)
if ai.IsRateLimit(err) {
slog.WarnContext(ctx, "pass2 rate-limited; returning pass1 results only",
"market_id", id)
} else {
slog.WarnContext(ctx, "pass2 failed, retrying", "market_id", id, "error", err)
pass2Result, err = h.aiClient.Pass2(ctx, pass2SystemPrompt, pass2Prompt)
if err != nil {
if ai.IsRateLimit(err) {
slog.WarnContext(ctx, "pass2 rate-limited on retry; returning pass1 results only",
"market_id", id)
} else {
slog.WarnContext(ctx, "pass2 retry failed, using pass1 results only",
"market_id", id, "error", err)
}
}
}
}
if err == nil {
@@ -116,6 +137,20 @@ func (h *ResearchHandler) Research(c *gin.Context) {
c.JSON(http.StatusOK, ResearchResponse{Data: result})
}
// respondRateLimited writes a 429 with a Retry-After header so the admin UI
// can display a clear "try again in N seconds" message instead of a generic
// 500. pass is a short identifier for the log line ("pass1" / "pass2").
func (h *ResearchHandler) respondRateLimited(c *gin.Context, pass string, marketID uuid.UUID, err error) {
retry := ai.DefaultRetryAfterSeconds
slog.WarnContext(c.Request.Context(), "ai rate-limited",
"pass", pass, "market_id", marketID, "retry_after_s", retry, "error", err)
c.Header("Retry-After", fmt.Sprint(retry))
apiErr := apierror.TooManyRequests(
fmt.Sprintf("AI research temporarily rate-limited; try again in ~%ds", retry),
)
c.JSON(apiErr.Status, apierror.NewResponse(apiErr))
}
// --- Pass 1 types and prompt ---
type pass1FieldResult struct {

View File

@@ -0,0 +1,25 @@
package ai
import "strings"
// IsRateLimit reports whether err is a Mistral 429 / web_search rate-limit
// signal. The SDK surfaces these as wrapped errors whose message contains
// "rate limit" and/or "status 429"; we match defensively on both.
//
// Shared helper so domains (discovery, market/research) treat rate limits
// consistently: log at WARN, return a 429 to the caller with a Retry-After
// hint instead of a generic 500.
func IsRateLimit(err error) bool {
if err == nil {
return false
}
msg := err.Error()
return strings.Contains(msg, "rate limit") ||
strings.Contains(msg, "status 429") ||
strings.Contains(msg, "429")
}
// DefaultRetryAfterSeconds is the hint we send when the SDK error doesn't
// carry a structured Retry-After. 60s matches the typical web_search budget
// window on Mistral's paid tier.
const DefaultRetryAfterSeconds = 60

View File

@@ -0,0 +1,28 @@
package ai
import (
"errors"
"testing"
)
func TestIsRateLimit(t *testing.T) {
tests := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"unrelated error", errors.New("connection refused"), false},
{"mistral web_search 429", errors.New("pass1 conversation: mistral: web_search rate limit reached. (status 429)"), true},
{"bare status 429", errors.New("upstream returned status 429"), true},
{"raw 429 token", errors.New("http 429 received"), true},
{"token 42900 should NOT match bare token but does by substring", errors.New("code 42900 from upstream"), true},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if got := IsRateLimit(tc.err); got != tc.want {
t.Errorf("IsRateLimit(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}

View File

@@ -57,11 +57,18 @@ func Conflict(message string) *Error {
return &Error{Status: http.StatusConflict, Code: "conflict", Message: message}
}
func TooManyRequests() *Error {
// TooManyRequests accepts an optional custom message. Callers that want the
// default — "too many requests, please try again later" — pass no arguments;
// callers with context (e.g. "retry in ~60s") pass a single string.
func TooManyRequests(message ...string) *Error {
msg := "too many requests, please try again later"
if len(message) > 0 && message[0] != "" {
msg = message[0]
}
return &Error{
Status: http.StatusTooManyRequests,
Code: "rate_limited",
Message: "too many requests, please try again later",
Message: msg,
}
}

View File

@@ -7,6 +7,7 @@ import (
"marktvogt.de/backend/internal/domain/auth"
"marktvogt.de/backend/internal/domain/discovery"
"marktvogt.de/backend/internal/domain/discovery/crawler"
"marktvogt.de/backend/internal/domain/market"
"marktvogt.de/backend/internal/domain/user"
"marktvogt.de/backend/internal/middleware"
@@ -80,7 +81,10 @@ func (s *Server) registerRoutes() {
s.cfg.Discovery.BatchSize,
s.cfg.Discovery.ForwardMonths,
)
discoveryHandler := discovery.NewHandler(discoveryService)
// Wire the crawler for the Crawl path (MR 1 keeps both Tick and Crawl paths)
crawlerInstance := crawler.NewCrawler(s.cfg.Discovery.CrawlerUserAgent, crawler.DefaultSourceConfigs())
discoveryService.SetCrawler(crawlerInstance)
discoveryHandler := discovery.NewHandler(discoveryService, s.cfg.Discovery.CrawlerManualRateLimitPerHour)
requireTickToken := middleware.RequireBearerToken(s.cfg.Discovery.Token)
discovery.RegisterRoutes(v1, discoveryHandler, requireAuth, requireAdmin, requireTickToken)
}

View File

@@ -0,0 +1,8 @@
DO $$
BEGIN
IF EXISTS (SELECT 1 FROM discovered_markets WHERE bucket_id IS NULL) THEN
RAISE EXCEPTION 'Cannot revert: % rows have NULL bucket_id (crawler-produced). Resolve manually before rolling back.',
(SELECT COUNT(*) FROM discovered_markets WHERE bucket_id IS NULL);
END IF;
END $$;
ALTER TABLE discovered_markets ALTER COLUMN bucket_id SET NOT NULL;

View File

@@ -0,0 +1 @@
ALTER TABLE discovered_markets ALTER COLUMN bucket_id DROP NOT NULL;