Building Real-Time Streaming Pipelines with Hazelcast Jet and Spring Boot

Learn how to build distributed, fault-tolerant streaming data pipelines using Hazelcast Jet with Spring Boot, covering windowing, stateful processing, and custom processors.

GT
Gonnect Team
January 14, 202415 min readView on GitHub
Hazelcast JetSpring BootJava StreamsDistributed Computing

Introduction

Real-time data processing has become essential for modern applications. From monitoring system health to processing financial transactions, organizations need the ability to analyze data as it arrives - not hours or days later. Hazelcast Jet provides a powerful, lightweight engine for building distributed streaming pipelines that can process millions of events per second.

This article explores how to build production-ready streaming pipelines using Hazelcast Jet with Spring Boot, treating data as continuous streams and leveraging distributed processing for scale and fault tolerance.

Key Insight: Hazelcast Jet treats all data as streams - whether from files, databases, or Kafka topics - enabling unified processing of batch and streaming workloads.

Why Hazelcast Jet?

Event-Driven Architecture

Loading diagram...

The Stream Processing Paradigm

Traditional batch processing treats data as finite collections. Stream processing inverts this model:

AspectBatch ProcessingStream Processing (Jet)
Data ModelFinite datasetsUnbounded streams
ProcessingScheduled jobsContinuous
LatencyMinutes to hoursMilliseconds
StateRecomputed each runIncrementally maintained
ScalabilityVerticalHorizontal (distributed)

Hazelcast Jet Advantages

FeatureBenefit
LightweightMinimal deployment footprint
DistributedAutomatic partitioning across cluster
Fault TolerantSnapshots and re-execution
Low LatencyIn-memory processing
Java NativeFamiliar Streams-like API

Architecture Overview

Microservices Architecture

Loading diagram...
┌─────────────────────────────────────────────────────────────────┐
│                      Data Sources                               │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│   Kafka     │   Files     │  Databases  │   HTTP      │  IMDG   │
└──────┬──────┴──────┬──────┴──────┬──────┴──────┬──────┴────┬────┘
       │             │             │             │           │
       └─────────────┴─────────────┴─────────────┴───────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────┐
│                   Hazelcast Jet Pipeline                        │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────────┐                                            │
│  │   Source Stage   │────────────────────────────────────────►  │
│  └─────────────────┘                                            │
│           │                                                     │
│           ▼                                                     │
│  ┌─────────────────┐     ┌─────────────────┐                   │
│  │  Transform Stage │────►│  Window Stage   │                   │
│  └─────────────────┘     └─────────────────┘                   │
│           │                      │                              │
│           └──────────────────────┘                              │
│                      │                                          │
│                      ▼                                          │
│  ┌─────────────────────────────────────────┐                   │
│  │           Aggregate Stage               │                   │
│  │  (Stateful with distributed state)      │                   │
│  └─────────────────────────────────────────┘                   │
│                      │                                          │
│                      ▼                                          │
│  ┌─────────────────┐                                            │
│  │   Sink Stage     │◄───────────────────────────────────────── │
│  └─────────────────┘                                            │
└─────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────┐
│                      Data Sinks                                 │
├─────────────┬─────────────┬─────────────┬─────────────┬─────────┤
│   Kafka     │   Files     │  Databases  │   HTTP      │  IMap   │
└─────────────┴─────────────┴─────────────┴─────────────┴─────────┘

Pipeline DSL and DAG Execution

Hazelcast Jet converts Pipeline DSL into Directed Acyclic Graph (DAG) for distributed execution:

Pipeline DSL                          Execution DAG
─────────────                         ─────────────

source()                    ┌─────────────────────────┐
   │                        │     Source Processor    │
   ▼                        │    (Partitioned x N)    │
map()                       └───────────┬─────────────┘
   │                                    │
   ▼                                    ▼
filter()                    ┌─────────────────────────┐
   │                        │   Transform Processor   │
   ▼                        │    (Partitioned x N)    │
groupingKey()               └───────────┬─────────────┘
   │                                    │
   ▼                                    ▼
aggregate()                 ┌─────────────────────────┐
   │                        │  Aggregate Processor    │
   ▼                        │  (Keyed, Partitioned)   │
