Building Graph-Based Fault Trees in Python

Cascading alarm storms across multi-domain telecom infrastructures routinely inflate mean time to resolution (MTTR) by burying primary failures beneath hundreds of derivative symptoms. When a core fiber cut triggers optical layer degradation, IP routing reconvergence, and downstream BGP session flaps, traditional threshold-based alerting generates ticket duplication and forces NOC engineers into manual root-cause deduction. Transitioning from flat-rule correlation to graph-based fault trees enables deterministic failure isolation by modeling network dependencies as directed acyclic graphs (DAGs). This architectural shift suppresses symptom tickets, accelerates L2/L3 assignment, and establishes a repeatable automation pattern for fault routing pipelines.

Graph Schema and Dependency Mapping

A production-grade fault tree requires a strict node-edge schema that mirrors physical and logical topology. Nodes represent discrete network elements or interfaces, while directed edges encode failure propagation paths. Each node must carry immutable identifiers (node_id, layer, vendor), mutable state attributes (alarm_severity, last_seen_ts, health_score), and propagation metadata (expected_latency_ms, suppression_weight). Edges define causal relationships: optical port failure propagates to attached router interfaces, which in turn propagate to BGP peers and downstream service endpoints.

The graph must be constructed as a DAG to guarantee termination during traversal cycles. Telecom routing protocols occasionally introduce logical loops during reconvergence, so the fault tree implementation must enforce acyclicity through protocol-aware pruning. When ingesting topology data, map physical adjacency to upstream edges and logical dependency to downstream edges. Assign edge weights inversely proportional to propagation probability; for example, a DWDM amplifier failure carries a higher propagation weight to attached transponders than a transient interface CRC error carries to a routing protocol adjacency. Implementing Topology-Aware Correlation at this layer ensures that dependency graphs reflect actual service paths rather than static inventory snapshots.

Diagram: a fault-propagation DAG across network layers.

graph TD
  accTitle: Fault propagation DAG
  accDescr: Optical failures propagate to the interface, BFD and BGP sessions, then to the service endpoint.
  OPT["Optical port / DWDM"] --> IF["Router interface"]
  IF --> BFD["BFD session"]
  IF --> BGP["BGP peer session"]
  BFD --> BGP
  BGP --> SVC["Service endpoint"]

Core Implementation: DAG Initialization & Event Ingestion

The following implementation uses networkx for graph operations, dataclasses for schema enforcement, and asyncio for non-blocking event ingestion. It enforces strict DAG constraints and normalizes heterogeneous alarm payloads before mutation.

import asyncio
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
import networkx as nx

logger = logging.getLogger("fault_tree_engine")

@dataclass
class FaultNode:
    node_id: str
    layer: str  # e.g., "optical", "ip_transport", "bgp", "service"
    vendor: str
    alarm_severity: float = 0.0
    last_seen_ts: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    health_score: float = 1.0
    suppression_weight: float = 1.0

@dataclass
class FaultEdge:
    source_id: str
    target_id: str
    propagation_prob: float  # 0.0 - 1.0
    expected_latency_ms: int
    causal_type: str  # "physical", "logical", "protocol"

class GraphFaultTree:
    def __init__(self, correlation_window_ms: int = 300_000):
        self.graph = nx.DiGraph()
        self.graph.graph["correlation_window_ms"] = correlation_window_ms
        self.graph.graph["strict_dag"] = True
        self._node_registry: Dict[str, FaultNode] = {}
        self._edge_registry: Dict[tuple, FaultEdge] = {}

    def add_node(self, node: FaultNode) -> None:
        if node.node_id in self._node_registry:
            self._update_node_state(node)
            return
        self._node_registry[node.node_id] = node
        self.graph.add_node(
            node.node_id,
            layer=node.layer,
            vendor=node.vendor,
            health_score=node.health_score,
            severity=node.alarm_severity
        )

    def add_edge(self, edge: FaultEdge) -> None:
        key = (edge.source_id, edge.target_id)
        if key in self._edge_registry:
            return
        # Enforce DAG constraint before insertion
        if self.graph.has_edge(edge.target_id, edge.source_id):
            logger.warning(f"Cyclic dependency detected: {edge.source_id} <-> {edge.target_id}")
            return
        self.graph.add_edge(
            edge.source_id, edge.target_id,
            propagation_prob=edge.propagation_prob,
            expected_latency_ms=edge.expected_latency_ms,
            causal_type=edge.causal_type
        )
        self._edge_registry[key] = edge

    def _update_node_state(self, node: FaultNode) -> None:
        existing = self._node_registry[node.node_id]
        existing.alarm_severity = max(existing.alarm_severity, node.alarm_severity)
        existing.last_seen_ts = node.last_seen_ts
        self.graph.nodes[node.node_id]["severity"] = existing.alarm_severity
        self.graph.nodes[node.node_id]["health_score"] = node.health_score

