Applying Content Based Routing EIP
At the end of this chapter you will be able to:
-
How to integrate Apache Kafka and Camel-K
-
Apply the Content Based Routing (CBR) Enterprise Integration Pattern(EIP)
Apache Camel supports numerous Enterprise Integration Patterns (EIPs) out-of-the-box, you can find the complete list of patterns on the Apache Camel website.
Application Overview
We will deploy a simple data streaming application that will use Camel-K and Knative to process the incoming data where that processed data is pushed out to to a reactive web application via Server-Sent Events (SSE) as shown below:

The application has following components,
-
Data Producer: A Camel-K integration application which will produce data simulating the streaming data by sending the data to Apache Kafka
-
Data Processor: A Camel-K integration application which will process the streaming data from Kafka and send the default Knative Eventing Broker
-
Event Subscriber(Fruits UI): A Quarkus Java application, that will display the processed data from the Data Processor
-
Event Trigger: A Knative Event Trigger that applies a filter on the processed data to send to the Event Subscriber
The upcoming samples will deploy these individual components and after that we will test the integration by wiring them all together.
Just make sure:
-
To review the Knative Eventing module to refresh the concepts
-
Apache Kafka my-cluster is running
Create default Broker
If you have deleted the default broker, then you need to create the default broker to be used by this chapter’s exercises.
Navigate to the eventing chapter folder eventing:
cd $TUTORIAL_HOME/eventing
---
apiVersion: eventing.knative.dev/v1
kind: broker
metadata:
name: default
kn broker create default
kubectl apply -f ${TUTORIAL_HOME}/eventing/default-broker.yaml
Verify the created resources:
kn broker list
NAME URL AGE CONDITIONS READY REASON
default http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default 112s 5 OK / 5 True
kubectl get broker.eventing.knative.dev -n knativetutorial
NAME URL AGE READY REASON
default http://broker-ingress.knative-eventing.svc.cluster.local/knativetutorial/default 2m57s True
Finally navigate to the tutorial chapter’s folder advanced/camel-k:
cd $TUTORIAL_HOME/advanced/camel-k
Deploy Data Producer
This is a Knative Camel-K integration called fruits-producer
which will use a public fruits API to retrieve the information about fruits and stream the data to Apache Kafka. The fruits-producer
service retrieves the data from the fruits API, splits it using the Split EIP and then sends the data to a Kafka topic called fruits
.
- from:
uri: timer:tick
parameters:
period: 5000(1)
steps:
- set-header:
name: CamelHttpMethod
constant: GET
- to: "https://fruityvice.com/api/fruit/all"(2)
- split:
jsonpath: "$.[*]"(3)
- marshal:
json: {}
- log:
message: "${body}"
- to: "kafka:fruits?brokers=my-cluster-kafka-bootstrap.kafka:9092"(4)
1 | Poll every 5 seconds from the REST API http://fruityvice.com. |
2 | Call the external REST API http://fruityvice.com to get the list of fruits to simulate the data streaming |
3 | Split the message |
4 | Send the processed data i.e. the individual fruit record as JSON to Apache Kafka Topic |
Run the following command to deploy the fruit-producer
integration:
kamel -n knativetutorial run \
--wait \
--dependency camel:log \
--dependency camel:jackson \
--dependency camel:jsonpath \
eip/fruits-producer.yaml
The service deployment may take several minutes to become available, to monitor the status:
|
watch kubectl -n knativetutorial get pods
A successful deploy will show the following pods:
NAME READY STATUS AGE
camel-k-operator-5d74595cdf-4v9qz 1/1 Running 4h4m
fruits-producer-nfngm-deployment-759c797c44-d6r52 2/2 Running 70s
kn -n knativetutorial service ls
NAME URL LATESTCREATED LATESTREADY READY REASON
fruits-producer http://fruits-producer.knativetutorial.192.168.64.13.nip.io fruits-producer-gl575 fruits-producer-gl575 True
Verify Fruit Producer
Open a new terminal and run the start Kafka consumer using the script $TUTORIAL_HOME/bin/kafka-consumer.sh
with parameter fruits
:
$TUTORIAL_HOME/bin/kafka-consumer.sh fruits
If the fruit-producer
executed well then you should the Kafka Consumer terminal show something like:
{"genus":"Citrullus","name":"Watermelon","id":25,"family":"Cucurbitaceae","order":"Cucurbitales","nutritions":{"carbohydrates":8,"protein":0.6,"fat":0.2,"calories":30,"sugar":6}}
Since the fruits API returns a static set of fruit data consistently, you can call it as needed to simulate data streaming and it will always be the same data. |
Deploy Data Processor
Let us now deploy a Kamelet called fruits-processor
, that can handle and process the streaming data from the Kafka topic fruits
.
The fruits-processor
Kamelet applies the Content Based Router EIP to process the data:
apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: fruits-processor
annotations:
camel.apache.org/kamelet.support.level: "Preview"
camel.apache.org/catalog.version: "main-SNAPSHOT"
camel.apache.org/provider: "Apache Software Foundation"
camel.apache.org/kamelet.group: "Kafka"
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
title: "Kafka Not Secured Source"
description: |-
Receive data from Kafka topics on an insecure broker.
required:
- topic
- brokers
type: object
properties:
topic:
title: Topic Names
description: Comma separated list of Kafka topic names
type: string
brokers:
title: Brokers
description: Comma separated list of Kafka Broker URLs
type: string
dependencies:
- "mvn:org.apache.camel.kamelets:camel-kamelets-utils:0.10.0"
- "camel:kafka"
- "camel:kamelet"
- "camel:jackson"
- "camel:core"
- "camel:log"
template:
from:
uri: "kafka:fruits?brokers=my-cluster-kafka-bootstrap.kafka:9092"(1)
steps:
- log:
message: "Received Body ${body}"
- unmarshal:
json: {}(2)
- choice: (3)
when:
- simple: "${body[nutritions][sugar]} <= 5"
steps:
- remove-headers: "*"
- marshal:
json: {}
- set-header: (4)
name: ce-type
constant: low-sugar
- set-header:
name: fruit-sugar-level
constant: low
- to: "log:low?showAll=true&multiline=true"
- simple: "${body[nutritions][sugar]} > 5 || ${body[nutritions][sugar]} <= 10"
steps:
- remove-headers: "*"
- marshal:
json: {}
- set-header:
name: ce-type
constant: medium-sugar
- set-header:
name: fruit-sugar-level
constant: medium
- to: "log:medium?showAll=true&multiline=true"
otherwise:
steps:
- remove-headers: "*"
- marshal:
json: {}
- set-header:
name: ce-type
constant: high-sugar
- set-header:
name: fruit-sugar-level
constant: high
- to: "log:high?showAll=true&multiline=true"
1 | The Camel route connects to Apache Kafka broker and the topic fruits . |
2 | Once the data is received it is transformed into a JSON payload. |
3 | The Content Based Router pattern using the Choice EIP. In the data processing you classify the fruits as low (sugar <= 5), medium(sugar between 5 to 10) and high(sugar > 10) based on the sugar level present in their nutritions data. |
4 | Based on the data classification you will be setting the CloudEvents type header to be low-high , medium-sugar and high-sugar . This header is used as one of the filter attributes in the Knative Eventing Trigger. |
Let’s deploy the Kamelet using the following command:
kubectl apply -n knativetutorial -f eip/fruit-processor-kamelet.yaml
The last step is to send the processed data to the Knative Eventing Broker named default
.
We can do that by using a KameletBinding
:
apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: fruits-processor-to-knative
spec:
source:
ref: (1)
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: fruits-processor
properties:
topic: "fruits"
brokers: "my-cluster-kafka-bootstrap.kafka:9092"
sink: (2)
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
1 | Kubernetes reference to the previously created Kamelet. |
2 | Sink the input to the default broker. |
Let’s deploy the KameletBinding using:
kubectl apply -n knativetutorial -f eip/fruits-processor-to-knative.yaml
As the Camel-K controller takes few minutes to deploy the KameletBinding, you can watch the pods of the knativetutorial
namespace for its status:
watch kubectl -n knativetutorial get pods
NAME READY STATUS AGE
camel-k-operator-5d74595cdf-4v9qz 1/1 Running 4h17m
fruit-processor-to-knative-595945f8d7-wc9qz 1/1 Running 4h42m
Wondering why
|
You can check if your KameletBinding
is Ready
by running:
kn source kamelet binding list -n knativetutorial
When the KamelBinding is successful you will see it in READY
state as shown:
NAME READY REASON AGE
fruits-processor-to-knative True 2m22s
Deploy Event Subscriber
Now deploy a Reactive Web application called fruit-events-display
. It is a Quarkus Java application, that will update UI(reactively) as and when it receives the processed data from the Knative Eventing backend.
You can deploy the fruit-events-display
application using the command:
kubectl apply -n knativetutorial \
-f $TUTORIAL_HOME/install/utils/fruit-events-display.yaml
Verify if the fruit-events-display
application is up and running:
watch kubectl -n knativetutorial get pods
Once the fruit-events-display
is running you will see the following pods in the knativetutorial
:
NAME READY STATUS AGE
camel-k-operator-5d74595cdf-4v9qz 1/1 Running 4h21m
fruit-events-display-8d47bc98f-6r7zt 1/1 Running 15s
fruits-processor-h45f7-6fdfd74cf9-nmfkn 1/1 Running 4m12s
The web fruit-events-display
application will refresh its UI as and when it receives the processed data, you need you open the web application in your browser.
minikube -p knativetutorial -n knativetutorial service fruit-events-display
oc expose -n knativetutorial service fruit-events-display
Once you have exposed the service, you can open the OpenShift route in the web browser:
oc get -n knativetutorial route fruit-events-display
The fruit-events-display
UI will be empty as shown below:

