"""Point cloud utilities for XYZ and LAZ file handling. Provides unified reading for: - LPG (ground points) and LPO (object points) in XYZ or LAZ format - BDOM20RGBI in LAZ format with RGB data """ from __future__ import annotations import glob import os from dataclasses import dataclass from typing import Iterator, Optional, Tuple import numpy as np try: import laspy HAS_LASPY = True except ImportError: HAS_LASPY = False try: from shapely.geometry import Polygon, Point from shapely.prepared import prep HAS_SHAPELY = True except ImportError: HAS_SHAPELY = False @dataclass class PointCloud: """Container for point cloud data with optional RGB.""" x: np.ndarray y: np.ndarray z: np.ndarray r: Optional[np.ndarray] = None g: Optional[np.ndarray] = None b: Optional[np.ndarray] = None intensity: Optional[np.ndarray] = None def __len__(self) -> int: return len(self.x) @property def has_rgb(self) -> bool: return self.r is not None and self.g is not None and self.b is not None @property def xyz(self) -> np.ndarray: """Return Nx3 array of XYZ coordinates.""" return np.column_stack([self.x, self.y, self.z]) @property def rgb(self) -> Optional[np.ndarray]: """Return Nx3 array of RGB values (0-255) if available.""" if not self.has_rgb: return None return np.column_stack([self.r, self.g, self.b]) def filter_bounds(self, xmin: float, ymin: float, xmax: float, ymax: float) -> "PointCloud": """Filter points to bounding box.""" mask = ( (self.x >= xmin) & (self.x <= xmax) & (self.y >= ymin) & (self.y <= ymax) ) return PointCloud( x=self.x[mask], y=self.y[mask], z=self.z[mask], r=self.r[mask] if self.r is not None else None, g=self.g[mask] if self.g is not None else None, b=self.b[mask] if self.b is not None else None, intensity=self.intensity[mask] if self.intensity is not None else None, ) def filter_polygon(self, polygon: "Polygon", buffer_m: float = 0.0) -> "PointCloud": """Filter points within a polygon (with optional buffer).""" if not HAS_SHAPELY: raise ImportError("shapely required for polygon filtering") if buffer_m > 0: polygon = polygon.buffer(buffer_m) prepared = prep(polygon) mask = np.array([prepared.contains(Point(x, y)) for x, y in zip(self.x, self.y)]) return PointCloud( x=self.x[mask], y=self.y[mask], z=self.z[mask], r=self.r[mask] if self.r is not None else None, g=self.g[mask] if self.g is not None else None, b=self.b[mask] if self.b is not None else None, intensity=self.intensity[mask] if self.intensity is not None else None, ) def read_xyz_file( path: str, bounds: Optional[Tuple[float, float, float, float]] = None, chunk_size: int = 1_000_000, ) -> PointCloud: """Read XYZ text file (space/tab delimited) with optional bounds filtering. Args: path: Path to XYZ file bounds: Optional (xmin, ymin, xmax, ymax) to filter points chunk_size: Number of lines to read at once for memory efficiency Returns: PointCloud with x, y, z arrays """ all_x, all_y, all_z = [], [], [] with open(path, "r") as f: while True: lines = [] for _ in range(chunk_size): line = f.readline() if not line: break lines.append(line) if not lines: break # Parse chunk chunk_data = [] for line in lines: parts = line.strip().split() if len(parts) >= 3: try: x, y, z = float(parts[0]), float(parts[1]), float(parts[2]) chunk_data.append((x, y, z)) except ValueError: continue if not chunk_data: continue chunk_arr = np.array(chunk_data, dtype=np.float64) # Apply bounds filter if specified if bounds is not None: xmin, ymin, xmax, ymax = bounds mask = ( (chunk_arr[:, 0] >= xmin) & (chunk_arr[:, 0] <= xmax) & (chunk_arr[:, 1] >= ymin) & (chunk_arr[:, 1] <= ymax) ) chunk_arr = chunk_arr[mask] if len(chunk_arr) > 0: all_x.append(chunk_arr[:, 0]) all_y.append(chunk_arr[:, 1]) all_z.append(chunk_arr[:, 2]) if not all_x: return PointCloud( x=np.array([], dtype=np.float64), y=np.array([], dtype=np.float64), z=np.array([], dtype=np.float64), ) return PointCloud( x=np.concatenate(all_x), y=np.concatenate(all_y), z=np.concatenate(all_z), ) def read_laz_file( path: str, bounds: Optional[Tuple[float, float, float, float]] = None, ) -> PointCloud: """Read LAZ/LAS file with optional bounds filtering. Args: path: Path to LAZ/LAS file bounds: Optional (xmin, ymin, xmax, ymax) to filter points Returns: PointCloud with x, y, z and optional r, g, b, intensity """ if not HAS_LASPY: raise ImportError("laspy required for LAZ file reading. Install with: pip install laspy[lazrs]") with laspy.open(path) as reader: # Read all points (laspy handles decompression) las = reader.read() x = np.array(las.x, dtype=np.float64) y = np.array(las.y, dtype=np.float64) z = np.array(las.z, dtype=np.float64) # Extract RGB if available (typically 16-bit, scale to 8-bit) r, g, b = None, None, None if hasattr(las, 'red') and hasattr(las, 'green') and hasattr(las, 'blue'): # LAZ RGB is often 16-bit, convert to 8-bit r = (np.array(las.red, dtype=np.float32) / 256).astype(np.uint8) g = (np.array(las.green, dtype=np.float32) / 256).astype(np.uint8) b = (np.array(las.blue, dtype=np.float32) / 256).astype(np.uint8) # Extract intensity if available intensity = None if hasattr(las, 'intensity'): intensity = np.array(las.intensity, dtype=np.uint16) pc = PointCloud(x=x, y=y, z=z, r=r, g=g, b=b, intensity=intensity) # Apply bounds filter if bounds is not None: pc = pc.filter_bounds(*bounds) return pc def read_pointcloud_file( path: str, bounds: Optional[Tuple[float, float, float, float]] = None, chunk_size: int = 1_000_000, ) -> PointCloud: """Read XYZ/LAZ/LAS point cloud file with optional bounds filtering.""" ext = os.path.splitext(path)[1].lower() if ext in (".laz", ".las"): return read_laz_file(path, bounds=bounds) if ext == ".xyz": return read_xyz_file(path, bounds=bounds, chunk_size=chunk_size) raise ValueError(f"Unsupported point cloud format: {path}") def _extract_tile_coords(tile_id: str) -> str: """Extract tile coordinates from tile ID. E.g., 'dgm1_32_328_5511' -> '328_5511' """ parts = tile_id.split("_") # Find numeric parts that look like coordinates coords = [] for p in parts: if p.isdigit() and len(p) >= 3: coords.append(p) if len(coords) >= 2: return f"{coords[-2]}_{coords[-1]}" return tile_id def find_xyz_file(directory: str, tile_id: str) -> Optional[str]: """Find XYZ file for a tile in directory. Args: directory: Directory to search (e.g., 'raw/lpg' or 'raw/lpo') tile_id: Tile ID (e.g., 'dgm1_32_328_5511' or '32_328_5511') Returns: Path to XYZ file or None if not found """ return find_pointcloud_file(directory, tile_id, extensions=("xyz",)) def find_pointcloud_file( directory: str, tile_id: str, extensions: Tuple[str, ...] = ("xyz", "laz", "las"), ) -> Optional[str]: """Find XYZ/LAZ/LAS file for a tile in directory.""" # Extract coordinate suffix (e.g., '328_5511') coords = _extract_tile_coords(tile_id) # Try common naming patterns patterns = [] for ext in extensions: patterns.extend([ os.path.join(directory, f"*{coords}*.{ext}"), os.path.join(directory, f"*{tile_id}*.{ext}"), os.path.join(directory, f"*_{tile_id}.{ext}"), ]) for pattern in patterns: matches = glob.glob(pattern) if matches: return matches[0] return None def find_laz_file(directory: str, tile_id: str) -> Optional[str]: """Find LAZ file for a tile in directory. Args: directory: Directory to search (e.g., 'raw/bdom20rgbi') tile_id: Tile ID (e.g., '32_328_5511') Returns: Path to LAZ file or None if not found """ coords = _extract_tile_coords(tile_id) patterns = [ os.path.join(directory, f"*{coords}*.laz"), os.path.join(directory, f"*{tile_id}*.laz"), os.path.join(directory, f"*_{tile_id}.laz"), os.path.join(directory, f"*{coords}*.las"), os.path.join(directory, f"*{tile_id}*.las"), os.path.join(directory, f"*_{tile_id}.las"), ] for pattern in patterns: matches = glob.glob(pattern) if matches: return matches[0] return None def has_lpg_data(tile_id: str, lpg_dir: str = "raw/lpg") -> bool: """Check if LPG (ground points) data exists for tile.""" return find_pointcloud_file(lpg_dir, tile_id) is not None def has_lpo_data(tile_id: str, lpo_dir: str = "raw/lpo") -> bool: """Check if LPO (object points) data exists for tile.""" return find_pointcloud_file(lpo_dir, tile_id) is not None def has_lpolpg_data(tile_id: str, lpolpg_dir: str = "raw/lpolpg") -> bool: """Check if combined LPO/LPG data exists for tile.""" return find_pointcloud_file(lpolpg_dir, tile_id) is not None def has_bdom_data(tile_id: str, bdom_dir: str = "raw/bdom20rgbi") -> bool: """Check if BDOM20RGBI data exists for tile.""" return find_laz_file(bdom_dir, tile_id) is not None def compute_point_density( points: PointCloud, cell_size: float = 1.0, bounds: Optional[Tuple[float, float, float, float]] = None, ) -> Tuple[np.ndarray, Tuple[float, float, float, float]]: """Compute point density raster. Args: points: PointCloud to analyze cell_size: Grid cell size in meters bounds: Optional bounds, otherwise computed from points Returns: Tuple of (density_array, (xmin, ymin, xmax, ymax)) """ if len(points) == 0: return np.array([[0]]), (0, 0, 1, 1) if bounds is None: xmin, xmax = points.x.min(), points.x.max() ymin, ymax = points.y.min(), points.y.max() else: xmin, ymin, xmax, ymax = bounds # Compute grid dimensions nx = int(np.ceil((xmax - xmin) / cell_size)) ny = int(np.ceil((ymax - ymin) / cell_size)) if nx <= 0 or ny <= 0: return np.array([[0]]), (xmin, ymin, xmax, ymax) # Compute cell indices for each point ix = ((points.x - xmin) / cell_size).astype(int) iy = ((points.y - ymin) / cell_size).astype(int) # Clamp to grid bounds ix = np.clip(ix, 0, nx - 1) iy = np.clip(iy, 0, ny - 1) # Count points per cell density = np.zeros((ny, nx), dtype=np.int32) np.add.at(density, (iy, ix), 1) return density, (xmin, ymin, xmax, ymax) def compute_height_percentiles( points: PointCloud, percentiles: list[float] = [5, 25, 50, 75, 95], ) -> dict[str, float]: """Compute height percentiles from point cloud. Args: points: PointCloud to analyze percentiles: List of percentiles to compute Returns: Dict mapping percentile name (e.g., 'p50') to value """ if len(points) == 0: return {f"p{int(p)}": 0.0 for p in percentiles} result = {} for p in percentiles: result[f"p{int(p)}"] = float(np.percentile(points.z, p)) result["min"] = float(points.z.min()) result["max"] = float(points.z.max()) result["mean"] = float(points.z.mean()) result["std"] = float(points.z.std()) return result def sample_rgb_average( points: PointCloud, center_x: float, center_y: float, radius: float, ) -> Optional[Tuple[int, int, int]]: """Sample average RGB color from points within radius of center. Args: points: PointCloud with RGB data center_x, center_y: Center point radius: Search radius in meters Returns: Tuple of (r, g, b) in 0-255 range, or None if no points/no RGB """ if not points.has_rgb or len(points) == 0: return None # Find points within radius dist_sq = (points.x - center_x) ** 2 + (points.y - center_y) ** 2 mask = dist_sq <= radius ** 2 if not mask.any(): return None r_avg = int(np.mean(points.r[mask])) g_avg = int(np.mean(points.g[mask])) b_avg = int(np.mean(points.b[mask])) return (r_avg, g_avg, b_avg)