440 lines
13 KiB
Python
440 lines
13 KiB
Python
"""Point cloud utilities for XYZ and LAZ file handling.
|
|
|
|
Provides unified reading for:
|
|
- LPG (ground points) and LPO (object points) in XYZ or LAZ format
|
|
- BDOM20RGBI in LAZ format with RGB data
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import glob
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import Iterator, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
try:
|
|
import laspy
|
|
HAS_LASPY = True
|
|
except ImportError:
|
|
HAS_LASPY = False
|
|
|
|
try:
|
|
from shapely.geometry import Polygon, Point
|
|
from shapely.prepared import prep
|
|
HAS_SHAPELY = True
|
|
except ImportError:
|
|
HAS_SHAPELY = False
|
|
|
|
|
|
@dataclass
|
|
class PointCloud:
|
|
"""Container for point cloud data with optional RGB."""
|
|
x: np.ndarray
|
|
y: np.ndarray
|
|
z: np.ndarray
|
|
r: Optional[np.ndarray] = None
|
|
g: Optional[np.ndarray] = None
|
|
b: Optional[np.ndarray] = None
|
|
intensity: Optional[np.ndarray] = None
|
|
|
|
def __len__(self) -> int:
|
|
return len(self.x)
|
|
|
|
@property
|
|
def has_rgb(self) -> bool:
|
|
return self.r is not None and self.g is not None and self.b is not None
|
|
|
|
@property
|
|
def xyz(self) -> np.ndarray:
|
|
"""Return Nx3 array of XYZ coordinates."""
|
|
return np.column_stack([self.x, self.y, self.z])
|
|
|
|
@property
|
|
def rgb(self) -> Optional[np.ndarray]:
|
|
"""Return Nx3 array of RGB values (0-255) if available."""
|
|
if not self.has_rgb:
|
|
return None
|
|
return np.column_stack([self.r, self.g, self.b])
|
|
|
|
def filter_bounds(self, xmin: float, ymin: float, xmax: float, ymax: float) -> "PointCloud":
|
|
"""Filter points to bounding box."""
|
|
mask = (
|
|
(self.x >= xmin) & (self.x <= xmax) &
|
|
(self.y >= ymin) & (self.y <= ymax)
|
|
)
|
|
return PointCloud(
|
|
x=self.x[mask],
|
|
y=self.y[mask],
|
|
z=self.z[mask],
|
|
r=self.r[mask] if self.r is not None else None,
|
|
g=self.g[mask] if self.g is not None else None,
|
|
b=self.b[mask] if self.b is not None else None,
|
|
intensity=self.intensity[mask] if self.intensity is not None else None,
|
|
)
|
|
|
|
def filter_polygon(self, polygon: "Polygon", buffer_m: float = 0.0) -> "PointCloud":
|
|
"""Filter points within a polygon (with optional buffer)."""
|
|
if not HAS_SHAPELY:
|
|
raise ImportError("shapely required for polygon filtering")
|
|
|
|
if buffer_m > 0:
|
|
polygon = polygon.buffer(buffer_m)
|
|
|
|
prepared = prep(polygon)
|
|
mask = np.array([prepared.contains(Point(x, y)) for x, y in zip(self.x, self.y)])
|
|
|
|
return PointCloud(
|
|
x=self.x[mask],
|
|
y=self.y[mask],
|
|
z=self.z[mask],
|
|
r=self.r[mask] if self.r is not None else None,
|
|
g=self.g[mask] if self.g is not None else None,
|
|
b=self.b[mask] if self.b is not None else None,
|
|
intensity=self.intensity[mask] if self.intensity is not None else None,
|
|
)
|
|
|
|
|
|
def read_xyz_file(
|
|
path: str,
|
|
bounds: Optional[Tuple[float, float, float, float]] = None,
|
|
chunk_size: int = 1_000_000,
|
|
) -> PointCloud:
|
|
"""Read XYZ text file (space/tab delimited) with optional bounds filtering.
|
|
|
|
Args:
|
|
path: Path to XYZ file
|
|
bounds: Optional (xmin, ymin, xmax, ymax) to filter points
|
|
chunk_size: Number of lines to read at once for memory efficiency
|
|
|
|
Returns:
|
|
PointCloud with x, y, z arrays
|
|
"""
|
|
all_x, all_y, all_z = [], [], []
|
|
|
|
with open(path, "r") as f:
|
|
while True:
|
|
lines = []
|
|
for _ in range(chunk_size):
|
|
line = f.readline()
|
|
if not line:
|
|
break
|
|
lines.append(line)
|
|
|
|
if not lines:
|
|
break
|
|
|
|
# Parse chunk
|
|
chunk_data = []
|
|
for line in lines:
|
|
parts = line.strip().split()
|
|
if len(parts) >= 3:
|
|
try:
|
|
x, y, z = float(parts[0]), float(parts[1]), float(parts[2])
|
|
chunk_data.append((x, y, z))
|
|
except ValueError:
|
|
continue
|
|
|
|
if not chunk_data:
|
|
continue
|
|
|
|
chunk_arr = np.array(chunk_data, dtype=np.float64)
|
|
|
|
# Apply bounds filter if specified
|
|
if bounds is not None:
|
|
xmin, ymin, xmax, ymax = bounds
|
|
mask = (
|
|
(chunk_arr[:, 0] >= xmin) & (chunk_arr[:, 0] <= xmax) &
|
|
(chunk_arr[:, 1] >= ymin) & (chunk_arr[:, 1] <= ymax)
|
|
)
|
|
chunk_arr = chunk_arr[mask]
|
|
|
|
if len(chunk_arr) > 0:
|
|
all_x.append(chunk_arr[:, 0])
|
|
all_y.append(chunk_arr[:, 1])
|
|
all_z.append(chunk_arr[:, 2])
|
|
|
|
if not all_x:
|
|
return PointCloud(
|
|
x=np.array([], dtype=np.float64),
|
|
y=np.array([], dtype=np.float64),
|
|
z=np.array([], dtype=np.float64),
|
|
)
|
|
|
|
return PointCloud(
|
|
x=np.concatenate(all_x),
|
|
y=np.concatenate(all_y),
|
|
z=np.concatenate(all_z),
|
|
)
|
|
|
|
|
|
def read_laz_file(
|
|
path: str,
|
|
bounds: Optional[Tuple[float, float, float, float]] = None,
|
|
) -> PointCloud:
|
|
"""Read LAZ/LAS file with optional bounds filtering.
|
|
|
|
Args:
|
|
path: Path to LAZ/LAS file
|
|
bounds: Optional (xmin, ymin, xmax, ymax) to filter points
|
|
|
|
Returns:
|
|
PointCloud with x, y, z and optional r, g, b, intensity
|
|
"""
|
|
if not HAS_LASPY:
|
|
raise ImportError("laspy required for LAZ file reading. Install with: pip install laspy[lazrs]")
|
|
|
|
with laspy.open(path) as reader:
|
|
# Read all points (laspy handles decompression)
|
|
las = reader.read()
|
|
|
|
x = np.array(las.x, dtype=np.float64)
|
|
y = np.array(las.y, dtype=np.float64)
|
|
z = np.array(las.z, dtype=np.float64)
|
|
|
|
# Extract RGB if available (typically 16-bit, scale to 8-bit)
|
|
r, g, b = None, None, None
|
|
if hasattr(las, 'red') and hasattr(las, 'green') and hasattr(las, 'blue'):
|
|
# LAZ RGB is often 16-bit, convert to 8-bit
|
|
r = (np.array(las.red, dtype=np.float32) / 256).astype(np.uint8)
|
|
g = (np.array(las.green, dtype=np.float32) / 256).astype(np.uint8)
|
|
b = (np.array(las.blue, dtype=np.float32) / 256).astype(np.uint8)
|
|
|
|
# Extract intensity if available
|
|
intensity = None
|
|
if hasattr(las, 'intensity'):
|
|
intensity = np.array(las.intensity, dtype=np.uint16)
|
|
|
|
pc = PointCloud(x=x, y=y, z=z, r=r, g=g, b=b, intensity=intensity)
|
|
|
|
# Apply bounds filter
|
|
if bounds is not None:
|
|
pc = pc.filter_bounds(*bounds)
|
|
|
|
return pc
|
|
|
|
|
|
def read_pointcloud_file(
|
|
path: str,
|
|
bounds: Optional[Tuple[float, float, float, float]] = None,
|
|
chunk_size: int = 1_000_000,
|
|
) -> PointCloud:
|
|
"""Read XYZ/LAZ/LAS point cloud file with optional bounds filtering."""
|
|
ext = os.path.splitext(path)[1].lower()
|
|
if ext in (".laz", ".las"):
|
|
return read_laz_file(path, bounds=bounds)
|
|
if ext == ".xyz":
|
|
return read_xyz_file(path, bounds=bounds, chunk_size=chunk_size)
|
|
raise ValueError(f"Unsupported point cloud format: {path}")
|
|
|
|
|
|
def _extract_tile_coords(tile_id: str) -> str:
|
|
"""Extract tile coordinates from tile ID.
|
|
|
|
E.g., 'dgm1_32_328_5511' -> '328_5511'
|
|
"""
|
|
parts = tile_id.split("_")
|
|
# Find numeric parts that look like coordinates
|
|
coords = []
|
|
for p in parts:
|
|
if p.isdigit() and len(p) >= 3:
|
|
coords.append(p)
|
|
if len(coords) >= 2:
|
|
return f"{coords[-2]}_{coords[-1]}"
|
|
return tile_id
|
|
|
|
|
|
def find_xyz_file(directory: str, tile_id: str) -> Optional[str]:
|
|
"""Find XYZ file for a tile in directory.
|
|
|
|
Args:
|
|
directory: Directory to search (e.g., 'raw/lpg' or 'raw/lpo')
|
|
tile_id: Tile ID (e.g., 'dgm1_32_328_5511' or '32_328_5511')
|
|
|
|
Returns:
|
|
Path to XYZ file or None if not found
|
|
"""
|
|
return find_pointcloud_file(directory, tile_id, extensions=("xyz",))
|
|
|
|
|
|
def find_pointcloud_file(
|
|
directory: str,
|
|
tile_id: str,
|
|
extensions: Tuple[str, ...] = ("xyz", "laz", "las"),
|
|
) -> Optional[str]:
|
|
"""Find XYZ/LAZ/LAS file for a tile in directory."""
|
|
# Extract coordinate suffix (e.g., '328_5511')
|
|
coords = _extract_tile_coords(tile_id)
|
|
|
|
# Try common naming patterns
|
|
patterns = []
|
|
for ext in extensions:
|
|
patterns.extend([
|
|
os.path.join(directory, f"*{coords}*.{ext}"),
|
|
os.path.join(directory, f"*{tile_id}*.{ext}"),
|
|
os.path.join(directory, f"*_{tile_id}.{ext}"),
|
|
])
|
|
|
|
for pattern in patterns:
|
|
matches = glob.glob(pattern)
|
|
if matches:
|
|
return matches[0]
|
|
|
|
return None
|
|
|
|
|
|
def find_laz_file(directory: str, tile_id: str) -> Optional[str]:
|
|
"""Find LAZ file for a tile in directory.
|
|
|
|
Args:
|
|
directory: Directory to search (e.g., 'raw/bdom20rgbi')
|
|
tile_id: Tile ID (e.g., '32_328_5511')
|
|
|
|
Returns:
|
|
Path to LAZ file or None if not found
|
|
"""
|
|
coords = _extract_tile_coords(tile_id)
|
|
patterns = [
|
|
os.path.join(directory, f"*{coords}*.laz"),
|
|
os.path.join(directory, f"*{tile_id}*.laz"),
|
|
os.path.join(directory, f"*_{tile_id}.laz"),
|
|
os.path.join(directory, f"*{coords}*.las"),
|
|
os.path.join(directory, f"*{tile_id}*.las"),
|
|
os.path.join(directory, f"*_{tile_id}.las"),
|
|
]
|
|
|
|
for pattern in patterns:
|
|
matches = glob.glob(pattern)
|
|
if matches:
|
|
return matches[0]
|
|
|
|
return None
|
|
|
|
|
|
def has_lpg_data(tile_id: str, lpg_dir: str = "raw/lpg") -> bool:
|
|
"""Check if LPG (ground points) data exists for tile."""
|
|
return find_pointcloud_file(lpg_dir, tile_id) is not None
|
|
|
|
|
|
def has_lpo_data(tile_id: str, lpo_dir: str = "raw/lpo") -> bool:
|
|
"""Check if LPO (object points) data exists for tile."""
|
|
return find_pointcloud_file(lpo_dir, tile_id) is not None
|
|
|
|
|
|
def has_lpolpg_data(tile_id: str, lpolpg_dir: str = "raw/lpolpg") -> bool:
|
|
"""Check if combined LPO/LPG data exists for tile."""
|
|
return find_pointcloud_file(lpolpg_dir, tile_id) is not None
|
|
|
|
|
|
def has_bdom_data(tile_id: str, bdom_dir: str = "raw/bdom20rgbi") -> bool:
|
|
"""Check if BDOM20RGBI data exists for tile."""
|
|
return find_laz_file(bdom_dir, tile_id) is not None
|
|
|
|
|
|
def compute_point_density(
|
|
points: PointCloud,
|
|
cell_size: float = 1.0,
|
|
bounds: Optional[Tuple[float, float, float, float]] = None,
|
|
) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
|
|
"""Compute point density raster.
|
|
|
|
Args:
|
|
points: PointCloud to analyze
|
|
cell_size: Grid cell size in meters
|
|
bounds: Optional bounds, otherwise computed from points
|
|
|
|
Returns:
|
|
Tuple of (density_array, (xmin, ymin, xmax, ymax))
|
|
"""
|
|
if len(points) == 0:
|
|
return np.array([[0]]), (0, 0, 1, 1)
|
|
|
|
if bounds is None:
|
|
xmin, xmax = points.x.min(), points.x.max()
|
|
ymin, ymax = points.y.min(), points.y.max()
|
|
else:
|
|
xmin, ymin, xmax, ymax = bounds
|
|
|
|
# Compute grid dimensions
|
|
nx = int(np.ceil((xmax - xmin) / cell_size))
|
|
ny = int(np.ceil((ymax - ymin) / cell_size))
|
|
|
|
if nx <= 0 or ny <= 0:
|
|
return np.array([[0]]), (xmin, ymin, xmax, ymax)
|
|
|
|
# Compute cell indices for each point
|
|
ix = ((points.x - xmin) / cell_size).astype(int)
|
|
iy = ((points.y - ymin) / cell_size).astype(int)
|
|
|
|
# Clamp to grid bounds
|
|
ix = np.clip(ix, 0, nx - 1)
|
|
iy = np.clip(iy, 0, ny - 1)
|
|
|
|
# Count points per cell
|
|
density = np.zeros((ny, nx), dtype=np.int32)
|
|
np.add.at(density, (iy, ix), 1)
|
|
|
|
return density, (xmin, ymin, xmax, ymax)
|
|
|
|
|
|
def compute_height_percentiles(
|
|
points: PointCloud,
|
|
percentiles: list[float] = [5, 25, 50, 75, 95],
|
|
) -> dict[str, float]:
|
|
"""Compute height percentiles from point cloud.
|
|
|
|
Args:
|
|
points: PointCloud to analyze
|
|
percentiles: List of percentiles to compute
|
|
|
|
Returns:
|
|
Dict mapping percentile name (e.g., 'p50') to value
|
|
"""
|
|
if len(points) == 0:
|
|
return {f"p{int(p)}": 0.0 for p in percentiles}
|
|
|
|
result = {}
|
|
for p in percentiles:
|
|
result[f"p{int(p)}"] = float(np.percentile(points.z, p))
|
|
|
|
result["min"] = float(points.z.min())
|
|
result["max"] = float(points.z.max())
|
|
result["mean"] = float(points.z.mean())
|
|
result["std"] = float(points.z.std())
|
|
|
|
return result
|
|
|
|
|
|
def sample_rgb_average(
|
|
points: PointCloud,
|
|
center_x: float,
|
|
center_y: float,
|
|
radius: float,
|
|
) -> Optional[Tuple[int, int, int]]:
|
|
"""Sample average RGB color from points within radius of center.
|
|
|
|
Args:
|
|
points: PointCloud with RGB data
|
|
center_x, center_y: Center point
|
|
radius: Search radius in meters
|
|
|
|
Returns:
|
|
Tuple of (r, g, b) in 0-255 range, or None if no points/no RGB
|
|
"""
|
|
if not points.has_rgb or len(points) == 0:
|
|
return None
|
|
|
|
# Find points within radius
|
|
dist_sq = (points.x - center_x) ** 2 + (points.y - center_y) ** 2
|
|
mask = dist_sq <= radius ** 2
|
|
|
|
if not mask.any():
|
|
return None
|
|
|
|
r_avg = int(np.mean(points.r[mask]))
|
|
g_avg = int(np.mean(points.g[mask]))
|
|
b_avg = int(np.mean(points.b[mask]))
|
|
|
|
return (r_avg, g_avg, b_avg)
|