Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

45.8. Data Engineering Pipelines: The Polars Revolution

Important

The Problem: Pandas is single-threaded and memory-hungry (needs 5x RAM of dataset size). Spark is JVM-heavy and hard to debug. The Solution: Polars. Written in Rust. Parallel. Lazy. Vectorized. It processes 100GB of CSV on a MacBook Air without crashing.

45.8.1. Architecture: Why Polars Wins

FeaturePandasSparkPolars
LanguagePython (C-API)Scala (JVM)Rust
ExecutionEager (Line-by-line)Lazy (Plan)Hybrid (Lazy + Eager)
MemoryCopy-on-Write (Partial)GC OverheadArrow (Zero-Copy)
ParallelismNo (GIL)Yes (Distributed)Yes (Rayon)
Missing DataNaN / None messNullOption Type

The Query Engine

Polars is not just a library; it is a Query Engine. When you write df.filter(..).select(..), it builds a Logical Plan. It then optimizes this plan:

  1. Predicate Pushdown: Moves filters to the scan level (don’t load rows you don’t need).
  2. Projection Pushdown: Moves selects to the scan level (don’t load columns you don’t need).
  3. Common Subexpression Elimination: Don’t calculate col("a") * 2 twice.

45.8.2. Getting Started in Rust

Polars in Python involves FFI. Polars in Rust is native.

[dependencies]
polars = { version = "0.36", features = ["lazy", "parquet", "streaming", "sql"] }
tokio = { version = "1", features = ["full"] }

1. The LazyFrame

Never use DataFrame (Eager) unless you are printing to stdout. Always use LazyFrame.

#![allow(unused)]
fn main() {
use polars::prelude::*;

fn process_logs() -> PolarsResult<DataFrame> {
    let q = LazyCsvReader::new("logs.csv")
        .has_header(true)
        .finish()?
        .filter(col("status").eq(lit(500))) // Filter errors
        .group_by(vec![col("endpoint")])
        .agg(vec![
            count().alias("error_count"),
            col("latency").mean().alias("avg_latency")
        ])
        .sort("error_count", SortOptions::default().descending());
        
    // Optimization & Execution happens here
    let df = q.collect()?;
    Ok(df)
}
}

45.8.3. Streaming: Breaking the RAM Barrier

If logs.csv is 100GB and your RAM is 16GB, collect() will OOM. Use collect_streaming() (or sink_parquet).

#![allow(unused)]
fn main() {
fn stream_to_parquet() -> PolarsResult<()> {
    let q = LazyCsvReader::new("huge_file.csv").finish()?;
    
    // Process in chunks, never materializing the whole table via streaming
    // Sink directly to disk
    q.sink_parquet(
        "output.parquet".into(),
        ParquetWriteOptions::default()
    )?;
    
    Ok(())
}
}

45.8.4. Expressions: The DSL

Polars Expressions are composable logic. They compile down to efficient Rust functions.

Window Functions

#![allow(unused)]
fn main() {
// Calculate Rolling Z-Score per Group
let expr = (col("value") - col("value").mean().over(vec![col("group")]))
    / col("value").std(1).over(vec![col("group")]);
}

String Manipulation

#![allow(unused)]
fn main() {
// Extract Regex
let browser = col("user_agent").str().extract(r"Firefox/(\d+)", 1);
}

When expressions aren’t enough: map

You can inject custom Rust functions into the query plan.

#![allow(unused)]
fn main() {
fn custom_logic(s: Series) -> PolarsResult<Option<Series>> {
    let ca = s.u32()?;
    let out: ChunkedArray<UInt32Type> = ca.apply_values(|v| {
        // Complex Bitwise Logic
        (v >> 2) ^ 0xDEADBEEF
    });
    Ok(Some(out.into_series()))
}

let q = df.select(vec![
    col("id").map(custom_logic, GetOutput::from_type(DataType::UInt32))
]);
}

45.8.5. SQL Interface

Polars supports SQL. This is great for migrations.

#![allow(unused)]
fn main() {
use polars::sql::SQLContext;

fn run_sql() -> PolarsResult<()> {
    let mut ctx = SQLContext::new();
    
    let df = LazyCsvReader::new("data.csv").finish()?;
    ctx.register("data", df);
    
    let result = ctx.execute(
        "SELECT brand, AVG(price) FROM data GROUP BY brand HAVING AVG(price) > 100"
    )?.collect()?;
    
    println!("{}", result);
    Ok(())
}
}

45.8.6. Cloud I/O: Reading from S3