sink()                      └───────────┬─────────────┘
                                        │
                                        ▼
                            ┌─────────────────────────┐
                            │     Sink Processor      │
                            │    (Partitioned x N)    │
                            └─────────────────────────┘

Implementation Deep Dive

Spring Boot Configuration

@SpringBootApplication
public class HazelcastJetApplication {

    public static void main(String[] args) {
        SpringApplication.run(HazelcastJetApplication.class, args);
    }

    @Bean
    public JetInstance jetInstance() {
        JetConfig config = new JetConfig();

        // Configure instance
        config.getHazelcastConfig()
            .setClusterName("jet-cluster");

        // Configure Jet engine
        config.getInstanceConfig()
            .setCooperativeThreadCount(
                Runtime.getRuntime().availableProcessors());

        return Jet.newJetInstance(config);
    }
}

Trade Processing Pipeline

A complete example processing trade events with windowing and aggregation:

@Component
public class TradeProcessingPipeline {

    private final JetInstance jet;
    private static final Logger log = LoggerFactory.getLogger(TradeProcessingPipeline.class);

    public TradeProcessingPipeline(JetInstance jet) {
        this.jet = jet;
    }

    @PostConstruct
    public void startPipeline() {
        Pipeline pipeline = buildPipeline();
        Job job = jet.newJobIfAbsent(pipeline,
            new JobConfig().setName("trade-processor"));
        log.info("Trade processing pipeline started: {}", job.getName());
    }

    private Pipeline buildPipeline() {
        Pipeline pipeline = Pipeline.create();

        // Source: Read trades from Kafka
        StreamSource<Trade> source = KafkaSources.kafka(
            kafkaProperties(),
            record -> new Trade(
                record.key(),
                record.value().getSymbol(),
                record.value().getPrice(),
                record.value().getQuantity(),
                record.value().getTimestamp()
            ),
            "trades"
        );

        // Build processing pipeline
        pipeline.readFrom(source)
            // Add timestamps for event-time processing
            .withTimestamps(Trade::getTimestamp, 5000)

            // Filter valid trades
            .filter(trade -> trade.getPrice() > 0 && trade.getQuantity() > 0)

            // Group by symbol
            .groupingKey(Trade::getSymbol)

            // Window: 1-minute tumbling windows
            .window(WindowDefinition.tumbling(60_000))

            // Aggregate: Calculate OHLC (Open, High, Low, Close)
            .aggregate(AggregateOperations.allOf(
                AggregateOperations.reducing(
                    new OHLCAccumulator(),
                    Trade::getPrice,
                    OHLCAccumulator::accumulate,
                    OHLCAccumulator::combine,
                    OHLCAccumulator::finish
                ),
                AggregateOperations.summingLong(Trade::getQuantity)
            ))

            // Map to result
            .map(result -> new TradeAggregate(
                result.getKey(),                    // symbol
                result.end(),                       // window end
                result.getValue().f0(),             // OHLC
                result.getValue().f1()              // total volume
            ))

            // Sink: Write to Hazelcast IMap
            .writeTo(Sinks.map(
                "trade-aggregates",
                TradeAggregate::getKey,
                TradeAggregate::getValue
            ));

        return pipeline;
    }

    private Properties kafkaProperties() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer",
            "com.gonnect.jet.TradeDeserializer");
        props.setProperty("auto.offset.reset", "latest");
        return props;
    }
}

// Trade domain object
public class Trade implements Serializable {
    private String tradeId;
    private String symbol;
    private double price;
    private long quantity;
    private long timestamp;

    // Constructor, getters...
}

// OHLC Accumulator for aggregation
public class OHLCAccumulator implements Serializable {
    private double open = Double.NaN;
    private double high = Double.MIN_VALUE;
    private double low = Double.MAX_VALUE;
    private double close;
    private long count = 0;

    public OHLCAccumulator accumulate(double price) {
        if (count == 0) {
            open = price;
        }
        high = Math.max(high, price);
        low = Math.min(low, price);
        close = price;
        count++;
        return this;
    }

