37.2. Feature Stores for Time Series
In standard MLOps, a Feature Store is a dictionary of entity_id -> feature_value.
In Time Series MLOps, a Feature Store is a time-travel machine. It must answer: “What was the value of avg_clicks_last_7d for User X as of last Tuesday at 4:32 PM?”
This is the hardest engineering problem in forecasting pipelines. A single mistake here leaks future information into the past (“Look-ahead Bias”), creating highly accurate models that fail spectacularly in production.
The Core Problem: Point-in-Time Correctness
Imagine you are training a model to predict churn on Jan 15th.
- Feature: “Number of Support Tickets”
- Raw Data: User X filed a ticket on Jan 10th (closed Jan 12th) and another on Jan 16th.
- The Trap: If you naively perform a
groupby("user_id").count(), you get 2 tickets. - The Leak: The model sees the Jan 16th ticket. But on Jan 15th, that ticket didn’t exist yet.
You need an AS-OF Join (also known as a Point-in-Time Join).
Efficient “AS OF” Joins in Rust
Standard SQL joins (INNER/LEFT) are set-based. AS-OF joins are temporal. SQL Logic:
-- The Slow Way (O(N*M))
SELECT t.timestamp, f.feature_value
FROM training_labels t
LEFT JOIN features f ON t.entity_id = f.entity_id
WHERE f.event_timestamp <= t.timestamp
ORDER BY f.event_timestamp DESC
LIMIT 1
Running this subquery for every training row is $O(N \times M)$ and painfully slow. In Rust/Polars, we can do this in $O(N + M)$ using sorted merge strategies.
Rust Implementation: The Time-Travel Joiner
#![allow(unused)]
fn main() {
use polars::prelude::*;
pub fn point_in_time_join(
events: &DataFrame, // The "trigger" events (e.g., transactions)
features: &DataFrame, // The updates (e.g., user profile changes)
on: &str, // Entity ID column
time_col: &str, // Timestamp column
) -> PolarsResult<DataFrame> {
// Polars has a native join_asof function which is extremely optimized
// for sorted data.
// It works like a "Zipline" merging two sorted iterators.
let out = events.sort([time_col], false, false)?
.join_asof(
&features.sort([time_col], false, false)?,
on,
time_col,
AsofStrategy::Backward, // Look backward in time
Some(Tolerance::str("3d")), // Optional: Look back only 3 days. Prevents joining to ancient history.
None
)?;
Ok(out)
}
}
Implementing AsOf Logic from Scratch
To understand the complexity, let’s implement the core loop without Polars. This is effectively the “Zipline” algorithm.
#![allow(unused)]
fn main() {
struct Event { time: u64, val: f64 }
fn zipline_join(triggers: &[Event], features: &[Event]) -> Vec<(u64, f64)> {
let mut joined = Vec::new();
let mut f_idx = 0;
// Both arrays MUST be sorted by time
for t in triggers {
// Advance feature pointer while it is still "in the past" relative to trigger
while f_idx < features.len() - 1 && features[f_idx + 1].time <= t.time {
f_idx += 1;
}
// Now features[f_idx] is the latest feature <= t.time
if features[f_idx].time <= t.time {
joined.push((t.time, features[f_idx].val));
} else {
// No valid feature found (start of history)
joined.push((t.time, f64::NAN));
}
}
joined
}
}
Sliding Window Aggregations
Forecasting models live on “Lag Features” and “Rolling Windows”.
sales_lag_7d: Sales exactly 7 days ago.sales_rolling_mean_30d: Average sales over the last 30 days.
The “Tumbling” vs “Hopping” vs “Sliding” Confusion
- Tumbling: Fixed non-overlapping. [12:00-12:05], [12:05-12:10]. (Good for metrics).
- Hopping: Overlapping fixed stride. Window 5m, Slide 1m. (Good for alerts).
- Sliding: Calculated for every event. (Required for Transaction Fraud).
Implementation Strategy: Online vs Offline
1. Offline (Training)
compute on batch Parquet files using polars window functions.
#![allow(unused)]
fn main() {
let q = col("sales")
.rolling_mean(RollingOptions {
window_size: Duration::parse("30d"),
min_periods: 1,
..Default::default()
});
}
2. Online (Inference) You cannot run a 30-day scan over a database for every API call < 10ms.
Architecture: The Speed Layer with Redis
The “Speed Layer” must maintain the running state of the window.
For a SUM window, this is easy: SUM += new_val.
For a MEAN window over time (last 30 days), we need to know what values to subtract (retraction).
The Bucket Method (Approximation): Store 30 daily buckets in Redis List.
- On Day 31: Pop Left (Day 1), Push Right (Day 31).
- Sum = Sum(List).
Rust + Redis LUA Script for Atomicity:
-- Add to head
redis.call('LPUSH', KEYS[1], ARGV[1])
-- Trim to size 30
redis.call('LTRIM', KEYS[1], 0, 29)
-- Calculate Sum (Lua loop)
local sum = 0
for _, val in ipairs(redis.call('LRANGE', KEYS[1], 0, 29)) do
sum = sum + tonumber(val)
end
return sum
Exponential Moving Average (EMA): If exact windows aren’t required, EMA is O(1) storage. $$ S_t = \alpha \cdot x_t + (1-\alpha) \cdot S_{t-1} $$
- Pros: Only requires storing 1 float ($S_{t-1}$). Infinite history. Non-blocking.
- State: Store
(last_ema, last_timestamp)in Redis. - Update:
- Calculate
delta_t = now - last_timestamp. - Adjust
alphabased ondelta_t(irregular time intervals). - Update
new_ema.
- Calculate
Materialization Engine (Batch-to-Online Sync)
How do features get from your Data Warehouse (Snowflake/BigQuery) to Redis? You need a Materialization Job.
The job must be:
- Idempotent: Running it twice shouldn’t double-count events.
- Low Latency: Features must appear in Redis within minutes of computation.
Rust Worker for Materialization
#![allow(unused)]
fn main() {
use redis::Commands;
use arrow::record_batch::RecordBatch;
pub fn materialize_batch(batch: RecordBatch, redis_client: &mut redis::Client) {
let mut con = redis_client.get_connection().unwrap();
let mut pipe = redis::pipe();
// Pseudo-code iteration over Arrow batch
for row in batch.rows() {
let key = format!("user:{}:features", row.get("user_id"));
let val = row.get("click_count");
// HSET user:123:features click_count 55
pipe.hset(key, "click_count", val);
pipe.expire(key, 86400); // 1 day TTL
}
pipe.query(&mut con).unwrap();
}
}
Infrastructure as Code (Terraform)
Do not manually click “Create Redis”.
resource "aws_elasticache_cluster" "feature_store_speed" {
cluster_id = "fs-speed-layer"
engine = "redis"
node_type = "cache.t4g.medium"
num_cache_nodes = 1
parameter_group_name = "default.redis6.x"
engine_version = "6.2"
port = 6379
subnet_group_name = aws_elasticache_subnet_group.default.name
security_group_ids = [aws_security_group.redis.id]
}
resource "aws_security_group" "redis" {
name = "feature-store-redis-sg"
ingress {
from_port = 6379
to_port = 6379
protocol = "tcp"
cidr_blocks = ["10.0.0.0/8"]
}
}
Schema Evolution
Feature definitions change. “Clicks” becomes “Weighted Clicks”.
Strategy 1: Versioning via Key Prefixes
v1:user:123:clicksv2:user:123:clicksPros: Safe. Parallel/Canary deployment possible. Cons: Double storage cost.
Strategy 2: Expansive structs (Protobuf)
Store features as a serialized Protobuf blob.
- Add new field
weighted_clicks(id=2). - Old readers just ignore it.
- New readers use it.
The Comparison Matrix: Picking a Backend
| Feature | Redis | DynamoDB | TimescaleDB | BigQuery |
|---|---|---|---|---|
| Role | Hot (Speed Layer) | Warm (Lookup) | Warm (History) | Cold (Batch) |
| Latency | < 1ms | < 10ms | < 50ms | Minutes |
| Throughput | 1M ops/sec | Scalable | Medium | High |
| Cost | $$$$ (RAM) | $$$ (WCU) | $$ (Disk) | $ (Storage) |
| TTL Support | Native | Native | Partition Drop | Partition Drop |
| Data Model | Key-Value | Key-Value | Relational | Columnar |
Troubleshooting Guide
1. “Redis is OOMing”
- Cause: You are storing infinite history in lists without
LTRIMorEXPIRE. - Fix: Implement aggressive TTLs (Time To Live). If a user hasn’t logged in for 30 days, their session features should expire.
2. “Feature Store Latency Spikes”
- Cause: Using
KEYS *or largeHGETALLcommands. - Fix: Use
SCANfor iteration. UseMGET(Multi-Get) to fetch 50 features in one RTT.
3. “Training Data doesn’t match Production”
- Cause: UTC vs Local Time timezone mismatch in the aggregation window.
- Fix: Force all timestamps to UTC ISO-8601 (
2023-01-01T00:00:00Z) at the ingest gate.
Glossary
- Entity: The object the feature belongs to (User, Product, Store).
- Feature View: A logical group of features computed together (e.g., “User Clicks View”).
- Materialization: The process of computing features and saving them to the Hot Store.
- Point-in-Time: The state of the world at a specific timestamp $T$, ignoring all events $> T$.
- Watermark: A timestamp indicating that no events older than $T$ will arrive in the stream.
Feature Freshness & The “Late Arrival” Problem
A feature is only useful if it’s fresh. Consider a “Real-time” feature: “Number of clicks in last 5 minutes”.
- Event: User clicks at 12:00:01.
- Ingest: Kafka lag is 5 seconds. Processed at 12:00:06.
- Inference Request: Arrives at 12:00:03.
The catch: The inference service cannot know about the click at 12:00:01 yet.
Solution Strategies:
- Wait: Add a “Feature Wait” buffer (e.g., 50ms) before inference. (High latency).
- Model: Train the model to expect slightly stale features. (Lower accuracy, fast). (i.e. use
sales_lag_T_minus_5_secondsas the ground truth during training). - Watermarking: In streaming engines (Flink), wait until watermark passes 12:00:05 before emitting the “12:00-12:05” window.
Offline-Online Consistency Check
You must prove that your Python/Polars batch logic produces the exact same float as your Rust/Redis online logic.
Verification Script:
- Replay 1 day of production Kafka logs through the Rust Online implementation. Capture outputs.
- Run the Polars Batch implementation on the same day’s Parquet dump.
- Join on
entity_idandtimestamp. - Assert
abs(online_val - batch_val) < epsilon.
Common causes of drift:
- Floating point definition: 32-bit (Redis) vs 64-bit (Polars).
- Time boundaries: Identifying “Start of Day” (UTC vs Local Time).
- Sort order: Processing events with identical timestamps in different orders.
The “Feature Definition” DSL
To ensure consistency, do not write logic twice (once in Python for training, once in Java/Rust for serving). Write it once in a generic DSL.
features:
- name: user_clicks_7d
entity: user
aggr: count
window: 7d
source: click_stream
implementation:
batch: polars_expr
stream: flink_window
Storage Hierarchy for Time Series Features
| Tier | Technology | Latency | Cost | Use Case |
|---|---|---|---|---|
| Hot | Redis / KeyDB | < 1ms | $$$$ | Real-time sliding windows. Last known value. |
| Warm | TimescaleDB / ClickHouse | < 50ms | $$ | Historical lookups (e.g., “Last 5 logins”). |
| Cold | S3 (Parquet) | Seconds | $ | Batch Training. |
Rust Tip: Use the redis crate with pipelined() commands to fetch 50 features in a single round-trip. Use mget for bulk retrieval.
Summary Checklist
- AS-OF Joins: Use them exclusively for creating training sets. Never use standard Left Joins.
- Partitioning: Partition Feature Store by
Dateto enable efficient time-travel. - State Compactness: Prefer EMA over exact sliding windows if strict precision isn’t required.
- Consistency Test: Automate the Offline-Online replay test in CI.
- Lag Awareness: Explicitly model data arrival delays in your features.
- Retraction: Ensure your streaming window logic correctly handles “Event Expiry”.
- Materialization: Ensure batch jobs are idempotent to prevent double counting.
- Schema: Use Protobuf for schema evolution if possible to avoid breaking changes.
- Monitoring: Track “Feature Freshness” (Age of last update) as a P0 metric.