#!/usr/bin/env python3 """Split triangulated CityJSON files by semantic surface type.""" from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any, Iterable SEMANTIC_PRESETS = { "roof": {"RoofSurface"}, "wall": {"WallSurface"}, "ground": {"GroundSurface"}, "closure": {"ClosureSurface"}, } def parse_args(argv: Iterable[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Split triangulated CityJSON files into per-semantic outputs (e.g., roof/wall)." ) parser.add_argument( "--input-dir", type=Path, default=Path("work/cityjson_tri"), help="Directory containing triangulated CityJSON files.", ) parser.add_argument( "--output-dir", type=Path, default=Path("work/cityjson_split"), help="Directory to write filtered CityJSON files.", ) parser.add_argument( "--targets", nargs="+", choices=sorted(SEMANTIC_PRESETS), default=["roof", "wall"], help="Semantic presets to emit. Defaults to roof and wall.", ) parser.add_argument( "--pattern", default="*.city.json", help="Glob pattern for input files (defaults to all .city.json files).", ) return parser.parse_args(argv) def read_json(path: Path) -> dict[str, Any]: with path.open("r", encoding="utf-8") as handle: return json.load(handle) def write_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, ensure_ascii=True, indent=2) handle.write("\n") def base_name(path: Path) -> str: name = path.name if name.endswith(".tri.city.json"): return name[: -len(".tri.city.json")] if name.endswith(".city.json"): return name[: -len(".city.json")] return path.stem def allowed_semantic_indices(semantics: dict[str, Any], allowed_types: set[str]) -> set[int]: surfaces = semantics.get("surfaces") or [] return {idx for idx, surface in enumerate(surfaces) if surface.get("type") in allowed_types} def filter_boundaries( boundaries: list[Any], values: list[Any], allowed_indices: set[int], ) -> tuple[list[Any], list[Any]]: filtered_boundaries: list[Any] = [] filtered_values: list[Any] = [] for boundary, value in zip(boundaries, values): if isinstance(value, list): nested_boundaries, nested_values = filter_boundaries(boundary, value, allowed_indices) if nested_boundaries: filtered_boundaries.append(nested_boundaries) filtered_values.append(nested_values) continue if value is None: continue if value in allowed_indices: filtered_boundaries.append(boundary) filtered_values.append(value) return filtered_boundaries, filtered_values def filter_geometry(geometry: dict[str, Any], allowed_types: set[str]) -> dict[str, Any] | None: semantics = geometry.get("semantics") boundaries = geometry.get("boundaries") if not semantics or boundaries is None: return None values = semantics.get("values") if values is None: return None allowed_indices = allowed_semantic_indices(semantics, allowed_types) if not allowed_indices: return None filtered_boundaries, filtered_values = filter_boundaries(boundaries, values, allowed_indices) if not filtered_boundaries: return None filtered_geometry = dict(geometry) filtered_geometry["boundaries"] = filtered_boundaries filtered_geometry["semantics"] = dict(semantics) filtered_geometry["semantics"]["values"] = filtered_values return filtered_geometry def filter_cityobject(cityobject: dict[str, Any], allowed_types: set[str]) -> dict[str, Any] | None: geometries = cityobject.get("geometry") or [] filtered_geometries = [] for geometry in geometries: filtered_geometry = filter_geometry(geometry, allowed_types) if filtered_geometry: filtered_geometries.append(filtered_geometry) if not filtered_geometries: return None filtered_object = dict(cityobject) filtered_object["geometry"] = filtered_geometries return filtered_object def filter_cityjson(cityjson: dict[str, Any], allowed_types: set[str]) -> dict[str, Any] | None: cityobjects = cityjson.get("CityObjects") or {} filtered_objects = {} for object_id, cityobject in cityobjects.items(): filtered_object = filter_cityobject(cityobject, allowed_types) if filtered_object: filtered_objects[object_id] = filtered_object if not filtered_objects: return None filtered_cityjson = {k: v for k, v in cityjson.items() if k != "CityObjects"} filtered_cityjson["CityObjects"] = filtered_objects return filtered_cityjson def process_file(path: Path, targets: list[str], output_dir: Path) -> int: cityjson = read_json(path) written = 0 base = base_name(path) for target in targets: allowed_types = SEMANTIC_PRESETS[target] filtered = filter_cityjson(cityjson, allowed_types) if not filtered: print(f"[skip] {base} has no geometry for target '{target}'", file=sys.stderr) continue output_path = output_dir / f"{base}.{target}.city.json" write_json(output_path, filtered) written += 1 return written def main(argv: Iterable[str] | None = None) -> int: args = parse_args(argv) files = sorted(args.input_dir.glob(args.pattern)) if not files: print(f"No input files matched pattern '{args.pattern}' in {args.input_dir}", file=sys.stderr) return 1 total_written = 0 for path in files: total_written += process_file(path, args.targets, args.output_dir) print(f"Wrote {total_written} files to {args.output_dir}") return 0 if total_written else 1 if __name__ == "__main__": sys.exit(main())