You don’t need boto3. Polars integrates with object_store to read directly from Cloud Storage.

#![allow(unused)]
fn main() {
// features = ["aws"]

fn read_s3() -> PolarsResult<LazyFrame> {
    let cloud_options = CloudOptions::default(); // Reads ~/.aws/credentials
    
    let args = ScanArgsParquet::default();
    
    let lf = LazyFrame::scan_parquet(
        "s3://my-bucket/data.parquet", 
        args
    )?;
    
    Ok(lf)
}
}

45.8.7. Case Study: Feature Engineering Pipeline

Scenario: User Clickstream Data (JSONL). Goal: Generate User features (Last 5 clicks, Time on Site).

#![allow(unused)]
fn main() {
use polars::prelude::*;

fn feature_pipeline() -> PolarsResult<()> {
    let lf = LazyJsonLineReader::new("clicks.jsonl").finish()?;
    
    let features = lf
        .sort("timestamp", SortOptions::default())
        .group_by(vec![col("user_id")])
        .agg(vec![
            // Feature 1: Count
            count().alias("n_clicks"),
            
            // Feature 2: Time on Site (Max - Min)
            (col("timestamp").max() - col("timestamp").min()).alias("session_duration"),
            
            // Feature 3: List of last 5 Page IDs
            col("page_id").tail(Some(5)).alias("history_5")
        ]);
        
    features.sink_parquet("features.parquet".into(), Default::default())
}
}

45.8.8. Performance Tuning

  1. Parquet vs CSV: Always convert CSV to Parquet first. Parquet has statistics (Min/Max) that Polars scans to skip file chunks.
  2. Row Groups: Ensure your Parquet row groups are reasonable size (100MB). Too small = overhead. Too big = no skipping.
  3. String cache: Use StringCache::hold() when working with Categorical data globally.
  4. jemalloc: Use #[global_allocator] jemallocator. It is faster than system malloc for Arrow arrays.

[End of Section 45.8]

45.8.9. Deep Dive: The Query Optimizer

How does Polars make df.filter().select() fast? It uses a Rule-Based Optimizer.

Visualizing the Plan ([Mermaid] Supported)

You can inspect the plan with lf.explain(optimized=True).

#![allow(unused)]
fn main() {
let q = LazyCsvReader::new("data.csv").finish()?;
println!("{}", q.explain(true)?);
}

Key Optimizations:

  1. Predicate Pushdown: FILTER moves past JOIN.
    • Before: Join 1M rows with 1M rows -> Filter result.
    • After: Filter 1M rows to 10k -> Join -> Fast.
  2. Projection Pushdown: Only read columns a and b from disk. ignore c through z.
  3. Slice Pushdown: limit(5) stops the CSV parser after 5 rows.

45.8.10. Delta Lake Integration (deltalake crate)

Modern Data Lakes use Delta Lake (ACID transactions on Parquet). Rust has native bindings.

deltalake = { version = "0.17", features = ["s3"] }
#![allow(unused)]
fn main() {
use deltalake::open_table;

async fn read_delta() {
    let table = open_table("s3://my-lake/events").await.unwrap();
    println!("Table Version: {}", table.version());
    
    // Convert to Polars
    let files = table.get_files_iter().collect::<Vec<_>>();
    // Scan these parquet files with Polars
}
}

45.8.11. Data Quality Checks (Great Expectations in Rust)

Validating data at 1GB/s.

#![allow(unused)]
fn main() {
fn validate_schema(df: &DataFrame) -> PolarsResult<bool> {
    // Check 1: No Nulls in ID
    if df.column("id")?.null_count() > 0 {
        return Ok(false);
    }
    
    // Check 2: Age > 0
    let mask = df.column("age")?.gt(0)?;
    if !mask.all() {
        return Ok(false);
    }
    
    Ok(true)
}
}

45.8.12. Graph Analytics on DataFrames

You can convert a DataFrame (src, dst) into a Graph.

#![allow(unused)]
fn main() {
use petgraph::graph::UnGraph;

fn build_interaction_graph(df: &DataFrame) -> UnGraph<String, ()> {
    let src = df.column("src").unwrap().utf8().unwrap();
    let dst = df.column("dst").unwrap().utf8().unwrap();
    
    let mut graph = UnGraph::new_undirected();
    let mut node_map = HashMap::new();
    
    for (s, d) in src.into_iter().zip(dst.into_iter()) {
        if let (Some(s), Some(d)) = (s, d) {
             let ns = *node_map.entry(s).or_insert_with(|| graph.add_node(s.to_string()));
             let nd = *node_map.entry(d).or_insert_with(|| graph.add_node(d.to_string()));
             graph.add_edge(ns, nd, ());
        }
    }
    graph
}
}

45.8.13. Benchmark: TPC-H (The Gold Standard)

The TPC-H benchmark simulates a Data Warehouse. Data Size: 10GB (SF10) and 100GB (SF100).

QueryPolars (Rust)Spark (Cluster)DaskPandas
Q1 (Aggregation)1.2s4.5s (overhead)3.2sOOM
Q2 (Join)0.8s2.1s1.9sOOM
Q3 (Group By)1.5s3.0s4.1sOOM

Observation: For datasets that fit on a single node (up to ~500GB NVMe swap), Polars beats Spark 3x-10x. Spark only wins when the data > 10TB and must be sharded.

45.8.14. Final Exam: The ETL CLI

Task: Build a CLI tool etl-cli that:

  1. Reads JSON logs from S3.
  2. Parses Timestamp.
  3. Joins with users.parquet.
  4. Aggregates Daily Active Users (DAU).
  5. Uploads result to Postgres.

Solution:

  • clap for CLI.
  • polars for Logic.
  • sqlx for Postgres.
  • tokio for scheduling.

This tool compiles to a 15MB binary. No Docker required. No JVM warmup.

[End of Section 45.8]

45.8.15. DataFusion: The SQL Engine

DataFusion is a query execution framework (like Spark’s Catalyst optimizer). Polars uses its own optimizer. DataFusion is used to build custom engines.

use datafusion::prelude::*;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
    // Create execution context
    let ctx = SessionContext::new();
    
    // Register a Parquet file
    ctx.register_parquet("events", "events.parquet", ParquetReadOptions::default()).await?;
    
    // Execute SQL
    let df = ctx.sql("
        SELECT 
            date_trunc('hour', timestamp) as hour,
            count(*) as event_count,
            count(distinct user_id) as unique_users
        FROM events
        WHERE event_type = 'purchase'
        GROUP BY 1
        ORDER BY 1
    ").await?;
    
    // Show results
    df.show().await?;
    
    Ok(())
}

Custom Functions (UDFs)

#![allow(unused)]
fn main() {
use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{create_udf, Volatility};

fn register_custom_udf(ctx: &SessionContext) {
    // Define a UDF that calculates haversine distance
    let haversine_udf = create_udf(
        "haversine",
        vec![DataType::Float64, DataType::Float64, DataType::Float64, DataType::Float64],
        Arc::new(DataType::Float64),
        Volatility::Immutable,
        Arc::new(|args: &[ArrayRef]| {
            let lat1 = args[0].as_any().downcast_ref::<Float64Array>().unwrap();
            let lon1 = args[1].as_any().downcast_ref::<Float64Array>().unwrap();
            let lat2 = args[2].as_any().downcast_ref::<Float64Array>().unwrap();
            let lon2 = args[3].as_any().downcast_ref::<Float64Array>().unwrap();
            
            let result: Float64Array = (0..lat1.len())
                .map(|i| {
                    Some(haversine_distance(
                        lat1.value(i), lon1.value(i),
                        lat2.value(i), lon2.value(i),
                    ))
                })
                .collect();
            
            Ok(Arc::new(result) as ArrayRef)
        }),
    );
    
    ctx.register_udf(haversine_udf);
}

fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
    let r = 6371.0; // Earth radius in km
    let d_lat = (lat2 - lat1).to_radians();
    let d_lon = (lon2 - lon1).to_radians();
    let a = (d_lat / 2.0).sin().powi(2)
        + lat1.to_radians().cos() * lat2.to_radians().cos() * (d_lon / 2.0).sin().powi(2);
    let c = 2.0 * a.sqrt().asin();
    r * c
}
}

45.8.16. Apache Iceberg Integration

Iceberg is a table format for huge analytic datasets (alternative to Delta Lake). Rust has native Iceberg support via iceberg-rust.

#![allow(unused)]
fn main() {
use iceberg_rust::catalog::Catalog;
use iceberg_rust::table::Table;

async fn read_iceberg_table() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to Iceberg catalog (e.g., AWS Glue, Hive Metastore)
    let catalog = Catalog::from_uri("glue://my-catalog").await?;
    
    // Load table
    let table = catalog.load_table("my_database.events").await?;
    
    // Get current snapshot
    let snapshot = table.current_snapshot()?;
    println!("Snapshot ID: {}", snapshot.snapshot_id());
    println!("Timestamp: {:?}", snapshot.timestamp());
    
    // List data files
    for file in table.data_files()? {
        println!("File: {} ({} records)", file.path, file.record_count);
    }
    
    // Time travel: read previous version
    let old_table = table.at_snapshot(previous_snapshot_id)?;
    
    Ok(())
}
}

Writing to Iceberg

#![allow(unused)]
fn main() {
async fn write_iceberg_table(df: &DataFrame) -> Result<(), Box<dyn std::error::Error>> {
    let catalog = Catalog::from_uri("glue://my-catalog").await?;
    
    // Create table if not exists
    let schema = df.schema();
    let table = catalog.create_table(
        "my_database.new_events",
        schema,
        PartitionSpec::builder()
            .year("timestamp")
            .identity("region")
            .build(),
    ).await?;
    
    // Append data
    let batches = df.to_arrow_batches()?;
    table.append(batches).await?;
    
    // Commit transaction
    table.commit().await?;
    
    Ok(())
}
}

45.8.17. Real-Time Streaming with Apache Arrow Flight

Arrow Flight is a high-performance RPC protocol for transferring Arrow data. It’s 10x faster than gRPC+Protobuf for large datasets.

use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use arrow_flight::{FlightData, SchemaAsIpc, Ticket};

pub struct DataServer {
    datasets: HashMap<String, DataFrame>,
}

#[tonic::async_trait]
impl FlightService for DataServer {
    type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
    
    async fn do_get(
        &self,
        request: Request<Ticket>,
    ) -> Result<Response<Self::DoGetStream>, Status> {
        let ticket = request.into_inner();
        let dataset_name = std::str::from_utf8(&ticket.ticket)
            .map_err(|_| Status::invalid_argument("Invalid ticket"))?;
        
        let df = self.datasets.get(dataset_name)
            .ok_or_else(|| Status::not_found("Dataset not found"))?;
        
        // Convert DataFrame to Arrow RecordBatches
        let batches = df.to_arrow_batches()?;
        
        // Stream batches
        let stream = futures::stream::iter(batches)
            .map(|batch| {
                let flight_data = FlightData::from(&batch);
                Ok(flight_data)
            });
        
        Ok(Response::new(Box::pin(stream)))
    }
}

// Usage: Start server
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let server = DataServer::new();
    
    Server::builder()
        .add_service(FlightServiceServer::new(server))
        .serve("[::1]:50051".parse()?)
        .await?;
    
    Ok(())
}

Flight Client

#![allow(unused)]
fn main() {
use arrow_flight::flight_service_client::FlightServiceClient;

async fn fetch_data() -> Result<DataFrame, Box<dyn std::error::Error>> {
    let mut client = FlightServiceClient::connect("http://localhost:50051").await?;
    
    let ticket = Ticket {
        ticket: b"my_dataset".to_vec().into(),
    };
    
    let mut stream = client.do_get(ticket).await?.into_inner();
    let mut batches = vec![];
    
    while let Some(flight_data) = stream.message().await? {
        let batch = RecordBatch::try_from(&flight_data)?;
        batches.push(batch);
    }
    
    let df = DataFrame::from_batches(&batches)?;
    Ok(df)
}
}

45.8.18. Database Connectors

PostgreSQL with SQLx

#![allow(unused)]
fn main() {
use sqlx::postgres::PgPoolOptions;
use polars::prelude::*;

async fn load_from_postgres() -> PolarsResult<DataFrame> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://user:pass@localhost/db")
        .await?;
    
    // Execute query
    let rows = sqlx::query!(
        "SELECT id, name, value, created_at FROM metrics WHERE value > $1",
        100.0
    )
    .fetch_all(&pool)
    .await?;
    
    // Convert to Polars
    let ids: Vec<i64> = rows.iter().map(|r| r.id).collect();
    let names: Vec<String> = rows.iter().map(|r| r.name.clone()).collect();
    let values: Vec<f64> = rows.iter().map(|r| r.value).collect();
    
    let df = df! {
        "id" => ids,
        "name" => names,
        "value" => values,
    }?;
    
    Ok(df)
}

async fn write_to_postgres(df: &DataFrame, pool: &PgPool) -> Result<(), sqlx::Error> {
    let ids = df.column("id")?.i64()?;
    let values = df.column("value")?.f64()?;
    
    for (id, value) in ids.into_iter().zip(values.into_iter()) {
        if let (Some(id), Some(value)) = (id, value) {
            sqlx::query!(
                "INSERT INTO results (id, value) VALUES ($1, $2)",
                id, value
            )
            .execute(pool)
            .await?;
        }
    }
    
    Ok(())
}
}

