Kafka_DSL Sample Improvements
- get rid of topic creator and scala deps - use boot properties - demonstrate using the DSL to dynamically add adapters
This commit is contained in:
committed by
Artem Bilan
parent
038ee4576a
commit
a198e45239
@@ -30,25 +30,28 @@ writes to stdout:
|
||||
|
||||
Sending 10 messages...
|
||||
Sending a null message...
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=657, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=658, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=659, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=660, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=661, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=662, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=663, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=664, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo8, headers={kafka_offset=665, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4d9d1b69, headers={kafka_offset=667, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=857, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=858, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=859, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=860, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=861, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=862, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=863, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=864, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo8, headers={kafka_offset=865, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo9, headers={kafka_offset=866, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=org.springframework.kafka.support.KafkaNull@3546d80f, headers={kafka_offset=867, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
Adding an adapter for a second topic and sending 10 messages...
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=70, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=71, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=72, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=73, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=74, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=75, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=76, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=77, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar0, headers={kafka_offset=200, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar1, headers={kafka_offset=201, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar2, headers={kafka_offset=202, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar3, headers={kafka_offset=203, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar4, headers={kafka_offset=204, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar5, headers={kafka_offset=205, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar6, headers={kafka_offset=206, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar7, headers={kafka_offset=207, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar8, headers={kafka_offset=208, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar9, headers={kafka_offset=209, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
|
||||
Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained
|
||||
between executions).
|
||||
|
||||
@@ -95,7 +95,7 @@ public class Application {
|
||||
addAnotherListenerForTopics(this.properties.getNewTopic());
|
||||
headers = Collections.singletonMap(KafkaHeaders.TOPIC, this.properties.getNewTopic());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
toKafka.send(new GenericMessage<>("foo" + i, headers));
|
||||
toKafka.send(new GenericMessage<>("bar" + i, headers));
|
||||
}
|
||||
received = fromKafka.receive(10000);
|
||||
count = 0;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2002-2014 the original author or authors.
|
||||
* Copyright 2002-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.samples.sftp;
|
||||
|
||||
import java.io.File;
|
||||
@@ -46,7 +47,8 @@ public class SftpOutboundTransferSample {
|
||||
final String destinationFileName = sourceFileName +"_foo";
|
||||
|
||||
final ClassPathXmlApplicationContext ac =
|
||||
new ClassPathXmlApplicationContext("/META-INF/spring/integration/SftpOutboundTransferSample-context.xml", SftpOutboundTransferSample.class);
|
||||
new ClassPathXmlApplicationContext("/META-INF/spring/integration/SftpOutboundTransferSample-context.xml",
|
||||
SftpOutboundTransferSample.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
SessionFactory<LsEntry> sessionFactory = ac.getBean(CachingSessionFactory.class);
|
||||
RemoteFileTemplate<LsEntry> template = new RemoteFileTemplate<LsEntry>(sessionFactory);
|
||||
@@ -63,7 +65,8 @@ public class SftpOutboundTransferSample {
|
||||
inputChannel.send(message);
|
||||
Thread.sleep(2000);
|
||||
|
||||
Assert.isTrue(SftpTestUtils.fileExists(template, destinationFileName));
|
||||
Assert.isTrue(SftpTestUtils.fileExists(template, destinationFileName),
|
||||
String.format("File '%s' does not exist.", destinationFileName));
|
||||
|
||||
System.out.println(String.format("Successfully transferred '%s' file to a " +
|
||||
"remote location under the name '%s'", sourceFileName, destinationFileName));
|
||||
|
||||
@@ -1336,9 +1336,6 @@ project('kafka-dsl') {
|
||||
compile 'org.springframework.boot:spring-boot-starter-integration'
|
||||
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
|
||||
compile ("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaVersion")
|
||||
compile ("org.springframework.kafka:spring-kafka-test:$springKafkaVersion") {
|
||||
exclude group: 'org.slf4j'
|
||||
}
|
||||
compile "log4j:log4j:$log4jVersion"
|
||||
testCompile 'org.springframework.boot:spring-boot-starter-test'
|
||||
}
|
||||
|
||||
@@ -17,20 +17,53 @@ In STS (Eclipse), go to package **org.springframework.integration.samples.kafka*
|
||||
|
||||
### Output
|
||||
|
||||
The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic` (which is created if necessary).
|
||||
The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic`; this must exist or the broker must be configured to auto-create topics.
|
||||
|
||||
The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads and
|
||||
writes to stdout:
|
||||
It then dynamically creates a new inbound adapter for a different topic `si.new.topic` and sends 10 messages there.
|
||||
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=21, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=22}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=22, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=23}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=23, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=24}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=24, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=25}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=25, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=26}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=26, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=27}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=27, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=28}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=28, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=29}]
|
||||
GenericMessage [payload=foo8, headers={kafka_offset=29, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=30}]
|
||||
GenericMessage [payload=foo9, headers={kafka_offset=30, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=31}]
|
||||
The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads using a no-arg gateway method and writes to stdout:
|
||||
|
||||
Sending 10 messages...
|
||||
Send to Kafka: foo0
|
||||
Send to Kafka: foo1
|
||||
Send to Kafka: foo2
|
||||
Send to Kafka: foo3
|
||||
Send to Kafka: foo4
|
||||
Send to Kafka: foo5
|
||||
Send to Kafka: foo6
|
||||
Send to Kafka: foo7
|
||||
Send to Kafka: foo8
|
||||
Send to Kafka: foo9
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=847, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=848, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=849, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=850, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=851, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=852, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=853, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=854, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo8, headers={kafka_offset=855, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo9, headers={kafka_offset=856, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
Adding an adapter for a second topic and sending 10 messages...
|
||||
Send to Kafka: bar0
|
||||
Send to Kafka: bar1
|
||||
Send to Kafka: bar2
|
||||
Send to Kafka: bar3
|
||||
Send to Kafka: bar4
|
||||
Send to Kafka: bar5
|
||||
Send to Kafka: bar6
|
||||
Send to Kafka: bar7
|
||||
Send to Kafka: bar8
|
||||
Send to Kafka: bar9
|
||||
GenericMessage [payload=bar0, headers={kafka_offset=190, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar1, headers={kafka_offset=191, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar2, headers={kafka_offset=192, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar3, headers={kafka_offset=193, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar4, headers={kafka_offset=194, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar5, headers={kafka_offset=195, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar6, headers={kafka_offset=196, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar7, headers={kafka_offset=197, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar8, headers={kafka_offset=198, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=bar9, headers={kafka_offset=199, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
|
||||
Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained between executions).
|
||||
|
||||
@@ -74,12 +74,6 @@
|
||||
<version>3.0.0.BUILD-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>2.0.0.BUILD-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,30 +16,30 @@
|
||||
|
||||
package org.springframework.integration.samples.dsl.kafka;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.common.errors.TopicExistsException;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.integration.annotation.Gateway;
|
||||
import org.springframework.integration.annotation.MessagingGateway;
|
||||
import org.springframework.integration.dsl.IntegrationFlow;
|
||||
import org.springframework.integration.dsl.IntegrationFlows;
|
||||
import org.springframework.integration.dsl.context.IntegrationFlowContext;
|
||||
import org.springframework.integration.kafka.dsl.Kafka;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
@@ -47,57 +47,53 @@ import kafka.utils.ZkUtils;
|
||||
* @since 4.3
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableConfigurationProperties(KafkaAppProperties.class)
|
||||
public class Application {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConfigurableApplicationContext context =
|
||||
new SpringApplicationBuilder(Application.class)
|
||||
.web(false)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run(args);
|
||||
context.getBean(Application.class).runDemo(context);
|
||||
context.close();
|
||||
}
|
||||
|
||||
private void runDemo(ConfigurableApplicationContext context) {
|
||||
KafkaGateway kafkaGateway = context.getBean(KafkaGateway.class);
|
||||
System.out.println("Sending 10 messages...");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String message = "foo" + i;
|
||||
System.out.println("Send to Kafka: " + message);
|
||||
kafkaGateway.sendToKafka(message);
|
||||
kafkaGateway.sendToKafka(message, this.properties.getTopic());
|
||||
}
|
||||
|
||||
Message<?> received = kafkaGateway.receiveFromKafka();
|
||||
while (received != null) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Message<?> received = kafkaGateway.receiveFromKafka();
|
||||
System.out.println(received);
|
||||
}
|
||||
System.out.println("Adding an adapter for a second topic and sending 10 messages...");
|
||||
addAnotherListenerForTopics(this.properties.getNewTopic());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String message = "bar" + i;
|
||||
System.out.println("Send to Kafka: " + message);
|
||||
kafkaGateway.sendToKafka(message, this.properties.getNewTopic());
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Message<?> received = kafkaGateway.receiveFromKafka();
|
||||
System.out.println(received);
|
||||
received = kafkaGateway.receiveFromKafka();
|
||||
}
|
||||
|
||||
context.close();
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
@Value("${kafka.topic}")
|
||||
private String topic;
|
||||
|
||||
@Value("${kafka.messageKey}")
|
||||
private String messageKey;
|
||||
|
||||
@Value("${kafka.zookeeper.connect}")
|
||||
private String zookeeperConnect;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
ZkClient zkClient = new ZkClient(this.zookeeperConnect, 6000, 6000, ZKStringSerializer$.MODULE$);
|
||||
ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
try {
|
||||
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), null);
|
||||
}
|
||||
catch (TopicExistsException e) {
|
||||
// no-op
|
||||
}
|
||||
}
|
||||
@Autowired
|
||||
private KafkaAppProperties properties;
|
||||
|
||||
@MessagingGateway
|
||||
public interface KafkaGateway {
|
||||
|
||||
@Gateway(requestChannel = "toKafka.input")
|
||||
void sendToKafka(String payload);
|
||||
void sendToKafka(String payload, @Header(KafkaHeaders.TOPIC) String topic);
|
||||
|
||||
@Gateway(replyChannel = "fromKafka", replyTimeout = 10000)
|
||||
Message<?> receiveFromKafka();
|
||||
@@ -108,16 +104,35 @@ public class Application {
|
||||
public IntegrationFlow toKafka(KafkaTemplate<?, ?> kafkaTemplate) {
|
||||
return f -> f
|
||||
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
|
||||
.topic(this.topic)
|
||||
.messageKey(this.messageKey));
|
||||
.messageKey(this.properties.getMessageKey()));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow fromKafka(ConsumerFactory<?, ?> consumerFactory) {
|
||||
return IntegrationFlows
|
||||
.from(Kafka.messageDrivenChannelAdapter(consumerFactory, this.topic))
|
||||
.from(Kafka.messageDrivenChannelAdapter(consumerFactory, this.properties.getTopic()))
|
||||
.channel(c -> c.queue("fromKafka"))
|
||||
.get();
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private IntegrationFlowContext flowContext;
|
||||
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
public void addAnotherListenerForTopics(String... topics) {
|
||||
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
|
||||
// change the group id so we don't revoke the other partitions.
|
||||
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
|
||||
consumerProperties.get(ConsumerConfig.GROUP_ID_CONFIG) + "x");
|
||||
IntegrationFlow flow =
|
||||
IntegrationFlows
|
||||
.from(Kafka.messageDrivenChannelAdapter(
|
||||
new DefaultKafkaConsumerFactory<String, String>(consumerProperties), topics))
|
||||
.channel("fromKafka")
|
||||
.get();
|
||||
this.flowContext.registration(flow).register();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.samples.dsl.kafka;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Properties for the kafka sample app.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 5.0
|
||||
*
|
||||
*/
|
||||
@ConfigurationProperties("kafka")
|
||||
public class KafkaAppProperties {
|
||||
|
||||
private String topic;
|
||||
|
||||
private String newTopic;
|
||||
|
||||
private String messageKey;
|
||||
|
||||
public String getTopic() {
|
||||
return this.topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public String getNewTopic() {
|
||||
return this.newTopic;
|
||||
}
|
||||
|
||||
public void setNewTopic(String newTopic) {
|
||||
this.newTopic = newTopic;
|
||||
}
|
||||
|
||||
public String getMessageKey() {
|
||||
return this.messageKey;
|
||||
}
|
||||
|
||||
public void setMessageKey(String messageKey) {
|
||||
this.messageKey = messageKey;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,14 +1,13 @@
|
||||
kafka:
|
||||
zookeeper:
|
||||
connect: localhost:2181
|
||||
topic: si.topic
|
||||
newTopic: si.new.topic
|
||||
messageKey: si.key
|
||||
spring:
|
||||
kafka:
|
||||
consumer:
|
||||
group-id: siTestGroup
|
||||
enable-auto-commit: true
|
||||
auto-commit-interval: 100
|
||||
auto-offset-reset: earliest
|
||||
enable-auto-commit: false
|
||||
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
producer:
|
||||
|
||||
Reference in New Issue
Block a user