Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

40.3. Temporal GNNs & Dynamic Graphs

Status: Draft Version: 1.0.0 Tags: #GNN, #Temporal, #TGN, #Rust, #Streaming Author: MLOps Team


Table of Contents

  1. The Myth of the Static Graph
  2. Dynamic Graph Types: Discrete vs Continuous
  3. TGN (Temporal Graph Networks) Architecture
  4. Rust Implementation: Temporal Memory Module
  5. Streaming Architecture: Feature Stores for TGNs
  6. Training Strategies: Snapshot vs Event-Based
  7. Handling Late-Arriving Events
  8. Infrastructure: Kafka to Graph Store
  9. Troubleshooting: TGN Training Issues
  10. Future Trends: Causal GNNs
  11. MLOps Interview Questions
  12. Glossary
  13. Summary Checklist

Prerequisites

Before diving into this chapter, ensure you have the following installed:

  • Rust: 1.70+
  • Kafka: For streaming edge events.
  • PyTorch: torch-geometric-temporal.

The Myth of the Static Graph

Most GNN tutorials assume the graph $G$ is fixed. In reality:

  • Users follow new people (Edge Addition).
  • Users unfollow (Edge Deletion).
  • Users change their profile (Node Feature Update).
  • Transactions happen at timestamp $t$ (Temporal Edge).

If you train a static GCN on yesterday’s graph, it will fail to detect fraud happening now. You need Dynamic Graphs.


Dynamic Graph Types: Discrete vs Continuous

1. Discrete Time Dynamic Graphs (DTDG)

Snapshots taken at fixed intervals ($t_0, t_1, t_2$).

  • $G_0$: Graph tuple at Monday 00:00.
  • $G_1$: Graph tuple at Tuesday 00:00.
  • Model: 3D-GCN or RNN over GCN embeddings.
  • Pros: Easy to implement (just pile up matrices).
  • Cons: Loss of fine-grained timing. Was the transaction at 00:01 or 23:59?

2. Continuous Time Dynamic Graphs (CTDG)

A stream of events: $(u, v, t, feat)$.

  • Event 1: Alice buys Bread (10:00).
  • Event 2: Bob sends Money (10:05).
  • Model: TGN (Temporal Graph Networks).
  • Pros: Exact timing. Immediate updates.
  • Cons: Complex state management.

TGN (Temporal Graph Networks) Architecture

TGN is the state-of-the-art framework for CTDG. It introduces a Memory Module $S_u(t)$ for each node $u$.

Components:

  1. Memory: A vector $s_u$ storing the node’s history.
  2. Message Function: $m_u(t) = MultiLayerPerceptron(s_u, s_v, \Delta t, e_{uv})$.
  3. Memory Updater: $s_u(t) = GRU(m_u(t), s_u(t-1))$.
  4. Embedding Module: $z_u(t) = GNN(s_u(t), \text{neighbors})$.

Time Encoding: Neural networks can’t understand raw timestamps. We use Harmonic Encoding (like Transformers): $$ \Phi(t) = [\cos(\omega_1 t), \sin(\omega_1 t), \dots, \cos(\omega_d t), \sin(\omega_d t)] $$


Rust Implementation: Temporal Memory Module

In Python, looping over 1 million events to update GRU states is slow ($O(N)$ Python overhead). We implement the Memory Updater in Rust using concurrent hash maps.

Project Structure

tgn-memory/
├── Cargo.toml
└── src/
    └── lib.rs

Cargo.toml:

[package]
name = "tgn-memory"
version = "0.1.0"
edition = "2021"

[dependencies]
dashmap = "5.5"  # Concurrent HashMap
ndarray = "0.15" # Math
rayon = "1.7"
serde = { version = "1.0", features = ["derive"] }

src/lib.rs:

#![allow(unused)]
fn main() {
//! Temporal Graph Memory Module.
//! Manages state vectors for millions of nodes with thread-safety.
//! Designed to handle high-throughput event streams from Kafka.

use dashmap::DashMap;
use ndarray::{Array1, Array2};
use std::sync::Arc;

const MEMORY_DIM: usize = 128;

#[derive(Clone, Debug)]
pub struct NodeMemory {
    /// The hidden state of the node (e.g. from GRU)
    /// Represents the "compressed history" of the node.
    pub state: Array1<f32>,
    /// Last time this node was updated. 
    /// Used to calculate dt for time encoding.
    pub last_update: f64,
}

impl NodeMemory {
    pub fn new() -> Self {
        Self {
            state: Array1::zeros(MEMORY_DIM),
            last_update: 0.0,
        }
    }
}

pub struct TemporalMemory {
    /// Thread-safe Map: NodeID -> MemoryState
    /// DashMap uses sharding to reduce lock contention.
    store: Arc<DashMap<usize, NodeMemory>>,
}

impl TemporalMemory {
    pub fn new() -> Self {
        Self {
            store: Arc::new(DashMap::new()),
        }
    }

    /// Process a batch of events (Source, Dest, Timestamp, EdgeFeat).
    /// Updates the memory of source and destination nodes interactively.
    pub fn update_batch(&self, events: Vec<(usize, usize, f64)>) {
        // Parallel update is tricky because source and dest might conflict (Data Race).
        // DashMap handles locking per-shard internally, preventing panic.
        // In a real TGN, we must process strictly in time order, so strict parallelism is limited 
        // within a batch unless we guarantee no node collisions.
        
        events.iter().for_each(|&(src, dst, t)| {
            // Update Source Node Memory
            self.store.entry(src)
                .and_modify(|mem| {
                    let dt = t - mem.last_update;
                    // Mock GRU: Decay + Input
                    // s(t) = s(t-1) * 0.9 + 0.1
                    mem.state.mapv_inplace(|x| x * 0.9 + 0.1); 
                    mem.last_update = t;
                })
                .or_insert_with(|| {
                    let mut mem = NodeMemory::new();
                    mem.last_update = t;
                    mem
                });

            // Update Destination Node Memory
            self.store.entry(dst)
                .and_modify(|mem| {
                    let dt = t - mem.last_update;
                    mem.state.mapv_inplace(|x| x * 0.9 + 0.1);
                    mem.last_update = t;
                })
                .or_insert_with(|| {
                    let mut mem = NodeMemory::new();
                    mem.last_update = t;
                    mem
                });
        });
    }

    pub fn get_state(&self, node: usize) -> Option<Array1<f32>> {
        self.store.get(&node).map(|m| m.state.clone())
    }
}
}

Streaming Architecture: Feature Stores for TGNs

TGN requires reading the “Memory” ($S_u$) and the “Raw Features” ($X_u$). Since Memory changes with every interaction, it must be in RAM (Redis or In-Process).

[ Kafka: Edge Stream ]
       |
       v
[ Rust TGN Ingestor ]
       |
       +---> [ Update Memory $S_u$ (In-RAM DashMap) ]
       |
       +---> [ Append to Graph Store (CSR) ]
       |
       +---> [ Publish "Enriched Event" ] ---> [ Inference Service ]

Consistency: The Inference Service must use the exact same Memory state $S_u$ that the model expects. This means the Ingestor is also the State Server.


Training Strategies: Snapshot vs Event-Based

1. Backprop Through Time (BPTT)

Like training an RNN. Split the event stream into batches of 200 events. Run TGN. Update Weights.

  • Problem: Gradients vanish over long time horizons.

2. Snapshot Training (Discrete Approximation)

Accumulate events for 1 hour. Build a static graph. Train GraphSAGE.

  • Problem: Latency. User A acted 55 mins ago, but model only sees it now.

Recommendation: Use TGN for critical “Attack Detection” (Milliseconds matter). Use Snapshot GraphSAGE for “Friend Recommendation” (Daily updates needed).


Handling Late-Arriving Events

Events in distributed systems arrive out of order. Event A (10:00) arrives after Event B (10:05). If TGN updates memory with B, then A arrives… the state is corrupted (Causality violation).

Solutions:

  1. Buffer & Sort: Wait 10 seconds, sort by timestamp, then process.
  2. Optimistic Processing: Process anyway. Accept noise.
  3. Watermarks: Flink-style watermarking. Drop events older than $T_{late}$.

