Apache Camel AWS EIS: Building Data Pipelines with Enterprise Integration Patterns

A comprehensive guide to implementing Enterprise Integration Patterns (EIP) using Apache Camel with AWS services including S3, Kinesis, and Apache Kafka for building robust data engineering pipelines.

GT
Gonnect Team
January 20, 202412 min readView on GitHub
Apache CamelAWSKinesisS3Kafka

Introduction

What is common between a Navigation System and Data Engineering? Movement from A to B with location transparency. Location transparency means that "A" is unaware of "B". A navigation system handles location transparency by defining "routes" - how to go from "A" to "B" by picking up the correct highway. As a driver, you don't need to know the intricate details of how it works.

Can we apply the same principle to Data Engineering? The answer is a resounding Yes. Location transparency is achieved through the implementation of the Router Pattern - one of the most powerful Enterprise Integration Patterns (EIP) for building scalable, maintainable data pipelines.

Key Insight: The Router pattern transforms monolithic, non-modular applications into elegant, configurable pieces of art through location transparency and route-based design.

Understanding Enterprise Integration Patterns

Enterprise Integration Patterns (EIP) provide a catalog of standard approaches for integrating enterprise applications. Apache Camel implements these patterns as first-class citizens, making it the ideal framework for building integration solutions.

The Router Pattern

The Router pattern is recognized as an excellent way to accomplish Enterprise Application Integration (EAI). A router is a component that connects its consumer to one of multiple output strategies, following the Strategy design pattern. This pattern is particularly powerful for microservices architecture as it enables:

  • Decoupling: Source systems remain unaware of destination systems
  • Flexibility: Routes can be reconfigured without code changes
  • Scalability: Each route can scale independently
  • Maintainability: Clear separation of concerns

Medallion Data Architecture

Loading diagram...

Architecture Overview

The Apache Camel AWS EIS project demonstrates a comprehensive data engineering architecture that implements multiple EIP patterns using Spring Boot and Apache Camel:

Medallion Data Architecture

Loading diagram...

EIP Patterns Demonstrated

The project implements several critical Enterprise Integration Patterns:

PatternDescriptionUse Case
RouterRoutes messages to different destinationsS3, Kinesis, Kafka routing
Content-Based Router (CBR)Routes based on message contentCountry-based Kafka routing
Message TranslatorTransforms message formatJSON to stream data
Message FilterFilters messages based on criteriaConditional processing
SplitterBreaks composite messages into partsFile line processing

Route to AWS S3 (Data Lake)

The S3 route demonstrates uploading generated content to AWS S3 for data lake storage:

@Component
public class S3Route extends RouteBuilder {
    @Autowired
    private RandomDataGenerator randomDataGenerator;

    @Override
    public void configure() throws Exception {
        from("timer:trigger?period=12h")
            .routeId("RandomTextGeneratorRoute")
            .process(exchange -> {
                StringBuilder stringBuilder = new StringBuilder();
                for (int i = 0; i < 1000; i++) {
                    String str = RandomStringUtils
                        .randomAlphabetic(randomDataGenerator.nextInt(1, 10));
                    stringBuilder.append(str).append(" ");
                }
                exchange.getIn().setBody(stringBuilder.toString());
            })
            .log(LoggingLevel.INFO, "Started Uploading to S3 bucket")
            // Set required headers for S3 upload
            .setHeader(S3Constants.CONTENT_LENGTH, simple("${body.length}"))
            .setHeader(S3Constants.KEY, constant("random-text.txt"))
            .setHeader(S3Constants.CONTENT_TYPE, constant("text/plain"))
            .setHeader(S3Constants.CONTENT_ENCODING, constant("UTF-8"))
            .setHeader(S3Constants.CANNED_ACL, constant("PublicRead"))
            .to("aws-s3://test-bucket?amazonS3Client=#amazonS3Client&region=US_EAST_1")
            .log("Completed uploading to S3 bucket");
    }
}

Key Points:

  • Timer-based trigger for scheduled uploads
  • Dynamic content generation using processors
  • S3 headers configuration for proper file handling
  • Injected S3 client for flexible environment configuration

Route to AWS Kinesis (Real-Time Streaming)

The Kinesis route demonstrates real-time data streaming from REST APIs to AWS Kinesis:

@Component
public class KinesisRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        restConfiguration().host("localhost").port(4001);

        from("timer:hi?period={{timer.period}}")
            .setHeader("id", simple("${random(1,3)}"))
            .to("rest:get:cars/{id}")
            .log("[going to Kinesis]" + "${body}")
            .setHeader(KinesisConstants.PARTITION_KEY, simple("1"))
            .setHeader(KinesisConstants.SHARD_ID, simple("1"))
            .to("aws-kinesis://mykinesisstream?amazonKinesisClient=#amazonKinesisClient")
            .to("log:out?showAll=true")
            .log("Completed Writing to Kinesis");
    }
}

Data Flow:

Medallion Data Architecture

Loading diagram...

Content-Based Routing to Kafka

The Content-Based Router (CBR) pattern routes messages to different destinations based on message content - in this case, routing orders to Kafka based on the country field:

@Component
public class CbrKafkaRoute extends RouteBuilder {
    @Value("classpath:kafkainput")
    private Resource resource;

    @Override
    public void configure() throws Exception {
        from("file:" + resource.getURI().getPath())
            .choice()
                .when().jsonpath("$[?(@.country == 'IND')]")
                    .to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
                    .log("[CBR] publish message to kafka")
                    .log("[Routed Payload]: " + "${body}")
                .when().jsonpath("$[?(@.country == 'NL')]")
                    .to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
                    .log("[CBR] publish message to kafka")
                .when().jsonpath("$[?(@.country == 'UK')]")
                    .to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
                    .log("[CBR] publish message to kafka")
                .when().jsonpath("$[?(@.country == 'US')]")
                    .to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
                    .log("[CBR] publish message to kafka")
            .end();
    }
}

