381 lines
13 KiB
Python
381 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""Build global editable master masks from per-tile mask files.
|
|
|
|
This script keeps 1025-tile semantics intact for river erosion while making
|
|
manual source/sink editing easier in one global image.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable, Optional
|
|
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
|
|
@dataclass
|
|
class TileRef:
|
|
key: str
|
|
water_path: Path
|
|
water_stem: str
|
|
width: int
|
|
height: int
|
|
px: float
|
|
py: float
|
|
minx: float
|
|
miny: float
|
|
maxx: float
|
|
maxy: float
|
|
source_path: Optional[Path] = None
|
|
sink_path: Optional[Path] = None
|
|
|
|
|
|
_TILE_KEY_RE = re.compile(r"^(dgm\d+_\d+_\d+_\d+)")
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
p = argparse.ArgumentParser(description="Build master water/source/sink masks from tiled inputs.")
|
|
p.add_argument("--water-dir", default="raw/water_masks", help="Directory with tile water masks.")
|
|
p.add_argument(
|
|
"--fallback-water-dir",
|
|
default="work/river_masks",
|
|
help="Fallback water mask directory used for tiles missing in --water-dir.",
|
|
)
|
|
p.add_argument("--source-dir", default="raw/water_source_masks", help="Directory with tile source masks.")
|
|
p.add_argument("--sink-dir", default="raw/water_sink_masks", help="Directory with tile sink masks.")
|
|
p.add_argument("--out-dir", default="work/mask_master", help="Output directory for master masks.")
|
|
p.add_argument("--water-pattern", default="*_mask_viz.png", help="Glob for water mask files.")
|
|
p.add_argument("--fallback-water-pattern", default="*_mask.png", help="Glob for fallback water mask files.")
|
|
p.add_argument("--write-master-wld", action="store_true", help="Write worldfiles for master images.")
|
|
return p.parse_args()
|
|
|
|
|
|
def tile_key_from_name(name: str) -> Optional[str]:
|
|
stem = Path(name).stem
|
|
m = _TILE_KEY_RE.match(stem)
|
|
if not m:
|
|
return None
|
|
return m.group(1)
|
|
|
|
|
|
def read_worldfile(path: Path) -> tuple[float, float, float, float, float, float]:
|
|
vals = []
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if line:
|
|
vals.append(float(line))
|
|
if len(vals) != 6:
|
|
raise ValueError(f"Expected 6 values in worldfile {path}, got {len(vals)}.")
|
|
return vals[0], vals[1], vals[2], vals[3], vals[4], vals[5]
|
|
|
|
|
|
def infer_bounds(width: int, height: int, wld: tuple[float, float, float, float, float, float]) -> tuple[float, float, float, float]:
|
|
a, d, b, e, c, f = wld
|
|
if abs(b) > 1e-9 or abs(d) > 1e-9:
|
|
raise ValueError("Rotated worldfiles are not supported.")
|
|
minx = c - (a / 2.0)
|
|
maxy = f - (e / 2.0)
|
|
maxx = minx + (a * width)
|
|
miny = maxy + (e * height)
|
|
return minx, miny, maxx, maxy
|
|
|
|
|
|
def load_rgb(path: Path, width: int, height: int) -> np.ndarray:
|
|
img = Image.open(path).convert("RGB")
|
|
arr = np.array(img, dtype=np.uint8)
|
|
if arr.shape[0] == height and arr.shape[1] == width:
|
|
return arr
|
|
resized = img.resize((width, height), resample=Image.Resampling.NEAREST)
|
|
return np.array(resized, dtype=np.uint8)
|
|
|
|
|
|
def write_worldfile(path: Path, px: float, py: float, minx: float, maxy: float) -> None:
|
|
c = minx + (px / 2.0)
|
|
f = maxy + (py / 2.0)
|
|
text = "\n".join(
|
|
[
|
|
f"{px:.12f}",
|
|
"0.0",
|
|
"0.0",
|
|
f"{py:.12f}",
|
|
f"{c:.12f}",
|
|
f"{f:.12f}",
|
|
]
|
|
)
|
|
path.write_text(text + "\n", encoding="utf-8")
|
|
|
|
|
|
def index_masks_by_tile(mask_dir: Path, suffix: str) -> Dict[str, Path]:
|
|
out: Dict[str, Path] = {}
|
|
if not mask_dir.exists():
|
|
return out
|
|
for p in sorted(mask_dir.glob(f"*{suffix}")):
|
|
key = tile_key_from_name(p.name)
|
|
if key is None:
|
|
continue
|
|
if key in out:
|
|
print(f"[mask_build_master] Warning: duplicate {suffix} for {key}, keeping {out[key].name}, ignoring {p.name}")
|
|
continue
|
|
out[key] = p
|
|
return out
|
|
|
|
|
|
def collect_tiles(water_dir: Path, water_pattern: str, source_dir: Path, sink_dir: Path) -> list[TileRef]:
|
|
source_by_key = index_masks_by_tile(source_dir, ".png")
|
|
sink_by_key = index_masks_by_tile(sink_dir, ".png")
|
|
tiles: list[TileRef] = []
|
|
|
|
for p in sorted(water_dir.glob(water_pattern)):
|
|
if p.name.endswith(".tmp.png"):
|
|
continue
|
|
key = tile_key_from_name(p.name)
|
|
if key is None:
|
|
continue
|
|
|
|
wld = p.with_suffix(".wld")
|
|
if not wld.exists():
|
|
print(f"[mask_build_master] Skipping {p.name}: missing worldfile {wld.name}")
|
|
continue
|
|
|
|
img = Image.open(p)
|
|
width, height = img.size
|
|
img.close()
|
|
|
|
a, d, b, e, c, f = read_worldfile(wld)
|
|
if abs(b) > 1e-9 or abs(d) > 1e-9:
|
|
raise SystemExit(f"[mask_build_master] Rotated worldfile is not supported: {wld}")
|
|
if a <= 0 or e >= 0:
|
|
raise SystemExit(f"[mask_build_master] Unexpected worldfile pixel size signs in {wld}")
|
|
|
|
minx, miny, maxx, maxy = infer_bounds(width, height, (a, d, b, e, c, f))
|
|
tiles.append(
|
|
TileRef(
|
|
key=key,
|
|
water_path=p,
|
|
water_stem=p.stem,
|
|
width=width,
|
|
height=height,
|
|
px=a,
|
|
py=e,
|
|
minx=minx,
|
|
miny=miny,
|
|
maxx=maxx,
|
|
maxy=maxy,
|
|
source_path=source_by_key.get(key),
|
|
sink_path=sink_by_key.get(key),
|
|
)
|
|
)
|
|
return tiles
|
|
|
|
|
|
def collect_fallback_tiles(
|
|
fallback_dir: Path,
|
|
fallback_pattern: str,
|
|
existing_keys: set[str],
|
|
source_dir: Path,
|
|
sink_dir: Path,
|
|
) -> list[TileRef]:
|
|
source_by_key = index_masks_by_tile(source_dir, ".png")
|
|
sink_by_key = index_masks_by_tile(sink_dir, ".png")
|
|
tiles: list[TileRef] = []
|
|
if not fallback_dir.exists():
|
|
return tiles
|
|
|
|
for p in sorted(fallback_dir.glob(fallback_pattern)):
|
|
key = tile_key_from_name(p.name)
|
|
if key is None or key in existing_keys:
|
|
continue
|
|
if p.name.endswith(".tmp.png"):
|
|
continue
|
|
|
|
wld = p.with_suffix(".wld")
|
|
if not wld.exists():
|
|
print(f"[mask_build_master] Skipping fallback {p.name}: missing worldfile {wld.name}")
|
|
continue
|
|
|
|
img = Image.open(p)
|
|
width, height = img.size
|
|
img.close()
|
|
|
|
a, d, b, e, c, f = read_worldfile(wld)
|
|
if abs(b) > 1e-9 or abs(d) > 1e-9:
|
|
raise SystemExit(f"[mask_build_master] Rotated worldfile is not supported: {wld}")
|
|
if a <= 0 or e >= 0:
|
|
raise SystemExit(f"[mask_build_master] Unexpected worldfile pixel size signs in {wld}")
|
|
|
|
minx, miny, maxx, maxy = infer_bounds(width, height, (a, d, b, e, c, f))
|
|
tiles.append(
|
|
TileRef(
|
|
key=key,
|
|
water_path=p,
|
|
water_stem=p.stem,
|
|
width=width,
|
|
height=height,
|
|
px=a,
|
|
py=e,
|
|
minx=minx,
|
|
miny=miny,
|
|
maxx=maxx,
|
|
maxy=maxy,
|
|
source_path=source_by_key.get(key),
|
|
sink_path=sink_by_key.get(key),
|
|
)
|
|
)
|
|
return tiles
|
|
|
|
|
|
def check_resolution_consistency(tiles: Iterable[TileRef]) -> tuple[float, float]:
|
|
pxs = [t.px for t in tiles]
|
|
pys = [t.py for t in tiles]
|
|
px = float(np.median(pxs))
|
|
py = float(np.median(pys))
|
|
for t in tiles:
|
|
if not math.isclose(t.px, px, rel_tol=0.0, abs_tol=1e-9):
|
|
raise SystemExit(f"[mask_build_master] Inconsistent px: {t.water_path} has {t.px}, expected {px}")
|
|
if not math.isclose(t.py, py, rel_tol=0.0, abs_tol=1e-9):
|
|
raise SystemExit(f"[mask_build_master] Inconsistent py: {t.water_path} has {t.py}, expected {py}")
|
|
return px, py
|
|
|
|
|
|
def merge_non_black(dst: np.ndarray, src: np.ndarray) -> None:
|
|
nz = np.any(src != 0, axis=2)
|
|
dst[nz] = src[nz]
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
|
|
water_dir = Path(args.water_dir)
|
|
fallback_water_dir = Path(args.fallback_water_dir)
|
|
source_dir = Path(args.source_dir)
|
|
sink_dir = Path(args.sink_dir)
|
|
out_dir = Path(args.out_dir)
|
|
|
|
if not water_dir.exists():
|
|
raise SystemExit(f"[mask_build_master] water-dir not found: {water_dir}")
|
|
|
|
primary_tiles = collect_tiles(water_dir, args.water_pattern, source_dir, sink_dir)
|
|
fallback_tiles = collect_fallback_tiles(
|
|
fallback_water_dir,
|
|
args.fallback_water_pattern,
|
|
{t.key for t in primary_tiles},
|
|
source_dir,
|
|
sink_dir,
|
|
)
|
|
tiles = primary_tiles + fallback_tiles
|
|
if not tiles:
|
|
raise SystemExit("[mask_build_master] No water tiles found.")
|
|
|
|
px, py = check_resolution_consistency(tiles)
|
|
minx = min(t.minx for t in tiles)
|
|
miny = min(t.miny for t in tiles)
|
|
maxx = max(t.maxx for t in tiles)
|
|
maxy = max(t.maxy for t in tiles)
|
|
|
|
master_w = int(round((maxx - minx) / px))
|
|
master_h = int(round((maxy - miny) / abs(py)))
|
|
if master_w <= 0 or master_h <= 0:
|
|
raise SystemExit("[mask_build_master] Invalid master size.")
|
|
|
|
water_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
|
|
source_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
|
|
sink_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
|
|
|
|
tile_rows = []
|
|
for t in sorted(tiles, key=lambda k: (k.key, k.minx, k.miny)):
|
|
xoff = int(round((t.minx - minx) / px))
|
|
yoff = int(round((maxy - t.maxy) / abs(py)))
|
|
|
|
if xoff < 0 or yoff < 0 or xoff + t.width > master_w or yoff + t.height > master_h:
|
|
raise SystemExit(f"[mask_build_master] Tile placement out of bounds for {t.water_path}")
|
|
|
|
w_arr = load_rgb(t.water_path, t.width, t.height)
|
|
merge_non_black(water_master[yoff : yoff + t.height, xoff : xoff + t.width], w_arr)
|
|
|
|
if t.source_path and t.source_path.exists():
|
|
s_arr = load_rgb(t.source_path, t.width, t.height)
|
|
merge_non_black(source_master[yoff : yoff + t.height, xoff : xoff + t.width], s_arr)
|
|
|
|
if t.sink_path and t.sink_path.exists():
|
|
k_arr = load_rgb(t.sink_path, t.width, t.height)
|
|
merge_non_black(sink_master[yoff : yoff + t.height, xoff : xoff + t.width], k_arr)
|
|
|
|
tile_rows.append(
|
|
{
|
|
"key": t.key,
|
|
"water_file": t.water_path.name,
|
|
"water_stem": t.water_stem,
|
|
"source_file": t.source_path.name if t.source_path else "",
|
|
"sink_file": t.sink_path.name if t.sink_path else "",
|
|
"xoff": xoff,
|
|
"yoff": yoff,
|
|
"width": t.width,
|
|
"height": t.height,
|
|
"minx": t.minx,
|
|
"miny": t.miny,
|
|
"maxx": t.maxx,
|
|
"maxy": t.maxy,
|
|
}
|
|
)
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
water_out = out_dir / "water_master.png"
|
|
source_out = out_dir / "source_master.png"
|
|
sink_out = out_dir / "sink_master.png"
|
|
Image.fromarray(water_master, mode="RGB").save(water_out)
|
|
Image.fromarray(source_master, mode="RGB").save(source_out)
|
|
Image.fromarray(sink_master, mode="RGB").save(sink_out)
|
|
|
|
if args.write_master_wld:
|
|
write_worldfile(water_out.with_suffix(".wld"), px, py, minx, maxy)
|
|
write_worldfile(source_out.with_suffix(".wld"), px, py, minx, maxy)
|
|
write_worldfile(sink_out.with_suffix(".wld"), px, py, minx, maxy)
|
|
|
|
meta = {
|
|
"schema_version": 1,
|
|
"master": {
|
|
"width": master_w,
|
|
"height": master_h,
|
|
"minx": minx,
|
|
"miny": miny,
|
|
"maxx": maxx,
|
|
"maxy": maxy,
|
|
"px": px,
|
|
"py": py,
|
|
"water_master": water_out.name,
|
|
"source_master": source_out.name,
|
|
"sink_master": sink_out.name,
|
|
},
|
|
"inputs": {
|
|
"water_dir": str(water_dir),
|
|
"fallback_water_dir": str(fallback_water_dir),
|
|
"source_dir": str(source_dir),
|
|
"sink_dir": str(sink_dir),
|
|
"water_pattern": args.water_pattern,
|
|
"fallback_water_pattern": args.fallback_water_pattern,
|
|
},
|
|
"tiles": tile_rows,
|
|
}
|
|
(out_dir / "master_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
|
|
|
|
print(f"[mask_build_master] Wrote {water_out}")
|
|
print(f"[mask_build_master] Wrote {source_out}")
|
|
print(f"[mask_build_master] Wrote {sink_out}")
|
|
print(f"[mask_build_master] Wrote {out_dir / 'master_meta.json'}")
|
|
print(f"[mask_build_master] Master size: {master_w}x{master_h}")
|
|
print(f"[mask_build_master] Tiles merged: {len(tile_rows)}")
|
|
print(f"[mask_build_master] from primary: {len(primary_tiles)}")
|
|
print(f"[mask_build_master] from fallback: {len(fallback_tiles)}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|