Files
GeoData/scripts/mask_build_master.py

381 lines
13 KiB
Python

#!/usr/bin/env python3
"""Build global editable master masks from per-tile mask files.
This script keeps 1025-tile semantics intact for river erosion while making
manual source/sink editing easier in one global image.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, Optional
import numpy as np
from PIL import Image
@dataclass
class TileRef:
key: str
water_path: Path
water_stem: str
width: int
height: int
px: float
py: float
minx: float
miny: float
maxx: float
maxy: float
source_path: Optional[Path] = None
sink_path: Optional[Path] = None
_TILE_KEY_RE = re.compile(r"^(dgm\d+_\d+_\d+_\d+)")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Build master water/source/sink masks from tiled inputs.")
p.add_argument("--water-dir", default="raw/water_masks", help="Directory with tile water masks.")
p.add_argument(
"--fallback-water-dir",
default="work/river_masks",
help="Fallback water mask directory used for tiles missing in --water-dir.",
)
p.add_argument("--source-dir", default="raw/water_source_masks", help="Directory with tile source masks.")
p.add_argument("--sink-dir", default="raw/water_sink_masks", help="Directory with tile sink masks.")
p.add_argument("--out-dir", default="work/mask_master", help="Output directory for master masks.")
p.add_argument("--water-pattern", default="*_mask_viz.png", help="Glob for water mask files.")
p.add_argument("--fallback-water-pattern", default="*_mask.png", help="Glob for fallback water mask files.")
p.add_argument("--write-master-wld", action="store_true", help="Write worldfiles for master images.")
return p.parse_args()
def tile_key_from_name(name: str) -> Optional[str]:
stem = Path(name).stem
m = _TILE_KEY_RE.match(stem)
if not m:
return None
return m.group(1)
def read_worldfile(path: Path) -> tuple[float, float, float, float, float, float]:
vals = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line:
vals.append(float(line))
if len(vals) != 6:
raise ValueError(f"Expected 6 values in worldfile {path}, got {len(vals)}.")
return vals[0], vals[1], vals[2], vals[3], vals[4], vals[5]
def infer_bounds(width: int, height: int, wld: tuple[float, float, float, float, float, float]) -> tuple[float, float, float, float]:
a, d, b, e, c, f = wld
if abs(b) > 1e-9 or abs(d) > 1e-9:
raise ValueError("Rotated worldfiles are not supported.")
minx = c - (a / 2.0)
maxy = f - (e / 2.0)
maxx = minx + (a * width)
miny = maxy + (e * height)
return minx, miny, maxx, maxy
def load_rgb(path: Path, width: int, height: int) -> np.ndarray:
img = Image.open(path).convert("RGB")
arr = np.array(img, dtype=np.uint8)
if arr.shape[0] == height and arr.shape[1] == width:
return arr
resized = img.resize((width, height), resample=Image.Resampling.NEAREST)
return np.array(resized, dtype=np.uint8)
def write_worldfile(path: Path, px: float, py: float, minx: float, maxy: float) -> None:
c = minx + (px / 2.0)
f = maxy + (py / 2.0)
text = "\n".join(
[
f"{px:.12f}",
"0.0",
"0.0",
f"{py:.12f}",
f"{c:.12f}",
f"{f:.12f}",
]
)
path.write_text(text + "\n", encoding="utf-8")
def index_masks_by_tile(mask_dir: Path, suffix: str) -> Dict[str, Path]:
out: Dict[str, Path] = {}
if not mask_dir.exists():
return out
for p in sorted(mask_dir.glob(f"*{suffix}")):
key = tile_key_from_name(p.name)
if key is None:
continue
if key in out:
print(f"[mask_build_master] Warning: duplicate {suffix} for {key}, keeping {out[key].name}, ignoring {p.name}")
continue
out[key] = p
return out
def collect_tiles(water_dir: Path, water_pattern: str, source_dir: Path, sink_dir: Path) -> list[TileRef]:
source_by_key = index_masks_by_tile(source_dir, ".png")
sink_by_key = index_masks_by_tile(sink_dir, ".png")
tiles: list[TileRef] = []
for p in sorted(water_dir.glob(water_pattern)):
if p.name.endswith(".tmp.png"):
continue
key = tile_key_from_name(p.name)
if key is None:
continue
wld = p.with_suffix(".wld")
if not wld.exists():
print(f"[mask_build_master] Skipping {p.name}: missing worldfile {wld.name}")
continue
img = Image.open(p)
width, height = img.size
img.close()
a, d, b, e, c, f = read_worldfile(wld)
if abs(b) > 1e-9 or abs(d) > 1e-9:
raise SystemExit(f"[mask_build_master] Rotated worldfile is not supported: {wld}")
if a <= 0 or e >= 0:
raise SystemExit(f"[mask_build_master] Unexpected worldfile pixel size signs in {wld}")
minx, miny, maxx, maxy = infer_bounds(width, height, (a, d, b, e, c, f))
tiles.append(
TileRef(
key=key,
water_path=p,
water_stem=p.stem,
width=width,
height=height,
px=a,
py=e,
minx=minx,
miny=miny,
maxx=maxx,
maxy=maxy,
source_path=source_by_key.get(key),
sink_path=sink_by_key.get(key),
)
)
return tiles
def collect_fallback_tiles(
fallback_dir: Path,
fallback_pattern: str,
existing_keys: set[str],
source_dir: Path,
sink_dir: Path,
) -> list[TileRef]:
source_by_key = index_masks_by_tile(source_dir, ".png")
sink_by_key = index_masks_by_tile(sink_dir, ".png")
tiles: list[TileRef] = []
if not fallback_dir.exists():
return tiles
for p in sorted(fallback_dir.glob(fallback_pattern)):
key = tile_key_from_name(p.name)
if key is None or key in existing_keys:
continue
if p.name.endswith(".tmp.png"):
continue
wld = p.with_suffix(".wld")
if not wld.exists():
print(f"[mask_build_master] Skipping fallback {p.name}: missing worldfile {wld.name}")
continue
img = Image.open(p)
width, height = img.size
img.close()
a, d, b, e, c, f = read_worldfile(wld)
if abs(b) > 1e-9 or abs(d) > 1e-9:
raise SystemExit(f"[mask_build_master] Rotated worldfile is not supported: {wld}")
if a <= 0 or e >= 0:
raise SystemExit(f"[mask_build_master] Unexpected worldfile pixel size signs in {wld}")
minx, miny, maxx, maxy = infer_bounds(width, height, (a, d, b, e, c, f))
tiles.append(
TileRef(
key=key,
water_path=p,
water_stem=p.stem,
width=width,
height=height,
px=a,
py=e,
minx=minx,
miny=miny,
maxx=maxx,
maxy=maxy,
source_path=source_by_key.get(key),
sink_path=sink_by_key.get(key),
)
)
return tiles
def check_resolution_consistency(tiles: Iterable[TileRef]) -> tuple[float, float]:
pxs = [t.px for t in tiles]
pys = [t.py for t in tiles]
px = float(np.median(pxs))
py = float(np.median(pys))
for t in tiles:
if not math.isclose(t.px, px, rel_tol=0.0, abs_tol=1e-9):
raise SystemExit(f"[mask_build_master] Inconsistent px: {t.water_path} has {t.px}, expected {px}")
if not math.isclose(t.py, py, rel_tol=0.0, abs_tol=1e-9):
raise SystemExit(f"[mask_build_master] Inconsistent py: {t.water_path} has {t.py}, expected {py}")
return px, py
def merge_non_black(dst: np.ndarray, src: np.ndarray) -> None:
nz = np.any(src != 0, axis=2)
dst[nz] = src[nz]
def main() -> int:
args = parse_args()
water_dir = Path(args.water_dir)
fallback_water_dir = Path(args.fallback_water_dir)
source_dir = Path(args.source_dir)
sink_dir = Path(args.sink_dir)
out_dir = Path(args.out_dir)
if not water_dir.exists():
raise SystemExit(f"[mask_build_master] water-dir not found: {water_dir}")
primary_tiles = collect_tiles(water_dir, args.water_pattern, source_dir, sink_dir)
fallback_tiles = collect_fallback_tiles(
fallback_water_dir,
args.fallback_water_pattern,
{t.key for t in primary_tiles},
source_dir,
sink_dir,
)
tiles = primary_tiles + fallback_tiles
if not tiles:
raise SystemExit("[mask_build_master] No water tiles found.")
px, py = check_resolution_consistency(tiles)
minx = min(t.minx for t in tiles)
miny = min(t.miny for t in tiles)
maxx = max(t.maxx for t in tiles)
maxy = max(t.maxy for t in tiles)
master_w = int(round((maxx - minx) / px))
master_h = int(round((maxy - miny) / abs(py)))
if master_w <= 0 or master_h <= 0:
raise SystemExit("[mask_build_master] Invalid master size.")
water_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
source_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
sink_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
tile_rows = []
for t in sorted(tiles, key=lambda k: (k.key, k.minx, k.miny)):
xoff = int(round((t.minx - minx) / px))
yoff = int(round((maxy - t.maxy) / abs(py)))
if xoff < 0 or yoff < 0 or xoff + t.width > master_w or yoff + t.height > master_h:
raise SystemExit(f"[mask_build_master] Tile placement out of bounds for {t.water_path}")
w_arr = load_rgb(t.water_path, t.width, t.height)
merge_non_black(water_master[yoff : yoff + t.height, xoff : xoff + t.width], w_arr)
if t.source_path and t.source_path.exists():
s_arr = load_rgb(t.source_path, t.width, t.height)
merge_non_black(source_master[yoff : yoff + t.height, xoff : xoff + t.width], s_arr)
if t.sink_path and t.sink_path.exists():
k_arr = load_rgb(t.sink_path, t.width, t.height)
merge_non_black(sink_master[yoff : yoff + t.height, xoff : xoff + t.width], k_arr)
tile_rows.append(
{
"key": t.key,
"water_file": t.water_path.name,
"water_stem": t.water_stem,
"source_file": t.source_path.name if t.source_path else "",
"sink_file": t.sink_path.name if t.sink_path else "",
"xoff": xoff,
"yoff": yoff,
"width": t.width,
"height": t.height,
"minx": t.minx,
"miny": t.miny,
"maxx": t.maxx,
"maxy": t.maxy,
}
)
out_dir.mkdir(parents=True, exist_ok=True)
water_out = out_dir / "water_master.png"
source_out = out_dir / "source_master.png"
sink_out = out_dir / "sink_master.png"
Image.fromarray(water_master, mode="RGB").save(water_out)
Image.fromarray(source_master, mode="RGB").save(source_out)
Image.fromarray(sink_master, mode="RGB").save(sink_out)
if args.write_master_wld:
write_worldfile(water_out.with_suffix(".wld"), px, py, minx, maxy)
write_worldfile(source_out.with_suffix(".wld"), px, py, minx, maxy)
write_worldfile(sink_out.with_suffix(".wld"), px, py, minx, maxy)
meta = {
"schema_version": 1,
"master": {
"width": master_w,
"height": master_h,
"minx": minx,
"miny": miny,
"maxx": maxx,
"maxy": maxy,
"px": px,
"py": py,
"water_master": water_out.name,
"source_master": source_out.name,
"sink_master": sink_out.name,
},
"inputs": {
"water_dir": str(water_dir),
"fallback_water_dir": str(fallback_water_dir),
"source_dir": str(source_dir),
"sink_dir": str(sink_dir),
"water_pattern": args.water_pattern,
"fallback_water_pattern": args.fallback_water_pattern,
},
"tiles": tile_rows,
}
(out_dir / "master_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
print(f"[mask_build_master] Wrote {water_out}")
print(f"[mask_build_master] Wrote {source_out}")
print(f"[mask_build_master] Wrote {sink_out}")
print(f"[mask_build_master] Wrote {out_dir / 'master_meta.json'}")
print(f"[mask_build_master] Master size: {master_w}x{master_h}")
print(f"[mask_build_master] Tiles merged: {len(tile_rows)}")
print(f"[mask_build_master] from primary: {len(primary_tiles)}")
print(f"[mask_build_master] from fallback: {len(fallback_tiles)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())