Files
GeoData/geodata_pipeline/pointcloud.py

440 lines
13 KiB
Python

"""Point cloud utilities for XYZ and LAZ file handling.
Provides unified reading for:
- LPG (ground points) and LPO (object points) in XYZ or LAZ format
- BDOM20RGBI in LAZ format with RGB data
"""
from __future__ import annotations
import glob
import os
from dataclasses import dataclass
from typing import Iterator, Optional, Tuple
import numpy as np
try:
import laspy
HAS_LASPY = True
except ImportError:
HAS_LASPY = False
try:
from shapely.geometry import Polygon, Point
from shapely.prepared import prep
HAS_SHAPELY = True
except ImportError:
HAS_SHAPELY = False
@dataclass
class PointCloud:
"""Container for point cloud data with optional RGB."""
x: np.ndarray
y: np.ndarray
z: np.ndarray
r: Optional[np.ndarray] = None
g: Optional[np.ndarray] = None
b: Optional[np.ndarray] = None
intensity: Optional[np.ndarray] = None
def __len__(self) -> int:
return len(self.x)
@property
def has_rgb(self) -> bool:
return self.r is not None and self.g is not None and self.b is not None
@property
def xyz(self) -> np.ndarray:
"""Return Nx3 array of XYZ coordinates."""
return np.column_stack([self.x, self.y, self.z])
@property
def rgb(self) -> Optional[np.ndarray]:
"""Return Nx3 array of RGB values (0-255) if available."""
if not self.has_rgb:
return None
return np.column_stack([self.r, self.g, self.b])
def filter_bounds(self, xmin: float, ymin: float, xmax: float, ymax: float) -> "PointCloud":
"""Filter points to bounding box."""
mask = (
(self.x >= xmin) & (self.x <= xmax) &
(self.y >= ymin) & (self.y <= ymax)
)
return PointCloud(
x=self.x[mask],
y=self.y[mask],
z=self.z[mask],
r=self.r[mask] if self.r is not None else None,
g=self.g[mask] if self.g is not None else None,
b=self.b[mask] if self.b is not None else None,
intensity=self.intensity[mask] if self.intensity is not None else None,
)
def filter_polygon(self, polygon: "Polygon", buffer_m: float = 0.0) -> "PointCloud":
"""Filter points within a polygon (with optional buffer)."""
if not HAS_SHAPELY:
raise ImportError("shapely required for polygon filtering")
if buffer_m > 0:
polygon = polygon.buffer(buffer_m)
prepared = prep(polygon)
mask = np.array([prepared.contains(Point(x, y)) for x, y in zip(self.x, self.y)])
return PointCloud(
x=self.x[mask],
y=self.y[mask],
z=self.z[mask],
r=self.r[mask] if self.r is not None else None,
g=self.g[mask] if self.g is not None else None,
b=self.b[mask] if self.b is not None else None,
intensity=self.intensity[mask] if self.intensity is not None else None,
)
def read_xyz_file(
path: str,
bounds: Optional[Tuple[float, float, float, float]] = None,
chunk_size: int = 1_000_000,
) -> PointCloud:
"""Read XYZ text file (space/tab delimited) with optional bounds filtering.
Args:
path: Path to XYZ file
bounds: Optional (xmin, ymin, xmax, ymax) to filter points
chunk_size: Number of lines to read at once for memory efficiency
Returns:
PointCloud with x, y, z arrays
"""
all_x, all_y, all_z = [], [], []
with open(path, "r") as f:
while True:
lines = []
for _ in range(chunk_size):
line = f.readline()
if not line:
break
lines.append(line)
if not lines:
break
# Parse chunk
chunk_data = []
for line in lines:
parts = line.strip().split()
if len(parts) >= 3:
try:
x, y, z = float(parts[0]), float(parts[1]), float(parts[2])
chunk_data.append((x, y, z))
except ValueError:
continue
if not chunk_data:
continue
chunk_arr = np.array(chunk_data, dtype=np.float64)
# Apply bounds filter if specified
if bounds is not None:
xmin, ymin, xmax, ymax = bounds
mask = (
(chunk_arr[:, 0] >= xmin) & (chunk_arr[:, 0] <= xmax) &
(chunk_arr[:, 1] >= ymin) & (chunk_arr[:, 1] <= ymax)
)
chunk_arr = chunk_arr[mask]
if len(chunk_arr) > 0:
all_x.append(chunk_arr[:, 0])
all_y.append(chunk_arr[:, 1])
all_z.append(chunk_arr[:, 2])
if not all_x:
return PointCloud(
x=np.array([], dtype=np.float64),
y=np.array([], dtype=np.float64),
z=np.array([], dtype=np.float64),
)
return PointCloud(
x=np.concatenate(all_x),
y=np.concatenate(all_y),
z=np.concatenate(all_z),
)
def read_laz_file(
path: str,
bounds: Optional[Tuple[float, float, float, float]] = None,
) -> PointCloud:
"""Read LAZ/LAS file with optional bounds filtering.
Args:
path: Path to LAZ/LAS file
bounds: Optional (xmin, ymin, xmax, ymax) to filter points
Returns:
PointCloud with x, y, z and optional r, g, b, intensity
"""
if not HAS_LASPY:
raise ImportError("laspy required for LAZ file reading. Install with: pip install laspy[lazrs]")
with laspy.open(path) as reader:
# Read all points (laspy handles decompression)
las = reader.read()
x = np.array(las.x, dtype=np.float64)
y = np.array(las.y, dtype=np.float64)
z = np.array(las.z, dtype=np.float64)
# Extract RGB if available (typically 16-bit, scale to 8-bit)
r, g, b = None, None, None
if hasattr(las, 'red') and hasattr(las, 'green') and hasattr(las, 'blue'):
# LAZ RGB is often 16-bit, convert to 8-bit
r = (np.array(las.red, dtype=np.float32) / 256).astype(np.uint8)
g = (np.array(las.green, dtype=np.float32) / 256).astype(np.uint8)
b = (np.array(las.blue, dtype=np.float32) / 256).astype(np.uint8)
# Extract intensity if available
intensity = None
if hasattr(las, 'intensity'):
intensity = np.array(las.intensity, dtype=np.uint16)
pc = PointCloud(x=x, y=y, z=z, r=r, g=g, b=b, intensity=intensity)
# Apply bounds filter
if bounds is not None:
pc = pc.filter_bounds(*bounds)
return pc
def read_pointcloud_file(
path: str,
bounds: Optional[Tuple[float, float, float, float]] = None,
chunk_size: int = 1_000_000,
) -> PointCloud:
"""Read XYZ/LAZ/LAS point cloud file with optional bounds filtering."""
ext = os.path.splitext(path)[1].lower()
if ext in (".laz", ".las"):
return read_laz_file(path, bounds=bounds)
if ext == ".xyz":
return read_xyz_file(path, bounds=bounds, chunk_size=chunk_size)
raise ValueError(f"Unsupported point cloud format: {path}")
def _extract_tile_coords(tile_id: str) -> str:
"""Extract tile coordinates from tile ID.
E.g., 'dgm1_32_328_5511' -> '328_5511'
"""
parts = tile_id.split("_")
# Find numeric parts that look like coordinates
coords = []
for p in parts:
if p.isdigit() and len(p) >= 3:
coords.append(p)
if len(coords) >= 2:
return f"{coords[-2]}_{coords[-1]}"
return tile_id
def find_xyz_file(directory: str, tile_id: str) -> Optional[str]:
"""Find XYZ file for a tile in directory.
Args:
directory: Directory to search (e.g., 'raw/lpg' or 'raw/lpo')
tile_id: Tile ID (e.g., 'dgm1_32_328_5511' or '32_328_5511')
Returns:
Path to XYZ file or None if not found
"""
return find_pointcloud_file(directory, tile_id, extensions=("xyz",))
def find_pointcloud_file(
directory: str,
tile_id: str,
extensions: Tuple[str, ...] = ("xyz", "laz", "las"),
) -> Optional[str]:
"""Find XYZ/LAZ/LAS file for a tile in directory."""
# Extract coordinate suffix (e.g., '328_5511')
coords = _extract_tile_coords(tile_id)
# Try common naming patterns
patterns = []
for ext in extensions:
patterns.extend([
os.path.join(directory, f"*{coords}*.{ext}"),
os.path.join(directory, f"*{tile_id}*.{ext}"),
os.path.join(directory, f"*_{tile_id}.{ext}"),
])
for pattern in patterns:
matches = glob.glob(pattern)
if matches:
return matches[0]
return None
def find_laz_file(directory: str, tile_id: str) -> Optional[str]:
"""Find LAZ file for a tile in directory.
Args:
directory: Directory to search (e.g., 'raw/bdom20rgbi')
tile_id: Tile ID (e.g., '32_328_5511')
Returns:
Path to LAZ file or None if not found
"""
coords = _extract_tile_coords(tile_id)
patterns = [
os.path.join(directory, f"*{coords}*.laz"),
os.path.join(directory, f"*{tile_id}*.laz"),
os.path.join(directory, f"*_{tile_id}.laz"),
os.path.join(directory, f"*{coords}*.las"),
os.path.join(directory, f"*{tile_id}*.las"),
os.path.join(directory, f"*_{tile_id}.las"),
]
for pattern in patterns:
matches = glob.glob(pattern)
if matches:
return matches[0]
return None
def has_lpg_data(tile_id: str, lpg_dir: str = "raw/lpg") -> bool:
"""Check if LPG (ground points) data exists for tile."""
return find_pointcloud_file(lpg_dir, tile_id) is not None
def has_lpo_data(tile_id: str, lpo_dir: str = "raw/lpo") -> bool:
"""Check if LPO (object points) data exists for tile."""
return find_pointcloud_file(lpo_dir, tile_id) is not None
def has_lpolpg_data(tile_id: str, lpolpg_dir: str = "raw/lpolpg") -> bool:
"""Check if combined LPO/LPG data exists for tile."""
return find_pointcloud_file(lpolpg_dir, tile_id) is not None
def has_bdom_data(tile_id: str, bdom_dir: str = "raw/bdom20rgbi") -> bool:
"""Check if BDOM20RGBI data exists for tile."""
return find_laz_file(bdom_dir, tile_id) is not None
def compute_point_density(
points: PointCloud,
cell_size: float = 1.0,
bounds: Optional[Tuple[float, float, float, float]] = None,
) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
"""Compute point density raster.
Args:
points: PointCloud to analyze
cell_size: Grid cell size in meters
bounds: Optional bounds, otherwise computed from points
Returns:
Tuple of (density_array, (xmin, ymin, xmax, ymax))
"""
if len(points) == 0:
return np.array([[0]]), (0, 0, 1, 1)
if bounds is None:
xmin, xmax = points.x.min(), points.x.max()
ymin, ymax = points.y.min(), points.y.max()
else:
xmin, ymin, xmax, ymax = bounds
# Compute grid dimensions
nx = int(np.ceil((xmax - xmin) / cell_size))
ny = int(np.ceil((ymax - ymin) / cell_size))
if nx <= 0 or ny <= 0:
return np.array([[0]]), (xmin, ymin, xmax, ymax)
# Compute cell indices for each point
ix = ((points.x - xmin) / cell_size).astype(int)
iy = ((points.y - ymin) / cell_size).astype(int)
# Clamp to grid bounds
ix = np.clip(ix, 0, nx - 1)
iy = np.clip(iy, 0, ny - 1)
# Count points per cell
density = np.zeros((ny, nx), dtype=np.int32)
np.add.at(density, (iy, ix), 1)
return density, (xmin, ymin, xmax, ymax)
def compute_height_percentiles(
points: PointCloud,
percentiles: list[float] = [5, 25, 50, 75, 95],
) -> dict[str, float]:
"""Compute height percentiles from point cloud.
Args:
points: PointCloud to analyze
percentiles: List of percentiles to compute
Returns:
Dict mapping percentile name (e.g., 'p50') to value
"""
if len(points) == 0:
return {f"p{int(p)}": 0.0 for p in percentiles}
result = {}
for p in percentiles:
result[f"p{int(p)}"] = float(np.percentile(points.z, p))
result["min"] = float(points.z.min())
result["max"] = float(points.z.max())
result["mean"] = float(points.z.mean())
result["std"] = float(points.z.std())
return result
def sample_rgb_average(
points: PointCloud,
center_x: float,
center_y: float,
radius: float,
) -> Optional[Tuple[int, int, int]]:
"""Sample average RGB color from points within radius of center.
Args:
points: PointCloud with RGB data
center_x, center_y: Center point
radius: Search radius in meters
Returns:
Tuple of (r, g, b) in 0-255 range, or None if no points/no RGB
"""
if not points.has_rgb or len(points) == 0:
return None
# Find points within radius
dist_sq = (points.x - center_x) ** 2 + (points.y - center_y) ** 2
mask = dist_sq <= radius ** 2
if not mask.any():
return None
r_avg = int(np.mean(points.r[mask]))
g_avg = int(np.mean(points.g[mask]))
b_avg = int(np.mean(points.b[mask]))
return (r_avg, g_avg, b_avg)