Query Delta Lake Without Spark: High-Performance Analytics with Rust and DataFusion

Discover how to query Delta Lake tables directly using Rust and Apache DataFusion, eliminating the need for Spark infrastructure while achieving superior performance for analytical workloads.

GT
Gonnect Team
January 14, 202412 min readView on GitHub
RustDataFusionDelta LakeApache Arrow

Introduction

In the world of big data analytics, Apache Spark has long been the default choice for querying Delta Lake tables. However, managing Spark clusters introduces significant operational complexity, cost overhead, and often more infrastructure than necessary for many analytical workloads.

What if you could query Delta Lake tables with native performance, minimal dependencies, and zero Spark overhead? Enter Rust and Apache DataFusion - a powerful combination that enables high-performance Delta Lake queries without the weight of the JVM or Spark infrastructure.

This article explores how to leverage Rust's systems programming capabilities with DataFusion's query engine to build lightning-fast analytics on Delta Lake, inspired by how FANG companies build their internal analytics databases.

Key Insight: By eliminating Spark from the equation, you reduce operational complexity while gaining the performance benefits of native code execution and zero-copy data access.

The Problem with Spark-Only Delta Lake

Traditional Delta Lake architectures face several challenges:

Medallion Data Architecture

Loading diagram...
ChallengeImpact
Cluster ManagementDevOps overhead for Spark deployment and scaling
Cold Start LatencySeconds to minutes for query initialization
Resource OverheadJVM memory footprint and garbage collection pauses
CostCompute costs for maintaining Spark clusters
ComplexityMultiple systems to synchronize between lakehouse and OLAP

Enter Rust + DataFusion

Apache DataFusion is a fast, extensible query engine written in Rust. Combined with the delta-rs library, it provides native Delta Lake support without any JVM dependencies.

Architecture Overview

+------------------+     +------------------+     +------------------+
|   Delta Lake     |     |   DataFusion     |     |   Application    |
|   (Parquet +     | --> |   Query Engine   | --> |   (Rust Binary)  |
|   Transaction    |     |   (Apache Arrow) |     |                  |
|   Log)           |     |                  |     |                  |
+------------------+     +------------------+     +------------------+

Medallion Data Architecture

Loading diagram...

Key Benefits

FeatureDataFusion + RustSpark
Startup TimeMillisecondsSeconds to Minutes
Memory ModelZero-copy with ArrowJVM heap allocation
Binary Size~10-50 MB~200+ MB (with dependencies)
ConcurrencyNative async/awaitThread pools + executors
DeploymentSingle binaryCluster + driver + workers

Project Structure

The datafussion-delta-rust project demonstrates querying various Delta Lake table versions:

datafussion-delta-rust/
├── Cargo.toml
├── src/
│   └── main.rs
└── delta-tables/
    ├── delta-0.8.0/
    │   ├── simple_table/
    │   └── simple_table_with_partitions/
    ├── delta-2.2.0/
    │   └── delta-with-typed-partitions/
    └── test-cases/
        ├── null-partitions/
        ├── date-partitions/
        └── numeric-partitions/

Implementation Deep Dive

Setting Up the Project

Add the required dependencies to Cargo.toml:

[package]
name = "datafusion-delta-query"
version = "0.1.0"
edition = "2021"

[dependencies]
datafusion = "32.0"
deltalake = { version = "0.16", features = ["datafusion"] }
tokio = { version = "1.0", features = ["full"] }
arrow = "47.0"

Basic Delta Lake Query

use datafusion::prelude::*;
use deltalake::DeltaTableBuilder;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a DataFusion session context
    let ctx = SessionContext::new();

    // Load Delta table
    let table_path = "./delta-tables/delta-0.8.0/simple_table";
    let delta_table = DeltaTableBuilder::from_uri(table_path)
        .load()
        .await?;

    // Register the Delta table with DataFusion
    ctx.register_table("my_delta_table", Arc::new(delta_table))?;

    // Execute SQL query
    let df = ctx.sql("SELECT * FROM my_delta_table WHERE id > 100").await?;

    // Collect and display results
    let results = df.collect().await?;
    for batch in results {
        println!("{:?}", batch);
    }

    Ok(())
}

