299 lines
9.0 KiB
Python
299 lines
9.0 KiB
Python
"""Enhanced heightmap export with LPG ground point validation.
|
|
|
|
Extends the standard heightmap export by:
|
|
1. Validating DGM1 raster against LPG ground points
|
|
2. Flagging areas where DGM1 and LPG diverge significantly
|
|
3. Optionally exporting quality metrics
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import os
|
|
from typing import Optional, Tuple
|
|
|
|
import numpy as np
|
|
from osgeo import gdal
|
|
|
|
from .config import Config
|
|
from .gdal_utils import build_vrt, open_dataset
|
|
from .heightmaps import export_heightmaps # Reuse existing export
|
|
from .pointcloud import find_pointcloud_file, read_pointcloud_file
|
|
|
|
|
|
def validate_tile_with_lpg(
|
|
tile_id: str,
|
|
dgm1_path: str,
|
|
xmin: float,
|
|
ymin: float,
|
|
xmax: float,
|
|
ymax: float,
|
|
cfg: Config,
|
|
) -> Optional[dict]:
|
|
"""Validate DGM1 tile against LPG ground points.
|
|
|
|
Args:
|
|
tile_id: Tile identifier
|
|
dgm1_path: Path to DGM1 raster
|
|
xmin, ymin, xmax, ymax: Tile bounds
|
|
cfg: Configuration
|
|
|
|
Returns:
|
|
Dict with validation metrics, or None if no LPG data
|
|
"""
|
|
pc_cfg = cfg.pointcloud
|
|
eh_cfg = cfg.heightmap_enhanced
|
|
|
|
# Check for LPG data
|
|
lpg_file = find_pointcloud_file(pc_cfg.lpg_dir, tile_id)
|
|
if not lpg_file:
|
|
return None
|
|
|
|
# Load DGM1 raster
|
|
dgm1_ds = open_dataset(dgm1_path, required=False)
|
|
if dgm1_ds is None:
|
|
return None
|
|
|
|
dgm1 = dgm1_ds.GetRasterBand(1).ReadAsArray().astype(np.float32)
|
|
gt = dgm1_ds.GetGeoTransform()
|
|
nodata = dgm1_ds.GetRasterBand(1).GetNoDataValue() or -9999
|
|
|
|
# Load LPG points for tile
|
|
lpg = read_pointcloud_file(lpg_file, bounds=(xmin, ymin, xmax, ymax))
|
|
if len(lpg) == 0:
|
|
return {"point_count": 0, "valid": True}
|
|
|
|
# Sample DGM1 at LPG point locations
|
|
differences = []
|
|
for x, y, z in zip(lpg.x, lpg.y, lpg.z):
|
|
# Convert world coords to pixel coords
|
|
col = int((x - gt[0]) / gt[1])
|
|
row = int((y - gt[3]) / gt[5])
|
|
|
|
if 0 <= row < dgm1.shape[0] and 0 <= col < dgm1.shape[1]:
|
|
dgm_val = dgm1[row, col]
|
|
if dgm_val > nodata + 1:
|
|
diff = z - dgm_val
|
|
differences.append(diff)
|
|
|
|
if not differences:
|
|
return {"point_count": len(lpg), "sampled": 0, "valid": True}
|
|
|
|
diffs = np.array(differences)
|
|
|
|
# Compute validation metrics
|
|
metrics = {
|
|
"point_count": len(lpg),
|
|
"sampled": len(diffs),
|
|
"mean_diff": float(np.mean(diffs)),
|
|
"std_diff": float(np.std(diffs)),
|
|
"median_diff": float(np.median(diffs)),
|
|
"min_diff": float(np.min(diffs)),
|
|
"max_diff": float(np.max(diffs)),
|
|
"rmse": float(np.sqrt(np.mean(diffs ** 2))),
|
|
"outlier_count": int(np.sum(np.abs(diffs) > eh_cfg.lpg_validation_threshold_m)),
|
|
"outlier_pct": float(np.mean(np.abs(diffs) > eh_cfg.lpg_validation_threshold_m) * 100),
|
|
"valid": bool(np.abs(np.median(diffs)) < eh_cfg.lpg_validation_threshold_m),
|
|
}
|
|
|
|
return metrics
|
|
|
|
|
|
def export_quality_map(
|
|
tile_id: str,
|
|
dgm1_path: str,
|
|
xmin: float,
|
|
ymin: float,
|
|
xmax: float,
|
|
ymax: float,
|
|
cfg: Config,
|
|
) -> Optional[str]:
|
|
"""Export quality/difference map comparing DGM1 to LPG.
|
|
|
|
Args:
|
|
tile_id: Tile identifier
|
|
dgm1_path: Path to DGM1 raster
|
|
xmin, ymin, xmax, ymax: Tile bounds
|
|
cfg: Configuration
|
|
|
|
Returns:
|
|
Path to quality map PNG, or None if no LPG data
|
|
"""
|
|
pc_cfg = cfg.pointcloud
|
|
eh_cfg = cfg.heightmap_enhanced
|
|
|
|
lpg_file = find_pointcloud_file(pc_cfg.lpg_dir, tile_id)
|
|
if not lpg_file:
|
|
return None
|
|
|
|
dgm1_ds = open_dataset(dgm1_path, required=False)
|
|
if dgm1_ds is None:
|
|
return None
|
|
|
|
dgm1 = dgm1_ds.GetRasterBand(1).ReadAsArray().astype(np.float32)
|
|
gt = dgm1_ds.GetGeoTransform()
|
|
nodata = dgm1_ds.GetRasterBand(1).GetNoDataValue() or -9999
|
|
|
|
# Load LPG points
|
|
lpg = read_pointcloud_file(lpg_file, bounds=(xmin, ymin, xmax, ymax))
|
|
if len(lpg) == 0:
|
|
return None
|
|
|
|
# Create difference accumulator
|
|
diff_sum = np.zeros_like(dgm1)
|
|
diff_count = np.zeros_like(dgm1, dtype=np.int32)
|
|
|
|
for x, y, z in zip(lpg.x, lpg.y, lpg.z):
|
|
col = int((x - gt[0]) / gt[1])
|
|
row = int((y - gt[3]) / gt[5])
|
|
|
|
if 0 <= row < dgm1.shape[0] and 0 <= col < dgm1.shape[1]:
|
|
dgm_val = dgm1[row, col]
|
|
if dgm_val > nodata + 1:
|
|
diff_sum[row, col] += (z - dgm_val)
|
|
diff_count[row, col] += 1
|
|
|
|
# Compute mean difference per cell
|
|
with np.errstate(divide='ignore', invalid='ignore'):
|
|
diff_mean = np.where(diff_count > 0, diff_sum / diff_count, 0)
|
|
|
|
# Scale to 8-bit for visualization
|
|
# Map [-threshold, +threshold] to [0, 255]
|
|
threshold = eh_cfg.lpg_validation_threshold_m * 2
|
|
quality = (diff_mean + threshold) / (2 * threshold) * 255
|
|
quality = np.clip(quality, 0, 255).astype(np.uint8)
|
|
|
|
# Export as PNG
|
|
os.makedirs(eh_cfg.quality_map_dir, exist_ok=True)
|
|
out_path = os.path.join(eh_cfg.quality_map_dir, f"{tile_id}_quality.png")
|
|
|
|
driver = gdal.GetDriverByName("PNG")
|
|
out_ds = driver.Create(out_path, dgm1.shape[1], dgm1.shape[0], 1, gdal.GDT_Byte)
|
|
out_ds.SetGeoTransform(gt)
|
|
out_ds.GetRasterBand(1).WriteArray(quality)
|
|
out_ds = None
|
|
|
|
return out_path
|
|
|
|
|
|
def export_heightmaps_enhanced(cfg: Config) -> int:
|
|
"""Export heightmaps with LPG validation.
|
|
|
|
This wraps the standard heightmap export and adds:
|
|
1. LPG validation metrics per tile
|
|
2. Optional quality map export
|
|
|
|
Args:
|
|
cfg: Configuration
|
|
|
|
Returns:
|
|
Exit code (0 = success)
|
|
"""
|
|
pc_cfg = cfg.pointcloud
|
|
eh_cfg = cfg.heightmap_enhanced
|
|
|
|
# First, run standard heightmap export
|
|
result = export_heightmaps(cfg)
|
|
if result != 0:
|
|
return result
|
|
|
|
# Now validate with LPG
|
|
if not pc_cfg.use_lpg_validation:
|
|
print("[heightmaps_enhanced] LPG validation disabled, skipping.")
|
|
return 0
|
|
|
|
# Load tile manifest
|
|
if not os.path.exists(cfg.export.manifest_path):
|
|
print(f"[heightmaps_enhanced] Manifest not found: {cfg.export.manifest_path}")
|
|
return 1
|
|
|
|
tiles = []
|
|
with open(cfg.export.manifest_path, "r") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
tiles.append({
|
|
"tile_id": row["tile_id"],
|
|
"xmin": float(row["xmin"]),
|
|
"ymin": float(row["ymin"]),
|
|
"xmax": float(row["xmax"]),
|
|
"ymax": float(row["ymax"]),
|
|
})
|
|
|
|
validation_results = []
|
|
quality_maps = []
|
|
|
|
for tile in tiles:
|
|
tile_id = tile["tile_id"]
|
|
|
|
# Find DGM1 file
|
|
import glob
|
|
dgm1_pattern = os.path.join(cfg.raw.dgm1_dir, f"*{tile_id}*.tif")
|
|
dgm1_files = glob.glob(dgm1_pattern)
|
|
|
|
if not dgm1_files:
|
|
print(f"[heightmaps_enhanced] No DGM1 for {tile_id}, skipping validation")
|
|
continue
|
|
|
|
dgm1_path = dgm1_files[0]
|
|
|
|
# Validate against LPG
|
|
metrics = validate_tile_with_lpg(
|
|
tile_id,
|
|
dgm1_path,
|
|
tile["xmin"],
|
|
tile["ymin"],
|
|
tile["xmax"],
|
|
tile["ymax"],
|
|
cfg,
|
|
)
|
|
|
|
if metrics:
|
|
metrics["tile_id"] = tile_id
|
|
validation_results.append(metrics)
|
|
|
|
status = "OK" if metrics.get("valid", True) else "WARN"
|
|
rmse = metrics.get("rmse", 0)
|
|
outlier_pct = metrics.get("outlier_pct", 0)
|
|
print(f"[heightmaps_enhanced] {tile_id}: {status} (RMSE={rmse:.3f}m, outliers={outlier_pct:.1f}%)")
|
|
|
|
# Export quality map if enabled
|
|
if eh_cfg.export_quality_map:
|
|
qm_path = export_quality_map(
|
|
tile_id,
|
|
dgm1_path,
|
|
tile["xmin"],
|
|
tile["ymin"],
|
|
tile["xmax"],
|
|
tile["ymax"],
|
|
cfg,
|
|
)
|
|
if qm_path:
|
|
quality_maps.append(qm_path)
|
|
else:
|
|
print(f"[heightmaps_enhanced] {tile_id}: No LPG data")
|
|
|
|
# Export validation summary
|
|
if validation_results:
|
|
summary_path = os.path.join(eh_cfg.quality_map_dir, "validation_summary.csv")
|
|
os.makedirs(eh_cfg.quality_map_dir, exist_ok=True)
|
|
|
|
fieldnames = ["tile_id", "point_count", "sampled", "mean_diff", "std_diff",
|
|
"median_diff", "min_diff", "max_diff", "rmse", "outlier_count",
|
|
"outlier_pct", "valid"]
|
|
|
|
with open(summary_path, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
|
|
writer.writeheader()
|
|
writer.writerows(validation_results)
|
|
|
|
print(f"[heightmaps_enhanced] Validation summary: {summary_path}")
|
|
|
|
valid_count = sum(1 for r in validation_results if r.get("valid", True))
|
|
total_count = len(validation_results)
|
|
print(f"[heightmaps_enhanced] DONE. {valid_count}/{total_count} tiles passed validation.")
|
|
|
|
if quality_maps:
|
|
print(f"[heightmaps_enhanced] Quality maps exported: {len(quality_maps)}")
|
|
|
|
return 0
|