diff --git a/basic/kafka/README.md b/basic/kafka/README.md index 426d3e56..a15bea09 100644 --- a/basic/kafka/README.md +++ b/basic/kafka/README.md @@ -30,25 +30,28 @@ writes to stdout: Sending 10 messages... Sending a null message... - GenericMessage [payload=foo0, headers={kafka_offset=657, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo1, headers={kafka_offset=658, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo2, headers={kafka_offset=659, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo3, headers={kafka_offset=660, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo4, headers={kafka_offset=661, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo5, headers={kafka_offset=662, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo6, headers={kafka_offset=663, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo7, headers={kafka_offset=664, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=foo8, headers={kafka_offset=665, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] - GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4d9d1b69, headers={kafka_offset=667, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo0, headers={kafka_offset=857, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo1, headers={kafka_offset=858, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo2, headers={kafka_offset=859, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo3, headers={kafka_offset=860, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo4, headers={kafka_offset=861, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo5, headers={kafka_offset=862, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo6, headers={kafka_offset=863, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo7, headers={kafka_offset=864, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo8, headers={kafka_offset=865, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo9, headers={kafka_offset=866, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=org.springframework.kafka.support.KafkaNull@3546d80f, headers={kafka_offset=867, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] Adding an adapter for a second topic and sending 10 messages... - GenericMessage [payload=foo0, headers={kafka_offset=70, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=foo1, headers={kafka_offset=71, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=foo2, headers={kafka_offset=72, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=foo3, headers={kafka_offset=73, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=foo4, headers={kafka_offset=74, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=foo5, headers={kafka_offset=75, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=foo6, headers={kafka_offset=76, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] - GenericMessage [payload=foo7, headers={kafka_offset=77, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar0, headers={kafka_offset=200, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar1, headers={kafka_offset=201, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar2, headers={kafka_offset=202, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar3, headers={kafka_offset=203, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar4, headers={kafka_offset=204, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar5, headers={kafka_offset=205, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar6, headers={kafka_offset=206, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar7, headers={kafka_offset=207, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar8, headers={kafka_offset=208, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar9, headers={kafka_offset=209, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained between executions). diff --git a/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java b/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java index d4b89861..d69d5b34 100644 --- a/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java +++ b/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java @@ -95,7 +95,7 @@ public class Application { addAnotherListenerForTopics(this.properties.getNewTopic()); headers = Collections.singletonMap(KafkaHeaders.TOPIC, this.properties.getNewTopic()); for (int i = 0; i < 10; i++) { - toKafka.send(new GenericMessage<>("foo" + i, headers)); + toKafka.send(new GenericMessage<>("bar" + i, headers)); } received = fromKafka.receive(10000); count = 0; diff --git a/basic/sftp/src/test/java/org/springframework/integration/samples/sftp/SftpOutboundTransferSample.java b/basic/sftp/src/test/java/org/springframework/integration/samples/sftp/SftpOutboundTransferSample.java index c9ddc96d..0a51c504 100644 --- a/basic/sftp/src/test/java/org/springframework/integration/samples/sftp/SftpOutboundTransferSample.java +++ b/basic/sftp/src/test/java/org/springframework/integration/samples/sftp/SftpOutboundTransferSample.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.integration.samples.sftp; import java.io.File; @@ -46,7 +47,8 @@ public class SftpOutboundTransferSample { final String destinationFileName = sourceFileName +"_foo"; final ClassPathXmlApplicationContext ac = - new ClassPathXmlApplicationContext("/META-INF/spring/integration/SftpOutboundTransferSample-context.xml", SftpOutboundTransferSample.class); + new ClassPathXmlApplicationContext("/META-INF/spring/integration/SftpOutboundTransferSample-context.xml", + SftpOutboundTransferSample.class); @SuppressWarnings("unchecked") SessionFactory sessionFactory = ac.getBean(CachingSessionFactory.class); RemoteFileTemplate template = new RemoteFileTemplate(sessionFactory); @@ -63,7 +65,8 @@ public class SftpOutboundTransferSample { inputChannel.send(message); Thread.sleep(2000); - Assert.isTrue(SftpTestUtils.fileExists(template, destinationFileName)); + Assert.isTrue(SftpTestUtils.fileExists(template, destinationFileName), + String.format("File '%s' does not exist.", destinationFileName)); System.out.println(String.format("Successfully transferred '%s' file to a " + "remote location under the name '%s'", sourceFileName, destinationFileName)); diff --git a/build.gradle b/build.gradle index e00f44ed..652d3071 100644 --- a/build.gradle +++ b/build.gradle @@ -1336,9 +1336,6 @@ project('kafka-dsl') { compile 'org.springframework.boot:spring-boot-starter-integration' compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" compile ("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaVersion") - compile ("org.springframework.kafka:spring-kafka-test:$springKafkaVersion") { - exclude group: 'org.slf4j' - } compile "log4j:log4j:$log4jVersion" testCompile 'org.springframework.boot:spring-boot-starter-test' } diff --git a/dsl/kafka-dsl/README.md b/dsl/kafka-dsl/README.md index 5512facd..07e7aace 100644 --- a/dsl/kafka-dsl/README.md +++ b/dsl/kafka-dsl/README.md @@ -17,20 +17,53 @@ In STS (Eclipse), go to package **org.springframework.integration.samples.kafka* ### Output -The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic` (which is created if necessary). +The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic`; this must exist or the broker must be configured to auto-create topics. -The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads and -writes to stdout: +It then dynamically creates a new inbound adapter for a different topic `si.new.topic` and sends 10 messages there. - GenericMessage [payload=foo0, headers={kafka_offset=21, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=22}] - GenericMessage [payload=foo1, headers={kafka_offset=22, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=23}] - GenericMessage [payload=foo2, headers={kafka_offset=23, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=24}] - GenericMessage [payload=foo3, headers={kafka_offset=24, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=25}] - GenericMessage [payload=foo4, headers={kafka_offset=25, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=26}] - GenericMessage [payload=foo5, headers={kafka_offset=26, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=27}] - GenericMessage [payload=foo6, headers={kafka_offset=27, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=28}] - GenericMessage [payload=foo7, headers={kafka_offset=28, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=29}] - GenericMessage [payload=foo8, headers={kafka_offset=29, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=30}] - GenericMessage [payload=foo9, headers={kafka_offset=30, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=31}] +The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads using a no-arg gateway method and writes to stdout: + + Sending 10 messages... + Send to Kafka: foo0 + Send to Kafka: foo1 + Send to Kafka: foo2 + Send to Kafka: foo3 + Send to Kafka: foo4 + Send to Kafka: foo5 + Send to Kafka: foo6 + Send to Kafka: foo7 + Send to Kafka: foo8 + Send to Kafka: foo9 + GenericMessage [payload=foo0, headers={kafka_offset=847, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo1, headers={kafka_offset=848, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo2, headers={kafka_offset=849, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo3, headers={kafka_offset=850, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo4, headers={kafka_offset=851, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo5, headers={kafka_offset=852, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo6, headers={kafka_offset=853, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo7, headers={kafka_offset=854, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo8, headers={kafka_offset=855, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo9, headers={kafka_offset=856, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + Adding an adapter for a second topic and sending 10 messages... + Send to Kafka: bar0 + Send to Kafka: bar1 + Send to Kafka: bar2 + Send to Kafka: bar3 + Send to Kafka: bar4 + Send to Kafka: bar5 + Send to Kafka: bar6 + Send to Kafka: bar7 + Send to Kafka: bar8 + Send to Kafka: bar9 + GenericMessage [payload=bar0, headers={kafka_offset=190, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar1, headers={kafka_offset=191, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar2, headers={kafka_offset=192, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar3, headers={kafka_offset=193, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar4, headers={kafka_offset=194, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar5, headers={kafka_offset=195, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar6, headers={kafka_offset=196, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar7, headers={kafka_offset=197, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar8, headers={kafka_offset=198, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=bar9, headers={kafka_offset=199, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained between executions). diff --git a/dsl/kafka-dsl/pom.xml b/dsl/kafka-dsl/pom.xml index 4025da73..6dad0017 100644 --- a/dsl/kafka-dsl/pom.xml +++ b/dsl/kafka-dsl/pom.xml @@ -74,12 +74,6 @@ 3.0.0.BUILD-SNAPSHOT compile - - org.springframework.kafka - spring-kafka-test - 2.0.0.BUILD-SNAPSHOT - compile - log4j log4j diff --git a/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java b/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java index e16b1648..e4ecb701 100644 --- a/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java +++ b/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/Application.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,30 +16,30 @@ package org.springframework.integration.samples.dsl.kafka; -import java.util.Properties; +import java.util.Map; -import javax.annotation.PostConstruct; +import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.I0Itec.zkclient.ZkClient; -import org.apache.kafka.common.errors.TopicExistsException; - -import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.context.IntegrationFlowContext; import org.springframework.integration.kafka.dsl.Kafka; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; - -import kafka.admin.AdminUtils; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; +import org.springframework.messaging.handler.annotation.Header; /** * @author Gary Russell @@ -47,57 +47,53 @@ import kafka.utils.ZkUtils; * @since 4.3 */ @SpringBootApplication +@EnableConfigurationProperties(KafkaAppProperties.class) public class Application { public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class) - .web(false) + .web(WebApplicationType.NONE) .run(args); + context.getBean(Application.class).runDemo(context); + context.close(); + } + private void runDemo(ConfigurableApplicationContext context) { KafkaGateway kafkaGateway = context.getBean(KafkaGateway.class); + System.out.println("Sending 10 messages..."); for (int i = 0; i < 10; i++) { String message = "foo" + i; System.out.println("Send to Kafka: " + message); - kafkaGateway.sendToKafka(message); + kafkaGateway.sendToKafka(message, this.properties.getTopic()); } - Message received = kafkaGateway.receiveFromKafka(); - while (received != null) { + for (int i = 0; i < 10; i++) { + Message received = kafkaGateway.receiveFromKafka(); + System.out.println(received); + } + System.out.println("Adding an adapter for a second topic and sending 10 messages..."); + addAnotherListenerForTopics(this.properties.getNewTopic()); + for (int i = 0; i < 10; i++) { + String message = "bar" + i; + System.out.println("Send to Kafka: " + message); + kafkaGateway.sendToKafka(message, this.properties.getNewTopic()); + } + for (int i = 0; i < 10; i++) { + Message received = kafkaGateway.receiveFromKafka(); System.out.println(received); - received = kafkaGateway.receiveFromKafka(); } - context.close(); - System.exit(0); } - @Value("${kafka.topic}") - private String topic; - - @Value("${kafka.messageKey}") - private String messageKey; - - @Value("${kafka.zookeeper.connect}") - private String zookeeperConnect; - - @PostConstruct - public void init() { - ZkClient zkClient = new ZkClient(this.zookeeperConnect, 6000, 6000, ZKStringSerializer$.MODULE$); - ZkUtils zkUtils = new ZkUtils(zkClient, null, false); - try { - AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), null); - } - catch (TopicExistsException e) { - // no-op - } - } + @Autowired + private KafkaAppProperties properties; @MessagingGateway public interface KafkaGateway { @Gateway(requestChannel = "toKafka.input") - void sendToKafka(String payload); + void sendToKafka(String payload, @Header(KafkaHeaders.TOPIC) String topic); @Gateway(replyChannel = "fromKafka", replyTimeout = 10000) Message receiveFromKafka(); @@ -108,16 +104,35 @@ public class Application { public IntegrationFlow toKafka(KafkaTemplate kafkaTemplate) { return f -> f .handle(Kafka.outboundChannelAdapter(kafkaTemplate) - .topic(this.topic) - .messageKey(this.messageKey)); + .messageKey(this.properties.getMessageKey())); } @Bean public IntegrationFlow fromKafka(ConsumerFactory consumerFactory) { return IntegrationFlows - .from(Kafka.messageDrivenChannelAdapter(consumerFactory, this.topic)) + .from(Kafka.messageDrivenChannelAdapter(consumerFactory, this.properties.getTopic())) .channel(c -> c.queue("fromKafka")) .get(); } + @Autowired + private IntegrationFlowContext flowContext; + + @Autowired + private KafkaProperties kafkaProperties; + + public void addAnotherListenerForTopics(String... topics) { + Map consumerProperties = kafkaProperties.buildConsumerProperties(); + // change the group id so we don't revoke the other partitions. + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, + consumerProperties.get(ConsumerConfig.GROUP_ID_CONFIG) + "x"); + IntegrationFlow flow = + IntegrationFlows + .from(Kafka.messageDrivenChannelAdapter( + new DefaultKafkaConsumerFactory(consumerProperties), topics)) + .channel("fromKafka") + .get(); + this.flowContext.registration(flow).register(); + } + } diff --git a/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/KafkaAppProperties.java b/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/KafkaAppProperties.java new file mode 100644 index 00000000..2bc5b1a1 --- /dev/null +++ b/dsl/kafka-dsl/src/main/java/org/springframework/integration/samples/dsl/kafka/KafkaAppProperties.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.dsl.kafka; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Properties for the kafka sample app. + * + * @author Gary Russell + * @since 5.0 + * + */ +@ConfigurationProperties("kafka") +public class KafkaAppProperties { + + private String topic; + + private String newTopic; + + private String messageKey; + + public String getTopic() { + return this.topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getNewTopic() { + return this.newTopic; + } + + public void setNewTopic(String newTopic) { + this.newTopic = newTopic; + } + + public String getMessageKey() { + return this.messageKey; + } + + public void setMessageKey(String messageKey) { + this.messageKey = messageKey; + } + +} diff --git a/dsl/kafka-dsl/src/main/resources/application.yml b/dsl/kafka-dsl/src/main/resources/application.yml index d0efe847..cf48f86c 100644 --- a/dsl/kafka-dsl/src/main/resources/application.yml +++ b/dsl/kafka-dsl/src/main/resources/application.yml @@ -1,14 +1,13 @@ kafka: - zookeeper: - connect: localhost:2181 topic: si.topic + newTopic: si.new.topic messageKey: si.key spring: kafka: consumer: group-id: siTestGroup - enable-auto-commit: true - auto-commit-interval: 100 + auto-offset-reset: earliest + enable-auto-commit: false value-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: