Building Real-Time Streaming Pipelines with Hazelcast Jet and Spring Boot
Learn how to build distributed, fault-tolerant streaming data pipelines using Hazelcast Jet with Spring Boot, covering windowing, stateful processing, and custom processors.
Table of Contents
Introduction
Real-time data processing has become essential for modern applications. From monitoring system health to processing financial transactions, organizations need the ability to analyze data as it arrives - not hours or days later. Hazelcast Jet provides a powerful, lightweight engine for building distributed streaming pipelines that can process millions of events per second.
This article explores how to build production-ready streaming pipelines using Hazelcast Jet with Spring Boot, treating data as continuous streams and leveraging distributed processing for scale and fault tolerance.
Key Insight: Hazelcast Jet treats all data as streams - whether from files, databases, or Kafka topics - enabling unified processing of batch and streaming workloads.
Why Hazelcast Jet?
Event-Driven Architecture
The Stream Processing Paradigm
Traditional batch processing treats data as finite collections. Stream processing inverts this model:
| Aspect | Batch Processing | Stream Processing (Jet) |
|---|---|---|
| Data Model | Finite datasets | Unbounded streams |
| Processing | Scheduled jobs | Continuous |
| Latency | Minutes to hours | Milliseconds |
| State | Recomputed each run | Incrementally maintained |
| Scalability | Vertical | Horizontal (distributed) |
Hazelcast Jet Advantages
| Feature | Benefit |
|---|---|
| Lightweight | Minimal deployment footprint |
| Distributed | Automatic partitioning across cluster |
| Fault Tolerant | Snapshots and re-execution |
| Low Latency | In-memory processing |
| Java Native | Familiar Streams-like API |
Architecture Overview
Microservices Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Data Sources │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│ Kafka │ Files │ Databases │ HTTP │ IMDG │
└──────┬──────┴──────┬──────┴──────┬──────┴──────┬──────┴────┬────┘
│ │ │ │ │
└─────────────┴─────────────┴─────────────┴───────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Hazelcast Jet Pipeline │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ │
│ │ Source Stage │────────────────────────────────────────► │
│ └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Transform Stage │────►│ Window Stage │ │
│ └─────────────────┘ └─────────────────┘ │
│ │ │ │
│ └──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────┐ │
│ │ Aggregate Stage │ │
│ │ (Stateful with distributed state) │ │
│ └─────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Sink Stage │◄───────────────────────────────────────── │
│ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Data Sinks │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│ Kafka │ Files │ Databases │ HTTP │ IMap │
└─────────────┴─────────────┴─────────────┴─────────────┴─────────┘
Pipeline DSL and DAG Execution
Hazelcast Jet converts Pipeline DSL into Directed Acyclic Graph (DAG) for distributed execution:
Pipeline DSL Execution DAG
───────────── ─────────────
source() ┌─────────────────────────┐
│ │ Source Processor │
▼ │ (Partitioned x N) │
map() └───────────┬─────────────┘
│ │
▼ ▼
filter() ┌─────────────────────────┐
│ │ Transform Processor │
▼ │ (Partitioned x N) │
groupingKey() └───────────┬─────────────┘
│ │
▼ ▼
aggregate() ┌─────────────────────────┐
│ │ Aggregate Processor │
▼ │ (Keyed, Partitioned) │
sink() └───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Sink Processor │
│ (Partitioned x N) │
└─────────────────────────┘
Implementation Deep Dive
Spring Boot Configuration
@SpringBootApplication
public class HazelcastJetApplication {
public static void main(String[] args) {
SpringApplication.run(HazelcastJetApplication.class, args);
}
@Bean
public JetInstance jetInstance() {
JetConfig config = new JetConfig();
// Configure instance
config.getHazelcastConfig()
.setClusterName("jet-cluster");
// Configure Jet engine
config.getInstanceConfig()
.setCooperativeThreadCount(
Runtime.getRuntime().availableProcessors());
return Jet.newJetInstance(config);
}
}
Trade Processing Pipeline
A complete example processing trade events with windowing and aggregation:
@Component
public class TradeProcessingPipeline {
private final JetInstance jet;
private static final Logger log = LoggerFactory.getLogger(TradeProcessingPipeline.class);
public TradeProcessingPipeline(JetInstance jet) {
this.jet = jet;
}
@PostConstruct
public void startPipeline() {
Pipeline pipeline = buildPipeline();
Job job = jet.newJobIfAbsent(pipeline,
new JobConfig().setName("trade-processor"));
log.info("Trade processing pipeline started: {}", job.getName());
}
private Pipeline buildPipeline() {
Pipeline pipeline = Pipeline.create();
// Source: Read trades from Kafka
StreamSource<Trade> source = KafkaSources.kafka(
kafkaProperties(),
record -> new Trade(
record.key(),
record.value().getSymbol(),
record.value().getPrice(),
record.value().getQuantity(),
record.value().getTimestamp()
),
"trades"
);
// Build processing pipeline
pipeline.readFrom(source)
// Add timestamps for event-time processing
.withTimestamps(Trade::getTimestamp, 5000)
// Filter valid trades
.filter(trade -> trade.getPrice() > 0 && trade.getQuantity() > 0)
// Group by symbol
.groupingKey(Trade::getSymbol)
// Window: 1-minute tumbling windows
.window(WindowDefinition.tumbling(60_000))
// Aggregate: Calculate OHLC (Open, High, Low, Close)
.aggregate(AggregateOperations.allOf(
AggregateOperations.reducing(
new OHLCAccumulator(),
Trade::getPrice,
OHLCAccumulator::accumulate,
OHLCAccumulator::combine,
OHLCAccumulator::finish
),
AggregateOperations.summingLong(Trade::getQuantity)
))
// Map to result
.map(result -> new TradeAggregate(
result.getKey(), // symbol
result.end(), // window end
result.getValue().f0(), // OHLC
result.getValue().f1() // total volume
))
// Sink: Write to Hazelcast IMap
.writeTo(Sinks.map(
"trade-aggregates",
TradeAggregate::getKey,
TradeAggregate::getValue
));
return pipeline;
}
private Properties kafkaProperties() {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"com.gonnect.jet.TradeDeserializer");
props.setProperty("auto.offset.reset", "latest");
return props;
}
}
// Trade domain object
public class Trade implements Serializable {
private String tradeId;
private String symbol;
private double price;
private long quantity;
private long timestamp;
// Constructor, getters...
}
// OHLC Accumulator for aggregation
public class OHLCAccumulator implements Serializable {
private double open = Double.NaN;
private double high = Double.MIN_VALUE;
private double low = Double.MAX_VALUE;
private double close;
private long count = 0;
public OHLCAccumulator accumulate(double price) {
if (count == 0) {
open = price;
}
high = Math.max(high, price);
low = Math.min(low, price);
close = price;
count++;
return this;
}
public OHLCAccumulator combine(OHLCAccumulator other) {
if (other.count == 0) return this;
if (count == 0) return other;
OHLCAccumulator result = new OHLCAccumulator();
result.open = this.open;
result.high = Math.max(this.high, other.high);
result.low = Math.min(this.low, other.low);
result.close = other.close;
result.count = this.count + other.count;
return result;
}
public OHLC finish() {
return new OHLC(open, high, low, close);
}
}
Windowing Strategies
Hazelcast Jet supports multiple windowing strategies:
@Component
public class WindowingExamples {
// Tumbling Window: Fixed, non-overlapping intervals
public Pipeline tumblingWindowPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("events"))
.withTimestamps(Event::getTimestamp, 1000)
.groupingKey(Event::getCategory)
.window(WindowDefinition.tumbling(60_000)) // 1 minute
.aggregate(AggregateOperations.counting())
.writeTo(Sinks.logger());
return p;
}
// Sliding Window: Overlapping intervals
public Pipeline slidingWindowPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("events"))
.withTimestamps(Event::getTimestamp, 1000)
.groupingKey(Event::getCategory)
.window(WindowDefinition.sliding(60_000, 10_000)) // 1 min window, 10s slide
.aggregate(AggregateOperations.averaging(Event::getValue))
.writeTo(Sinks.logger());
return p;
}
// Session Window: Gap-based, dynamic windows
public Pipeline sessionWindowPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.list("events"))
.withTimestamps(Event::getTimestamp, 1000)
.groupingKey(Event::getUserId)
.window(WindowDefinition.session(300_000)) // 5 min inactivity gap
.aggregate(AggregateOperations.toList())
.map(result -> new UserSession(
result.getKey(),
result.start(),
result.end(),
result.getValue()
))
.writeTo(Sinks.logger());
return p;
}
}
Custom Processor Implementation
For advanced use cases, implement custom processors:
public class DataHealthProcessor extends AbstractProcessor {
private final long windowSizeMs;
private long processedCount = 0;
private long failedCount = 0;
private long windowStart = -1;
public DataHealthProcessor(long windowSizeMs) {
this.windowSizeMs = windowSizeMs;
}
@Override
protected boolean tryProcess(int ordinal, Object item) {
DataEvent event = (DataEvent) item;
long now = System.currentTimeMillis();
// Initialize window
if (windowStart == -1) {
windowStart = now;
}
// Process event
if (event.isValid()) {
processedCount++;
} else {
failedCount++;
}
// Check if window complete
if (now - windowStart >= windowSizeMs) {
emitHealthMetric();
resetWindow(now);
}
// Always mark as processed
return true;
}
@Override
public boolean complete() {
// Emit final metrics on completion
if (processedCount > 0 || failedCount > 0) {
emitHealthMetric();
}
return true;
}
private void emitHealthMetric() {
HealthMetric metric = new HealthMetric(
windowStart,
processedCount,
failedCount,
calculateHealthScore()
);
tryEmit(metric);
}
private double calculateHealthScore() {
long total = processedCount + failedCount;
return total > 0 ? (double) processedCount / total * 100 : 100.0;
}
private void resetWindow(long now) {
windowStart = now;
processedCount = 0;
failedCount = 0;
}
// Processor supplier for distributed execution
public static ProcessorSupplier supplier(long windowSizeMs) {
return ProcessorSupplier.of(() ->
new DataHealthProcessor(windowSizeMs));
}
}
// Using custom processor in pipeline
public Pipeline healthMonitoringPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.kafka(kafkaProps(), "data-events"))
.withoutTimestamps()
.customTransform("health-monitor",
DataHealthProcessor.supplier(60_000))
.writeTo(Sinks.map("health-metrics"));
return p;
}
REST API for Pipeline Control
Expose pipeline management through REST:
@RestController
@RequestMapping("/api/pipelines")
public class PipelineController {
private final JetInstance jet;
@GetMapping
public List<JobSummary> listJobs() {
return jet.getJobs().stream()
.map(job -> new JobSummary(
job.getId(),
job.getName(),
job.getStatus().name(),
job.getSubmissionTime()
))
.collect(Collectors.toList());
}
@GetMapping("/{jobId}")
public JobDetails getJob(@PathVariable long jobId) {
Job job = jet.getJob(jobId);
if (job == null) {
throw new JobNotFoundException(jobId);
}
return new JobDetails(
job.getId(),
job.getName(),
job.getStatus().name(),
job.getSubmissionTime(),
job.getMetrics()
);
}
@PostMapping("/{jobId}/suspend")
public ResponseEntity<Void> suspendJob(@PathVariable long jobId) {
Job job = jet.getJob(jobId);
if (job == null) {
throw new JobNotFoundException(jobId);
}
job.suspend();
return ResponseEntity.ok().build();
}
@PostMapping("/{jobId}/resume")
public ResponseEntity<Void> resumeJob(@PathVariable long jobId) {
Job job = jet.getJob(jobId);
if (job == null) {
throw new JobNotFoundException(jobId);
}
job.resume();
return ResponseEntity.ok().build();
}
@DeleteMapping("/{jobId}")
public ResponseEntity<Void> cancelJob(@PathVariable long jobId) {
Job job = jet.getJob(jobId);
if (job == null) {
throw new JobNotFoundException(jobId);
}
job.cancel();
return ResponseEntity.noContent().build();
}
}
Fault Tolerance and State Management
Hazelcast Jet provides built-in fault tolerance through snapshots:
@Bean
public JobConfig faultTolerantJobConfig() {
return new JobConfig()
.setName("fault-tolerant-pipeline")
// Enable exactly-once processing
.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)
// Snapshot every 10 seconds
.setSnapshotIntervalMillis(10_000)
// Auto-restart on failure
.setAutoScaling(true);
}
// State is automatically persisted to Hazelcast IMaps
public Pipeline statefulPipeline() {
Pipeline p = Pipeline.create();
p.readFrom(Sources.kafka(props, "events"))
.withTimestamps(Event::getTimestamp, 5000)
.groupingKey(Event::getKey)
// State is automatically managed and recovered
.mapStateful(
() -> new RunningStats(),
(state, event) -> {
state.update(event.getValue());
return new StatsSnapshot(event.getKey(), state);
}
)
.writeTo(Sinks.logger());
return p;
}
Configuration
Application Properties
spring:
application:
name: hazelcast-jet-streaming
hazelcast:
jet:
cluster-name: jet-cluster
cooperative-thread-count: 4
server:
port: 8080
# Swagger UI available at /swagger-ui/index.html
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
Running the Application
# Clone the repository
git clone https://github.com/mgorav/spring-hazelcast-jet.git
cd spring-hazelcast-jet
# Build
mvn clean install
# Run
mvn spring-boot:run
# Access Swagger UI
open http://localhost:8080/swagger-ui/index.html
Performance Optimization
| Technique | Description |
|---|---|
| Cooperative Threads | Lightweight execution with work stealing |
| Partitioned Processing | Automatic data distribution |
| Zero-Copy | Direct memory access for efficiency |
| Backpressure | Automatic flow control |
| State Locality | Process data where state resides |
Conclusion
Hazelcast Jet provides a powerful foundation for building real-time streaming pipelines with Spring Boot. Key takeaways:
- Stream-First approach unifies batch and streaming
- Pipeline DSL provides intuitive API
- DAG Execution enables distributed processing
- Windowing supports tumbling, sliding, and session windows
- Fault Tolerance through automatic snapshots
- Custom Processors for advanced use cases
The spring-hazelcast-jet project demonstrates these concepts in a production-ready implementation.