Infrastructure: Kafka to Graph Store

A robust setup uses Change Data Capture (CDC) from the core DB to drive the Graph.

# pipeline.yaml
sources:
  - name: transactions_db
    type: postgres-cdc
    
transforms:
  - name: to_edge_format
    query: "SELECT user_id as src, merchant_id as dst, amount as weight, ts FROM stream"

sinks:
  - name: graph_topic
    type: kafka
    topic: edges_v1

The GNN Service consumes edges_v1.


Troubleshooting: TGN Training Issues

Scenario 1: Memory Staleness

  • Symptom: Validation accuracy drops over time.
  • Cause: The “last update time” for many nodes is very old (e.g. inactive users). The TGN acts weird mainly on large $\Delta t$.
  • Fix: Implement a Time Decay in the Memory Updater. Force the state to zero if $\Delta t > 30 \text{days}$.

Scenario 2: Exploding Gradients

  • Symptom: Loss becomes NaN.
  • Cause: The GRU is unrolled for too many steps (Backprop through 1000 interactions).
  • Fix: Truncated BPTT. Detach gradients after 20 steps.

Scenario 3: Leakage

  • Symptom: Test AUC is 0.99 (suspiciously high).
  • Cause: You are using edges from the future (Target) to update the Memory (Input).
  • Fix: Strict ordering:
    1. Predict Interaction $(u, v, t)$.
    2. Calculate Loss.
    3. Update Memory with $(u, v, t)$. Never swap 2 and 3.

Current GNNs look at correlations. “People who bought X also bought Y”. Causal GNNs ask “If I recommend X, will they buy Y?”. This requires Intervention Modeling (Do-calculus on Graphs). This is the next frontier for “Actionable RecSys”.


MLOps Interview Questions

  1. Q: Why not just put “Time” as a feature in a static GNN? A: A static GNN aggregates all neighbors equally. It cannot distinguish “Neighbor from 2010” vs “Neighbor from 2024”. TGN’s memory module explicitly decays old information.

  2. Q: What is the bottleneck in TGN inference? A: Sequential dependency. To compute $S_u(t)$, you strictly need $S_u(t-1)$. You cannot parallelize processing of a single node’s history. But you can parallelize across different nodes.

  3. Q: How do you evaluate a Dynamic Graph model? A: You cannot use random K-Fold Split. You must use Temporal Split.

    • Train: Jan - Nov.
    • Test: Dec.
    • Eval: Metric (AP/AUC) on future edges.
  4. Q: Explain “Inductive” capability in TGNs. A: New nodes start with empty Memory $S_{new} = \vec{0}$. The model can still process them immediately using their raw features and the interactions they just comprised. No re-training needed.

  5. Q: What is “Temporal Neighbor Sampling”? A: When aggregating neighbors for a node at time $t$, we only look at interactions in $[t - \delta, t]$. We ignore the future (no leakage) and the very distant past (irrelevant).


Glossary

  • TGN (Temporal Graph Network): Architecture combining GNNs and RNNs (Memory).
  • CTDG (Continuous Time Dynamic Graph): Graph defined by a stream of timestamped events.
  • Harmonic Encoding: Using sine/cosine functions to represent continuous time values.
  • Snapshot: A static view of the graph at a specific point in time.
  • BPTT (Backpropagation Through Time): Gradient descent method for recurrent networks.

Summary Checklist

  1. Timestamping: Ensure every edge in your DB has a created_at timestamp.
  2. Sorting: Always sort interaction batches by time before feeding to TGN.
  3. State Persistence: Periodically checkpoint the Rust DashMap (Memory) to disk/S3 so you can recover from crashes.
  4. Latency: Measure the “Event-to-Memory-Update” latency. Should be < 100ms.
  5. Validation: Check for “Future Leakage”. Ensure Test Set start time > Train Set end time.
  6. Baselines: Always compare TGN against a simple “Recent Activity” heuristic or static GNN. TGN adds massive complexity; ensure it beats the baseline.