Source code for fabulous.fabric_cad.timing_model.hdlnx.sdfnx.sdf_to_graph

"""SDF to Timing Graph Conversion Module.

This module provides functionality to convert SDF files into timing graphs represented
as NetworkX directed graphs. It is the main class used to create timing graphs from SDF
files. It is derived from SDFTimingGraphBase which provides basic functionality.

New algorithms can be added here. Note that this is a low level module focused on graph
algorithms based on the SDF, and should not contain high-level algorithms based on
verilog netlists.
"""

from math import isclose

import networkx as nx

from fabulous.fabric_cad.timing_model.hdlnx.sdfnx.sdf_to_graph_base import (
    SDFTimingGraphBase,
)


[docs] class SDFTimingGraph(SDFTimingGraphBase): """Class to represent a timing graph generated from an SDF file. It extends SDFTimingGraphBase to allow for additional algorithms specific to timing analysis on the SDF timing graph. Inherits all attributes and methods from SDFTimingGraphBase. """ ### Public Methods ###
[docs] def has_path(self, source: str, target: str) -> bool: """Check if there is a path from source to target in the timing graph. Parameters ---------- source : str The source node. target : str The target node. Returns ------- bool True if a path exists, False otherwise. Examples -------- exists = sdf_graph.has_path("nodeA/pin", "nodeB/pin") """ return nx.has_path(self.graph, source=source, target=target)
[docs] def single_delay(self, source: str, target: str) -> float: """Find path with delay between source and target nodes in the timing graph. Note: The delay value depends on delay_type_str when creating the graph. For example, if delay_type_str="max_all", then the delay represents the maximum delay along the path. If delay_type_str="min_all", then the delay represents the minimum delay along the path. Fastest way to obtain only a single delay value along the path. Parameters ---------- source : str The source node. target : str The target node. Returns ------- float The total delay between the source and target nodes. Examples -------- length = sdf_graph.single_delay("nodeA/pin", "nodeB/pin") """ length: float = nx.dijkstra_path_length( self.graph, source=source, target=target, weight="weight" ) return length
[docs] def earliest_common_nodes( self, sources: list[str], mode: str = "max", sentinel: str | None = None, prefer_sentinel_for_single_source: bool = False, follow_steps_to_sentinel: int = 0, stop: float | None = None, ) -> tuple[list[str], float | None, dict[str, dict[str, float]]]: """Find the structurally earliest node reachable from ALL given sources. The function first finds all nodes reachable from every source. It then restricts to the structurally earliest common region(s), using SCCs of the common-reachable subgraph. Among those candidates it minimizes: cost(v) = max_i dist(s_i, v) if mode == "max" cost(v) = sum_i dist(s_i, v) if mode == "sum" If several candidates still tie, it prefers the one that can still reach the largest downstream common region. If there is still a tie, it prefers the one that can reach more total downstream nodes. Final fallback is lexicographic node order. For a single source, the earliest common node is normally the source itself. If `prefer_sentinel_for_single_source` is True and the source can reach the sentinel, we follow the shortest path to the sentinel and return the node we walk follow_steps_to_sentinel edges along that path. Parameters ---------- sources : list[str] Source nodes. mode : str "max" to minimize worst distance, "sum" to minimize total distance. sentinel : str | None Optional node that can be returned if only one source is given. prefer_sentinel_for_single_source : bool If True and exactly one source is given, return the sentinel instead of the source when the source can reach the sentinel. follow_steps_to_sentinel : int Number of steps to follow along the path to the sentinel before returning the node. stop : float | None Optional cutoff for path length. Returns ------- tuple[list[str], float | None, dict[str, dict[str, float]]] - best_nodes: a single-element list containing the chosen node, or [] if none exists - best_cost: minimal cost of the chosen node, or None if no common node exists - dists: source -> node -> distance Raises ------ ValueError If `mode` is invalid or if a source node is not in the graph. """ if mode not in {"max", "sum"}: raise ValueError("mode must be 'max' or 'sum'") sources = list(dict.fromkeys(sources)) if not sources: return [], None, {} missing = [s for s in sources if s not in self.graph] if missing: raise ValueError(f"Source node(s) not in graph: {missing}") # Compute distances from each source to all reachable nodes. dists: dict[str, dict[str, float]] = {} for s in sources: # Compute shortest-path distances from each source. dists[s] = nx.single_source_shortest_path_length(self.graph, s, cutoff=stop) # Fast path for single source: just return the source. # Or follow the path to the sentinel if requested and possible # and return that follwed node as the earliest node instead. if len(sources) == 1: source = sources[0] if ( prefer_sentinel_for_single_source and sentinel is not None and sentinel in self.graph and sentinel in dists[source] ): path = nx.shortest_path(self.graph, source=source, target=sentinel) step_idx = min(max(follow_steps_to_sentinel, 0), len(path) - 1) chosen = path[step_idx] return [chosen], dists[source][chosen], dists return [source], 0.0, dists # Keep only nodes reachable from every source. common = set(dists[sources[0]].keys()) for s in sources[1:]: common &= set(dists[s].keys()) if not common: return [], None, dists # Builds a new graph containing only the nodes that are reachable # from all sources. So from now on, the code ignores nodes that are # not common to all sources. common_subgraph = self.graph.subgraph(common).copy() # Finds groups of nodes where every node can reach every other node # in the same group. In a directed graph, that means they form a mutually # reachable region. Example: if A -> B, B -> C, and C -> A, then {A, B, C} # is one SCC sccs = list(nx.strongly_connected_components(common_subgraph)) node_to_scc: dict[str, int] = {} for idx, comp in enumerate(sccs): for node in comp: node_to_scc[node] = idx # Creates a counter for each SCC # This will count how many edges come into that SCC from a different SCC scc_indegree = {i: 0 for i in range(len(sccs))} for u, v in common_subgraph.edges(): su = node_to_scc[u] sv = node_to_scc[v] if su != sv: scc_indegree[sv] += 1 # Earliest common regions are SCCs with no incoming edge # from another common SCC. earliest_scc_ids = {i for i, indeg in scc_indegree.items() if indeg == 0} candidates = [node for node in common if node_to_scc[node] in earliest_scc_ids] def cost(v: str) -> float: """Compute the cost of a node based on the selected mode.""" if mode == "sum": return sum(dists[s][v] for s in sources) return max(dists[s][v] for s in sources) candidate_costs = {v: cost(v) for v in candidates} best_cost = min(candidate_costs.values()) # First tie-break step: keep only nodes with minimal cost. cost_tied = [ v for v, c in candidate_costs.items() if isclose(c, best_cost, rel_tol=1e-12, abs_tol=1e-12) ] if len(cost_tied) == 1: return [cost_tied[0]], best_cost, dists def common_reach_score(v: str) -> int: """Prefer nodes that still reach more of the common downstream region.""" return 1 + len(nx.descendants(common_subgraph, v)) common_scores = {v: common_reach_score(v) for v in cost_tied} max_common_score = max(common_scores.values()) common_tied = [v for v in cost_tied if common_scores[v] == max_common_score] if len(common_tied) == 1: return [common_tied[0]], best_cost, dists def total_reach_score(v: str) -> int: """Second tie-break: prefer nodes that reach more of the full graph.""" return 1 + len(nx.descendants(self.graph, v)) total_scores = {v: total_reach_score(v) for v in common_tied} max_total_score = max(total_scores.values()) total_tied = [v for v in common_tied if total_scores[v] == max_total_score] # Final deterministic fallback. chosen = sorted(total_tied)[0] return [chosen], best_cost, dists
[docs] def follow_first_fanout_from_pins( self, hier_pin_path: str, num_follow: int = 1 ) -> str: """Follow the first fanout path from a given hierarchical pin path. Can do multiple hops if num_follow > 1, following the first fanout at each step. Parameters ---------- hier_pin_path : str Hierarchical pin path to start from. num_follow : int Number of fanout hops to follow. Returns ------- str The hierarchical pin path reached after following the fanout. """ current_pin: str = hier_pin_path for _ in range(num_follow): successors = next(self.graph.successors(current_pin), None) if successors is None: break current_pin = successors return current_pin
[docs] def path_to_nearest_target_sentinel( self, source: str, targets: list[str], weight: str | None = None, sentinel_prefix: str = "_sentinel_", reverse: bool = False, ) -> tuple[list[str], str]: """Shortest path to nearest target using sentinel-node trick. Find the shortest path from `source` to the nearest node in `targets` in a (directed) NetworkX graph using the sentinel-node trick. https://networkx.org/documentation/stable/reference/algorithms/shortest_paths.html Parameters ---------- source : str Source node. targets : list[str] List of target nodes. weight : str | None, optional Edge attribute name to use as weight. If None, the graph is treated as unweighted (hop count). sentinel_prefix : str, optional Base name for the temporary sentinel node (ensured to be unique). reverse : bool If True, find the shortest path from the nearest target to the source instead (i.e., reverse the graph direction). Returns ------- path : list[str] List of nodes from `source` to the closest target (no sentinel), or None if no target is reachable. closest_target : str The closest target node, or None if no target is reachable. Raises ------ ValueError If `targets` is empty. """ G = self.reverse_graph if reverse else self.graph targets: set[str] = set(targets) if not targets: raise ValueError("targets must be a non-empty iterable of nodes") # Pick a sentinel name that doesn't collide with existing nodes sentinel: str = f"{sentinel_prefix}_i89f9j9g58f7g6e5d4c3b2a1" G.add_node(sentinel) # Add zero-cost edges from each target to the sentinel if weight is None: for t in targets: G.add_edge(t, sentinel) else: for t in targets: G.add_edge(t, sentinel, weight=0) try: # Shortest path (directed) source -> sentinel path: list[str] = nx.shortest_path( G, source=source, target=sentinel, weight=weight ) except nx.NetworkXNoPath: # Clean up and signal no reachable target G.remove_node(sentinel) return None, None finally: # If shortest_path raised, sentinel is still removed here. if sentinel in G: G.remove_node(sentinel) # Remove sentinel from the path # The real closest target is the node before the sentinel closest_target: str = path[-2] path_without_sentinel: list[str] = path[:-1] return path_without_sentinel, closest_target