Apache Camel AWS EIS: Building Data Pipelines with Enterprise Integration Patterns
A comprehensive guide to implementing Enterprise Integration Patterns (EIP) using Apache Camel with AWS services including S3, Kinesis, and Apache Kafka for building robust data engineering pipelines.
Table of Contents
Introduction
What is common between a Navigation System and Data Engineering? Movement from A to B with location transparency. Location transparency means that "A" is unaware of "B". A navigation system handles location transparency by defining "routes" - how to go from "A" to "B" by picking up the correct highway. As a driver, you don't need to know the intricate details of how it works.
Can we apply the same principle to Data Engineering? The answer is a resounding Yes. Location transparency is achieved through the implementation of the Router Pattern - one of the most powerful Enterprise Integration Patterns (EIP) for building scalable, maintainable data pipelines.
Key Insight: The Router pattern transforms monolithic, non-modular applications into elegant, configurable pieces of art through location transparency and route-based design.
Understanding Enterprise Integration Patterns
Enterprise Integration Patterns (EIP) provide a catalog of standard approaches for integrating enterprise applications. Apache Camel implements these patterns as first-class citizens, making it the ideal framework for building integration solutions.
The Router Pattern
The Router pattern is recognized as an excellent way to accomplish Enterprise Application Integration (EAI). A router is a component that connects its consumer to one of multiple output strategies, following the Strategy design pattern. This pattern is particularly powerful for microservices architecture as it enables:
- Decoupling: Source systems remain unaware of destination systems
- Flexibility: Routes can be reconfigured without code changes
- Scalability: Each route can scale independently
- Maintainability: Clear separation of concerns
Medallion Data Architecture
Architecture Overview
The Apache Camel AWS EIS project demonstrates a comprehensive data engineering architecture that implements multiple EIP patterns using Spring Boot and Apache Camel:
Medallion Data Architecture
EIP Patterns Demonstrated
The project implements several critical Enterprise Integration Patterns:
| Pattern | Description | Use Case |
|---|---|---|
| Router | Routes messages to different destinations | S3, Kinesis, Kafka routing |
| Content-Based Router (CBR) | Routes based on message content | Country-based Kafka routing |
| Message Translator | Transforms message format | JSON to stream data |
| Message Filter | Filters messages based on criteria | Conditional processing |
| Splitter | Breaks composite messages into parts | File line processing |
Route to AWS S3 (Data Lake)
The S3 route demonstrates uploading generated content to AWS S3 for data lake storage:
@Component
public class S3Route extends RouteBuilder {
@Autowired
private RandomDataGenerator randomDataGenerator;
@Override
public void configure() throws Exception {
from("timer:trigger?period=12h")
.routeId("RandomTextGeneratorRoute")
.process(exchange -> {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 1000; i++) {
String str = RandomStringUtils
.randomAlphabetic(randomDataGenerator.nextInt(1, 10));
stringBuilder.append(str).append(" ");
}
exchange.getIn().setBody(stringBuilder.toString());
})
.log(LoggingLevel.INFO, "Started Uploading to S3 bucket")
// Set required headers for S3 upload
.setHeader(S3Constants.CONTENT_LENGTH, simple("${body.length}"))
.setHeader(S3Constants.KEY, constant("random-text.txt"))
.setHeader(S3Constants.CONTENT_TYPE, constant("text/plain"))
.setHeader(S3Constants.CONTENT_ENCODING, constant("UTF-8"))
.setHeader(S3Constants.CANNED_ACL, constant("PublicRead"))
.to("aws-s3://test-bucket?amazonS3Client=#amazonS3Client®ion=US_EAST_1")
.log("Completed uploading to S3 bucket");
}
}
Key Points:
- Timer-based trigger for scheduled uploads
- Dynamic content generation using processors
- S3 headers configuration for proper file handling
- Injected S3 client for flexible environment configuration
Route to AWS Kinesis (Real-Time Streaming)
The Kinesis route demonstrates real-time data streaming from REST APIs to AWS Kinesis:
@Component
public class KinesisRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
restConfiguration().host("localhost").port(4001);
from("timer:hi?period={{timer.period}}")
.setHeader("id", simple("${random(1,3)}"))
.to("rest:get:cars/{id}")
.log("[going to Kinesis]" + "${body}")
.setHeader(KinesisConstants.PARTITION_KEY, simple("1"))
.setHeader(KinesisConstants.SHARD_ID, simple("1"))
.to("aws-kinesis://mykinesisstream?amazonKinesisClient=#amazonKinesisClient")
.to("log:out?showAll=true")
.log("Completed Writing to Kinesis");
}
}
Data Flow:
Medallion Data Architecture
Content-Based Routing to Kafka
The Content-Based Router (CBR) pattern routes messages to different destinations based on message content - in this case, routing orders to Kafka based on the country field:
@Component
public class CbrKafkaRoute extends RouteBuilder {
@Value("classpath:kafkainput")
private Resource resource;
@Override
public void configure() throws Exception {
from("file:" + resource.getURI().getPath())
.choice()
.when().jsonpath("$[?(@.country == 'IND')]")
.to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
.log("[CBR] publish message to kafka")
.log("[Routed Payload]: " + "${body}")
.when().jsonpath("$[?(@.country == 'NL')]")
.to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
.log("[CBR] publish message to kafka")
.when().jsonpath("$[?(@.country == 'UK')]")
.to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
.log("[CBR] publish message to kafka")
.when().jsonpath("$[?(@.country == 'US')]")
.to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
.log("[CBR] publish message to kafka")
.end();
}
}
CBR Pattern Visualization:
Medallion Data Architecture
Kafka Producer and Consumer Routes
The Kafka integration demonstrates bidirectional message flow:
Producer Route:
@Component
public class KafkaProducerRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:helloKafkaProducer?period={{timer.period}}")
.routeGroup("kafka-route-group")
.routeId("kafkaStartWithPartitioner")
.transform().method("springBean", "greet")
.filter(simple("${body} contains 'foo'"))
.to("log:foo")
.end()
.to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
.log("publish message to kafka")
.log("${body}");
}
}
Consumer Route:
@Component
public class KafkaConsumerRoute extends RouteBuilder {
@Autowired
private KafkaComponent kafkaComponent;
@Override
public void configure() throws Exception {
kafkaComponent.setBrokers("{{camel.component.kafka.brokers}}");
from("kafka:{{consumer.topic}}?brokers={{camel.component.kafka.brokers}}"
+ "&maxPollRecords={{camel.component.kafka.configuration.max-poll-records}}"
+ "&consumersCount={{camel.component.kafka.configuration.consumers-count}}"
+ "&seekTo={{camel.component.kafka.configuration.seek-to}}"
+ "&groupId={{camel.component.kafka.configuration.group-id}}")
.routeId("FromKafka")
.log("consumed message from Kafka")
.log("${body}");
}
}
Local Development with AWS LocalStack
One of the most powerful features of this project is the ability to develop and test AWS integrations locally without an AWS account. The same code runs seamlessly on AWS cloud with no modifications.
Setting Up LocalStack
# Step 1: Create Python virtual environment
python3 -m virtualenv localstackenv
# Step 2: Activate virtual environment
source localstackenv/bin/activate
# Step 3: Install AWS LocalStack
pip install localstack
# Step 4: Start LocalStack with Docker
localstack start --docker
# Step 5: Start Kafka with Zookeeper
docker-compose up
Running the Application
# Run the Spring Boot application
mvn spring-boot:run
Sample Output:
2019-04-06 11:50:50.010 INFO [ucer[TestTopic]] route2:
{
"orderNumber": 1,
"country": "US",
"amount": 100,
"items": [
{
"itemId": 123,
"itemCost": 33,
"itemQty": 12
}
]
}
2019-04-06 11:50:50.013 INFO [umer[TestTopic]] FromKafka: consumed message from Kafka
2019-04-06 11:50:50.013 INFO [umer[TestTopic]] FromKafka: Hello from Gonnect
Monitoring Routes with Actuator
Apache Camel integrates with Spring Boot Actuator for route monitoring:
List All Routes:
curl -XGET -s http://localhost:4001/actuator/camelroutes
[
{
"id": "route1",
"uptime": "9.780 seconds",
"uptimeMillis": 9781,
"status": "Started"
},
{
"id": "RandomTextGeneratorRoute",
"uptime": "9.780 seconds",
"status": "Started"
},
{
"id": "FromKafka",
"uptime": "9.736 seconds",
"status": "Started"
},
{
"id": "kafkaStartWithPartitioner",
"group": "kafka-route-group",
"uptime": "9.735 seconds",
"status": "Started"
}
]
Route Details:
curl -XGET -s http://localhost:4001/actuator/camelroutes/route1/detail
{
"id": "route1",
"uptime": "2 minutes",
"status": "Started",
"details": {
"exchangesTotal": 82,
"exchangesInflight": 0,
"failuresHandled": 0,
"lastProcessingTime": 10,
"maxProcessingTime": 408,
"meanProcessingTime": 21,
"minProcessingTime": 8,
"totalProcessingTime": 1769
}
}
Complete Integration Flow
Medallion Data Architecture
Benefits of This Architecture
| Benefit | Description |
|---|---|
| Location Transparency | Sources and destinations are decoupled through routes |
| Local Development | Full AWS simulation with LocalStack |
| Production Ready | Same code deploys to AWS without changes |
| Observable | Built-in monitoring via Actuator endpoints |
| Extensible | Easy to add new routes and destinations |
| Configurable | Externalized configuration for all components |
Key Takeaways
-
Router Pattern Excellence: Apache Camel's implementation of the Router pattern enables clean separation between data sources and destinations, making data engineering as intuitive as navigation.
-
AWS Integration Made Simple: The Camel AWS components (S3, Kinesis) provide a consistent, declarative approach to AWS service integration.
-
Content-Based Routing: CBR enables intelligent message routing based on content, essential for multi-tenant and region-specific data processing.
-
Local-First Development: LocalStack integration enables complete AWS development without cloud costs or account setup.
-
Spring Boot Integration: Native Spring Boot support brings dependency injection, configuration management, and production-ready features.
Conclusion
The Apache Camel AWS EIS project demonstrates how Enterprise Integration Patterns, when combined with modern cloud services and streaming platforms, create a powerful foundation for data engineering. By treating data movement as routes - similar to a navigation system - we achieve location transparency, scalability, and maintainability.
The combination of Apache Camel, AWS services (S3, Kinesis), and Apache Kafka provides a complete toolkit for building production-grade data pipelines. Whether you're moving data to a data lake, streaming to real-time pipelines, or implementing content-based routing, these patterns and tools provide an elegant, proven approach.
Explore the complete implementation at apache-camel-aws-eis and start building your own data engineering routes today.