Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

37.2. Feature Stores for Time Series

In standard MLOps, a Feature Store is a dictionary of entity_id -> feature_value. In Time Series MLOps, a Feature Store is a time-travel machine. It must answer: “What was the value of avg_clicks_last_7d for User X as of last Tuesday at 4:32 PM?”

This is the hardest engineering problem in forecasting pipelines. A single mistake here leaks future information into the past (“Look-ahead Bias”), creating highly accurate models that fail spectacularly in production.

The Core Problem: Point-in-Time Correctness

Imagine you are training a model to predict churn on Jan 15th.

  • Feature: “Number of Support Tickets”
  • Raw Data: User X filed a ticket on Jan 10th (closed Jan 12th) and another on Jan 16th.
  • The Trap: If you naively perform a groupby("user_id").count(), you get 2 tickets.
  • The Leak: The model sees the Jan 16th ticket. But on Jan 15th, that ticket didn’t exist yet.

You need an AS-OF Join (also known as a Point-in-Time Join).

Efficient “AS OF” Joins in Rust

Standard SQL joins (INNER/LEFT) are set-based. AS-OF joins are temporal. SQL Logic:

-- The Slow Way (O(N*M))
SELECT t.timestamp, f.feature_value
FROM training_labels t
LEFT JOIN features f ON t.entity_id = f.entity_id
WHERE f.event_timestamp <= t.timestamp
ORDER BY f.event_timestamp DESC
LIMIT 1

Running this subquery for every training row is $O(N \times M)$ and painfully slow. In Rust/Polars, we can do this in $O(N + M)$ using sorted merge strategies.

Rust Implementation: The Time-Travel Joiner

#![allow(unused)]
fn main() {
use polars::prelude::*;

pub fn point_in_time_join(
    events: &DataFrame,    // The "trigger" events (e.g., transactions)
    features: &DataFrame,  // The updates (e.g., user profile changes)
    on: &str,              // Entity ID column
    time_col: &str,        // Timestamp column
) -> PolarsResult<DataFrame> {
    
    // Polars has a native join_asof function which is extremely optimized
    // for sorted data.
    // It works like a "Zipline" merging two sorted iterators.
    
    let out = events.sort([time_col], false, false)?
        .join_asof(
            &features.sort([time_col], false, false)?,
            on,
            time_col,
            AsofStrategy::Backward, // Look backward in time
            Some(Tolerance::str("3d")), // Optional: Look back only 3 days. Prevents joining to ancient history.
            None
        )?;
        
    Ok(out)
}
}

Implementing AsOf Logic from Scratch

To understand the complexity, let’s implement the core loop without Polars. This is effectively the “Zipline” algorithm.

#![allow(unused)]
fn main() {
struct Event { time: u64, val: f64 }

fn zipline_join(triggers: &[Event], features: &[Event]) -> Vec<(u64, f64)> {
    let mut joined = Vec::new();
    let mut f_idx = 0;
    
    // Both arrays MUST be sorted by time
    for t in triggers {
        // Advance feature pointer while it is still "in the past" relative to trigger
        while f_idx < features.len() - 1 && features[f_idx + 1].time <= t.time {
            f_idx += 1;
        }
        
        // Now features[f_idx] is the latest feature <= t.time
        if features[f_idx].time <= t.time {
            joined.push((t.time, features[f_idx].val));
        } else {
            // No valid feature found (start of history)
            joined.push((t.time, f64::NAN)); 
        }
    }
    joined
}
}

Sliding Window Aggregations

Forecasting models live on “Lag Features” and “Rolling Windows”.

  • sales_lag_7d: Sales exactly 7 days ago.
  • sales_rolling_mean_30d: Average sales over the last 30 days.

The “Tumbling” vs “Hopping” vs “Sliding” Confusion

  • Tumbling: Fixed non-overlapping. [12:00-12:05], [12:05-12:10]. (Good for metrics).
  • Hopping: Overlapping fixed stride. Window 5m, Slide 1m. (Good for alerts).
  • Sliding: Calculated for every event. (Required for Transaction Fraud).

Implementation Strategy: Online vs Offline

1. Offline (Training) compute on batch Parquet files using polars window functions.

#![allow(unused)]
fn main() {
let q = col("sales")
    .rolling_mean(RollingOptions {
        window_size: Duration::parse("30d"),
        min_periods: 1,
        ..Default::default()
    });
}