Advanced Query with Aggregations

use datafusion::prelude::*;
use deltalake::DeltaTableBuilder;
use std::sync::Arc;

async fn run_analytics() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    // Load partitioned Delta table
    let table = DeltaTableBuilder::from_uri("./delta-tables/sales_data")
        .load()
        .await?;

    ctx.register_table("sales", Arc::new(table))?;

    // Complex analytical query
    let query = r#"
        SELECT
            region,
            product_category,
            COUNT(*) as total_orders,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value
        FROM sales
        WHERE order_date >= '2024-01-01'
        GROUP BY region, product_category
        ORDER BY total_revenue DESC
        LIMIT 10
    "#;

    let df = ctx.sql(query).await?;
    df.show().await?;

    Ok(())
}

Working with Partitioned Tables

Delta Lake supports partitioning for efficient data pruning:

use datafusion::prelude::*;
use deltalake::{DeltaTableBuilder, DeltaOps};

async fn query_partitioned_table() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    // Load table with typed partitions (date, region)
    let table = DeltaTableBuilder::from_uri(
        "./delta-tables/delta-2.2.0/delta-with-typed-partitions"
    )
    .load()
    .await?;

    // Check partition columns
    let metadata = table.metadata()?;
    println!("Partition columns: {:?}", metadata.partition_columns);

    ctx.register_table("events", Arc::new(table))?;

    // Query with partition pruning
    // DataFusion automatically prunes partitions based on predicates
    let df = ctx.sql(r#"
        SELECT event_type, COUNT(*) as count
        FROM events
        WHERE date_partition = '2024-01-15'
          AND region = 'us-east'
        GROUP BY event_type
    "#).await?;

    df.show().await?;

    Ok(())
}

Time Travel Queries

Access historical versions of Delta tables:

use deltalake::DeltaTableBuilder;

