|
|
""" |
|
|
Utility helpers for loading BRep extractor-processed STEP data as PyG graphs. |
|
|
""" |
|
|
from __future__ import annotations |
|
|
|
|
|
from pathlib import Path |
|
|
from typing import Dict, Iterable, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from torch_geometric.data import HeteroData |
|
|
|
|
|
|
|
|
LABELS: Dict[str, int] = {"pipe": 0, "elbow": 1, "tjoint": 2, "random": 3} |
|
|
STEP_EXTS = ("*.step", "*.stp", "*.STEP", "*.STP") |
|
|
|
|
|
|
|
|
def build_label_map(step_root: Path) -> Dict[str, int]: |
|
|
""" |
|
|
Scan the STEP directory tree (containing /pipe, /elbow, /tjoint, ...) |
|
|
and build a mapping from file stem to integer label. |
|
|
""" |
|
|
mapping: Dict[str, int] = {} |
|
|
for cls, label in LABELS.items(): |
|
|
cls_dir = step_root / cls |
|
|
if not cls_dir.exists(): |
|
|
continue |
|
|
for ext in STEP_EXTS: |
|
|
for file in cls_dir.glob(ext): |
|
|
mapping[file.stem] = label |
|
|
if not mapping: |
|
|
raise RuntimeError(f"No STEP files found under {step_root} for any of {tuple(LABELS)}") |
|
|
return mapping |
|
|
|
|
|
|
|
|
def _flatten(arr: np.ndarray) -> np.ndarray: |
|
|
return np.asarray(arr, dtype=np.float32).reshape(arr.shape[0], -1) |
|
|
|
|
|
def _face_grid_stats(face_grids: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Summarize face point grids into compact stats per face. |
|
|
Returns [F, 10]: xyz_mean (3), xyz_std (3), nrm_mean (3), mask_frac (1). |
|
|
""" |
|
|
face_grids = np.asarray(face_grids, dtype=np.float32) |
|
|
f = face_grids.shape[0] |
|
|
xyz = face_grids[:, 0:3, :, :].reshape(f, 3, -1) |
|
|
nrm = face_grids[:, 3:6, :, :].reshape(f, 3, -1) |
|
|
msk = face_grids[:, 6, :, :].reshape(f, -1) |
|
|
|
|
|
mask = (msk > 0.5).astype(np.float32) |
|
|
mask_frac = mask.mean(axis=1, keepdims=True) |
|
|
w = mask / (mask.sum(axis=1, keepdims=True) + 1e-6) |
|
|
|
|
|
xyz_mean = (xyz * w[:, None, :]).sum(axis=2) |
|
|
xyz_var = (w[:, None, :] * (xyz - xyz_mean[:, :, None]) ** 2).sum(axis=2) |
|
|
xyz_std = np.sqrt(np.maximum(xyz_var, 1e-12)) |
|
|
nrm_mean = (nrm * w[:, None, :]).sum(axis=2) |
|
|
return np.concatenate([xyz_mean, xyz_std, nrm_mean, mask_frac], axis=1) |
|
|
|
|
|
def compute_global_geom_features(data) -> np.ndarray: |
|
|
""" |
|
|
Compute compact global geometry descriptors from face/coedge point samples. |
|
|
Returns [5] float32: pca_ev_ratio_1/2/3, line_fit_rmse, plane_fit_rmse. |
|
|
""" |
|
|
points = [] |
|
|
face_grids = np.asarray(data["face_point_grids"], dtype=np.float32) |
|
|
if face_grids.size: |
|
|
xyz = face_grids[:, 0:3, :, :].transpose(0, 2, 3, 1).reshape(-1, 3) |
|
|
mask = face_grids[:, 6, :, :].reshape(-1) > 0.5 |
|
|
if mask.any(): |
|
|
points.append(xyz[mask]) |
|
|
|
|
|
coedge_grids = np.asarray(data["coedge_point_grids"], dtype=np.float32) |
|
|
if coedge_grids.size: |
|
|
co_xyz = coedge_grids[:, 0:3, :].transpose(0, 2, 1).reshape(-1, 3) |
|
|
points.append(co_xyz) |
|
|
|
|
|
if not points: |
|
|
return np.zeros(5, dtype=np.float32) |
|
|
|
|
|
pts = np.concatenate(points, axis=0) |
|
|
if pts.shape[0] < 3: |
|
|
return np.zeros(5, dtype=np.float32) |
|
|
pts = pts[np.isfinite(pts).all(axis=1)] |
|
|
if pts.shape[0] < 3: |
|
|
return np.zeros(5, dtype=np.float32) |
|
|
|
|
|
mean = pts.mean(axis=0, keepdims=True) |
|
|
centered = pts - mean |
|
|
scale = np.sqrt(np.mean(np.sum(centered ** 2, axis=1))) |
|
|
centered = centered / (scale + 1e-6) |
|
|
cov = (centered.T @ centered) / max(1, centered.shape[0]) |
|
|
if not np.isfinite(cov).all(): |
|
|
return np.zeros(5, dtype=np.float32) |
|
|
|
|
|
ev = np.linalg.eigvalsh(cov) |
|
|
ev = np.sort(ev)[::-1] |
|
|
ev = np.maximum(ev, 0.0) |
|
|
total = ev.sum() |
|
|
if not np.isfinite(total) or total <= 0.0: |
|
|
return np.zeros(5, dtype=np.float32) |
|
|
|
|
|
ratios = ev / total |
|
|
line_rmse = np.sqrt(max(ev[1] + ev[2], 0.0)) |
|
|
plane_rmse = np.sqrt(max(ev[2], 0.0)) |
|
|
feats = np.array( |
|
|
[ratios[0], ratios[1], ratios[2], line_rmse, plane_rmse], |
|
|
dtype=np.float32, |
|
|
) |
|
|
if not np.isfinite(feats).all(): |
|
|
return np.zeros(5, dtype=np.float32) |
|
|
return feats |
|
|
|
|
|
def load_coedge_arrays(npz_path: Path) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Load node features and adjacency indices from a BRep extractor npz. |
|
|
Returns a dict with coedge/face/edge/global features and topology arrays. |
|
|
""" |
|
|
with np.load(npz_path) as data: |
|
|
coedge_feats = _flatten(data["coedge_features"]) |
|
|
scale = np.asarray(data["coedge_scale_factors"], dtype=np.float32)[:, None] |
|
|
reverse = np.asarray(data["coedge_reverse_flags"], dtype=np.float32)[:, None] |
|
|
point_grids = _flatten(data["coedge_point_grids"]) |
|
|
lcs = _flatten(data["coedge_lcs"]) |
|
|
|
|
|
face_idx = np.asarray(data["face"], dtype=np.int64) |
|
|
edge_idx = np.asarray(data["edge"], dtype=np.int64) |
|
|
face_feats = np.asarray(data["face_features"], dtype=np.float32) |
|
|
edge_feats = np.asarray(data["edge_features"], dtype=np.float32) |
|
|
|
|
|
face_grid_stats = _face_grid_stats(data["face_point_grids"]) |
|
|
|
|
|
coedge_x = np.concatenate( |
|
|
[coedge_feats, scale, reverse, point_grids, lcs], axis=1 |
|
|
) |
|
|
face_x = np.concatenate([face_feats, face_grid_stats], axis=1) |
|
|
edge_x = edge_feats |
|
|
next_index = np.asarray(data["next"], dtype=np.int64) |
|
|
mate_index = np.asarray(data["mate"], dtype=np.int64) |
|
|
global_features = compute_global_geom_features(data) |
|
|
|
|
|
return { |
|
|
"coedge_x": coedge_x, |
|
|
"face_x": face_x, |
|
|
"edge_x": edge_x, |
|
|
"next": next_index, |
|
|
"mate": mate_index, |
|
|
"coedge_face": face_idx, |
|
|
"coedge_edge": edge_idx, |
|
|
"global_x": global_features, |
|
|
} |
|
|
|
|
|
|
|
|
def make_edge_index(source: np.ndarray, target: np.ndarray) -> torch.Tensor: |
|
|
""" |
|
|
Build a 2 x E tensor of edge indices (with both directions, deduplicated). |
|
|
""" |
|
|
pairs = np.stack([source, target], axis=1) |
|
|
flipped = pairs[:, ::-1] |
|
|
all_pairs = np.concatenate([pairs, flipped], axis=0) |
|
|
all_pairs = np.unique(all_pairs, axis=0) |
|
|
return torch.tensor(all_pairs.T, dtype=torch.long) |
|
|
|
|
|
def make_directed_edge_index(source: np.ndarray, target: np.ndarray) -> torch.Tensor: |
|
|
""" |
|
|
Build a 2 x E tensor of directed edge indices (no deduplication). |
|
|
""" |
|
|
return torch.tensor(np.stack([source, target], axis=0), dtype=torch.long) |
|
|
|
|
|
def make_bipartite_edge_index(source: np.ndarray, target: np.ndarray) -> torch.Tensor: |
|
|
""" |
|
|
Build a 2 x E tensor of directed bipartite edge indices (deduplicated). |
|
|
""" |
|
|
pairs = np.stack([source, target], axis=1) |
|
|
pairs = np.unique(pairs, axis=0) |
|
|
return torch.tensor(pairs.T, dtype=torch.long) |
|
|
|
|
|
def make_heterodata( |
|
|
coedge_x: np.ndarray, |
|
|
face_x: np.ndarray, |
|
|
edge_x: np.ndarray, |
|
|
next_index: np.ndarray, |
|
|
mate_index: np.ndarray, |
|
|
coedge_face: np.ndarray, |
|
|
coedge_edge: np.ndarray, |
|
|
global_features: np.ndarray, |
|
|
label: int | None, |
|
|
norm_stats: Dict[str, Dict[str, np.ndarray | torch.Tensor]] | None = None, |
|
|
) -> HeteroData: |
|
|
""" |
|
|
Create a PyG HeteroData graph for the coedge features/relations. |
|
|
When mean/std are provided the features are normalised element-wise. |
|
|
""" |
|
|
def _normalize(x_arr: np.ndarray, stats: Dict[str, np.ndarray | torch.Tensor] | None) -> torch.Tensor: |
|
|
x_t = torch.tensor(x_arr, dtype=torch.float32) |
|
|
if stats is None: |
|
|
return x_t |
|
|
mean = stats.get("mean") |
|
|
std = stats.get("std") |
|
|
if mean is None or std is None: |
|
|
return x_t |
|
|
mean_t = torch.as_tensor(mean, dtype=torch.float32) |
|
|
std_t = torch.as_tensor(std, dtype=torch.float32) |
|
|
return (x_t - mean_t) / std_t |
|
|
|
|
|
coedge_stats = norm_stats.get("coedge") if norm_stats else None |
|
|
face_stats = norm_stats.get("face") if norm_stats else None |
|
|
edge_stats = norm_stats.get("edge") if norm_stats else None |
|
|
|
|
|
x_coedge = _normalize(coedge_x, coedge_stats) |
|
|
x_face = _normalize(face_x, face_stats) |
|
|
x_edge = _normalize(edge_x, edge_stats) |
|
|
|
|
|
idx = np.arange(coedge_x.shape[0], dtype=np.int64) |
|
|
edge_next = make_directed_edge_index(idx, next_index) |
|
|
edge_prev = make_directed_edge_index(next_index, idx) |
|
|
edge_mate = make_edge_index(idx, mate_index) |
|
|
edge_coedge_face = make_directed_edge_index(idx, coedge_face) |
|
|
edge_face_coedge = make_directed_edge_index(coedge_face, idx) |
|
|
edge_coedge_edge = make_directed_edge_index(idx, coedge_edge) |
|
|
edge_edge_coedge = make_directed_edge_index(coedge_edge, idx) |
|
|
edge_face_edge = make_bipartite_edge_index(coedge_face, coedge_edge) |
|
|
edge_edge_face = make_bipartite_edge_index(coedge_edge, coedge_face) |
|
|
|
|
|
data = HeteroData() |
|
|
data["coedge"].x = x_coedge |
|
|
data["face"].x = x_face |
|
|
data["edge"].x = x_edge |
|
|
data["global"].x = torch.tensor(global_features, dtype=torch.float32).view(1, -1) |
|
|
data["coedge", "next", "coedge"].edge_index = edge_next |
|
|
data["coedge", "prev", "coedge"].edge_index = edge_prev |
|
|
data["coedge", "mate", "coedge"].edge_index = edge_mate |
|
|
data["coedge", "to_face", "face"].edge_index = edge_coedge_face |
|
|
data["face", "to_coedge", "coedge"].edge_index = edge_face_coedge |
|
|
data["coedge", "to_edge", "edge"].edge_index = edge_coedge_edge |
|
|
data["edge", "to_coedge", "coedge"].edge_index = edge_edge_coedge |
|
|
data["face", "to_edge", "edge"].edge_index = edge_face_edge |
|
|
data["edge", "to_face", "face"].edge_index = edge_edge_face |
|
|
if label is not None: |
|
|
data.y = torch.tensor([int(label)], dtype=torch.long) |
|
|
return data |
|
|
|
|
|
|
|
|
def compute_feature_stats(npz_paths: Iterable[Path]) -> Dict[str, np.ndarray]: |
|
|
""" |
|
|
Compute mean and std (per feature dimension) across all node features in the dataset. |
|
|
""" |
|
|
totals = {"coedge": 0, "face": 0, "edge": 0} |
|
|
sum_vec: Dict[str, np.ndarray | None] = {"coedge": None, "face": None, "edge": None} |
|
|
sum_sq: Dict[str, np.ndarray | None] = {"coedge": None, "face": None, "edge": None} |
|
|
|
|
|
for path in npz_paths: |
|
|
graph = load_coedge_arrays(path) |
|
|
for key, x in (("coedge", graph["coedge_x"]), ("face", graph["face_x"]), ("edge", graph["edge_x"])): |
|
|
if sum_vec[key] is None: |
|
|
sum_vec[key] = np.zeros(x.shape[1], dtype=np.float64) |
|
|
sum_sq[key] = np.zeros(x.shape[1], dtype=np.float64) |
|
|
sum_vec[key] += x.sum(axis=0) |
|
|
sum_sq[key] += (x * x).sum(axis=0) |
|
|
totals[key] += x.shape[0] |
|
|
|
|
|
out = {} |
|
|
for key in ("coedge", "face", "edge"): |
|
|
if sum_vec[key] is None or totals[key] == 0: |
|
|
raise RuntimeError(f"Cannot compute feature stats: no {key} features observed.") |
|
|
mean = sum_vec[key] / totals[key] |
|
|
var = sum_sq[key] / totals[key] - mean * mean |
|
|
var = np.maximum(var, 1e-12) |
|
|
std = np.sqrt(var) |
|
|
out[key] = {"mean": mean.astype(np.float32), "std": std.astype(np.float32)} |
|
|
return out |
|
|
|