    public OHLCAccumulator combine(OHLCAccumulator other) {
        if (other.count == 0) return this;
        if (count == 0) return other;

        OHLCAccumulator result = new OHLCAccumulator();
        result.open = this.open;
        result.high = Math.max(this.high, other.high);
        result.low = Math.min(this.low, other.low);
        result.close = other.close;
        result.count = this.count + other.count;
        return result;
    }

    public OHLC finish() {
        return new OHLC(open, high, low, close);
    }
}

Windowing Strategies

Hazelcast Jet supports multiple windowing strategies:

@Component
public class WindowingExamples {

    // Tumbling Window: Fixed, non-overlapping intervals
    public Pipeline tumblingWindowPipeline() {
        Pipeline p = Pipeline.create();

        p.readFrom(Sources.list("events"))
            .withTimestamps(Event::getTimestamp, 1000)
            .groupingKey(Event::getCategory)
            .window(WindowDefinition.tumbling(60_000))  // 1 minute
            .aggregate(AggregateOperations.counting())
            .writeTo(Sinks.logger());

        return p;
    }

    // Sliding Window: Overlapping intervals
    public Pipeline slidingWindowPipeline() {
        Pipeline p = Pipeline.create();

        p.readFrom(Sources.list("events"))
            .withTimestamps(Event::getTimestamp, 1000)
            .groupingKey(Event::getCategory)
            .window(WindowDefinition.sliding(60_000, 10_000))  // 1 min window, 10s slide
            .aggregate(AggregateOperations.averaging(Event::getValue))
            .writeTo(Sinks.logger());

        return p;
    }

    // Session Window: Gap-based, dynamic windows
    public Pipeline sessionWindowPipeline() {
        Pipeline p = Pipeline.create();

        p.readFrom(Sources.list("events"))
            .withTimestamps(Event::getTimestamp, 1000)
            .groupingKey(Event::getUserId)
            .window(WindowDefinition.session(300_000))  // 5 min inactivity gap
            .aggregate(AggregateOperations.toList())
            .map(result -> new UserSession(
                result.getKey(),
                result.start(),
                result.end(),
                result.getValue()
            ))
            .writeTo(Sinks.logger());

        return p;
    }
}

Custom Processor Implementation

For advanced use cases, implement custom processors:

public class DataHealthProcessor extends AbstractProcessor {

    private final long windowSizeMs;
    private long processedCount = 0;
    private long failedCount = 0;
    private long windowStart = -1;

    public DataHealthProcessor(long windowSizeMs) {
        this.windowSizeMs = windowSizeMs;
    }

    @Override
    protected boolean tryProcess(int ordinal, Object item) {
        DataEvent event = (DataEvent) item;
        long now = System.currentTimeMillis();

        // Initialize window
        if (windowStart == -1) {
            windowStart = now;
        }

        // Process event
        if (event.isValid()) {
            processedCount++;
        } else {
            failedCount++;
        }

        // Check if window complete
        if (now - windowStart >= windowSizeMs) {
            emitHealthMetric();
            resetWindow(now);
        }

        // Always mark as processed
        return true;
    }

    @Override
    public boolean complete() {
        // Emit final metrics on completion
        if (processedCount > 0 || failedCount > 0) {
            emitHealthMetric();
        }
        return true;
    }

    private void emitHealthMetric() {
        HealthMetric metric = new HealthMetric(
            windowStart,
            processedCount,
            failedCount,
            calculateHealthScore()
        );
        tryEmit(metric);
    }

    private double calculateHealthScore() {
        long total = processedCount + failedCount;
        return total > 0 ? (double) processedCount / total * 100 : 100.0;
    }

    private void resetWindow(long now) {
        windowStart = now;
        processedCount = 0;
        failedCount = 0;
    }

    // Processor supplier for distributed execution
    public static ProcessorSupplier supplier(long windowSizeMs) {
        return ProcessorSupplier.of(() ->
            new DataHealthProcessor(windowSizeMs));
    }
}

// Using custom processor in pipeline
public Pipeline healthMonitoringPipeline() {
    Pipeline p = Pipeline.create();

    p.readFrom(Sources.kafka(kafkaProps(), "data-events"))
        .withoutTimestamps()
        .customTransform("health-monitor",
            DataHealthProcessor.supplier(60_000))
        .writeTo(Sinks.map("health-metrics"));

    return p;
}

REST API for Pipeline Control

