Files
GeoData/geodata_pipeline/heightmaps.py

104 lines
3.5 KiB
Python

from __future__ import annotations
import glob
import os
from typing import Iterable
from osgeo import gdal
from .config import Config
from .gdal_utils import build_vrt, cleanup_aux_files, ensure_dir, ensure_parent, open_dataset, safe_remove
gdal.UseExceptions()
def _cleanup_patterns(raw_dir: str) -> Iterable[str]:
return [
os.path.join("work", "*_tmp.tif"),
os.path.join("work", "*_tmp.tif.aux.xml"),
os.path.join("work", "*.aux.xml"),
os.path.join(raw_dir, "*.aux.xml"),
]
def export_heightmaps(cfg: Config, *, force_vrt: bool = False) -> int:
ensure_dir(cfg.work.work_dir)
ensure_dir(cfg.export.heightmap_dir)
ensure_parent(cfg.export.manifest_path)
tif_paths = sorted(glob.glob(os.path.join(cfg.raw.dgm1_dir, "*.tif")))
build_vrt(cfg.work.heightmap_vrt, tif_paths, force=force_vrt)
ds = open_dataset(cfg.work.heightmap_vrt, f"Could not open {cfg.work.heightmap_vrt} after attempting to build it.")
band = ds.GetRasterBand(1)
gmin, gmax = band.ComputeRasterMinMax(False)
print(f"GLOBAL_MIN={gmin}, GLOBAL_MAX={gmax}")
with open(cfg.export.manifest_path, "w", encoding="utf-8") as f:
f.write("tile_id,xmin,ymin,xmax,ymax,global_min,global_max,out_res\n")
skipped = 0
written = 0
for tif in tif_paths:
try:
tds = open_dataset(tif, f"Skipping unreadable {tif}")
except SystemExit as exc:
print(exc)
skipped += 1
continue
gt = tds.GetGeoTransform()
ulx, xres, _, uly, _, yres = gt
xmax = ulx + xres * tds.RasterXSize
ymin = uly + yres * tds.RasterYSize
xmin = ulx
ymax = uly
base = os.path.splitext(os.path.basename(tif))[0]
tile_id = base
tmp_path = os.path.join(cfg.work.work_dir, f"{tile_id}_tmp.tif")
out_path = os.path.join(cfg.export.heightmap_dir, f"{tile_id}.png")
warp_opts = gdal.WarpOptions(
outputBounds=(xmin, ymin, xmax, ymax),
width=cfg.heightmap.out_res,
height=cfg.heightmap.out_res,
resampleAlg=cfg.heightmap.resample,
srcNodata=-9999,
dstNodata=gmin,
)
try:
gdal.Warp(tmp_path, ds, options=warp_opts)
except RuntimeError as exc:
print(f"Warp failed for {tile_id}: {exc}")
skipped += 1
continue
trans_opts = gdal.TranslateOptions(
outputType=gdal.GDT_UInt16,
scaleParams=[(gmin, gmax, 0, 65535)],
format="PNG",
creationOptions=["WORLDFILE=YES"],
)
try:
gdal.Translate(out_path, tmp_path, options=trans_opts)
except RuntimeError as exc:
print(f"Translate failed for {tile_id}: {exc}")
skipped += 1
continue
safe_remove(tmp_path)
safe_remove(f"{tmp_path}.aux.xml")
f.write(f"{tile_id},{xmin},{ymin},{xmax},{ymax},{gmin},{gmax},{cfg.heightmap.out_res}\n")
print(f"Wrote {out_path}")
written += 1
print(f"Manifest: {cfg.export.manifest_path}")
print(f"Summary: wrote {written} tiles; skipped {skipped}.")
removed = cleanup_aux_files(_cleanup_patterns(cfg.raw.dgm1_dir))
print(f"Cleanup removed {removed} temporary files/sidecars.")
return 1 if skipped else 0