2. Online (Inference) You cannot run a 30-day scan over a database for every API call < 10ms.

Architecture: The Speed Layer with Redis

The “Speed Layer” must maintain the running state of the window. For a SUM window, this is easy: SUM += new_val. For a MEAN window over time (last 30 days), we need to know what values to subtract (retraction).

The Bucket Method (Approximation): Store 30 daily buckets in Redis List.

  • On Day 31: Pop Left (Day 1), Push Right (Day 31).
  • Sum = Sum(List).

Rust + Redis LUA Script for Atomicity:

-- Add to head
redis.call('LPUSH', KEYS[1], ARGV[1])
-- Trim to size 30
redis.call('LTRIM', KEYS[1], 0, 29)
-- Calculate Sum (Lua loop)
local sum = 0
for _, val in ipairs(redis.call('LRANGE', KEYS[1], 0, 29)) do
    sum = sum + tonumber(val)
end
return sum

Exponential Moving Average (EMA): If exact windows aren’t required, EMA is O(1) storage. $$ S_t = \alpha \cdot x_t + (1-\alpha) \cdot S_{t-1} $$

  • Pros: Only requires storing 1 float ($S_{t-1}$). Infinite history. Non-blocking.
  • State: Store (last_ema, last_timestamp) in Redis.
  • Update:
    1. Calculate delta_t = now - last_timestamp.
    2. Adjust alpha based on delta_t (irregular time intervals).
    3. Update new_ema.

Materialization Engine (Batch-to-Online Sync)

How do features get from your Data Warehouse (Snowflake/BigQuery) to Redis? You need a Materialization Job.

The job must be:

  1. Idempotent: Running it twice shouldn’t double-count events.
  2. Low Latency: Features must appear in Redis within minutes of computation.

Rust Worker for Materialization

#![allow(unused)]
fn main() {
use redis::Commands;
use arrow::record_batch::RecordBatch;

pub fn materialize_batch(batch: RecordBatch, redis_client: &mut redis::Client) {
    let mut con = redis_client.get_connection().unwrap();
    let mut pipe = redis::pipe();
    
    // Pseudo-code iteration over Arrow batch
    for row in batch.rows() {
        let key = format!("user:{}:features", row.get("user_id"));
        let val = row.get("click_count");
        
        // HSET user:123:features click_count 55
        pipe.hset(key, "click_count", val);
        pipe.expire(key, 86400); // 1 day TTL
    }
    
    pipe.query(&mut con).unwrap();
}
}

Infrastructure as Code (Terraform)

Do not manually click “Create Redis”.

resource "aws_elasticache_cluster" "feature_store_speed" {
  cluster_id           = "fs-speed-layer"
  engine               = "redis"
  node_type            = "cache.t4g.medium"
  num_cache_nodes      = 1
  parameter_group_name = "default.redis6.x"
  engine_version       = "6.2"
  port                 = 6379
  
  subnet_group_name = aws_elasticache_subnet_group.default.name
  security_group_ids = [aws_security_group.redis.id]
}

resource "aws_security_group" "redis" {
  name = "feature-store-redis-sg"
  ingress {
    from_port = 6379
    to_port   = 6379
    protocol  = "tcp"
    cidr_blocks = ["10.0.0.0/8"]
  }
}

Schema Evolution

Feature definitions change. “Clicks” becomes “Weighted Clicks”.

Strategy 1: Versioning via Key Prefixes

  • v1:user:123:clicks
  • v2:user:123:clicks Pros: Safe. Parallel/Canary deployment possible. Cons: Double storage cost.

Strategy 2: Expansive structs (Protobuf)

Store features as a serialized Protobuf blob.

  • Add new field weighted_clicks (id=2).
  • Old readers just ignore it.
  • New readers use it.

The Comparison Matrix: Picking a Backend

FeatureRedisDynamoDBTimescaleDBBigQuery
RoleHot (Speed Layer)Warm (Lookup)Warm (History)Cold (Batch)
Latency< 1ms< 10ms< 50msMinutes
Throughput1M ops/secScalableMediumHigh
Cost$$$$ (RAM)$$$ (WCU)$$ (Disk)$ (Storage)
TTL SupportNativeNativePartition DropPartition Drop
Data ModelKey-ValueKey-ValueRelationalColumnar

