215 lines
12 KiB
Plaintext
215 lines
12 KiB
Plaintext
# Kafka Streams application samples
|
||
|
||
|
||
## Kafka Streams word count application as a processor
|
||
|
||
|
||
When you have a streaming data pipeline that uses Kafka Streams, it can be used as a Processor application in the Spring Cloud Data Flow streaming pipeline. In the following example, you will see how Kafka Streams application can be registered as a Spring Cloud Data Flow `processor` application and subsequently in streaming data pipeline.
|
||
|
||
To demonstrate this use case, we have a source application `http-ingest` which is an extension (with some additional functionalities needed for our examples) of the existing out-of-the-box `http` source application.
|
||
|
||
```
|
||
cd http-ingest
|
||
./mvnw clean package
|
||
```
|
||
|
||
At this step, the artifact `http-ingest-1.0.0.BUILD-SNAPSHOT.jar` is available.
|
||
|
||
You can register this artifact as the Spring Cloud Data Flow `Source` application as follows,
|
||
From the Spring Cloud Data Flow shell,
|
||
|
||
```
|
||
dataflow:>app register --name http-ingest --type source --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-1.0.0.BUILD-SNAPSHOT.jar
|
||
Successfully registered application 'source:http-ingest'
|
||
```
|
||
|
||
The application `kstreams-word-count` is a Kafka Streams application written using Spring Cloud Stream framework.
|
||
|
||
```
|
||
cd kstreams-word-count
|
||
./mvnw clean package
|
||
```
|
||
|
||
After this step, the artifact `kstreams-word-count-1.0.0.BUILD-SNAPSHOT.jar` is available.
|
||
|
||
You can register this artifact as the Spring Cloud Data Flow `Processor` application as follows:
|
||
|
||
From the Spring Cloud Data Flow shell,
|
||
|
||
```
|
||
dataflow:>app register --name kstream-word-count --type processor --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-word-count/target/kstreams-word-count-1.0.0.BUILD-SNAPSHOT.jar
|
||
Successfully registered application 'processor:kstream-word-count'
|
||
```
|
||
|
||
Let’s register the out-of-the-box `log` application to display the results from the `kstream-word-count` processor (if this was not registered already from the previous app import).
|
||
|
||
```
|
||
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-kafka:2.1.1.RELEASE
|
||
Successfully registered application 'sink:log'
|
||
|
||
```
|
||
|
||
Create the following stream that listens at HTTP web endpoint `http://localhost:9100` and sends the data to the `kstream-word-count` processor registered in the above step. The KStream processor’s output is then pipelined to the `log` sink application.
|
||
|
||
```
|
||
dataflow:>stream create kstream-wc-sample --definition "http-ingest --server.port=9100 | kstream-word-count | log"
|
||
Created new stream 'kstream-wc-sample'
|
||
|
||
dataflow:>stream deploy kstream-wc-sample --properties "deployer.log.local.inheritLogging=true"
|
||
Deployment request has been sent for stream 'kstream-wc-sample'
|
||
|
||
dataflow:>http post --target "http://localhost:9100" --data "Baby shark, doo doo doo doo doo doo"
|
||
> POST (text/plain) http://localhost:9100 Baby shark, doo doo doo doo doo doo
|
||
> 202 ACCEPTED
|
||
|
||
```
|
||
|
||
You can see the log sink application now has:
|
||
|
||
2019-03-18 15:40:46.195 INFO 34974 --- [container-0-C-1] log-sink : {"word":"baby","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"}
|
||
2019-03-18 15:40:46.198 INFO 34974 --- [container-0-C-1] log-sink : {"word":"shark","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"}
|
||
2019-03-18 15:40:46.199 INFO 34974 --- [container-0-C-1] log-sink : {"word":"doo","count":6,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"}
|
||
|
||
## Kafka Streams application that has multiple inputs and single output
|
||
|
||
|
||
Let's use the `http-ingest` application to create a couple of streaming pipelines that produce the input data (user regions and user clicks).
|
||
|
||
To build `http-ingest` application:
|
||
|
||
```
|
||
cd http-ingest
|
||
./mvnw clean package
|
||
```
|
||
|
||
You can register this artifact as the Spring Cloud Data Flow `Source` application as follows,
|
||
From the Spring Cloud Data Flow shell,
|
||
|
||
```
|
||
dataflow:>app register --name http-ingest --type source --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-1.0.0.BUILD-SNAPSHOT.jar
|
||
Successfully registered application 'source:http-ingest'
|
||
```
|
||
|
||
Create a stream that ingests user regions:
|
||
|
||
```
|
||
stream create ingest-user-regions --definition "http-ingest --server.port=9000 --http.mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] > :userRegions" --deploy
|
||
|
||
```
|
||
|
||
The above stream uses the named destination to send the user regions HTTP events into the Kafka topic named `userRegions`. We use the http header `username` to extract and set it as the `key` for the Kafka message and the user region String is extracted from the http payload String.
|
||
|
||
This sample application also showcases how we can make use function composition support in the streaming application.
|
||
|
||
The `http-ingest` has a function bean definition that looks like:
|
||
|
||
```
|
||
@Bean
|
||
public Function<String, Long> sendAsUserClicks() {
|
||
return value -> Long.parseLong(value);
|
||
}
|
||
|
||
```
|
||
|
||
When this function bean is enabled, the incoming `String` value is converted to `Long` value.
|
||
|
||
When sending user clicks count as `Long` we can enable this function for the `http-ingest` while the same application can be used for sending the user region `String` value.
|
||
|
||
Creating a stream that ingests user clicks:
|
||
|
||
```
|
||
stream create ingest-user-clicks --definition "http-ingest --server.port=9001 --mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] --spring.cloud.stream.kafka.binder.configuration.value.serializer=org.apache.kafka.common.serialization.LongSerializer --spring.cloud.stream.function.definition=sendAsUserClicks > :userClicks" --deploy
|
||
|
||
```
|
||
|
||
The above stream uses the named destination to send the user click HTTP events into the Kafka topic named `userClicks`.
|
||
We use the http header `username` to extract and set it as the `key` for the Kafka message and the click count is extracted from the http payload. The function bean `sendAsUserClicks` is enabled using the `spring.cloud.stream.function.definition` property to send the http payload String as Long and using the value serializer as LongSerializer.
|
||
|
||
Since the Kafka Streams processor has multiple inputs (one for user click events and another one for user regions events), this application needs to be deployed as `app` type and you need to explicitly configure the bindings to the appropriate Kafka topics and other related configurations.
|
||
|
||
To build this application:
|
||
|
||
```
|
||
cd kstreams-join-user-clicks-and-region
|
||
./mvnw clean package
|
||
```
|
||
|
||
Register this Kafka Streams application as `app` type:
|
||
|
||
```
|
||
dataflow:> app register --name join-user-clicks-and-regions --type app --uri file://<local parent directory of this git repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-join-user-clicks-and-region/target/kstreams-join-user-clicks-and-region-1.0.0.BUILD-SNAPSHOT.jar
|
||
|
||
```
|
||
|
||
We also have a demo application `log-user-clicks-per-region` that logs the result of the Kafka Streams application. Since the `app` type is not compatible with other stream application types `source`, `sink` and `processor`, this logger application also needs to registered as `app` type to work with the above KStream application.
|
||
|
||
To build this application:
|
||
|
||
```
|
||
cd kstreams-log-user-clicks-per-region
|
||
./mvnw clean package
|
||
```
|
||
|
||
```
|
||
dataflow:> app register --name log-user-clicks-per-region --type app --uri file://<local parent directory of this git repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-log-user-clicks-per-region/target/kstreams-log-user-clicks-per-region-1.0.0.BUILD-SNAPSHOT.jar
|
||
|
||
```
|
||
|
||
Let’s create the stream that pipes the Kafka Streams application’s output into the logger input.
|
||
|
||
```
|
||
stream create compute-user-clicks-per-region --definition "join-user-clicks-and-regions || log-user-clicks-per-region"
|
||
|
||
stream deploy compute-user-clicks-per-region --properties "deployer.log-user-clicks-per-region.local.inheritLogging=true"
|
||
```
|
||
|
||
Now, you can confirm all the three streams (ingest-user-regions, ingest-user-clicks and compute-user-clicks-per-region) are deployed successfully using `stream list` command from Spring Cloud Data Flow shell.
|
||
|
||
You can send the following sample user regions using cURL commands:
|
||
|
||
The `http-ingest` application in the `ingest-user-regions` stream accepts user regions data at `http://localhost:9000`
|
||
|
||
```
|
||
curl -X POST http://localhost:9000 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Soby" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Janne" -d "europe" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Mark" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Sabby" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Gunnar" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "asia" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Chris" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Damien" -d "europe" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Michael" -d "americas" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Christian" -d "europe" -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9000 -H "username: Oleg" -d "europe" -H "Content-Type: text/plain"
|
||
```
|
||
|
||
The `http-ingest` application in the `ingest-user-clicks` stream accepts user clicks data at `http://localhost:9001`
|
||
|
||
```
|
||
curl -X POST http://localhost:9001 -H "username: Glenn" -d 9 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Soby" -d 15 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Janne" -d 10 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Mark" -d 7 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Sabby" -d 20 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Gunnar" -d 18 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Ilaya" -d 10 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Chris" -d 5 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Damien" -d 21 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Michael" -d 10 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Christian" -d 12 -H "Content-Type: text/plain"
|
||
curl -X POST http://localhost:9001 -H "username: Oleg" -d 10 -H "Content-Type: text/plain"
|
||
|
||
```
|
||
|
||
Once the above data is published, you will see the KStream application outputs the processed result into the logger application and it has the following result:
|
||
|
||
```
|
||
2019-03-15 15:50:39.251 INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : europe : 53
|
||
2019-03-15 15:50:39.252 INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : asia : 10
|
||
2019-03-15 15:50:39.252 INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : americas : 84
|
||
```
|
||
|
||
You can keep publishing some click data and see the results at the logger application.
|