CBR Pattern Visualization:

Medallion Data Architecture

Loading diagram...

Kafka Producer and Consumer Routes

The Kafka integration demonstrates bidirectional message flow:

Producer Route:

@Component
public class KafkaProducerRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("timer:helloKafkaProducer?period={{timer.period}}")
            .routeGroup("kafka-route-group")
            .routeId("kafkaStartWithPartitioner")
            .transform().method("springBean", "greet")
            .filter(simple("${body} contains 'foo'"))
                .to("log:foo")
            .end()
            .to("kafka:{{producer.topic}}?partitioner={{camel.component.kafka.configuration.partitioner}}")
            .log("publish message to kafka")
            .log("${body}");
    }
}

Consumer Route:

@Component
public class KafkaConsumerRoute extends RouteBuilder {
    @Autowired
    private KafkaComponent kafkaComponent;

    @Override
    public void configure() throws Exception {
        kafkaComponent.setBrokers("{{camel.component.kafka.brokers}}");

        from("kafka:{{consumer.topic}}?brokers={{camel.component.kafka.brokers}}"
                + "&maxPollRecords={{camel.component.kafka.configuration.max-poll-records}}"
                + "&consumersCount={{camel.component.kafka.configuration.consumers-count}}"
                + "&seekTo={{camel.component.kafka.configuration.seek-to}}"
                + "&groupId={{camel.component.kafka.configuration.group-id}}")
            .routeId("FromKafka")
            .log("consumed message from Kafka")
            .log("${body}");
    }
}

Local Development with AWS LocalStack

One of the most powerful features of this project is the ability to develop and test AWS integrations locally without an AWS account. The same code runs seamlessly on AWS cloud with no modifications.

Setting Up LocalStack

# Step 1: Create Python virtual environment
python3 -m virtualenv localstackenv

# Step 2: Activate virtual environment
source localstackenv/bin/activate

# Step 3: Install AWS LocalStack
pip install localstack

# Step 4: Start LocalStack with Docker
localstack start --docker

# Step 5: Start Kafka with Zookeeper
docker-compose up

Running the Application

# Run the Spring Boot application
mvn spring-boot:run

Sample Output:

2019-04-06 11:50:50.010 INFO [ucer[TestTopic]] route2:
{
  "orderNumber": 1,
  "country": "US",
  "amount": 100,
  "items": [
    {
      "itemId": 123,
      "itemCost": 33,
      "itemQty": 12
    }
  ]
}
2019-04-06 11:50:50.013 INFO [umer[TestTopic]] FromKafka: consumed message from Kafka
2019-04-06 11:50:50.013 INFO [umer[TestTopic]] FromKafka: Hello from Gonnect

Monitoring Routes with Actuator

Apache Camel integrates with Spring Boot Actuator for route monitoring:

List All Routes:

curl -XGET -s http://localhost:4001/actuator/camelroutes
[
  {
    "id": "route1",
    "uptime": "9.780 seconds",
    "uptimeMillis": 9781,
    "status": "Started"
  },
  {
    "id": "RandomTextGeneratorRoute",
    "uptime": "9.780 seconds",
    "status": "Started"
  },
  {
    "id": "FromKafka",
    "uptime": "9.736 seconds",
    "status": "Started"
  },
  {
    "id": "kafkaStartWithPartitioner",
    "group": "kafka-route-group",
    "uptime": "9.735 seconds",
    "status": "Started"
  }
]

Route Details:

curl -XGET -s http://localhost:4001/actuator/camelroutes/route1/detail
{
  "id": "route1",
  "uptime": "2 minutes",
  "status": "Started",
  "details": {
    "exchangesTotal": 82,
    "exchangesInflight": 0,
    "failuresHandled": 0,
    "lastProcessingTime": 10,
    "maxProcessingTime": 408,
    "meanProcessingTime": 21,
    "minProcessingTime": 8,
    "totalProcessingTime": 1769
  }
}

Complete Integration Flow

Medallion Data Architecture

Loading diagram...

Benefits of This Architecture

BenefitDescription
Location TransparencySources and destinations are decoupled through routes
Local DevelopmentFull AWS simulation with LocalStack
Production ReadySame code deploys to AWS without changes
ObservableBuilt-in monitoring via Actuator endpoints
ExtensibleEasy to add new routes and destinations
ConfigurableExternalized configuration for all components

Key Takeaways

  1. Router Pattern Excellence: Apache Camel's implementation of the Router pattern enables clean separation between data sources and destinations, making data engineering as intuitive as navigation.

  2. AWS Integration Made Simple: The Camel AWS components (S3, Kinesis) provide a consistent, declarative approach to AWS service integration.

  3. Content-Based Routing: CBR enables intelligent message routing based on content, essential for multi-tenant and region-specific data processing.

  4. Local-First Development: LocalStack integration enables complete AWS development without cloud costs or account setup.

  5. Spring Boot Integration: Native Spring Boot support brings dependency injection, configuration management, and production-ready features.

Conclusion

The Apache Camel AWS EIS project demonstrates how Enterprise Integration Patterns, when combined with modern cloud services and streaming platforms, create a powerful foundation for data engineering. By treating data movement as routes - similar to a navigation system - we achieve location transparency, scalability, and maintainability.

The combination of Apache Camel, AWS services (S3, Kinesis), and Apache Kafka provides a complete toolkit for building production-grade data pipelines. Whether you're moving data to a data lake, streaming to real-time pipelines, or implementing content-based routing, these patterns and tools provide an elegant, proven approach.

Explore the complete implementation at apache-camel-aws-eis and start building your own data engineering routes today.


Further Reading