ClickHouse for OLAP

#![allow(unused)]
fn main() {
use clickhouse::{Client, Row};

#[derive(Row, Debug)]
struct Event {
    timestamp: u64,
    user_id: String,
    event_type: String,
    value: f64,
}

async fn query_clickhouse() -> Result<Vec<Event>, clickhouse::error::Error> {
    let client = Client::default()
        .with_url("http://localhost:8123")
        .with_database("analytics");
    
    let events = client
        .query("SELECT timestamp, user_id, event_type, value FROM events WHERE timestamp > ?")
        .bind(1700000000)
        .fetch_all::<Event>()
        .await?;
    
    Ok(events)
}

async fn insert_clickhouse(events: &[Event]) -> Result<(), clickhouse::error::Error> {
    let client = Client::default().with_url("http://localhost:8123");
    
    let mut insert = client.insert("events")?;
    for event in events {
        insert.write(event).await?;
    }
    insert.end().await?;
    
    Ok(())
}
}

45.8.19. CDC (Change Data Capture) Pipeline

Capture database changes in real-time and process with Polars.

#![allow(unused)]
fn main() {
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::Message;

async fn cdc_pipeline() {
    // Debezium sends CDC events to Kafka
    let consumer: StreamConsumer = ClientConfig::new()
        .set("group.id", "polars-cdc-consumer")
        .set("bootstrap.servers", "localhost:9092")
        .set("auto.offset.reset", "earliest")
        .create()
        .expect("Consumer creation failed");
    
    consumer.subscribe(&["postgres.public.users"]).unwrap();
    
    let mut batch = vec![];
    
    loop {
        match consumer.recv().await {
            Ok(message) => {
                if let Some(payload) = message.payload() {
                    let event: DebeziumEvent = serde_json::from_slice(payload).unwrap();
                    
                    match event.op.as_str() {
                        "c" | "u" => {
                            // Create or Update
                            batch.push(event.after.unwrap());
                        }
                        "d" => {
                            // Delete - handle tombstone
                            // ...
                        }
                        _ => {}
                    }
                    
                    // Process batch every 1000 events
                    if batch.len() >= 1000 {
                        let df = records_to_dataframe(&batch);
                        process_updates(&df).await;
                        batch.clear();
                    }
                }
            }
            Err(e) => eprintln!("Kafka error: {}", e),
        }
    }
}

#[derive(Deserialize)]
struct DebeziumEvent {
    op: String,
    before: Option<UserRecord>,
    after: Option<UserRecord>,
}
}

45.8.20. Data Pipeline Orchestration

Build a complete ETL pipeline with error handling and checkpointing.

#![allow(unused)]
fn main() {
use tokio::fs;

pub struct Pipeline {
    name: String,
    steps: Vec<Box<dyn PipelineStep>>,
    checkpoint_dir: PathBuf,
}

#[async_trait]
pub trait PipelineStep: Send + Sync {
    fn name(&self) -> &str;
    async fn execute(&self, input: LazyFrame) -> PolarsResult<LazyFrame>;
}

impl Pipeline {
    pub async fn run(&self) -> PolarsResult<()> {
        let mut current = self.load_checkpoint().await?;
        
        for (i, step) in self.steps.iter().enumerate() {
            tracing::info!(step = step.name(), "Executing step");
            let start = std::time::Instant::now();
            
            match step.execute(current.clone()).await {
                Ok(result) => {
                    current = result;
                    self.save_checkpoint(i, &current).await?;
                    
                    tracing::info!(
                        step = step.name(),
                        duration_ms = start.elapsed().as_millis(),
                        "Step completed"
                    );
                }
                Err(e) => {
                    tracing::error!(
                        step = step.name(),
                        error = ?e,
                        "Step failed"
                    );
                    return Err(e);
                }
            }
        }
        
        Ok(())
    }
    
    async fn load_checkpoint(&self) -> PolarsResult<LazyFrame> {
        let checkpoint_path = self.checkpoint_dir.join("latest.parquet");
        
        if checkpoint_path.exists() {
            LazyFrame::scan_parquet(&checkpoint_path, ScanArgsParquet::default())
        } else {
            // Start from source
            LazyCsvReader::new(&self.source_path).finish()
        }
    }
    
    async fn save_checkpoint(&self, step_idx: usize, df: &LazyFrame) -> PolarsResult<()> {
        let path = self.checkpoint_dir.join(format!("step_{}.parquet", step_idx));
        df.clone().sink_parquet(path, Default::default())
    }
}

// Example steps
struct FilterNullsStep;

#[async_trait]
impl PipelineStep for FilterNullsStep {
    fn name(&self) -> &str { "filter_nulls" }
    
    async fn execute(&self, input: LazyFrame) -> PolarsResult<LazyFrame> {
        Ok(input.drop_nulls(None))
    }
}

struct NormalizeStep {
    columns: Vec<String>,
}

#[async_trait]
impl PipelineStep for NormalizeStep {
    fn name(&self) -> &str { "normalize" }
    
    async fn execute(&self, input: LazyFrame) -> PolarsResult<LazyFrame> {
        let mut exprs = vec![];
        
        for col_name in &self.columns {
            let col_expr = col(col_name);
            let normalized = (col_expr.clone() - col_expr.clone().min())
                / (col_expr.clone().max() - col_expr.min());
            exprs.push(normalized.alias(col_name));
        }
        
        Ok(input.with_columns(exprs))
    }
}
}

