Files
GeoData/geodata_pipeline/lpolpg_split.py
s0wlz (Matthias Puchstein) a8f954805e Extend lpolpg split workflow
Changelog:

- add --split-lpolpg-delete-source to remove source files after split

- auto-split after downloads when lpolpg is enabled in config
2026-01-22 00:59:37 +01:00

151 lines
5.2 KiB
Python

from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Iterable
import numpy as np
from .config import Config
from .gdal_utils import ensure_dir
@dataclass
class SplitStats:
ground_points: int = 0
object_points: int = 0
@property
def total(self) -> int:
return self.ground_points + self.object_points
def _parse_output_base(name: str) -> tuple[str, str]:
base = os.path.splitext(name)[0]
if base.startswith("lpolpg_"):
suffix = base[len("lpolpg_") :]
else:
suffix = base
return f"lpg_{suffix}", f"lpo_{suffix}"
def _write_xyz(handle, points) -> int:
if points is None or len(points) == 0:
return 0
coords = np.column_stack((points.x, points.y, points.z)).astype(np.float64, copy=False)
np.savetxt(handle, coords, fmt="%.3f %.3f %.3f")
return coords.shape[0]
def split_lpolpg(
cfg: Config,
*,
formats: Iterable[str] = ("laz", "xyz"),
ground_classes: Iterable[int] = (2,),
overwrite: bool = False,
delete_source: bool = False,
) -> int:
try:
import laspy
except ImportError as exc:
raise SystemExit("laspy is required to split lpolpg LAZ files.") from exc
input_dir = cfg.pointcloud.lpolpg_dir
if not os.path.isdir(input_dir):
raise SystemExit(f"lpolpg directory not found: {input_dir}")
format_set = {fmt.strip().lower() for fmt in formats if fmt}
write_laz = "laz" in format_set or "las" in format_set
write_xyz = "xyz" in format_set
if not write_laz and not write_xyz:
raise SystemExit("No output formats requested for lpolpg split.")
ensure_dir(cfg.pointcloud.lpg_dir)
ensure_dir(cfg.pointcloud.lpo_dir)
ground_set = {int(v) for v in ground_classes}
chunk_size = max(1_000, int(cfg.pointcloud.chunk_size))
files = sorted(
name for name in os.listdir(input_dir)
if name.lower().endswith((".laz", ".las"))
)
if not files:
raise SystemExit(f"No LAZ/LAS files found in {input_dir}")
for name in files:
in_path = os.path.join(input_dir, name)
lpg_base, lpo_base = _parse_output_base(name)
lpg_laz = os.path.join(cfg.pointcloud.lpg_dir, f"{lpg_base}.laz")
lpo_laz = os.path.join(cfg.pointcloud.lpo_dir, f"{lpo_base}.laz")
lpg_xyz = os.path.join(cfg.pointcloud.lpg_dir, f"{lpg_base}.xyz")
lpo_xyz = os.path.join(cfg.pointcloud.lpo_dir, f"{lpo_base}.xyz")
outputs = []
if write_laz:
outputs.extend([lpg_laz, lpo_laz])
if write_xyz:
outputs.extend([lpg_xyz, lpo_xyz])
if not overwrite and outputs and all(os.path.exists(p) for p in outputs):
print(f"[lpolpg] skip {name}: outputs already exist")
continue
print(f"[lpolpg] splitting {name}...")
stats = SplitStats()
with laspy.open(in_path) as reader:
lpg_writer = None
lpo_writer = None
lpg_xyz_handle = None
lpo_xyz_handle = None
try:
if write_laz:
header_lpg = reader.header.copy()
header_lpg.point_count = 0
header_lpo = reader.header.copy()
header_lpo.point_count = 0
lpg_writer = laspy.open(lpg_laz, mode="w", header=header_lpg)
lpo_writer = laspy.open(lpo_laz, mode="w", header=header_lpo)
if write_xyz:
lpg_xyz_handle = open(lpg_xyz, "w", encoding="utf-8")
lpo_xyz_handle = open(lpo_xyz, "w", encoding="utf-8")
for points in reader.chunk_iterator(chunk_size):
classes = np.array(points.classification)
ground_mask = np.isin(classes, list(ground_set))
if write_laz:
if ground_mask.any():
lpg_writer.write_points(points[ground_mask])
if (~ground_mask).any():
lpo_writer.write_points(points[~ground_mask])
if write_xyz:
stats.ground_points += _write_xyz(lpg_xyz_handle, points[ground_mask])
stats.object_points += _write_xyz(lpo_xyz_handle, points[~ground_mask])
if write_laz:
stats.ground_points = lpg_writer.header.point_count
stats.object_points = lpo_writer.header.point_count
finally:
if lpg_writer is not None:
lpg_writer.close()
if lpo_writer is not None:
lpo_writer.close()
if lpg_xyz_handle is not None:
lpg_xyz_handle.close()
if lpo_xyz_handle is not None:
lpo_xyz_handle.close()
print(
f"[lpolpg] {name}: ground={stats.ground_points}, "
f"objects={stats.object_points}, total={stats.total}"
)
if delete_source:
try:
os.remove(in_path)
print(f"[lpolpg] removed source {name}")
except OSError as exc:
print(f"[lpolpg] warn: could not remove {name}: {exc}")
return 0