Add SWE boundary mask pipeline and mask tooling

This commit is contained in:
2026-02-08 03:18:16 +01:00
parent 65e5232a85
commit c63f21cf81
11 changed files with 1583 additions and 4 deletions

73
scripts/fix_masks.py Normal file
View File

@@ -0,0 +1,73 @@
import os
import shutil
from osgeo import gdal
def fix_manual_masks(raw_dir, work_dir):
manual_files = [f for f in os.listdir(raw_dir) if f.endswith(".png")]
for manual_file in manual_files:
manual_path = os.path.join(raw_dir, manual_file)
parts = manual_file.replace("_viz.png", "").split("_")
if len(parts) >= 4:
tile_prefix = f"{parts[0]}_{parts[1]}_{parts[2]}_{parts[3]}"
work_file = f"{tile_prefix}_1_rp_mask.png"
work_path = os.path.join(work_dir, work_file)
if not os.path.exists(work_path):
print(f"Skipping {manual_file}: Work file not found.")
continue
print(f"Fixing {manual_file} using {work_file}...")
ds_work = gdal.Open(work_path)
gt_work = ds_work.GetGeoTransform()
proj_work = ds_work.GetProjection()
target_w = ds_work.RasterXSize
target_h = ds_work.RasterYSize
minx = gt_work[0]
maxy = gt_work[3]
maxx = minx + (gt_work[1] * target_w)
miny = maxy + (gt_work[5] * target_h)
# Open Source as ReadOnly, then CreateCopy to MEM to edit
ds_raw = gdal.Open(manual_path)
mem_driver = gdal.GetDriverByName("MEM")
ds_manual = mem_driver.CreateCopy("", ds_raw)
ds_raw = None # Close file
src_w = ds_manual.RasterXSize
src_h = ds_manual.RasterYSize
src_res_x = (maxx - minx) / src_w
src_res_y = (miny - maxy) / src_h
src_gt = (minx, src_res_x, 0, maxy, 0, src_res_y)
ds_manual.SetGeoTransform(src_gt)
ds_manual.SetProjection(proj_work)
warp_options = gdal.WarpOptions(
format="PNG",
width=target_w,
height=target_h,
outputBounds=(minx, miny, maxx, maxy),
resampleAlg=gdal.GRA_NearestNeighbour
)
temp_path = manual_path + ".tmp.png"
ds_fixed = gdal.Warp(temp_path, ds_manual, options=warp_options)
ds_fixed = None
ds_manual = None
ds_work = None
shutil.move(temp_path, manual_path)
work_wld = work_path.replace(".png", ".wld")
if os.path.exists(work_wld):
shutil.copy(work_wld, manual_path.replace(".png", ".wld"))
print(f" -> Fixed.")
if __name__ == "__main__":
fix_manual_masks("raw/water_masks", "work/river_masks")