Apply Knative Filter
As a last step we need to deploy a Knative Event Trigger called fruits-trigger
. The trigger consumes the events from the Knative Event Broker named default
, when the fruit event is received it will dispatch the events to the subscriber which is the fruit-events-display
service.
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: sugary-fruits
spec:
broker: default (1)
filter: (2)
attributes:
type: low-sugar
subscriber: (3)
ref:
apiVersion: v1
kind: Service
name: fruit-events-display
1 | The Knative Event Broker that this Trigger listens to for Knative events. Events originate from the CamelSource called fruits-processor and are sent to the Knative Eventing Broker named default . |
2 | The filter attribute restricts the events that fruit-events-display will receive. In this example, it is configured to filter the events for the type low-sugar . You could also use the other classifications of fruits such as medium-sugar or high-sugar . |
3 | Set the subscriber as the fruit-events-display Kubernetes service to receive the filtered event data. |
You can deploy the Knative Event Trigger using the following command:
kubectl apply -n knativetutorial -f eip/sugary-fruits.yaml
Let us check the status of the Trigger using the command kubectl -n knativetutorial get triggers
which should return one trigger called sugary-fruits
with ready state as shown below.
kubectl -n knativetutorial get triggers
As the trigger will dispatch its filtered event to fruit-events-display
, the subscriber URI of the Trigger will the fruit-events-display
service.
NAME READY BROKER SUBSCRIBER_URI
sugary-fruits True default http://fruits-events-display.knativetutorial.svc.cluster.local/
Verify end to end
Now that we have all the components for the Application Overview, let’s verify the end to end flow:
To verify the data flow and processing call the fruits-producer
service using the script $TUTORIAL_HOME/bin/call.sh
with parameters fruits-producer
and ''.
$TUTORIAL_HOME/bin/call.sh fruits-producer ''
Assuming everything worked well, you should see the low-sugar
fruits listed in the fruits-event-display
as shown below:

Cleanup
kamel delete -n knativetutorial fruits-producer
kubectl delete -n knativetutorial -f eip/fruits-processor-kamelet.yaml
kubectl delete -n knativetutorial -f eip/fruits-processor-to-knative.yaml
kubectl delete -n knativetutorial -f eip/sugary-fruits.yaml
kubectl delete -n knativetutorial -f $TUTORIAL_HOME/install/utils/fruit-events-display.yaml
kn broker delete -n knativetutorial default
kamel -n knativetutorial reset
kamel -n knativetutorial uninstall