Troubleshooting Guide

1. “Redis is OOMing”

  • Cause: You are storing infinite history in lists without LTRIM or EXPIRE.
  • Fix: Implement aggressive TTLs (Time To Live). If a user hasn’t logged in for 30 days, their session features should expire.

2. “Feature Store Latency Spikes”

  • Cause: Using KEYS * or large HGETALL commands.
  • Fix: Use SCAN for iteration. Use MGET (Multi-Get) to fetch 50 features in one RTT.

3. “Training Data doesn’t match Production”

  • Cause: UTC vs Local Time timezone mismatch in the aggregation window.
  • Fix: Force all timestamps to UTC ISO-8601 (2023-01-01T00:00:00Z) at the ingest gate.

Glossary

  • Entity: The object the feature belongs to (User, Product, Store).
  • Feature View: A logical group of features computed together (e.g., “User Clicks View”).
  • Materialization: The process of computing features and saving them to the Hot Store.
  • Point-in-Time: The state of the world at a specific timestamp $T$, ignoring all events $> T$.
  • Watermark: A timestamp indicating that no events older than $T$ will arrive in the stream.

Feature Freshness & The “Late Arrival” Problem

A feature is only useful if it’s fresh. Consider a “Real-time” feature: “Number of clicks in last 5 minutes”.

  • Event: User clicks at 12:00:01.
  • Ingest: Kafka lag is 5 seconds. Processed at 12:00:06.
  • Inference Request: Arrives at 12:00:03.

The catch: The inference service cannot know about the click at 12:00:01 yet.

Solution Strategies:

  1. Wait: Add a “Feature Wait” buffer (e.g., 50ms) before inference. (High latency).
  2. Model: Train the model to expect slightly stale features. (Lower accuracy, fast). (i.e. use sales_lag_T_minus_5_seconds as the ground truth during training).
  3. Watermarking: In streaming engines (Flink), wait until watermark passes 12:00:05 before emitting the “12:00-12:05” window.

Offline-Online Consistency Check

You must prove that your Python/Polars batch logic produces the exact same float as your Rust/Redis online logic.

Verification Script:

  1. Replay 1 day of production Kafka logs through the Rust Online implementation. Capture outputs.
  2. Run the Polars Batch implementation on the same day’s Parquet dump.
  3. Join on entity_id and timestamp.
  4. Assert abs(online_val - batch_val) < epsilon.

Common causes of drift:

  • Floating point definition: 32-bit (Redis) vs 64-bit (Polars).
  • Time boundaries: Identifying “Start of Day” (UTC vs Local Time).
  • Sort order: Processing events with identical timestamps in different orders.

The “Feature Definition” DSL

To ensure consistency, do not write logic twice (once in Python for training, once in Java/Rust for serving). Write it once in a generic DSL.

features:
  - name: user_clicks_7d
    entity: user
    aggr: count
    window: 7d
    source: click_stream
    implementation:
      batch: polars_expr
      stream: flink_window

Storage Hierarchy for Time Series Features

TierTechnologyLatencyCostUse Case
HotRedis / KeyDB< 1ms$$$$Real-time sliding windows. Last known value.
WarmTimescaleDB / ClickHouse< 50ms$$Historical lookups (e.g., “Last 5 logins”).
ColdS3 (Parquet)Seconds$Batch Training.

Rust Tip: Use the redis crate with pipelined() commands to fetch 50 features in a single round-trip. Use mget for bulk retrieval.

Summary Checklist

  1. AS-OF Joins: Use them exclusively for creating training sets. Never use standard Left Joins.
  2. Partitioning: Partition Feature Store by Date to enable efficient time-travel.
  3. State Compactness: Prefer EMA over exact sliding windows if strict precision isn’t required.
  4. Consistency Test: Automate the Offline-Online replay test in CI.
  5. Lag Awareness: Explicitly model data arrival delays in your features.
  6. Retraction: Ensure your streaming window logic correctly handles “Event Expiry”.
  7. Materialization: Ensure batch jobs are idempotent to prevent double counting.
  8. Schema: Use Protobuf for schema evolution if possible to avoid breaking changes.
  9. Monitoring: Track “Feature Freshness” (Age of last update) as a P0 metric.