View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""Build global editable master masks from per-tile mask files.
This script keeps 1025-tile semantics intact for river erosion while making
manual source/sink editing easier in one global image.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, Optional
import numpy as np
from PIL import Image
@dataclass
class TileRef:
key: str
water_path: Path
water_stem: str
width: int
height: int
px: float
py: float
minx: float
miny: float
maxx: float
maxy: float
source_path: Optional[Path] = None
sink_path: Optional[Path] = None
_TILE_KEY_RE = re.compile(r"^(dgm\d+_\d+_\d+_\d+)")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Build master water/source/sink masks from tiled inputs.")
p.add_argument("--water-dir", default="raw/water_masks", help="Directory with tile water masks.")
p.add_argument(
"--fallback-water-dir",
default="work/river_masks",
help="Fallback water mask directory used for tiles missing in --water-dir.",
)
p.add_argument("--source-dir", default="raw/water_source_masks", help="Directory with tile source masks.")
p.add_argument("--sink-dir", default="raw/water_sink_masks", help="Directory with tile sink masks.")
p.add_argument("--out-dir", default="work/mask_master", help="Output directory for master masks.")
p.add_argument("--water-pattern", default="*_mask_viz.png", help="Glob for water mask files.")
p.add_argument("--fallback-water-pattern", default="*_mask.png", help="Glob for fallback water mask files.")
p.add_argument("--write-master-wld", action="store_true", help="Write worldfiles for master images.")
return p.parse_args()
def tile_key_from_name(name: str) -> Optional[str]:
stem = Path(name).stem
m = _TILE_KEY_RE.match(stem)
if not m:
return None
return m.group(1)
def read_worldfile(path: Path) -> tuple[float, float, float, float, float, float]:
vals = []
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line:
vals.append(float(line))
if len(vals) != 6:
raise ValueError(f"Expected 6 values in worldfile {path}, got {len(vals)}.")
return vals[0], vals[1], vals[2], vals[3], vals[4], vals[5]
def infer_bounds(width: int, height: int, wld: tuple[float, float, float, float, float, float]) -> tuple[float, float, float, float]:
a, d, b, e, c, f = wld
if abs(b) > 1e-9 or abs(d) > 1e-9:
raise ValueError("Rotated worldfiles are not supported.")
minx = c - (a / 2.0)
maxy = f - (e / 2.0)
maxx = minx + (a * width)
miny = maxy + (e * height)
return minx, miny, maxx, maxy
def load_rgb(path: Path, width: int, height: int) -> np.ndarray:
img = Image.open(path).convert("RGB")
arr = np.array(img, dtype=np.uint8)
if arr.shape[0] == height and arr.shape[1] == width:
return arr
resized = img.resize((width, height), resample=Image.Resampling.NEAREST)
return np.array(resized, dtype=np.uint8)
def write_worldfile(path: Path, px: float, py: float, minx: float, maxy: float) -> None:
c = minx + (px / 2.0)
f = maxy + (py / 2.0)
text = "\n".join(
[
f"{px:.12f}",
"0.0",
"0.0",
f"{py:.12f}",
f"{c:.12f}",
f"{f:.12f}",
]
)
path.write_text(text + "\n", encoding="utf-8")
def index_masks_by_tile(mask_dir: Path, suffix: str) -> Dict[str, Path]:
out: Dict[str, Path] = {}
if not mask_dir.exists():
return out
for p in sorted(mask_dir.glob(f"*{suffix}")):
key = tile_key_from_name(p.name)
if key is None:
continue
if key in out:
print(f"[mask_build_master] Warning: duplicate {suffix} for {key}, keeping {out[key].name}, ignoring {p.name}")
continue
out[key] = p
return out
def collect_tiles(water_dir: Path, water_pattern: str, source_dir: Path, sink_dir: Path) -> list[TileRef]:
source_by_key = index_masks_by_tile(source_dir, ".png")
sink_by_key = index_masks_by_tile(sink_dir, ".png")
tiles: list[TileRef] = []
for p in sorted(water_dir.glob(water_pattern)):
if p.name.endswith(".tmp.png"):
continue
key = tile_key_from_name(p.name)
if key is None:
continue
wld = p.with_suffix(".wld")
if not wld.exists():
print(f"[mask_build_master] Skipping {p.name}: missing worldfile {wld.name}")
continue
img = Image.open(p)
width, height = img.size
img.close()
a, d, b, e, c, f = read_worldfile(wld)
if abs(b) > 1e-9 or abs(d) > 1e-9:
raise SystemExit(f"[mask_build_master] Rotated worldfile is not supported: {wld}")
if a <= 0 or e >= 0:
raise SystemExit(f"[mask_build_master] Unexpected worldfile pixel size signs in {wld}")
minx, miny, maxx, maxy = infer_bounds(width, height, (a, d, b, e, c, f))
tiles.append(
TileRef(
key=key,
water_path=p,
water_stem=p.stem,
width=width,
height=height,
px=a,
py=e,
minx=minx,
miny=miny,
maxx=maxx,
maxy=maxy,
source_path=source_by_key.get(key),
sink_path=sink_by_key.get(key),
)
)
return tiles
def collect_fallback_tiles(
fallback_dir: Path,
fallback_pattern: str,
existing_keys: set[str],
source_dir: Path,
sink_dir: Path,
) -> list[TileRef]:
source_by_key = index_masks_by_tile(source_dir, ".png")
sink_by_key = index_masks_by_tile(sink_dir, ".png")
tiles: list[TileRef] = []
if not fallback_dir.exists():
return tiles
for p in sorted(fallback_dir.glob(fallback_pattern)):
key = tile_key_from_name(p.name)
if key is None or key in existing_keys:
continue
if p.name.endswith(".tmp.png"):
continue
wld = p.with_suffix(".wld")
if not wld.exists():
print(f"[mask_build_master] Skipping fallback {p.name}: missing worldfile {wld.name}")
continue
img = Image.open(p)
width, height = img.size
img.close()
a, d, b, e, c, f = read_worldfile(wld)
if abs(b) > 1e-9 or abs(d) > 1e-9:
raise SystemExit(f"[mask_build_master] Rotated worldfile is not supported: {wld}")
if a <= 0 or e >= 0:
raise SystemExit(f"[mask_build_master] Unexpected worldfile pixel size signs in {wld}")
minx, miny, maxx, maxy = infer_bounds(width, height, (a, d, b, e, c, f))
tiles.append(
TileRef(
key=key,
water_path=p,
water_stem=p.stem,
width=width,
height=height,
px=a,
py=e,
minx=minx,
miny=miny,
maxx=maxx,
maxy=maxy,
source_path=source_by_key.get(key),
sink_path=sink_by_key.get(key),
)
)
return tiles
def check_resolution_consistency(tiles: Iterable[TileRef]) -> tuple[float, float]:
pxs = [t.px for t in tiles]
pys = [t.py for t in tiles]
px = float(np.median(pxs))
py = float(np.median(pys))
for t in tiles:
if not math.isclose(t.px, px, rel_tol=0.0, abs_tol=1e-9):
raise SystemExit(f"[mask_build_master] Inconsistent px: {t.water_path} has {t.px}, expected {px}")
if not math.isclose(t.py, py, rel_tol=0.0, abs_tol=1e-9):
raise SystemExit(f"[mask_build_master] Inconsistent py: {t.water_path} has {t.py}, expected {py}")
return px, py
def merge_non_black(dst: np.ndarray, src: np.ndarray) -> None:
nz = np.any(src != 0, axis=2)
dst[nz] = src[nz]
def main() -> int:
args = parse_args()
water_dir = Path(args.water_dir)
fallback_water_dir = Path(args.fallback_water_dir)
source_dir = Path(args.source_dir)
sink_dir = Path(args.sink_dir)
out_dir = Path(args.out_dir)
if not water_dir.exists():
raise SystemExit(f"[mask_build_master] water-dir not found: {water_dir}")
primary_tiles = collect_tiles(water_dir, args.water_pattern, source_dir, sink_dir)
fallback_tiles = collect_fallback_tiles(
fallback_water_dir,
args.fallback_water_pattern,
{t.key for t in primary_tiles},
source_dir,
sink_dir,
)
tiles = primary_tiles + fallback_tiles
if not tiles:
raise SystemExit("[mask_build_master] No water tiles found.")
px, py = check_resolution_consistency(tiles)
minx = min(t.minx for t in tiles)
miny = min(t.miny for t in tiles)
maxx = max(t.maxx for t in tiles)
maxy = max(t.maxy for t in tiles)
master_w = int(round((maxx - minx) / px))
master_h = int(round((maxy - miny) / abs(py)))
if master_w <= 0 or master_h <= 0:
raise SystemExit("[mask_build_master] Invalid master size.")
water_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
source_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
sink_master = np.zeros((master_h, master_w, 3), dtype=np.uint8)
tile_rows = []
for t in sorted(tiles, key=lambda k: (k.key, k.minx, k.miny)):
xoff = int(round((t.minx - minx) / px))
yoff = int(round((maxy - t.maxy) / abs(py)))
if xoff < 0 or yoff < 0 or xoff + t.width > master_w or yoff + t.height > master_h:
raise SystemExit(f"[mask_build_master] Tile placement out of bounds for {t.water_path}")
w_arr = load_rgb(t.water_path, t.width, t.height)
merge_non_black(water_master[yoff : yoff + t.height, xoff : xoff + t.width], w_arr)
if t.source_path and t.source_path.exists():
s_arr = load_rgb(t.source_path, t.width, t.height)
merge_non_black(source_master[yoff : yoff + t.height, xoff : xoff + t.width], s_arr)
if t.sink_path and t.sink_path.exists():
k_arr = load_rgb(t.sink_path, t.width, t.height)
merge_non_black(sink_master[yoff : yoff + t.height, xoff : xoff + t.width], k_arr)
tile_rows.append(
{
"key": t.key,
"water_file": t.water_path.name,
"water_stem": t.water_stem,
"source_file": t.source_path.name if t.source_path else "",
"sink_file": t.sink_path.name if t.sink_path else "",
"xoff": xoff,
"yoff": yoff,
"width": t.width,
"height": t.height,
"minx": t.minx,
"miny": t.miny,
"maxx": t.maxx,
"maxy": t.maxy,
}
)
out_dir.mkdir(parents=True, exist_ok=True)
water_out = out_dir / "water_master.png"
source_out = out_dir / "source_master.png"
sink_out = out_dir / "sink_master.png"
Image.fromarray(water_master, mode="RGB").save(water_out)
Image.fromarray(source_master, mode="RGB").save(source_out)
Image.fromarray(sink_master, mode="RGB").save(sink_out)
if args.write_master_wld:
write_worldfile(water_out.with_suffix(".wld"), px, py, minx, maxy)
write_worldfile(source_out.with_suffix(".wld"), px, py, minx, maxy)
write_worldfile(sink_out.with_suffix(".wld"), px, py, minx, maxy)
meta = {
"schema_version": 1,
"master": {
"width": master_w,
"height": master_h,
"minx": minx,
"miny": miny,
"maxx": maxx,
"maxy": maxy,
"px": px,
"py": py,
"water_master": water_out.name,
"source_master": source_out.name,
"sink_master": sink_out.name,
},
"inputs": {
"water_dir": str(water_dir),
"fallback_water_dir": str(fallback_water_dir),
"source_dir": str(source_dir),
"sink_dir": str(sink_dir),
"water_pattern": args.water_pattern,
"fallback_water_pattern": args.fallback_water_pattern,
},
"tiles": tile_rows,
}
(out_dir / "master_meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
print(f"[mask_build_master] Wrote {water_out}")
print(f"[mask_build_master] Wrote {source_out}")
print(f"[mask_build_master] Wrote {sink_out}")
print(f"[mask_build_master] Wrote {out_dir / 'master_meta.json'}")
print(f"[mask_build_master] Master size: {master_w}x{master_h}")
print(f"[mask_build_master] Tiles merged: {len(tile_rows)}")
print(f"[mask_build_master] from primary: {len(primary_tiles)}")
print(f"[mask_build_master] from fallback: {len(fallback_tiles)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""Split global master masks back into tile masks.
Optional filtering modes:
- keep only source/sink tiles that contain non-black pixels
- keep only water tiles that differ from a reference river mask directory
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Dict
import numpy as np
from PIL import Image
_TILE_KEY_RE = re.compile(r"^(dgm\d+_\d+_\d+_\d+)")
# Master masks can exceed PIL's safety threshold; these files are trusted local data.
Image.MAX_IMAGE_PIXELS = None
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Split master masks into per-tile water/source/sink masks.")
p.add_argument("--master-dir", default="work/mask_master", help="Directory containing master_meta.json and master images.")
p.add_argument("--out-water-dir", default="raw/water_masks", help="Output directory for water tile masks.")
p.add_argument("--out-source-dir", default="raw/water_source_masks", help="Output directory for source tile masks.")
p.add_argument("--out-sink-dir", default="raw/water_sink_masks", help="Output directory for sink tile masks.")
p.add_argument(
"--ref-water-dir",
default="work/river_masks",
help="Reference water mask dir used to keep only changed water tiles.",
)
p.add_argument(
"--keep-informative-only",
action="store_true",
help="Write only informative masks (non-black source/sink; water changed vs ref water dir).",
)
p.add_argument(
"--prune-existing",
action="store_true",
help="Remove existing PNG/WLD outputs in out dirs before writing.",
)
p.add_argument("--write-wld", action="store_true", help="Write world files for output masks.")
return p.parse_args()
def write_worldfile(path: Path, px: float, py: float, minx: float, maxy: float) -> None:
c = minx + (px / 2.0)
f = maxy + (py / 2.0)
text = "\n".join(
[
f"{px:.12f}",
"0.0",
"0.0",
f"{py:.12f}",
f"{c:.12f}",
f"{f:.12f}",
]
)
path.write_text(text + "\n", encoding="utf-8")
def ensure_dirs(*dirs: Path) -> None:
for d in dirs:
d.mkdir(parents=True, exist_ok=True)
def tile_key_from_name(name: str) -> str:
stem = Path(name).stem
m = _TILE_KEY_RE.match(stem)
return m.group(1) if m else ""
def remove_existing_outputs(out_dir: Path) -> None:
if not out_dir.exists():
return
for p in out_dir.glob("*.png"):
p.unlink(missing_ok=True)
for p in out_dir.glob("*.wld"):
p.unlink(missing_ok=True)
def has_non_black(arr: np.ndarray) -> bool:
return bool(np.any(arr != 0))
def index_reference_masks(ref_dir: Path) -> Dict[str, Path]:
out: Dict[str, Path] = {}
if not ref_dir.exists():
return out
for p in sorted(ref_dir.glob("*.png")):
key = tile_key_from_name(p.name)
if not key or key in out:
continue
out[key] = p
return out
def resize_nearest(arr: np.ndarray, width: int, height: int) -> np.ndarray:
if arr.shape[0] == height and arr.shape[1] == width:
return arr
img = Image.fromarray(arr, mode="RGB")
resized = img.resize((width, height), resample=Image.Resampling.NEAREST)
return np.array(resized, dtype=np.uint8)
def water_differs_from_reference(water_arr: np.ndarray, key: str, ref_index: Dict[str, Path]) -> bool:
ref_path = ref_index.get(key)
if ref_path is None:
# No baseline tile: treat as informative so we do not lose manual work.
return True
ref_arr = np.array(Image.open(ref_path).convert("RGB"), dtype=np.uint8)
ref_arr = resize_nearest(ref_arr, water_arr.shape[1], water_arr.shape[0])
return not np.array_equal(water_arr, ref_arr)
def source_name_for_tile(tile: Dict[str, object]) -> str:
source_file = str(tile.get("source_file") or "").strip()
if source_file:
return source_file
stem = str(tile.get("water_stem") or "").strip()
if stem:
return f"{stem}_source_mask.png"
return f"{tile['key']}_source_mask.png"
def sink_name_for_tile(tile: Dict[str, object]) -> str:
sink_file = str(tile.get("sink_file") or "").strip()
if sink_file:
return sink_file
stem = str(tile.get("water_stem") or "").strip()
if stem:
return f"{stem}_sink_mask.png"
return f"{tile['key']}_sink_mask.png"
def crop_tile(master: np.ndarray, xoff: int, yoff: int, width: int, height: int) -> np.ndarray:
return master[yoff : yoff + height, xoff : xoff + width].copy()
def main() -> int:
args = parse_args()
master_dir = Path(args.master_dir)
meta_path = master_dir / "master_meta.json"
if not meta_path.exists():
raise SystemExit(f"[mask_split_master] Missing meta: {meta_path}")
meta = json.loads(meta_path.read_text(encoding="utf-8"))
m = meta["master"]
tiles = meta["tiles"]
px = float(m["px"])
py = float(m["py"])
water_master = np.array(Image.open(master_dir / m["water_master"]).convert("RGB"), dtype=np.uint8)
source_master = np.array(Image.open(master_dir / m["source_master"]).convert("RGB"), dtype=np.uint8)
sink_master = np.array(Image.open(master_dir / m["sink_master"]).convert("RGB"), dtype=np.uint8)
ensure_dirs(Path(args.out_water_dir), Path(args.out_source_dir), Path(args.out_sink_dir))
out_water = Path(args.out_water_dir)
out_source = Path(args.out_source_dir)
out_sink = Path(args.out_sink_dir)
if args.prune_existing:
remove_existing_outputs(out_water)
remove_existing_outputs(out_source)
remove_existing_outputs(out_sink)
ref_index = index_reference_masks(Path(args.ref_water_dir))
written_water = 0
written_source = 0
written_sink = 0
skipped_water = 0
skipped_source = 0
skipped_sink = 0
for tile in tiles:
xoff = int(tile["xoff"])
yoff = int(tile["yoff"])
width = int(tile["width"])
height = int(tile["height"])
water_arr = crop_tile(water_master, xoff, yoff, width, height)
source_arr = crop_tile(source_master, xoff, yoff, width, height)
sink_arr = crop_tile(sink_master, xoff, yoff, width, height)
water_name = str(tile.get("water_file") or f"{tile['key']}_mask_viz.png")
source_name = source_name_for_tile(tile)
sink_name = sink_name_for_tile(tile)
water_path = out_water / water_name
source_path = out_source / source_name
sink_path = out_sink / sink_name
write_water = True
write_source = True
write_sink = True
if args.keep_informative_only:
write_source = has_non_black(source_arr)
write_sink = has_non_black(sink_arr)
write_water = water_differs_from_reference(water_arr, str(tile["key"]), ref_index)
if write_water:
Image.fromarray(water_arr, mode="RGB").save(water_path)
if args.write_wld:
minx = float(tile["minx"])
maxy = float(tile["maxy"])
write_worldfile(water_path.with_suffix(".wld"), px, py, minx, maxy)
written_water += 1
else:
skipped_water += 1
if write_source:
Image.fromarray(source_arr, mode="RGB").save(source_path)
if args.write_wld:
minx = float(tile["minx"])
maxy = float(tile["maxy"])
write_worldfile(source_path.with_suffix(".wld"), px, py, minx, maxy)
written_source += 1
else:
skipped_source += 1
if write_sink:
Image.fromarray(sink_arr, mode="RGB").save(sink_path)
if args.write_wld:
minx = float(tile["minx"])
maxy = float(tile["maxy"])
write_worldfile(sink_path.with_suffix(".wld"), px, py, minx, maxy)
written_sink += 1
else:
skipped_sink += 1
total_tiles = len(tiles)
print(f"[mask_split_master] Tiles processed: {total_tiles}")
print(f"[mask_split_master] Water written: {written_water}, skipped: {skipped_water}")
print(f"[mask_split_master] Source written: {written_source}, skipped: {skipped_source}")
print(f"[mask_split_master] Sink written: {written_sink}, skipped: {skipped_sink}")
print(f"[mask_split_master] Water dir: {out_water}")
print(f"[mask_split_master] Source dir: {out_source}")
print(f"[mask_split_master] Sink dir: {out_sink}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""Validate source/sink ID mask integrity.
ID encoding:
id = B + 256*G + 65536*R
"""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Dict, Optional
import numpy as np
from PIL import Image
_TILE_KEY_RE = re.compile(r"^(dgm\d+_\d+_\d+_\d+)")
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Validate source/sink masks and report ID usage.")
p.add_argument("--source-dir", default="raw/water_source_masks", help="Directory with source masks.")
p.add_argument("--sink-dir", default="raw/water_sink_masks", help="Directory with sink masks.")
p.add_argument("--allowed-source-ids", default="", help="Comma-separated nonzero IDs allowed in source masks.")
p.add_argument("--allowed-sink-ids", default="", help="Comma-separated nonzero IDs allowed in sink masks.")
p.add_argument("--max-id", type=int, default=0, help="If >0, fail when any ID exceeds this value.")
p.add_argument(
"--report",
default="work/mask_master/validation_report.json",
help="Output JSON report path.",
)
p.add_argument("--fail-on-overlap", action="store_true", help="Fail if source and sink overlap on same tile pixels.")
return p.parse_args()
def parse_allowed_ids(text: str) -> Optional[set[int]]:
text = text.strip()
if not text:
return None
out: set[int] = set()
for part in text.split(","):
part = part.strip()
if not part:
continue
out.add(int(part))
return out
def tile_key_from_name(name: str) -> Optional[str]:
stem = Path(name).stem
m = _TILE_KEY_RE.match(stem)
return m.group(1) if m else None
def decode_ids(rgb: np.ndarray) -> np.ndarray:
r = rgb[..., 0].astype(np.uint32)
g = rgb[..., 1].astype(np.uint32)
b = rgb[..., 2].astype(np.uint32)
return b + (256 * g) + (65536 * r)
def analyze_mask(path: Path) -> dict:
arr = np.array(Image.open(path).convert("RGB"), dtype=np.uint8)
ids = decode_ids(arr)
u, c = np.unique(ids, return_counts=True)
nonzero = [(int(i), int(n)) for i, n in zip(u.tolist(), c.tolist()) if i != 0]
return {
"path": str(path),
"shape": [int(arr.shape[1]), int(arr.shape[0])],
"unique_count": int(len(u)),
"max_id": int(u[-1]) if len(u) > 0 else 0,
"nonzero_count": int(np.count_nonzero(ids)),
"nonzero_ids": nonzero,
"ids_array": ids,
}
def collect_pngs(path: Path) -> list[Path]:
if not path.exists():
return []
return sorted([p for p in path.glob("*.png") if p.is_file()])
def unexpected_ids(found: list[tuple[int, int]], allowed: Optional[set[int]]) -> list[int]:
if allowed is None:
return []
return sorted([i for i, _ in found if i not in allowed])
def build_overview(files: list[dict]) -> list[dict]:
totals: Dict[int, dict] = {}
for entry in files:
tile_key = str(entry.get("tile_key") or "").strip()
for ident, pixels in entry.get("nonzero_ids", []):
node = totals.setdefault(
int(ident),
{
"id": int(ident),
"total_pixels": 0,
"file_count": 0,
"tile_keys": set(),
},
)
node["total_pixels"] += int(pixels)
node["file_count"] += 1
if tile_key:
node["tile_keys"].add(tile_key)
out = []
for ident in sorted(totals.keys()):
node = totals[ident]
out.append(
{
"id": int(node["id"]),
"total_pixels": int(node["total_pixels"]),
"file_count": int(node["file_count"]),
"tile_keys": sorted(list(node["tile_keys"])),
}
)
return out
def main() -> int:
args = parse_args()
allowed_source = parse_allowed_ids(args.allowed_source_ids)
allowed_sink = parse_allowed_ids(args.allowed_sink_ids)
report = {
"schema_version": 1,
"config": {
"source_dir": args.source_dir,
"sink_dir": args.sink_dir,
"allowed_source_ids": sorted(list(allowed_source)) if allowed_source is not None else [],
"allowed_sink_ids": sorted(list(allowed_sink)) if allowed_sink is not None else [],
"max_id": args.max_id,
"fail_on_overlap": bool(args.fail_on_overlap),
},
"source": [],
"source_overview": [],
"sink": [],
"sink_overview": [],
"overlaps": [],
"issues": [],
}
failed = False
source_by_tile: Dict[str, np.ndarray] = {}
sink_by_tile: Dict[str, np.ndarray] = {}
for p in collect_pngs(Path(args.source_dir)):
a = analyze_mask(p)
key = tile_key_from_name(p.name)
bad_ids = unexpected_ids(a["nonzero_ids"], allowed_source)
if bad_ids:
msg = f"[mask_validate_ids] Source {p.name} has unexpected IDs: {bad_ids}"
report["issues"].append(msg)
print(msg)
failed = True
if args.max_id > 0 and a["max_id"] > args.max_id:
msg = f"[mask_validate_ids] Source {p.name} max_id={a['max_id']} exceeds max-id={args.max_id}"
report["issues"].append(msg)
print(msg)
failed = True
report["source"].append(
{
"path": a["path"],
"tile_key": key or "",
"shape": a["shape"],
"unique_count": a["unique_count"],
"max_id": a["max_id"],
"nonzero_count": a["nonzero_count"],
"nonzero_ids": a["nonzero_ids"],
}
)
if key:
source_by_tile[key] = a["ids_array"]
for p in collect_pngs(Path(args.sink_dir)):
a = analyze_mask(p)
key = tile_key_from_name(p.name)
bad_ids = unexpected_ids(a["nonzero_ids"], allowed_sink)
if bad_ids:
msg = f"[mask_validate_ids] Sink {p.name} has unexpected IDs: {bad_ids}"
report["issues"].append(msg)
print(msg)
failed = True
if args.max_id > 0 and a["max_id"] > args.max_id:
msg = f"[mask_validate_ids] Sink {p.name} max_id={a['max_id']} exceeds max-id={args.max_id}"
report["issues"].append(msg)
print(msg)
failed = True
report["sink"].append(
{
"path": a["path"],
"tile_key": key or "",
"shape": a["shape"],
"unique_count": a["unique_count"],
"max_id": a["max_id"],
"nonzero_count": a["nonzero_count"],
"nonzero_ids": a["nonzero_ids"],
}
)
if key:
sink_by_tile[key] = a["ids_array"]
shared_tiles = sorted(set(source_by_tile.keys()) & set(sink_by_tile.keys()))
for key in shared_tiles:
s = source_by_tile[key]
k = sink_by_tile[key]
if s.shape != k.shape:
msg = f"[mask_validate_ids] Shape mismatch for tile {key}: source{s.shape} sink{k.shape}"
report["issues"].append(msg)
print(msg)
failed = True
continue
overlap = int(np.count_nonzero((s > 0) & (k > 0)))
if overlap > 0:
entry = {"tile_key": key, "overlap_pixels": overlap}
report["overlaps"].append(entry)
msg = f"[mask_validate_ids] Overlap on tile {key}: {overlap} pixels"
print(msg)
if args.fail_on_overlap:
report["issues"].append(msg)
failed = True
report["source_overview"] = build_overview(report["source"])
report["sink_overview"] = build_overview(report["sink"])
out_path = Path(args.report)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
print(f"[mask_validate_ids] Report written: {out_path}")
print(f"[mask_validate_ids] Source files: {len(report['source'])}, Sink files: {len(report['sink'])}")
print(
"[mask_validate_ids] Source IDs: "
f"{len(report['source_overview'])}, Sink IDs: {len(report['sink_overview'])}"
)
print(f"[mask_validate_ids] Overlap records: {len(report['overlaps'])}")
return 1 if failed else 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,158 @@
#!/usr/bin/env python3
"""Build one global orthophoto master from tiled orthophoto JPGs."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from osgeo import gdal
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Create a global ortho master mosaic from ortho tile JPGs.")
p.add_argument(
"--input-dir",
action="append",
default=[],
help="Input directory containing ortho JPG tiles. Can be specified multiple times.",
)
p.add_argument("--pattern", default="*.jpg", help="Input filename pattern.")
p.add_argument("--out-dir", default="work/mask_master", help="Output directory.")
p.add_argument("--vrt-name", default="ortho_master.vrt", help="Output VRT name.")
p.add_argument("--tif-name", default="ortho_master.tif", help="Output GeoTIFF name.")
p.add_argument("--preview-name", default="ortho_master.jpg", help="Output preview JPG name.")
p.add_argument("--no-preview", action="store_true", help="Skip preview JPG generation.")
p.add_argument(
"--preview-max-size",
type=int,
default=8192,
help="Longest preview edge in pixels (aspect preserved).",
)
p.add_argument(
"--compress",
default="LZW",
help="GeoTIFF compression (e.g., LZW, DEFLATE, JPEG, NONE).",
)
p.add_argument("--resample", default="nearest", help="Resample algorithm for preview (nearest|bilinear|cubic...).")
return p.parse_args()
def collect_inputs(input_dirs: list[str], pattern: str) -> list[Path]:
dirs = [Path(d) for d in input_dirs] if input_dirs else [Path("export_unity/ortho_jpg")]
files: list[Path] = []
for d in dirs:
if not d.exists():
print(f"[ortho_build_master] Warning: input dir missing: {d}")
continue
for f in sorted(d.glob(pattern)):
if f.suffix.lower() != ".jpg":
continue
files.append(f)
return files
def preview_size(width: int, height: int, max_edge: int) -> tuple[int, int]:
if width <= 0 or height <= 0:
return width, height
edge = max(width, height)
if edge <= max_edge:
return width, height
scale = max_edge / float(edge)
return max(1, int(round(width * scale))), max(1, int(round(height * scale)))
def main() -> int:
args = parse_args()
gdal.UseExceptions()
inputs = collect_inputs(args.input_dir, args.pattern)
if not inputs:
raise SystemExit("[ortho_build_master] No input ortho tiles found.")
out_dir = Path(args.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
vrt_path = out_dir / args.vrt_name
tif_path = out_dir / args.tif_name
preview_path = out_dir / args.preview_name
meta_path = out_dir / "ortho_master_meta.json"
print(f"[ortho_build_master] Input tiles: {len(inputs)}")
print(f"[ortho_build_master] Building VRT: {vrt_path}")
vrt = gdal.BuildVRT(str(vrt_path), [str(p) for p in inputs])
if vrt is None:
raise SystemExit("[ortho_build_master] gdal.BuildVRT failed.")
width = vrt.RasterXSize
height = vrt.RasterYSize
gt = vrt.GetGeoTransform(can_return_null=True)
proj = vrt.GetProjectionRef()
print(f"[ortho_build_master] Translating GeoTIFF: {tif_path}")
tif_ds = gdal.Translate(
str(tif_path),
vrt,
options=gdal.TranslateOptions(
format="GTiff",
creationOptions=[
"TILED=YES",
f"COMPRESS={args.compress}",
"BIGTIFF=IF_SAFER",
],
),
)
if tif_ds is None:
raise SystemExit("[ortho_build_master] gdal.Translate to GeoTIFF failed.")
tif_ds = None
if not args.no_preview:
out_w, out_h = preview_size(width, height, args.preview_max_size)
print(f"[ortho_build_master] Writing preview JPG: {preview_path} ({out_w}x{out_h})")
jpg_ds = gdal.Translate(
str(preview_path),
vrt,
options=gdal.TranslateOptions(
format="JPEG",
width=out_w,
height=out_h,
resampleAlg=args.resample,
creationOptions=["QUALITY=92"],
),
)
if jpg_ds is None:
raise SystemExit("[ortho_build_master] gdal.Translate to JPEG preview failed.")
jpg_ds = None
vrt = None
meta = {
"schema_version": 1,
"inputs": [str(p) for p in inputs],
"outputs": {
"vrt": str(vrt_path),
"tif": str(tif_path),
"preview": None if args.no_preview else str(preview_path),
},
"raster": {
"width": width,
"height": height,
"geotransform": list(gt) if gt else None,
"projection": proj,
},
"settings": {
"compress": args.compress,
"preview_max_size": args.preview_max_size,
"resample": args.resample,
"pattern": args.pattern,
"input_dirs": args.input_dir if args.input_dir else ["export_unity/ortho_jpg"],
},
}
meta_path.write_text(json.dumps(meta, indent=2), encoding="utf-8")
print(f"[ortho_build_master] Wrote metadata: {meta_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())