Source code for gradgraph.graph.features

#!/usr/bin/env python3
# 
# features.py
# 
# Created by Nicolas Fricker on 08/20/2025.
# 

import numpy as np
import networkx as nx

from typing import Iterator

from gradgraph.graph.paths import find_apical_paths
from gradgraph.graph.hash import hash
from gradgraph.graph.utils import (
    add_missing_times,
    split_into_span,
    euclidean_distance
)

[docs] def apical_features( G: nx.Graph, pos: dict[int, tuple[float, float]], time_attr: str, ) -> Iterator[tuple[str, dict[str, np.ndarray]]]: """ Iterate over temporal and geometric features from apical paths in a graph. This function enumerates all apical paths in ``G`` using :func:`find_apical_paths`, then aggregates features along each path. For each unique path, it computes: - the sequence of nodes and edges, - the sorted unique arrival times (from head nodes), - the cumulative Euclidean distances between successive nodes. Parameters ---------- G : networkx.Graph Undirected graph. Nodes must contain a time attribute ``time_attr``. pos : dict[int, tuple[float, float]] Mapping from node ID to spatial coordinates (e.g., 2D or 3D). Each node in an apical path must exist in this dictionary. time_attr : str Name of the node attribute representing time (e.g., time of appearance or activation). For each edge (u, v), the time of ``v`` is taken as the edge's arrival time. Yields ------ key : str Unique hash string for the apical path (direction-invariant). features : dict of np.ndarray Dictionary containing: - ``"nodes"`` : ndarray of shape (n,) Ordered node IDs along the path. - ``"edges"`` : ndarray of shape (n-1, 2) Consecutive node pairs defining the path edges. - ``"times"`` : ndarray of shape (m,) Sorted unique arrival times corresponding to head nodes of edges. - ``"dists"`` : ndarray of shape (m,) Cumulative Euclidean distances aggregated per time step. Notes ----- * Paths are deduplicated: each path and its reverse direction are stored once. * Edge distances are computed from coordinates in ``pos`` using the Euclidean metric. Missing positions will raise a ``KeyError``. * If multiple edges share the same time, their distances are summed before accumulation. Examples -------- >>> import networkx as nx, numpy as np >>> G = nx.Graph() >>> G.add_nodes_from([ ... (1, {"t": 4., "pos": (0., 0.)}), ... (2, {"t": 3., "pos": (1., 1.)}), ... (3, {"t": 2., "pos": (2., 2.)}), ... (4, {"t": 1., "pos": (1., 3.)}), ... (5, {"t": 1., "pos": (3., 3.)}), ... ]) >>> G.add_edges_from([(1, 2), (2, 3), (3, 4), (3, 5)]) >>> pos = nx.get_node_attributes(G, "pos") >>> feats = dict(apical_features(G, pos=pos, time_attr="t")) >>> v = next(iter(feats.values())) >>> v["nodes"] array([3, 2, 1]) >>> v["edges"] array([[3, 2], [2, 1]]) >>> v["times"] array([3., 4.]) >>> v["dists"] array([1.41421356, 2.82842712]) """ for path in find_apical_paths(G, weight=time_attr): nodes = np.asarray(path) if nodes.size < 2: continue key = hash(nodes) edges = np.column_stack((nodes[:-1], nodes[1:])) node_times = np.array([G.nodes[int(n)].get(time_attr) for n in nodes]) edge_times = node_times[1:] edge_lengths = np.array([ euclidean_distance(pos[u], pos[v]) for u, v in edges ]) uniq_t, inv = np.unique(edge_times, return_inverse=True) per_time_len = np.bincount(inv, weights=edge_lengths, minlength=len(uniq_t)).astype(float) times = uniq_t dists = np.cumsum(per_time_len) yield key, { "nodes": nodes, "edges": edges, "times": times, "dists": dists, }
[docs] def temporal_apical_features( G: nx.Graph, pos: dict[int, tuple[float, float]], time_attr: str, ) -> Iterator[tuple[str, dict[str, np.ndarray]]]: """ Iterate over apical features of a temporal graph at successive time steps. At each unique time value present in the node attribute ``time_attr``, this function constructs a subgraph of all nodes that have appeared at or before that time. It then extracts apical features from this subgraph using :func:`apical_features` and yields them as key–value pairs. Parameters ---------- G : networkx.Graph Undirected input graph where nodes have a temporal attribute specified by ``time_attr``. pos : dict[int, tuple[float, float]] Mapping from node IDs to spatial coordinates (e.g., 2D or 3D positions). Used for computing Euclidean edge lengths. time_attr : str Name of the node attribute representing appearance time. Yields ------ (str, dict of np.ndarray) For each apical path discovered at a snapshot time step, yields a tuple consisting of: - key : str Unique hash identifying the path (direction-invariant). - value : dict of np.ndarray Dictionary containing: - ``"nodes"`` : ndarray of shape (n,) Ordered node IDs along the path. - ``"edges"`` : ndarray of shape (n-1, 2) Consecutive node pairs forming the path. - ``"times"`` : ndarray of shape (m,) Sorted unique arrival times associated with head nodes. - ``"dists"`` : ndarray of shape (m,) Cumulative Euclidean path lengths aggregated by time. Notes ----- - Subgraphs are built incrementally at each unique time value, including all nodes with appearance time less than or equal to the current time. - The same apical path may appear in multiple snapshots if it persists over time. Deduplication is not performed here. - For large graphs, recomputing paths at every snapshot can be computationally expensive. Examples -------- >>> import networkx as nx >>> G = nx.Graph() >>> G.add_nodes_from([ ... (8, {"t": 7., "pos": (0., 3.)}), ... (7, {"t": 6., "pos": (0., 2.)}), ... (6, {"t": 5., "pos": (0., 1.)}), ... (1, {"t": 4., "pos": (0., 0.)}), ... (2, {"t": 3., "pos": (1., 1.)}), ... (3, {"t": 2., "pos": (2., 2.)}), ... (4, {"t": 1., "pos": (1., 3.)}), ... (5, {"t": 1., "pos": (3., 3.)}), ... ]) >>> G.add_edges_from([(1, 2), (2, 3), (3, 4), (3, 5), (6, 1), (7, 6), (8, 7)]) >>> pos = nx.get_node_attributes(G, "pos") >>> feats_iter = temporal_apical_features(G, pos, time_attr="t") >>> for k, v in feats_iter: ... print(k, v["times"], v["dists"]) """ time_attributes = nx.get_node_attributes(G, time_attr) times = list(reversed(sorted(set(time_attributes.values())))) for t in times: g = G.subgraph((n for n in G.nodes if time_attributes[n] <= t)) yield from apical_features(g, pos, time_attr)
[docs] def windowed_temporal_apical_features( G: nx.Graph, pos: dict[int, tuple[float, float]], span: int, dt: float, weight: str = "t", ) -> Iterator[tuple[int, np.ndarray, np.ndarray]]: """ Generate fixed-size temporal windows of apical features from a temporal graph. This function extracts apical paths from ``G`` at each time step using :func:`temporal_apical_features`, regularizes their time series of cumulative distances with :func:`add_missing_times`, and splits them into overlapping windows of length ``span`` using :func:`split_into_span`. Parameters ---------- G : networkx.Graph Undirected graph with temporal node attributes. Each node must provide a ``time_attr`` (handled internally by :func:`temporal_apical_features`). pos : dict[int, tuple[float, float]] Mapping from node IDs to spatial coordinates (2D or 3D). Used for computing Euclidean distances along apical paths. span : int Length of each output window (number of consecutive time steps). Paths with fewer than ``span`` time steps after filling are skipped. dt : float Temporal resolution. Missing times are inserted at multiples of ``dt`` and their distance values forward-filled. weight : str Name of the node attribute representing time (e.g., time of appearance, activation, or observation). This attribute is used to determine the temporal ordering of nodes and edges. Yields ------ tuple of (str, (np.ndarray, np.ndarray)) - key : int Unique identifier for the path-window combination, consisting of the path hash plus a zero-padded window index. - times : np.ndarray of shape (span,) Time values for the current window. - dists : np.ndarray of shape (span,) Cumulative distances for the current window. Notes ----- * Windows are overlapping and slide by one time step. * Paths are deduplicated under reversal: a path and its reverse share the same key. * The function yields potentially many windows per path, depending on the length of its filled time series. Examples -------- >>> import networkx as nx, numpy as np >>> G = nx.Graph() >>> G.add_nodes_from([ ... (1, {"t": 5, "pos": (0., 0.)}), ... (2, {"t": 4, "pos": (1., 1.)}), ... (3, {"t": 3, "pos": (2., 2.)}), ... (4, {"t": 2, "pos": (3., 3.)}), ... (5, {"t": 1, "pos": (4., 4.)}), ... (6, {"t": 1, "pos": (4., 5.)}), ... (7, {"t": 1, "pos": (3., 4.)}), ... ]) >>> G.add_edges_from([(1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (5, 7)]) >>> pos = nx.get_node_attributes(G, "pos") >>> windows = list(windowed_apical_features(G, pos=pos, span=2, dt=1)) >>> for key, times, dists in windows: ... print(key, times, dists) ea0d7246...00000 ([2 3], [1.41421356 2.82842712]) ea0d7246...00001 ([3 4], [2.82842712 4.24264069]) ea0d7246...00002 ([4 5], [4.24264069 5.65685425]) b0e6f25f...00000 ([2 3], [1.41421356 2.82842712]) b0e6f25f...00001 ([3 4], [2.82842712 4.24264069]) 59cd57a1...00000 ([2 3], [1.41421356 2.82842712]) """ features = temporal_apical_features(G, pos=pos, time_attr=weight) for key, values in features: times = values['times'] dists = values['dists'] times, dists = add_missing_times(times, dists, dt=dt) if len(dists) < span: continue split = split_into_span(times, dists, span=span) for i, (split_times, split_dists) in enumerate(split): yield f"{key}{i:05d}", (split_times, split_dists)