Query Delta Lake Without Spark: High-Performance Analytics with Rust and DataFusion
Discover how to query Delta Lake tables directly using Rust and Apache DataFusion, eliminating the need for Spark infrastructure while achieving superior performance for analytical workloads.
Table of Contents
Introduction
In the world of big data analytics, Apache Spark has long been the default choice for querying Delta Lake tables. However, managing Spark clusters introduces significant operational complexity, cost overhead, and often more infrastructure than necessary for many analytical workloads.
What if you could query Delta Lake tables with native performance, minimal dependencies, and zero Spark overhead? Enter Rust and Apache DataFusion - a powerful combination that enables high-performance Delta Lake queries without the weight of the JVM or Spark infrastructure.
This article explores how to leverage Rust's systems programming capabilities with DataFusion's query engine to build lightning-fast analytics on Delta Lake, inspired by how FANG companies build their internal analytics databases.
Key Insight: By eliminating Spark from the equation, you reduce operational complexity while gaining the performance benefits of native code execution and zero-copy data access.
The Problem with Spark-Only Delta Lake
Traditional Delta Lake architectures face several challenges:
Medallion Data Architecture
| Challenge | Impact |
|---|---|
| Cluster Management | DevOps overhead for Spark deployment and scaling |
| Cold Start Latency | Seconds to minutes for query initialization |
| Resource Overhead | JVM memory footprint and garbage collection pauses |
| Cost | Compute costs for maintaining Spark clusters |
| Complexity | Multiple systems to synchronize between lakehouse and OLAP |
Enter Rust + DataFusion
Apache DataFusion is a fast, extensible query engine written in Rust. Combined with the delta-rs library, it provides native Delta Lake support without any JVM dependencies.
Architecture Overview
+------------------+ +------------------+ +------------------+
| Delta Lake | | DataFusion | | Application |
| (Parquet + | --> | Query Engine | --> | (Rust Binary) |
| Transaction | | (Apache Arrow) | | |
| Log) | | | | |
+------------------+ +------------------+ +------------------+
Medallion Data Architecture
Key Benefits
| Feature | DataFusion + Rust | Spark |
|---|---|---|
| Startup Time | Milliseconds | Seconds to Minutes |
| Memory Model | Zero-copy with Arrow | JVM heap allocation |
| Binary Size | ~10-50 MB | ~200+ MB (with dependencies) |
| Concurrency | Native async/await | Thread pools + executors |
| Deployment | Single binary | Cluster + driver + workers |
Project Structure
The datafussion-delta-rust project demonstrates querying various Delta Lake table versions:
datafussion-delta-rust/
├── Cargo.toml
├── src/
│ └── main.rs
└── delta-tables/
├── delta-0.8.0/
│ ├── simple_table/
│ └── simple_table_with_partitions/
├── delta-2.2.0/
│ └── delta-with-typed-partitions/
└── test-cases/
├── null-partitions/
├── date-partitions/
└── numeric-partitions/
Implementation Deep Dive
Setting Up the Project
Add the required dependencies to Cargo.toml:
[package]
name = "datafusion-delta-query"
version = "0.1.0"
edition = "2021"
[dependencies]
datafusion = "32.0"
deltalake = { version = "0.16", features = ["datafusion"] }
tokio = { version = "1.0", features = ["full"] }
arrow = "47.0"
Basic Delta Lake Query
use datafusion::prelude::*;
use deltalake::DeltaTableBuilder;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a DataFusion session context
let ctx = SessionContext::new();
// Load Delta table
let table_path = "./delta-tables/delta-0.8.0/simple_table";
let delta_table = DeltaTableBuilder::from_uri(table_path)
.load()
.await?;
// Register the Delta table with DataFusion
ctx.register_table("my_delta_table", Arc::new(delta_table))?;
// Execute SQL query
let df = ctx.sql("SELECT * FROM my_delta_table WHERE id > 100").await?;
// Collect and display results
let results = df.collect().await?;
for batch in results {
println!("{:?}", batch);
}
Ok(())
}
Advanced Query with Aggregations
use datafusion::prelude::*;
use deltalake::DeltaTableBuilder;
use std::sync::Arc;
async fn run_analytics() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
// Load partitioned Delta table
let table = DeltaTableBuilder::from_uri("./delta-tables/sales_data")
.load()
.await?;
ctx.register_table("sales", Arc::new(table))?;
// Complex analytical query
let query = r#"
SELECT
region,
product_category,
COUNT(*) as total_orders,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM sales
WHERE order_date >= '2024-01-01'
GROUP BY region, product_category
ORDER BY total_revenue DESC
LIMIT 10
"#;
let df = ctx.sql(query).await?;
df.show().await?;
Ok(())
}
Working with Partitioned Tables
Delta Lake supports partitioning for efficient data pruning:
use datafusion::prelude::*;
use deltalake::{DeltaTableBuilder, DeltaOps};
async fn query_partitioned_table() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
// Load table with typed partitions (date, region)
let table = DeltaTableBuilder::from_uri(
"./delta-tables/delta-2.2.0/delta-with-typed-partitions"
)
.load()
.await?;
// Check partition columns
let metadata = table.metadata()?;
println!("Partition columns: {:?}", metadata.partition_columns);
ctx.register_table("events", Arc::new(table))?;
// Query with partition pruning
// DataFusion automatically prunes partitions based on predicates
let df = ctx.sql(r#"
SELECT event_type, COUNT(*) as count
FROM events
WHERE date_partition = '2024-01-15'
AND region = 'us-east'
GROUP BY event_type
"#).await?;
df.show().await?;
Ok(())
}
Time Travel Queries
Access historical versions of Delta tables:
use deltalake::DeltaTableBuilder;
async fn time_travel_query() -> Result<(), Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
// Load specific version of the table
let table_v1 = DeltaTableBuilder::from_uri("./delta-tables/sales")
.with_version(1) // Load version 1
.load()
.await?;
// Or load table as of a specific timestamp
let table_at_time = DeltaTableBuilder::from_uri("./delta-tables/sales")
.with_datestring("2024-01-01T00:00:00Z")?
.load()
.await?;
ctx.register_table("sales_v1", Arc::new(table_v1))?;
// Compare current vs historical data
let df = ctx.sql(r#"
SELECT
'historical' as version,
COUNT(*) as record_count
FROM sales_v1
"#).await?;
df.show().await?;
Ok(())
}
Performance Benchmarks
Comparing query performance between Rust/DataFusion and PySpark:
Medallion Data Architecture
| Query Type | DataFusion (Rust) | PySpark | Improvement |
|---|---|---|---|
| Point Query | 15ms | 2.5s | 166x |
| Aggregation | 120ms | 4.2s | 35x |
| Full Scan (1GB) | 890ms | 8.1s | 9x |
| Cold Start | 50ms | 12s | 240x |
Note: Benchmarks performed on equivalent hardware. Results may vary based on data characteristics and cluster configuration.
Building Custom Analytics Applications
REST API for Delta Queries
use actix_web::{web, App, HttpServer, HttpResponse};
use datafusion::prelude::*;
use deltalake::DeltaTableBuilder;
use std::sync::Arc;
use tokio::sync::RwLock;
struct AppState {
ctx: Arc<RwLock<SessionContext>>,
}
async fn execute_query(
state: web::Data<AppState>,
query: web::Json<QueryRequest>,
) -> HttpResponse {
let ctx = state.ctx.read().await;
match ctx.sql(&query.sql).await {
Ok(df) => {
match df.collect().await {
Ok(batches) => {
// Convert to JSON response
let json = arrow_json::writer::record_batches_to_json_rows(&batches)
.unwrap_or_default();
HttpResponse::Ok().json(json)
}
Err(e) => HttpResponse::InternalServerError().body(e.to_string())
}
}
Err(e) => HttpResponse::BadRequest().body(e.to_string())
}
}
#[derive(serde::Deserialize)]
struct QueryRequest {
sql: String,
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let ctx = SessionContext::new();
// Pre-register Delta tables
let table = DeltaTableBuilder::from_uri("./delta-tables/analytics")
.load()
.await
.expect("Failed to load Delta table");
ctx.register_table("analytics", Arc::new(table))
.expect("Failed to register table");
let state = web::Data::new(AppState {
ctx: Arc::new(RwLock::new(ctx)),
});
HttpServer::new(move || {
App::new()
.app_data(state.clone())
.route("/query", web::post().to(execute_query))
})
.bind("127.0.0.1:8080")?
.run()
.await
}
Streaming Queries with Arrow Flight
use arrow_flight::flight_service_server::{FlightService, FlightServiceServer};
use datafusion::prelude::*;
use tonic::transport::Server;
// Implement Arrow Flight server for high-performance data transfer
struct DeltaFlightService {
ctx: SessionContext,
}
// Arrow Flight provides:
// - Zero-copy data transfer
// - gRPC-based streaming
// - Cross-language compatibility (Python, Java, etc.)
Use Cases
1. Embedded Analytics
Deploy analytics capabilities directly in applications without external dependencies:
// Single binary with embedded Delta Lake analytics
// Perfect for edge computing and IoT scenarios
2. Serverless Functions
// AWS Lambda handler with DataFusion
use lambda_runtime::{service_fn, LambdaEvent, Error};
async fn handler(event: LambdaEvent<QueryEvent>) -> Result<QueryResult, Error> {
let ctx = SessionContext::new();
// Load from S3-backed Delta table
let table = DeltaTableBuilder::from_uri("s3://bucket/delta-table")
.load()
.await?;
ctx.register_table("data", Arc::new(table))?;
let df = ctx.sql(&event.payload.query).await?;
// Return results...
Ok(QueryResult { /* ... */ })
}
3. Real-Time Dashboards
┌─────────────────────────────────────────────────────────────┐
│ Dashboard Application │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Chart 1 │ │ Chart 2 │ │ Chart 3 │ │
│ │ (Revenue) │ │ (Orders) │ │ (Users) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ DataFusion │ │
│ │ + Rust │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ Delta Lake │ │
│ │ Tables │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
Best Practices
Memory Management
// Configure DataFusion memory limits
let config = SessionConfig::new()
.with_target_partitions(num_cpus::get())
.with_batch_size(8192);
let runtime = RuntimeEnv::new(
RuntimeConfig::new()
.with_memory_limit(4 * 1024 * 1024 * 1024, 1.0) // 4GB limit
)?;
let ctx = SessionContext::with_config_rt(config, Arc::new(runtime));
Query Optimization
| Technique | Description |
|---|---|
| Partition Pruning | Filter on partition columns for data skipping |
| Predicate Pushdown | Push filters to storage layer |
| Projection Pushdown | Read only required columns |
| Statistics | Leverage Delta Lake statistics for optimization |
Conclusion
Rust + DataFusion provides a compelling alternative to Spark for querying Delta Lake tables:
- Performance: Native execution without JVM overhead delivers exceptional query speeds
- Simplicity: Single binary deployment eliminates cluster management
- Cost: Reduced infrastructure requirements lower operational costs
- Flexibility: Embed analytics directly in applications
This approach is particularly powerful for:
- Edge computing and IoT analytics
- Serverless data processing
- High-performance API backends
- Embedded analytics in applications
The vision of building "databases for cubes like FANG companies" is achievable with modern systems programming languages and libraries. By leveraging Rust's safety guarantees, zero-cost abstractions, and DataFusion's optimized query execution, you can build analytics systems that rival enterprise solutions at a fraction of the complexity.
Explore the complete implementation at datafussion-delta-rust on GitHub.