Traversal Logic & Severity Scoring

Once alarms are ingested, the engine performs time-windowed backward traversal to identify the highest-probability root cause. Severity scoring algorithms combine edge propagation probability, temporal decay, and node criticality to rank candidates. Cross-source event linking merges syslog, SNMP traps, and telemetry streams into unified fault contexts before scoring.

def correlate_root_cause(self, triggered_nodes: List[str]) -> Dict[str, Any]:
    """
    Executes backward DAG traversal within the correlation window.
    Returns ranked root-cause candidates with confidence scores.
    """
    candidates: Dict[str, float] = {}
    window_ms = self.graph.graph["correlation_window_ms"]
    now = datetime.now(timezone.utc)

    for start_node in triggered_nodes:
        if start_node not in self.graph:
            continue
        # BFS backward to find upstream sources
        for ancestor in nx.ancestors(self.graph, start_node):
            path = nx.shortest_path(self.graph, ancestor, start_node)
            # Calculate propagation confidence
            confidence = 1.0
            for i in range(len(path) - 1):
                edge_data = self.graph[path[i]][path[i+1]]
                confidence *= edge_data["propagation_prob"]
                # Temporal decay: penalize if ancestor alarm is outside window
                anc_ts = self._node_registry.get(ancestor, FaultNode(ancestor, "", "")).last_seen_ts
                if (now - anc_ts).total_seconds() * 1000 > window_ms:
                    confidence *= 0.5
            candidates[ancestor] = max(candidates.get(ancestor, 0.0), confidence)

    # Sort by confidence descending
    ranked = sorted(candidates.items(), key=lambda x: x[1], reverse=True)
    return {"root_causes": ranked, "evaluated_at": now.isoformat()}

For detailed traversal optimization and state machine integration, refer to the official NetworkX DiGraph documentation.

Threshold Tuning & False Positive Flood Control

Static thresholds fail under reconvergence storms. Production fault trees implement dynamic threshold tuning methods that adjust suppression_weight based on historical fault frequency and current network load. False positive flood control is achieved through three mechanisms:

  1. Hysteresis Windows: Alarms must persist for min_hold_time_ms before triggering graph traversal, filtering transient interface flaps.
  2. Symptom Aggregation: Derivative alarms within the same propagation subtree are collapsed into a single parent ticket, suppressing downstream noise.
  3. Confidence Floor Routing: Only candidates exceeding a configurable min_confidence_threshold (e.g., 0.75) are forwarded to ITSM systems. Lower-confidence events route to a staging queue for Fault Correlation & Rule Engines enrichment rather than immediate ticket creation.

Production Deployment & Mitigation Paths

Deploying this architecture requires strict operational guardrails. The following mitigation paths address common production failure modes:

Failure ModeMitigation Path
Topology DriftRun scheduled graph reconciliation against source-of-truth inventory (NetBox, CMDB). Prune orphaned nodes and validate edge directionality before sync.
Traversal TimeoutsLimit BFS/DFS depth to max_hops=6 for telecom domains. Implement asyncio.wait_for() with a 2s timeout per correlation batch.
State Memory BloatImplement TTL-based eviction for nodes with health_score > 0.9 and no active alarms for >24h. Use networkx subgraph extraction for hot-path caching.
Ticket Routing LoopsAttach a monotonic correlation_id to each fault tree instance. Reject duplicate ITSM payloads matching active correlation_id within the suppression window.

Concurrency should be managed via a worker pool that processes alarm batches independently. Each worker maintains a thread-local GraphFaultTree instance to avoid lock contention, with periodic state merging into a central read-only topology cache.

Advanced Patterns: Predictive & AI-Driven RCA

Once deterministic fault trees stabilize MTTR, the graph structure becomes a foundation for predictive fault modeling. By exporting historical traversal outcomes, propagation weights, and node health trajectories, SRE teams can train gradient-boosted models to forecast failure likelihood before alarms trigger. AI-driven root cause analysis extends this by applying graph neural networks (GNNs) to identify non-obvious cross-layer dependencies that static rules miss. The transition from reactive DAG traversal to predictive graph analytics requires consistent telemetry normalization, but the underlying node-edge schema remains identical, ensuring backward compatibility and incremental rollout safety.