216 lines
6.7 KiB
Python
216 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Split triangulated CityJSON files by semantic surface type."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
SEMANTIC_PRESETS = {
|
|
"roof": {"RoofSurface"},
|
|
"wall": {"WallSurface"},
|
|
"ground": {"GroundSurface"},
|
|
"closure": {"ClosureSurface"},
|
|
}
|
|
|
|
|
|
def parse_args(argv: Iterable[str] | None = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Split triangulated CityJSON files into per-semantic outputs (e.g., roof/wall)."
|
|
)
|
|
parser.add_argument(
|
|
"--input-dir",
|
|
type=Path,
|
|
default=Path("work/cityjson_tri"),
|
|
help="Directory containing triangulated CityJSON files.",
|
|
)
|
|
parser.add_argument(
|
|
"--output-dir",
|
|
type=Path,
|
|
default=Path("work/cityjson_split"),
|
|
help="Directory to write filtered CityJSON files.",
|
|
)
|
|
parser.add_argument(
|
|
"--targets",
|
|
nargs="+",
|
|
choices=sorted(SEMANTIC_PRESETS),
|
|
default=["roof", "wall"],
|
|
help="Semantic presets to emit. Defaults to roof and wall.",
|
|
)
|
|
parser.add_argument(
|
|
"--pattern",
|
|
default="**/*.json",
|
|
help="Glob pattern for input files (defaults to any .json under the input dir).",
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def read_json(path: Path) -> dict[str, Any]:
|
|
with path.open("r", encoding="utf-8") as handle:
|
|
return json.load(handle)
|
|
|
|
|
|
def write_json(path: Path, payload: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with path.open("w", encoding="utf-8") as handle:
|
|
json.dump(payload, handle, ensure_ascii=True, indent=2)
|
|
handle.write("\n")
|
|
|
|
|
|
def base_name(path: Path) -> str:
|
|
name = path.name
|
|
if name.endswith(".tri.city.json"):
|
|
return name[: -len(".tri.city.json")]
|
|
if name.endswith(".city.json"):
|
|
return name[: -len(".city.json")]
|
|
if name.endswith(".json"):
|
|
return name[: -len(".json")]
|
|
return path.stem
|
|
|
|
|
|
def resolve_input_file(path: Path) -> Path | None:
|
|
"""Handle both flat files and citygml-tools style output directories."""
|
|
if path.is_file():
|
|
return path
|
|
if path.is_dir():
|
|
# citygml-tools writes <name>.city.json/<name>.json
|
|
candidate = path / f"{path.stem}.json"
|
|
if candidate.is_file():
|
|
return candidate
|
|
matches = list(path.glob("*.json"))
|
|
if len(matches) == 1:
|
|
return matches[0]
|
|
return None
|
|
|
|
|
|
def allowed_semantic_indices(semantics: dict[str, Any], allowed_types: set[str]) -> set[int]:
|
|
surfaces = semantics.get("surfaces") or []
|
|
return {idx for idx, surface in enumerate(surfaces) if surface.get("type") in allowed_types}
|
|
|
|
|
|
def filter_boundaries(
|
|
boundaries: list[Any],
|
|
values: list[Any],
|
|
allowed_indices: set[int],
|
|
) -> tuple[list[Any], list[Any]]:
|
|
filtered_boundaries: list[Any] = []
|
|
filtered_values: list[Any] = []
|
|
|
|
for boundary, value in zip(boundaries, values):
|
|
if isinstance(value, list):
|
|
nested_boundaries, nested_values = filter_boundaries(boundary, value, allowed_indices)
|
|
if nested_boundaries:
|
|
filtered_boundaries.append(nested_boundaries)
|
|
filtered_values.append(nested_values)
|
|
continue
|
|
|
|
if value is None:
|
|
continue
|
|
if value in allowed_indices:
|
|
filtered_boundaries.append(boundary)
|
|
filtered_values.append(value)
|
|
|
|
return filtered_boundaries, filtered_values
|
|
|
|
|
|
def filter_geometry(geometry: dict[str, Any], allowed_types: set[str]) -> dict[str, Any] | None:
|
|
semantics = geometry.get("semantics")
|
|
boundaries = geometry.get("boundaries")
|
|
if not semantics or boundaries is None:
|
|
return None
|
|
|
|
values = semantics.get("values")
|
|
if values is None:
|
|
return None
|
|
|
|
allowed_indices = allowed_semantic_indices(semantics, allowed_types)
|
|
if not allowed_indices:
|
|
return None
|
|
|
|
filtered_boundaries, filtered_values = filter_boundaries(boundaries, values, allowed_indices)
|
|
if not filtered_boundaries:
|
|
return None
|
|
|
|
filtered_geometry = dict(geometry)
|
|
filtered_geometry["boundaries"] = filtered_boundaries
|
|
filtered_geometry["semantics"] = dict(semantics)
|
|
filtered_geometry["semantics"]["values"] = filtered_values
|
|
return filtered_geometry
|
|
|
|
|
|
def filter_cityobject(cityobject: dict[str, Any], allowed_types: set[str]) -> dict[str, Any] | None:
|
|
geometries = cityobject.get("geometry") or []
|
|
filtered_geometries = []
|
|
for geometry in geometries:
|
|
filtered_geometry = filter_geometry(geometry, allowed_types)
|
|
if filtered_geometry:
|
|
filtered_geometries.append(filtered_geometry)
|
|
|
|
if not filtered_geometries:
|
|
return None
|
|
|
|
filtered_object = dict(cityobject)
|
|
filtered_object["geometry"] = filtered_geometries
|
|
return filtered_object
|
|
|
|
|
|
def filter_cityjson(cityjson: dict[str, Any], allowed_types: set[str]) -> dict[str, Any] | None:
|
|
cityobjects = cityjson.get("CityObjects") or {}
|
|
filtered_objects = {}
|
|
for object_id, cityobject in cityobjects.items():
|
|
filtered_object = filter_cityobject(cityobject, allowed_types)
|
|
if filtered_object:
|
|
filtered_objects[object_id] = filtered_object
|
|
|
|
if not filtered_objects:
|
|
return None
|
|
|
|
filtered_cityjson = {k: v for k, v in cityjson.items() if k != "CityObjects"}
|
|
filtered_cityjson["CityObjects"] = filtered_objects
|
|
return filtered_cityjson
|
|
|
|
|
|
def process_file(path: Path, targets: list[str], output_dir: Path) -> int:
|
|
resolved = resolve_input_file(path)
|
|
if not resolved:
|
|
print(f"[skip] cannot resolve CityJSON file for {path}", file=sys.stderr)
|
|
return 0
|
|
|
|
cityjson = read_json(resolved)
|
|
written = 0
|
|
base = base_name(path)
|
|
|
|
for target in targets:
|
|
allowed_types = SEMANTIC_PRESETS[target]
|
|
filtered = filter_cityjson(cityjson, allowed_types)
|
|
if not filtered:
|
|
print(f"[skip] {base} has no geometry for target '{target}'", file=sys.stderr)
|
|
continue
|
|
|
|
output_path = output_dir / f"{base}.{target}.city.json"
|
|
write_json(output_path, filtered)
|
|
written += 1
|
|
|
|
return written
|
|
|
|
|
|
def main(argv: Iterable[str] | None = None) -> int:
|
|
args = parse_args(argv)
|
|
files = sorted(args.input_dir.glob(args.pattern))
|
|
if not files:
|
|
print(f"No input files matched pattern '{args.pattern}' in {args.input_dir}", file=sys.stderr)
|
|
return 1
|
|
|
|
total_written = 0
|
|
for path in files:
|
|
total_written += process_file(path, args.targets, args.output_dir)
|
|
|
|
print(f"Wrote {total_written} files to {args.output_dir}")
|
|
return 0 if total_written else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|