Files
spring-integration-samples/basic/kafka
Spring Operator 4f48dce420 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were fixed successfully.

* http://www.apache.org/licenses/LICENSE-2.0.txt migrated to:
  https://www.apache.org/licenses/LICENSE-2.0.txt ([https](https://www.apache.org/licenses/LICENSE-2.0.txt) result 200).
* http://projects.spring.io/spring-integration migrated to:
  https://projects.spring.io/spring-integration ([https](https://projects.spring.io/spring-integration) result 301).
* http://repo.spring.io/libs-milestone migrated to:
  https://repo.spring.io/libs-milestone ([https](https://repo.spring.io/libs-milestone) result 302).
* http://repo.spring.io/libs-snapshot migrated to:
  https://repo.spring.io/libs-snapshot ([https](https://repo.spring.io/libs-snapshot) result 302).
* http://repo.spring.io/libs-staging-local migrated to:
  https://repo.spring.io/libs-staging-local ([https](https://repo.spring.io/libs-staging-local) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0
* http://maven.apache.org/xsd/maven-4.0.0.xsd
* http://www.w3.org/2001/XMLSchema-instance
2019-03-06 10:03:19 -05:00
..
2019-03-06 10:03:19 -05:00
2017-02-02 13:42:51 -05:00

Apache Kafka Sample

This example demonstrates the use Kafka Outbound Channel Adapter and Message-Driven Channel Adapter.

It uses Java configuration for the adapters.

Running the sample

Start Apache Zookeeper and Apache Kafka according to the documentation for the Apache Kafka project.

$ gradlew :kafka:run

This will package the application and run it using the Gradle Application Plugin

Using an IDE such as SpringSource Tool Suite™ (STS)

In STS (Eclipse), go to package org.springframework.integration.samples.kafka, right-click Application and select Run as --> Java Application (or Spring Boot Application).

Output

The application sends 10 messages (foo0 ... foo9) to a kafka topic si.topic; this must exist or the broker must be configured to auto-create topics.

It then sends a message with a KafkaNull payload.

It then dynamically creates a new inbound adapter for a different topic si.new.topic and sends 10 messages there.

The message-driven adapter receives the messages and places them in a QueueChannel which the application reads and writes to stdout:

Sending 10 messages...
Sending a null message...
GenericMessage [payload=foo0, headers={kafka_offset=857, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo1, headers={kafka_offset=858, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo2, headers={kafka_offset=859, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo3, headers={kafka_offset=860, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo4, headers={kafka_offset=861, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo5, headers={kafka_offset=862, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo6, headers={kafka_offset=863, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo7, headers={kafka_offset=864, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo8, headers={kafka_offset=865, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=foo9, headers={kafka_offset=866, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
GenericMessage [payload=org.springframework.kafka.support.KafkaNull@3546d80f, headers={kafka_offset=867, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
Adding an adapter for a second topic and sending 10 messages...
GenericMessage [payload=bar0, headers={kafka_offset=200, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar1, headers={kafka_offset=201, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar2, headers={kafka_offset=202, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar3, headers={kafka_offset=203, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar4, headers={kafka_offset=204, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar5, headers={kafka_offset=205, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar6, headers={kafka_offset=206, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar7, headers={kafka_offset=207, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar8, headers={kafka_offset=208, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
GenericMessage [payload=bar9, headers={kafka_offset=209, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]

Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained between executions).