Files
spring-cloud-dataflow-samples/kafka-samples
Chris Bono 301ba872bb Bump libs up to latest
* Spring Boot 2.6.3
* Spring Cloud 2021.0.0
* SCS/F 3.2.1
* etc..

Partially resolves #156
2022-01-24 10:20:21 -06:00
..
2022-01-24 10:20:21 -06:00
2019-03-25 22:29:19 +05:30
2019-03-25 22:29:19 +05:30

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Kafka Streams application samples


## Kafka Streams word count application as a processor


When you have a streaming data pipeline that uses Kafka Streams, it can be used as a Processor application in the Spring Cloud Data Flow streaming pipeline. In the following example, you will see how Kafka Streams application can be registered as a Spring Cloud Data Flow `processor` application and subsequently in streaming data pipeline.

To demonstrate this use case, we have a source application `http-ingest` which is an extension (with some additional functionalities needed for our examples) of the existing out-of-the-box `http` source application.

```
cd http-ingest
./mvnw clean package
```

At this step, the artifact `http-ingest-2.0.0-SNAPSHOT.jar` is available.

You can register this artifact as the Spring Cloud Data Flow `Source` application as follows,
From the Spring Cloud Data Flow shell,

```
dataflow:>app register --name http-ingest --type source --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-2.0.0-SNAPSHOT.jar
Successfully registered application 'source:http-ingest'
```

The application `kstreams-word-count` is a Kafka Streams application written using Spring Cloud Stream framework.

```
cd kstreams-word-count
./mvnw clean package
```

After this step, the artifact `kstreams-word-count-2.0.0-SNAPSHOT.jar` is available.

You can register this artifact as the Spring Cloud Data Flow `Processor` application as follows:

From the Spring Cloud Data Flow shell,

```
dataflow:>app register --name kstream-word-count --type processor --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-word-count/target/kstreams-word-count-2.0.0-SNAPSHOT.jar
Successfully registered application 'processor:kstream-word-count'
```

Lets register the out-of-the-box `log` application to display the results from the `kstream-word-count` processor (if this was not registered already from the previous app import).

```
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-kafka:2.1.1.RELEASE
Successfully registered application 'sink:log'

```

Create the following stream that listens at HTTP web endpoint `http://localhost:9100` and sends the data to the `kstream-word-count` processor registered in the above step. The KStream processors output is then pipelined to the `log` sink application.

```
dataflow:>stream create kstream-wc-sample --definition "http-ingest --server.port=9100 | kstream-word-count | log"
Created new stream 'kstream-wc-sample'

dataflow:>stream deploy kstream-wc-sample --properties "deployer.log.local.inheritLogging=true"
Deployment request has been sent for stream 'kstream-wc-sample'

dataflow:>http post --target "http://localhost:9100" --data "Baby shark, doo doo doo doo doo doo"
> POST (text/plain) http://localhost:9100 Baby shark, doo doo doo doo doo doo
> 202 ACCEPTED

```

You can see the log sink application now has:

2019-03-18 15:40:46.195  INFO 34974 --- [container-0-C-1] log-sink                                 : {"word":"baby","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"}
2019-03-18 15:40:46.198  INFO 34974 --- [container-0-C-1] log-sink                                 : {"word":"shark","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"}
2019-03-18 15:40:46.199  INFO 34974 --- [container-0-C-1] log-sink                                 : {"word":"doo","count":6,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"}

## Kafka Streams application that has multiple inputs and single output


Let's use the `http-ingest` application to create a couple of streaming pipelines that produce the input data (user regions and user clicks).

To build `http-ingest` application:

```
cd http-ingest
./mvnw clean package
```

You can register this artifact as the Spring Cloud Data Flow `Source` application as follows,
From the Spring Cloud Data Flow shell,

```
dataflow:>app register --name http-ingest --type source --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-2.0.0-SNAPSHOT.jar
Successfully registered application 'source:http-ingest'
```

Create a stream that ingests user regions:

```
stream create ingest-user-regions --definition "http-ingest --server.port=9000 --http.mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] > :userRegions" --deploy

```

The above stream uses the named destination to send the user regions HTTP events into the Kafka topic named `userRegions`. We use the http header `username` to extract and set it as the `key` for the Kafka message and the user region String is extracted from the http payload String.

 This sample application also showcases how we can make use function composition support in the streaming application.

The `http-ingest` has a function bean definition that looks like:

```
@Bean
public Function<String, Long> sendAsUserClicks() {
  return value -> Long.parseLong(value);
}

```

When this function bean is enabled, the incoming `String` value is converted to `Long` value.

When sending user clicks count as `Long` we can enable this function for the `http-ingest` while the same application can be used for sending the user region `String` value.

Creating a stream that ingests user clicks:

```
stream create ingest-user-clicks --definition "http-ingest --server.port=9001 --mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] --spring.cloud.stream.kafka.binder.configuration.value.serializer=org.apache.kafka.common.serialization.LongSerializer --spring.cloud.stream.function.definition=sendAsUserClicks > :userClicks" --deploy

```

The above stream uses the named destination to send the user click HTTP events into the Kafka topic named `userClicks`.
We use the http header `username` to extract and set it as the `key` for the Kafka message and the click count is extracted from the http payload. The function bean `sendAsUserClicks` is enabled using the `spring.cloud.stream.function.definition` property to send the http payload String as Long and using the value serializer as LongSerializer.

Since the Kafka Streams processor has multiple inputs (one for user click events and another one for user regions events), this application needs to be deployed as `app` type and you need to explicitly configure the bindings to the appropriate Kafka topics and other related configurations.

To build this application:

```
cd kstreams-join-user-clicks-and-region
./mvnw clean package
```

Register this Kafka Streams application as `app` type:

```
dataflow:> app register --name join-user-clicks-and-regions --type app --uri file://<local parent directory of this git repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-join-user-clicks-and-region/target/kstreams-join-user-clicks-and-region-2.0.0-SNAPSHOT.jar

```

We also have a demo application `log-user-clicks-per-region`  that logs the result of the Kafka Streams application. Since the `app` type is not compatible with other stream application types `source`, `sink` and `processor`, this logger application also needs to registered as `app` type to work with the above KStream application.

To build this application:

```
cd kstreams-log-user-clicks-per-region
./mvnw clean package
```

```
dataflow:> app register --name log-user-clicks-per-region --type app --uri file://<local parent directory of this git repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-log-user-clicks-per-region/target/kstreams-log-user-clicks-per-region-2.0.0-SNAPSHOT.jar

```

Lets create the stream that pipes the Kafka Streams applications output into the logger input.

```
stream create compute-user-clicks-per-region --definition "join-user-clicks-and-regions || log-user-clicks-per-region"

stream deploy compute-user-clicks-per-region --properties "deployer.log-user-clicks-per-region.local.inheritLogging=true"
```

Now, you can confirm all the three streams (ingest-user-regions, ingest-user-clicks and compute-user-clicks-per-region) are deployed successfully using `stream list` command from Spring Cloud Data Flow shell.

You can send the following sample user regions using cURL commands:

The `http-ingest` application in the `ingest-user-regions` stream accepts user regions data at `http://localhost:9000`

```
curl -X POST http://localhost:9000 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Soby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Janne" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Mark" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Sabby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Gunnar" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "asia" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Chris" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Damien" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Michael" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Christian" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Oleg" -d "europe" -H "Content-Type: text/plain"
```

The `http-ingest` application in the `ingest-user-clicks` stream accepts user clicks data at `http://localhost:9001`

```
curl -X POST http://localhost:9001 -H "username: Glenn" -d 9 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Soby" -d 15 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Janne" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Mark" -d 7 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Sabby" -d 20 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Gunnar" -d 18 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Ilaya" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Chris" -d 5 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Damien" -d 21 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Michael" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Christian" -d 12 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Oleg" -d 10 -H "Content-Type: text/plain"

```

Once the above data is published, you will see the KStream application outputs the processed result into the logger application and it has the following result:

```
2019-03-15 15:50:39.251  INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : europe : 53
2019-03-15 15:50:39.252  INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : asia : 10
2019-03-15 15:50:39.252  INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : americas : 84
```

You can keep publishing some click data and see the results at the logger application.