45.8.21. Production Monitoring

Track data quality and pipeline health.

#![allow(unused)]
fn main() {
use metrics::{counter, histogram, gauge};

pub struct DataQualityMetrics;

impl DataQualityMetrics {
    pub fn record(df: &DataFrame, dataset_name: &str) {
        let labels = vec![("dataset", dataset_name.to_string())];
        
        // Row count
        gauge!("data_row_count", &labels).set(df.height() as f64);
        
        // Null percentages per column
        for col_name in df.get_column_names() {
            let null_count = df.column(col_name)
                .map(|c| c.null_count())
                .unwrap_or(0);
            
            let null_pct = null_count as f64 / df.height() as f64;
            
            gauge!(
                "data_null_percentage", 
                &[("dataset", dataset_name.to_string()), ("column", col_name.to_string())]
            ).set(null_pct);
        }
        
        // Schema drift detection
        let current_schema = df.schema();
        if let Some(expected) = EXPECTED_SCHEMAS.get(dataset_name) {
            if &current_schema != expected {
                counter!("data_schema_drift", &labels).increment(1);
            }
        }
    }
}
}

45.8.22. Final Architecture: The Modern Data Stack in Rust

┌─────────────────────────────────────────────────────────────────────┐
│                    Rust Data Engineering Stack                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  Sources                                                             │
│  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐       │
│  │   S3    │ │ Kafka   │ │ Postgres│ │ API     │ │ Files   │       │
│  └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘       │
│       │           │           │           │           │             │
│  ┌────▼───────────▼───────────▼───────────▼───────────▼────────────┐│
│  │                     Ingestion Layer                              ││
│  │  • object_store (S3/GCS/Azure)                                  ││
│  │  • rdkafka (Kafka consumer)                                     ││
│  │  • sqlx (Database connectors)                                   ││
│  └──────────────────────────────┬───────────────────────────────────┘│
│                                 │                                    │
│  ┌──────────────────────────────▼───────────────────────────────────┐│
│  │                    Processing Layer                              ││
│  │  • Polars (DataFrame operations)                                ││
│  │  • DataFusion (SQL engine)                                      ││
│  │  • Custom operators (Rayon parallel)                            ││
│  └──────────────────────────────┬───────────────────────────────────┘│
│                                 │                                    │
│  ┌──────────────────────────────▼───────────────────────────────────┐│
│  │                     Storage Layer                                ││
│  │  • Parquet (columnar files)                                     ││
│  │  • Delta Lake / Iceberg (table formats)                         ││
│  │  • Lance (ML vector storage)                                    ││
│  └──────────────────────────────┬───────────────────────────────────┘│
│                                 │                                    │
│  ┌──────────────────────────────▼───────────────────────────────────┐│
│  │                    Serving Layer                                 ││
│  │  • Arrow Flight (high-speed data transfer)                      ││
│  │  • Axum (REST APIs)                                             ││
│  │  • ClickHouse connector (OLAP queries)                          ││
│  └─────────────────────────────────────────────────────────────────┘│
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Total Stack Benefits:

  • 10x faster than Python + Pandas
  • 90% less memory than Spark
  • Single binary deployment (no JVM, no Python env)
  • Type-safe transforms (catch errors at compile time)

[End of Section 45.8]