45.8. Data Engineering Pipelines: The Polars Revolution
Important
The Problem: Pandas is single-threaded and memory-hungry (needs 5x RAM of dataset size). Spark is JVM-heavy and hard to debug. The Solution: Polars. Written in Rust. Parallel. Lazy. Vectorized. It processes 100GB of CSV on a MacBook Air without crashing.
45.8.1. Architecture: Why Polars Wins
| Feature | Pandas | Spark | Polars |
|---|---|---|---|
| Language | Python (C-API) | Scala (JVM) | Rust |
| Execution | Eager (Line-by-line) | Lazy (Plan) | Hybrid (Lazy + Eager) |
| Memory | Copy-on-Write (Partial) | GC Overhead | Arrow (Zero-Copy) |
| Parallelism | No (GIL) | Yes (Distributed) | Yes (Rayon) |
| Missing Data | NaN / None mess | Null | Option Type |
The Query Engine
Polars is not just a library; it is a Query Engine.
When you write df.filter(..).select(..), it builds a Logical Plan.
It then optimizes this plan:
- Predicate Pushdown: Moves filters to the scan level (don’t load rows you don’t need).
- Projection Pushdown: Moves selects to the scan level (don’t load columns you don’t need).
- Common Subexpression Elimination: Don’t calculate
col("a") * 2twice.
45.8.2. Getting Started in Rust
Polars in Python involves FFI. Polars in Rust is native.
[dependencies]
polars = { version = "0.36", features = ["lazy", "parquet", "streaming", "sql"] }
tokio = { version = "1", features = ["full"] }
1. The LazyFrame
Never use DataFrame (Eager) unless you are printing to stdout.
Always use LazyFrame.
#![allow(unused)]
fn main() {
use polars::prelude::*;
fn process_logs() -> PolarsResult<DataFrame> {
let q = LazyCsvReader::new("logs.csv")
.has_header(true)
.finish()?
.filter(col("status").eq(lit(500))) // Filter errors
.group_by(vec![col("endpoint")])
.agg(vec![
count().alias("error_count"),
col("latency").mean().alias("avg_latency")
])
.sort("error_count", SortOptions::default().descending());
// Optimization & Execution happens here
let df = q.collect()?;
Ok(df)
}
}
45.8.3. Streaming: Breaking the RAM Barrier
If logs.csv is 100GB and your RAM is 16GB, collect() will OOM.
Use collect_streaming() (or sink_parquet).
#![allow(unused)]
fn main() {
fn stream_to_parquet() -> PolarsResult<()> {
let q = LazyCsvReader::new("huge_file.csv").finish()?;
// Process in chunks, never materializing the whole table via streaming
// Sink directly to disk
q.sink_parquet(
"output.parquet".into(),
ParquetWriteOptions::default()
)?;
Ok(())
}
}
45.8.4. Expressions: The DSL
Polars Expressions are composable logic. They compile down to efficient Rust functions.
Window Functions
#![allow(unused)]
fn main() {
// Calculate Rolling Z-Score per Group
let expr = (col("value") - col("value").mean().over(vec![col("group")]))
/ col("value").std(1).over(vec![col("group")]);
}
String Manipulation
#![allow(unused)]
fn main() {
// Extract Regex
let browser = col("user_agent").str().extract(r"Firefox/(\d+)", 1);
}
When expressions aren’t enough: map
You can inject custom Rust functions into the query plan.
#![allow(unused)]
fn main() {
fn custom_logic(s: Series) -> PolarsResult<Option<Series>> {
let ca = s.u32()?;
let out: ChunkedArray<UInt32Type> = ca.apply_values(|v| {
// Complex Bitwise Logic
(v >> 2) ^ 0xDEADBEEF
});
Ok(Some(out.into_series()))
}
let q = df.select(vec![
col("id").map(custom_logic, GetOutput::from_type(DataType::UInt32))
]);
}
45.8.5. SQL Interface
Polars supports SQL. This is great for migrations.
#![allow(unused)]
fn main() {
use polars::sql::SQLContext;
fn run_sql() -> PolarsResult<()> {
let mut ctx = SQLContext::new();
let df = LazyCsvReader::new("data.csv").finish()?;
ctx.register("data", df);
let result = ctx.execute(
"SELECT brand, AVG(price) FROM data GROUP BY brand HAVING AVG(price) > 100"
)?.collect()?;
println!("{}", result);
Ok(())
}
}
45.8.6. Cloud I/O: Reading from S3
You don’t need boto3. Polars integrates with object_store to read directly from Cloud Storage.
#![allow(unused)]
fn main() {
// features = ["aws"]
fn read_s3() -> PolarsResult<LazyFrame> {
let cloud_options = CloudOptions::default(); // Reads ~/.aws/credentials
let args = ScanArgsParquet::default();
let lf = LazyFrame::scan_parquet(
"s3://my-bucket/data.parquet",
args
)?;
Ok(lf)
}
}
45.8.7. Case Study: Feature Engineering Pipeline
Scenario: User Clickstream Data (JSONL). Goal: Generate User features (Last 5 clicks, Time on Site).
#![allow(unused)]
fn main() {
use polars::prelude::*;
fn feature_pipeline() -> PolarsResult<()> {
let lf = LazyJsonLineReader::new("clicks.jsonl").finish()?;
let features = lf
.sort("timestamp", SortOptions::default())
.group_by(vec![col("user_id")])
.agg(vec![
// Feature 1: Count
count().alias("n_clicks"),
// Feature 2: Time on Site (Max - Min)
(col("timestamp").max() - col("timestamp").min()).alias("session_duration"),
// Feature 3: List of last 5 Page IDs
col("page_id").tail(Some(5)).alias("history_5")
]);
features.sink_parquet("features.parquet".into(), Default::default())
}
}
45.8.8. Performance Tuning
- Parquet vs CSV: Always convert CSV to Parquet first. Parquet has statistics (Min/Max) that Polars scans to skip file chunks.
- Row Groups: Ensure your Parquet row groups are reasonable size (100MB). Too small = overhead. Too big = no skipping.
- String cache: Use
StringCache::hold()when working with Categorical data globally. - jemalloc: Use
#[global_allocator]jemallocator. It is faster than system malloc for Arrow arrays.
[End of Section 45.8]
45.8.9. Deep Dive: The Query Optimizer
How does Polars make df.filter().select() fast?
It uses a Rule-Based Optimizer.
Visualizing the Plan ([Mermaid] Supported)
You can inspect the plan with lf.explain(optimized=True).
#![allow(unused)]
fn main() {
let q = LazyCsvReader::new("data.csv").finish()?;
println!("{}", q.explain(true)?);
}
Key Optimizations:
- Predicate Pushdown:
FILTERmoves pastJOIN.- Before: Join 1M rows with 1M rows -> Filter result.
- After: Filter 1M rows to 10k -> Join -> Fast.
- Projection Pushdown: Only read columns
aandbfrom disk. ignorecthroughz. - Slice Pushdown:
limit(5)stops the CSV parser after 5 rows.
45.8.10. Delta Lake Integration (deltalake crate)
Modern Data Lakes use Delta Lake (ACID transactions on Parquet). Rust has native bindings.
deltalake = { version = "0.17", features = ["s3"] }
#![allow(unused)]
fn main() {
use deltalake::open_table;
async fn read_delta() {
let table = open_table("s3://my-lake/events").await.unwrap();
println!("Table Version: {}", table.version());
// Convert to Polars
let files = table.get_files_iter().collect::<Vec<_>>();
// Scan these parquet files with Polars
}
}
45.8.11. Data Quality Checks (Great Expectations in Rust)
Validating data at 1GB/s.
#![allow(unused)]
fn main() {
fn validate_schema(df: &DataFrame) -> PolarsResult<bool> {
// Check 1: No Nulls in ID
if df.column("id")?.null_count() > 0 {
return Ok(false);
}
// Check 2: Age > 0
let mask = df.column("age")?.gt(0)?;
if !mask.all() {
return Ok(false);
}
Ok(true)
}
}
45.8.12. Graph Analytics on DataFrames
You can convert a DataFrame (src, dst) into a Graph.
#![allow(unused)]
fn main() {
use petgraph::graph::UnGraph;
fn build_interaction_graph(df: &DataFrame) -> UnGraph<String, ()> {
let src = df.column("src").unwrap().utf8().unwrap();
let dst = df.column("dst").unwrap().utf8().unwrap();
let mut graph = UnGraph::new_undirected();
let mut node_map = HashMap::new();
for (s, d) in src.into_iter().zip(dst.into_iter()) {
if let (Some(s), Some(d)) = (s, d) {
let ns = *node_map.entry(s).or_insert_with(|| graph.add_node(s.to_string()));
let nd = *node_map.entry(d).or_insert_with(|| graph.add_node(d.to_string()));
graph.add_edge(ns, nd, ());
}
}
graph
}
}
45.8.13. Benchmark: TPC-H (The Gold Standard)
The TPC-H benchmark simulates a Data Warehouse.
Data Size: 10GB (SF10) and 100GB (SF100).
| Query | Polars (Rust) | Spark (Cluster) | Dask | Pandas |
|---|---|---|---|---|
| Q1 (Aggregation) | 1.2s | 4.5s (overhead) | 3.2s | OOM |
| Q2 (Join) | 0.8s | 2.1s | 1.9s | OOM |
| Q3 (Group By) | 1.5s | 3.0s | 4.1s | OOM |
Observation: For datasets that fit on a single node (up to ~500GB NVMe swap), Polars beats Spark 3x-10x. Spark only wins when the data > 10TB and must be sharded.
45.8.14. Final Exam: The ETL CLI
Task: Build a CLI tool etl-cli that:
- Reads JSON logs from S3.
- Parses Timestamp.
- Joins with
users.parquet. - Aggregates Daily Active Users (DAU).
- Uploads result to Postgres.
Solution:
clapfor CLI.polarsfor Logic.sqlxfor Postgres.tokiofor scheduling.
This tool compiles to a 15MB binary. No Docker required. No JVM warmup.
[End of Section 45.8]
45.8.15. DataFusion: The SQL Engine
DataFusion is a query execution framework (like Spark’s Catalyst optimizer). Polars uses its own optimizer. DataFusion is used to build custom engines.
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// Create execution context
let ctx = SessionContext::new();
// Register a Parquet file
ctx.register_parquet("events", "events.parquet", ParquetReadOptions::default()).await?;
// Execute SQL
let df = ctx.sql("
SELECT
date_trunc('hour', timestamp) as hour,
count(*) as event_count,
count(distinct user_id) as unique_users
FROM events
WHERE event_type = 'purchase'
GROUP BY 1
ORDER BY 1
").await?;
// Show results
df.show().await?;
Ok(())
}
Custom Functions (UDFs)
#![allow(unused)]
fn main() {
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{create_udf, Volatility};
fn register_custom_udf(ctx: &SessionContext) {
// Define a UDF that calculates haversine distance
let haversine_udf = create_udf(
"haversine",
vec![DataType::Float64, DataType::Float64, DataType::Float64, DataType::Float64],
Arc::new(DataType::Float64),
Volatility::Immutable,
Arc::new(|args: &[ArrayRef]| {
let lat1 = args[0].as_any().downcast_ref::<Float64Array>().unwrap();
let lon1 = args[1].as_any().downcast_ref::<Float64Array>().unwrap();
let lat2 = args[2].as_any().downcast_ref::<Float64Array>().unwrap();
let lon2 = args[3].as_any().downcast_ref::<Float64Array>().unwrap();
let result: Float64Array = (0..lat1.len())
.map(|i| {
Some(haversine_distance(
lat1.value(i), lon1.value(i),
lat2.value(i), lon2.value(i),
))
})
.collect();
Ok(Arc::new(result) as ArrayRef)
}),
);
ctx.register_udf(haversine_udf);
}
fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
let r = 6371.0; // Earth radius in km
let d_lat = (lat2 - lat1).to_radians();
let d_lon = (lon2 - lon1).to_radians();
let a = (d_lat / 2.0).sin().powi(2)
+ lat1.to_radians().cos() * lat2.to_radians().cos() * (d_lon / 2.0).sin().powi(2);
let c = 2.0 * a.sqrt().asin();
r * c
}
}
45.8.16. Apache Iceberg Integration
Iceberg is a table format for huge analytic datasets (alternative to Delta Lake).
Rust has native Iceberg support via iceberg-rust.
#![allow(unused)]
fn main() {
use iceberg_rust::catalog::Catalog;
use iceberg_rust::table::Table;
async fn read_iceberg_table() -> Result<(), Box<dyn std::error::Error>> {
// Connect to Iceberg catalog (e.g., AWS Glue, Hive Metastore)
let catalog = Catalog::from_uri("glue://my-catalog").await?;
// Load table
let table = catalog.load_table("my_database.events").await?;
// Get current snapshot
let snapshot = table.current_snapshot()?;
println!("Snapshot ID: {}", snapshot.snapshot_id());
println!("Timestamp: {:?}", snapshot.timestamp());
// List data files
for file in table.data_files()? {
println!("File: {} ({} records)", file.path, file.record_count);
}
// Time travel: read previous version
let old_table = table.at_snapshot(previous_snapshot_id)?;
Ok(())
}
}
Writing to Iceberg
#![allow(unused)]
fn main() {
async fn write_iceberg_table(df: &DataFrame) -> Result<(), Box<dyn std::error::Error>> {
let catalog = Catalog::from_uri("glue://my-catalog").await?;
// Create table if not exists
let schema = df.schema();
let table = catalog.create_table(
"my_database.new_events",
schema,
PartitionSpec::builder()
.year("timestamp")
.identity("region")
.build(),
).await?;
// Append data
let batches = df.to_arrow_batches()?;
table.append(batches).await?;
// Commit transaction
table.commit().await?;
Ok(())
}
}
45.8.17. Real-Time Streaming with Apache Arrow Flight
Arrow Flight is a high-performance RPC protocol for transferring Arrow data. It’s 10x faster than gRPC+Protobuf for large datasets.
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use arrow_flight::{FlightData, SchemaAsIpc, Ticket};
pub struct DataServer {
datasets: HashMap<String, DataFrame>,
}
#[tonic::async_trait]
impl FlightService for DataServer {
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
async fn do_get(
&self,
request: Request<Ticket>,
) -> Result<Response<Self::DoGetStream>, Status> {
let ticket = request.into_inner();
let dataset_name = std::str::from_utf8(&ticket.ticket)
.map_err(|_| Status::invalid_argument("Invalid ticket"))?;
let df = self.datasets.get(dataset_name)
.ok_or_else(|| Status::not_found("Dataset not found"))?;
// Convert DataFrame to Arrow RecordBatches
let batches = df.to_arrow_batches()?;
// Stream batches
let stream = futures::stream::iter(batches)
.map(|batch| {
let flight_data = FlightData::from(&batch);
Ok(flight_data)
});
Ok(Response::new(Box::pin(stream)))
}
}
// Usage: Start server
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server = DataServer::new();
Server::builder()
.add_service(FlightServiceServer::new(server))
.serve("[::1]:50051".parse()?)
.await?;
Ok(())
}
Flight Client
#![allow(unused)]
fn main() {
use arrow_flight::flight_service_client::FlightServiceClient;
async fn fetch_data() -> Result<DataFrame, Box<dyn std::error::Error>> {
let mut client = FlightServiceClient::connect("http://localhost:50051").await?;
let ticket = Ticket {
ticket: b"my_dataset".to_vec().into(),
};
let mut stream = client.do_get(ticket).await?.into_inner();
let mut batches = vec![];
while let Some(flight_data) = stream.message().await? {
let batch = RecordBatch::try_from(&flight_data)?;
batches.push(batch);
}
let df = DataFrame::from_batches(&batches)?;
Ok(df)
}
}
45.8.18. Database Connectors
PostgreSQL with SQLx
#![allow(unused)]
fn main() {
use sqlx::postgres::PgPoolOptions;
use polars::prelude::*;
async fn load_from_postgres() -> PolarsResult<DataFrame> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://user:pass@localhost/db")
.await?;
// Execute query
let rows = sqlx::query!(
"SELECT id, name, value, created_at FROM metrics WHERE value > $1",
100.0
)
.fetch_all(&pool)
.await?;
// Convert to Polars
let ids: Vec<i64> = rows.iter().map(|r| r.id).collect();
let names: Vec<String> = rows.iter().map(|r| r.name.clone()).collect();
let values: Vec<f64> = rows.iter().map(|r| r.value).collect();
let df = df! {
"id" => ids,
"name" => names,
"value" => values,
}?;
Ok(df)
}
async fn write_to_postgres(df: &DataFrame, pool: &PgPool) -> Result<(), sqlx::Error> {
let ids = df.column("id")?.i64()?;
let values = df.column("value")?.f64()?;
for (id, value) in ids.into_iter().zip(values.into_iter()) {
if let (Some(id), Some(value)) = (id, value) {
sqlx::query!(
"INSERT INTO results (id, value) VALUES ($1, $2)",
id, value
)
.execute(pool)
.await?;
}
}
Ok(())
}
}
ClickHouse for OLAP
#![allow(unused)]
fn main() {
use clickhouse::{Client, Row};
#[derive(Row, Debug)]
struct Event {
timestamp: u64,
user_id: String,
event_type: String,
value: f64,
}
async fn query_clickhouse() -> Result<Vec<Event>, clickhouse::error::Error> {
let client = Client::default()
.with_url("http://localhost:8123")
.with_database("analytics");
let events = client
.query("SELECT timestamp, user_id, event_type, value FROM events WHERE timestamp > ?")
.bind(1700000000)
.fetch_all::<Event>()
.await?;
Ok(events)
}
async fn insert_clickhouse(events: &[Event]) -> Result<(), clickhouse::error::Error> {
let client = Client::default().with_url("http://localhost:8123");
let mut insert = client.insert("events")?;
for event in events {
insert.write(event).await?;
}
insert.end().await?;
Ok(())
}
}
45.8.19. CDC (Change Data Capture) Pipeline
Capture database changes in real-time and process with Polars.
#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;
async fn cdc_pipeline() {
// Debezium sends CDC events to Kafka
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "polars-cdc-consumer")
.set("bootstrap.servers", "localhost:9092")
.set("auto.offset.reset", "earliest")
.create()
.expect("Consumer creation failed");
consumer.subscribe(&["postgres.public.users"]).unwrap();
let mut batch = vec![];
loop {
match consumer.recv().await {
Ok(message) => {
if let Some(payload) = message.payload() {
let event: DebeziumEvent = serde_json::from_slice(payload).unwrap();
match event.op.as_str() {
"c" | "u" => {
// Create or Update
batch.push(event.after.unwrap());
}
"d" => {
// Delete - handle tombstone
// ...
}
_ => {}
}
// Process batch every 1000 events
if batch.len() >= 1000 {
let df = records_to_dataframe(&batch);
process_updates(&df).await;
batch.clear();
}
}
}
Err(e) => eprintln!("Kafka error: {}", e),
}
}
}
#[derive(Deserialize)]
struct DebeziumEvent {
op: String,
before: Option<UserRecord>,
after: Option<UserRecord>,
}
}
45.8.20. Data Pipeline Orchestration
Build a complete ETL pipeline with error handling and checkpointing.
#![allow(unused)]
fn main() {
use tokio::fs;
pub struct Pipeline {
name: String,
steps: Vec<Box<dyn PipelineStep>>,
checkpoint_dir: PathBuf,
}
#[async_trait]
pub trait PipelineStep: Send + Sync {
fn name(&self) -> &str;
async fn execute(&self, input: LazyFrame) -> PolarsResult<LazyFrame>;
}
impl Pipeline {
pub async fn run(&self) -> PolarsResult<()> {
let mut current = self.load_checkpoint().await?;
for (i, step) in self.steps.iter().enumerate() {
tracing::info!(step = step.name(), "Executing step");
let start = std::time::Instant::now();
match step.execute(current.clone()).await {
Ok(result) => {
current = result;
self.save_checkpoint(i, ¤t).await?;
tracing::info!(
step = step.name(),
duration_ms = start.elapsed().as_millis(),
"Step completed"
);
}
Err(e) => {
tracing::error!(
step = step.name(),
error = ?e,
"Step failed"
);
return Err(e);
}
}
}
Ok(())
}
async fn load_checkpoint(&self) -> PolarsResult<LazyFrame> {
let checkpoint_path = self.checkpoint_dir.join("latest.parquet");
if checkpoint_path.exists() {
LazyFrame::scan_parquet(&checkpoint_path, ScanArgsParquet::default())
} else {
// Start from source
LazyCsvReader::new(&self.source_path).finish()
}
}
async fn save_checkpoint(&self, step_idx: usize, df: &LazyFrame) -> PolarsResult<()> {
let path = self.checkpoint_dir.join(format!("step_{}.parquet", step_idx));
df.clone().sink_parquet(path, Default::default())
}
}
// Example steps
struct FilterNullsStep;
#[async_trait]
impl PipelineStep for FilterNullsStep {
fn name(&self) -> &str { "filter_nulls" }
async fn execute(&self, input: LazyFrame) -> PolarsResult<LazyFrame> {
Ok(input.drop_nulls(None))
}
}
struct NormalizeStep {
columns: Vec<String>,
}
#[async_trait]
impl PipelineStep for NormalizeStep {
fn name(&self) -> &str { "normalize" }
async fn execute(&self, input: LazyFrame) -> PolarsResult<LazyFrame> {
let mut exprs = vec![];
for col_name in &self.columns {
let col_expr = col(col_name);
let normalized = (col_expr.clone() - col_expr.clone().min())
/ (col_expr.clone().max() - col_expr.min());
exprs.push(normalized.alias(col_name));
}
Ok(input.with_columns(exprs))
}
}
}
45.8.21. Production Monitoring
Track data quality and pipeline health.
#![allow(unused)]
fn main() {
use metrics::{counter, histogram, gauge};
pub struct DataQualityMetrics;
impl DataQualityMetrics {
pub fn record(df: &DataFrame, dataset_name: &str) {
let labels = vec![("dataset", dataset_name.to_string())];
// Row count
gauge!("data_row_count", &labels).set(df.height() as f64);
// Null percentages per column
for col_name in df.get_column_names() {
let null_count = df.column(col_name)
.map(|c| c.null_count())
.unwrap_or(0);
let null_pct = null_count as f64 / df.height() as f64;
gauge!(
"data_null_percentage",
&[("dataset", dataset_name.to_string()), ("column", col_name.to_string())]
).set(null_pct);
}
// Schema drift detection
let current_schema = df.schema();
if let Some(expected) = EXPECTED_SCHEMAS.get(dataset_name) {
if ¤t_schema != expected {
counter!("data_schema_drift", &labels).increment(1);
}
}
}
}
}
45.8.22. Final Architecture: The Modern Data Stack in Rust
┌─────────────────────────────────────────────────────────────────────┐
│ Rust Data Engineering Stack │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Sources │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ S3 │ │ Kafka │ │ Postgres│ │ API │ │ Files │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │ │
│ ┌────▼───────────▼───────────▼───────────▼───────────▼────────────┐│
│ │ Ingestion Layer ││
│ │ • object_store (S3/GCS/Azure) ││
│ │ • rdkafka (Kafka consumer) ││
│ │ • sqlx (Database connectors) ││
│ └──────────────────────────────┬───────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────────▼───────────────────────────────────┐│
│ │ Processing Layer ││
│ │ • Polars (DataFrame operations) ││
│ │ • DataFusion (SQL engine) ││
│ │ • Custom operators (Rayon parallel) ││
│ └──────────────────────────────┬───────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────────▼───────────────────────────────────┐│
│ │ Storage Layer ││
│ │ • Parquet (columnar files) ││
│ │ • Delta Lake / Iceberg (table formats) ││
│ │ • Lance (ML vector storage) ││
│ └──────────────────────────────┬───────────────────────────────────┘│
│ │ │
│ ┌──────────────────────────────▼───────────────────────────────────┐│
│ │ Serving Layer ││
│ │ • Arrow Flight (high-speed data transfer) ││
│ │ • Axum (REST APIs) ││
│ │ • ClickHouse connector (OLAP queries) ││
│ └─────────────────────────────────────────────────────────────────┘│
│ │
└─────────────────────────────────────────────────────────────────────┘
Total Stack Benefits:
- 10x faster than Python + Pandas
- 90% less memory than Spark
- Single binary deployment (no JVM, no Python env)
- Type-safe transforms (catch errors at compile time)
[End of Section 45.8]