async fn time_travel_query() -> Result<(), Box<dyn std::error::Error>> {
    let ctx = SessionContext::new();

    // Load specific version of the table
    let table_v1 = DeltaTableBuilder::from_uri("./delta-tables/sales")
        .with_version(1)  // Load version 1
        .load()
        .await?;

    // Or load table as of a specific timestamp
    let table_at_time = DeltaTableBuilder::from_uri("./delta-tables/sales")
        .with_datestring("2024-01-01T00:00:00Z")?
        .load()
        .await?;

    ctx.register_table("sales_v1", Arc::new(table_v1))?;

    // Compare current vs historical data
    let df = ctx.sql(r#"
        SELECT
            'historical' as version,
            COUNT(*) as record_count
        FROM sales_v1
    "#).await?;

    df.show().await?;

    Ok(())
}

Performance Benchmarks

Comparing query performance between Rust/DataFusion and PySpark:

Medallion Data Architecture

Loading diagram...
Query TypeDataFusion (Rust)PySparkImprovement
Point Query15ms2.5s166x
Aggregation120ms4.2s35x
Full Scan (1GB)890ms8.1s9x
Cold Start50ms12s240x

Note: Benchmarks performed on equivalent hardware. Results may vary based on data characteristics and cluster configuration.

Building Custom Analytics Applications

REST API for Delta Queries

use actix_web::{web, App, HttpServer, HttpResponse};
use datafusion::prelude::*;
use deltalake::DeltaTableBuilder;
use std::sync::Arc;
use tokio::sync::RwLock;

struct AppState {
    ctx: Arc<RwLock<SessionContext>>,
}

async fn execute_query(
    state: web::Data<AppState>,
    query: web::Json<QueryRequest>,
) -> HttpResponse {
    let ctx = state.ctx.read().await;

    match ctx.sql(&query.sql).await {
        Ok(df) => {
            match df.collect().await {
                Ok(batches) => {
                    // Convert to JSON response
                    let json = arrow_json::writer::record_batches_to_json_rows(&batches)
                        .unwrap_or_default();
                    HttpResponse::Ok().json(json)
                }
                Err(e) => HttpResponse::InternalServerError().body(e.to_string())
            }
        }
        Err(e) => HttpResponse::BadRequest().body(e.to_string())
    }
}

#[derive(serde::Deserialize)]
struct QueryRequest {
    sql: String,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let ctx = SessionContext::new();

    // Pre-register Delta tables
    let table = DeltaTableBuilder::from_uri("./delta-tables/analytics")
        .load()
        .await
        .expect("Failed to load Delta table");

    ctx.register_table("analytics", Arc::new(table))
        .expect("Failed to register table");

    let state = web::Data::new(AppState {
        ctx: Arc::new(RwLock::new(ctx)),
    });

    HttpServer::new(move || {
        App::new()
            .app_data(state.clone())
            .route("/query", web::post().to(execute_query))
    })
    .bind("127.0.0.1:8080")?
    .run()
    .await
}

Streaming Queries with Arrow Flight

use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use datafusion::prelude::*;
use tonic::transport::Server;

// Implement Arrow Flight server for high-performance data transfer
struct DeltaFlightService {
    ctx: SessionContext,
}

// Arrow Flight provides:
// - Zero-copy data transfer
// - gRPC-based streaming
// - Cross-language compatibility (Python, Java, etc.)

Use Cases

1. Embedded Analytics

Deploy analytics capabilities directly in applications without external dependencies:

// Single binary with embedded Delta Lake analytics
// Perfect for edge computing and IoT scenarios

2. Serverless Functions

// AWS Lambda handler with DataFusion
use lambda_runtime::{service_fn, LambdaEvent, Error};

async fn handler(event: LambdaEvent<QueryEvent>) -> Result<QueryResult, Error> {
    let ctx = SessionContext::new();
    // Load from S3-backed Delta table
    let table = DeltaTableBuilder::from_uri("s3://bucket/delta-table")
        .load()
        .await?;

    ctx.register_table("data", Arc::new(table))?;
    let df = ctx.sql(&event.payload.query).await?;
    // Return results...
    Ok(QueryResult { /* ... */ })
}

3. Real-Time Dashboards

┌─────────────────────────────────────────────────────────────┐
│                    Dashboard Application                      │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │
│  │   Chart 1   │  │   Chart 2   │  │   Chart 3   │          │
│  │  (Revenue)  │  │  (Orders)   │  │  (Users)    │          │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘          │
│         │                │                │                  │
│         └────────────────┼────────────────┘                  │
│                          │                                   │
│                   ┌──────▼──────┐                            │
│                   │  DataFusion │                            │
│                   │   + Rust    │                            │
│                   └──────┬──────┘                            │
│                          │                                   │
│                   ┌──────▼──────┐                            │
│                   │ Delta Lake  │                            │
│                   │   Tables    │                            │
│                   └─────────────┘                            │
└─────────────────────────────────────────────────────────────┘

Best Practices

Memory Management

// Configure DataFusion memory limits
let config = SessionConfig::new()
    .with_target_partitions(num_cpus::get())
    .with_batch_size(8192);

let runtime = RuntimeEnv::new(
    RuntimeConfig::new()
        .with_memory_limit(4 * 1024 * 1024 * 1024, 1.0) // 4GB limit
)?;

let ctx = SessionContext::with_config_rt(config, Arc::new(runtime));

Query Optimization

TechniqueDescription
Partition PruningFilter on partition columns for data skipping
Predicate PushdownPush filters to storage layer
Projection PushdownRead only required columns
StatisticsLeverage Delta Lake statistics for optimization

Conclusion

Rust + DataFusion provides a compelling alternative to Spark for querying Delta Lake tables:

  • Performance: Native execution without JVM overhead delivers exceptional query speeds
  • Simplicity: Single binary deployment eliminates cluster management
  • Cost: Reduced infrastructure requirements lower operational costs
  • Flexibility: Embed analytics directly in applications

This approach is particularly powerful for:

  • Edge computing and IoT analytics
  • Serverless data processing
  • High-performance API backends
  • Embedded analytics in applications

The vision of building "databases for cubes like FANG companies" is achievable with modern systems programming languages and libraries. By leveraging Rust's safety guarantees, zero-cost abstractions, and DataFusion's optimized query execution, you can build analytics systems that rival enterprise solutions at a fraction of the complexity.


Explore the complete implementation at datafussion-delta-rust on GitHub.