Expose pipeline management through REST:

@RestController
@RequestMapping("/api/pipelines")
public class PipelineController {

    private final JetInstance jet;

    @GetMapping
    public List<JobSummary> listJobs() {
        return jet.getJobs().stream()
            .map(job -> new JobSummary(
                job.getId(),
                job.getName(),
                job.getStatus().name(),
                job.getSubmissionTime()
            ))
            .collect(Collectors.toList());
    }

    @GetMapping("/{jobId}")
    public JobDetails getJob(@PathVariable long jobId) {
        Job job = jet.getJob(jobId);
        if (job == null) {
            throw new JobNotFoundException(jobId);
        }

        return new JobDetails(
            job.getId(),
            job.getName(),
            job.getStatus().name(),
            job.getSubmissionTime(),
            job.getMetrics()
        );
    }

    @PostMapping("/{jobId}/suspend")
    public ResponseEntity<Void> suspendJob(@PathVariable long jobId) {
        Job job = jet.getJob(jobId);
        if (job == null) {
            throw new JobNotFoundException(jobId);
        }
        job.suspend();
        return ResponseEntity.ok().build();
    }

    @PostMapping("/{jobId}/resume")
    public ResponseEntity<Void> resumeJob(@PathVariable long jobId) {
        Job job = jet.getJob(jobId);
        if (job == null) {
            throw new JobNotFoundException(jobId);
        }
        job.resume();
        return ResponseEntity.ok().build();
    }

    @DeleteMapping("/{jobId}")
    public ResponseEntity<Void> cancelJob(@PathVariable long jobId) {
        Job job = jet.getJob(jobId);
        if (job == null) {
            throw new JobNotFoundException(jobId);
        }
        job.cancel();
        return ResponseEntity.noContent().build();
    }
}

Fault Tolerance and State Management

Hazelcast Jet provides built-in fault tolerance through snapshots:

@Bean
public JobConfig faultTolerantJobConfig() {
    return new JobConfig()
        .setName("fault-tolerant-pipeline")
        // Enable exactly-once processing
        .setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)
        // Snapshot every 10 seconds
        .setSnapshotIntervalMillis(10_000)
        // Auto-restart on failure
        .setAutoScaling(true);
}

// State is automatically persisted to Hazelcast IMaps
public Pipeline statefulPipeline() {
    Pipeline p = Pipeline.create();

    p.readFrom(Sources.kafka(props, "events"))
        .withTimestamps(Event::getTimestamp, 5000)
        .groupingKey(Event::getKey)
        // State is automatically managed and recovered
        .mapStateful(
            () -> new RunningStats(),
            (state, event) -> {
                state.update(event.getValue());
                return new StatsSnapshot(event.getKey(), state);
            }
        )
        .writeTo(Sinks.logger());

    return p;
}

Configuration

Application Properties

spring:
  application:
    name: hazelcast-jet-streaming

hazelcast:
  jet:
    cluster-name: jet-cluster
    cooperative-thread-count: 4

server:
  port: 8080

# Swagger UI available at /swagger-ui/index.html
springdoc:
  api-docs:
    path: /api-docs
  swagger-ui:
    path: /swagger-ui.html

Running the Application

# Clone the repository
git clone https://github.com/mgorav/spring-hazelcast-jet.git
cd spring-hazelcast-jet

# Build
mvn clean install

# Run
mvn spring-boot:run

# Access Swagger UI
open http://localhost:8080/swagger-ui/index.html

Performance Optimization

TechniqueDescription
Cooperative ThreadsLightweight execution with work stealing
Partitioned ProcessingAutomatic data distribution
Zero-CopyDirect memory access for efficiency
BackpressureAutomatic flow control
State LocalityProcess data where state resides

Conclusion

Hazelcast Jet provides a powerful foundation for building real-time streaming pipelines with Spring Boot. Key takeaways:

  • Stream-First approach unifies batch and streaming
  • Pipeline DSL provides intuitive API
  • DAG Execution enables distributed processing
  • Windowing supports tumbling, sliding, and session windows
  • Fault Tolerance through automatic snapshots
  • Custom Processors for advanced use cases

The spring-hazelcast-jet project demonstrates these concepts in a production-ready implementation.


Further Reading