Kafka Sample Improvements
- get rid of topic creator and scala deps - use boot properties - demonstrate using the DSL to dynamically add adapters
This commit is contained in:
committed by
Artem Bilan
parent
65368f7959
commit
038ee4576a
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -19,6 +19,7 @@ package org.springframework.integration.samples.barrier;
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.boot.Banner.Mode;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
@@ -43,7 +44,7 @@ public class Application {
|
||||
|
||||
ConfigurableApplicationContext client
|
||||
= new SpringApplicationBuilder("/META-INF/spring/integration/client-context.xml")
|
||||
.web(false)
|
||||
.web(WebApplicationType.NONE)
|
||||
.bannerMode(Mode.OFF)
|
||||
.run(args);
|
||||
RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -18,6 +18,7 @@ package org.springframework.integration.samples.barrier2;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
@@ -33,7 +34,7 @@ public class ErrorHandlingApplication {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConfigurableApplicationContext test = new SpringApplicationBuilder(ErrorHandlingApplication.class)
|
||||
.web(false)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run(args);
|
||||
Gateway gateway = test.getBean(Gateway.class);
|
||||
try {
|
||||
|
||||
@@ -62,6 +62,26 @@
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
<scope>compile</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>jboss-interceptors-api_1.1_spec</artifactId>
|
||||
<groupId>org.jboss.spec.javax.interceptor</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>jsr250-api</artifactId>
|
||||
<groupId>javax.annotation</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>cdi-api</artifactId>
|
||||
<groupId>javax.enterprise</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>el-api</artifactId>
|
||||
<groupId>javax.el</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>javax.inject</artifactId>
|
||||
<groupId>javax.inject</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
@@ -74,6 +94,26 @@
|
||||
<version>5.0.0.BUILD-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>jboss-interceptors-api_1.1_spec</artifactId>
|
||||
<groupId>org.jboss.spec.javax.interceptor</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>jsr250-api</artifactId>
|
||||
<groupId>javax.annotation</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>cdi-api</artifactId>
|
||||
<groupId>javax.enterprise</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>el-api</artifactId>
|
||||
<groupId>javax.el</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>javax.inject</artifactId>
|
||||
<groupId>javax.inject</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
@@ -87,17 +127,25 @@
|
||||
<scope>compile</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
<artifactId>jboss-interceptors-api_1.1_spec</artifactId>
|
||||
<groupId>org.jboss.spec.javax.interceptor</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>jsr250-api</artifactId>
|
||||
<groupId>javax.annotation</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>cdi-api</artifactId>
|
||||
<groupId>javax.enterprise</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>el-api</artifactId>
|
||||
<groupId>javax.el</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>javax.inject</artifactId>
|
||||
<groupId>javax.inject</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hibernate.javax.persistence</groupId>
|
||||
<artifactId>hibernate-jpa-2.1-api</artifactId>
|
||||
<version>1.0.0.Final</version>
|
||||
<scope>compile</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
@@ -110,6 +158,26 @@
|
||||
<version>5.0.0.BUILD-SNAPSHOT</version>
|
||||
<scope>runtime</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>jboss-interceptors-api_1.1_spec</artifactId>
|
||||
<groupId>org.jboss.spec.javax.interceptor</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>jsr250-api</artifactId>
|
||||
<groupId>javax.annotation</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>cdi-api</artifactId>
|
||||
<groupId>javax.enterprise</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>el-api</artifactId>
|
||||
<groupId>javax.el</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>javax.inject</artifactId>
|
||||
<groupId>javax.inject</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
@@ -117,11 +185,31 @@
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hibernate</groupId>
|
||||
<artifactId>hibernate-entitymanager</artifactId>
|
||||
<version>5.0.9.Final</version>
|
||||
<groupId>org.hibernate.javax.persistence</groupId>
|
||||
<artifactId>hibernate-jpa-2.1-api</artifactId>
|
||||
<version>1.0.0.Final</version>
|
||||
<scope>runtime</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>jboss-interceptors-api_1.1_spec</artifactId>
|
||||
<groupId>org.jboss.spec.javax.interceptor</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>jsr250-api</artifactId>
|
||||
<groupId>javax.annotation</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>cdi-api</artifactId>
|
||||
<groupId>javax.enterprise</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>el-api</artifactId>
|
||||
<groupId>javax.el</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>javax.inject</artifactId>
|
||||
<groupId>javax.inject</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
@@ -134,6 +222,26 @@
|
||||
<version>2.4.2</version>
|
||||
<scope>runtime</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>jboss-interceptors-api_1.1_spec</artifactId>
|
||||
<groupId>org.jboss.spec.javax.interceptor</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>jsr250-api</artifactId>
|
||||
<groupId>javax.annotation</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>cdi-api</artifactId>
|
||||
<groupId>javax.enterprise</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>el-api</artifactId>
|
||||
<groupId>javax.el</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>javax.inject</artifactId>
|
||||
<groupId>javax.inject</groupId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
|
||||
@@ -7,7 +7,7 @@ It uses Java configuration for the adapters.
|
||||
|
||||
## Running the sample
|
||||
|
||||
Start Apache Zookeeper and Apache Kafka according to the documentation for the Apache Kafka project.
|
||||
Start Apache Zookeeper and Apache Kafka according to the documentation for the Apache Kafka project.
|
||||
|
||||
$ gradlew :kafka:run
|
||||
|
||||
@@ -19,21 +19,36 @@ In STS (Eclipse), go to package **org.springframework.integration.samples.kafka*
|
||||
|
||||
### Output
|
||||
|
||||
The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic` (which is created if necessary).
|
||||
The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic`; this must exist or the broker must be configured to auto-create topics.
|
||||
|
||||
It then sends a message with a `KafkaNull` payload.
|
||||
|
||||
It then dynamically creates a new inbound adapter for a different topic `si.new.topic` and sends 10 messages there.
|
||||
|
||||
The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads and
|
||||
writes to stdout:
|
||||
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=21, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=22}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=22, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=23}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=23, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=24}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=24, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=25}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=25, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=26}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=26, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=27}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=27, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=28}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=28, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=29}]
|
||||
GenericMessage [payload=foo8, headers={kafka_offset=29, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=30}]
|
||||
GenericMessage [payload=foo9, headers={kafka_offset=30, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=31}]
|
||||
Sending 10 messages...
|
||||
Sending a null message...
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=657, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=658, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=659, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=660, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=661, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=662, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=663, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=664, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=foo8, headers={kafka_offset=665, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4d9d1b69, headers={kafka_offset=667, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}]
|
||||
Adding an adapter for a second topic and sending 10 messages...
|
||||
GenericMessage [payload=foo0, headers={kafka_offset=70, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo1, headers={kafka_offset=71, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo2, headers={kafka_offset=72, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo3, headers={kafka_offset=73, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo4, headers={kafka_offset=74, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo5, headers={kafka_offset=75, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo6, headers={kafka_offset=76, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
GenericMessage [payload=foo7, headers={kafka_offset=77, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}]
|
||||
|
||||
Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained
|
||||
between executions).
|
||||
|
||||
@@ -77,13 +77,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>1.1.2.BUILD-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>1.1.2.BUILD-SNAPSHOT</version>
|
||||
<version>2.0.0.BUILD-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,24 +16,27 @@
|
||||
|
||||
package org.springframework.integration.samples.kafka;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.errors.TopicExistsException;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.expression.common.LiteralExpression;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.dsl.IntegrationFlow;
|
||||
import org.springframework.integration.dsl.IntegrationFlows;
|
||||
import org.springframework.integration.dsl.context.IntegrationFlowContext;
|
||||
import org.springframework.integration.kafka.dsl.Kafka;
|
||||
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
@@ -43,6 +46,7 @@ import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.KafkaNull;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -51,44 +55,54 @@ import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.PollableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 4.2
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableConfigurationProperties(KafkaAppProperties.class)
|
||||
public class Application {
|
||||
|
||||
@Value("${kafka.topic}")
|
||||
private String topic;
|
||||
|
||||
@Value("${kafka.messageKey}")
|
||||
private String messageKey;
|
||||
|
||||
@Value("${kafka.zookeeper.connect}")
|
||||
private String zookeeperConnect;
|
||||
@Autowired
|
||||
private KafkaAppProperties properties;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConfigurableApplicationContext context
|
||||
= new SpringApplicationBuilder(Application.class)
|
||||
.web(false)
|
||||
.run(args);
|
||||
.web(WebApplicationType.NONE)
|
||||
.run(args);
|
||||
context.getBean(Application.class).runDemo(context);
|
||||
context.close();
|
||||
}
|
||||
|
||||
private void runDemo(ConfigurableApplicationContext context) {
|
||||
MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class);
|
||||
System.out.println("Sending 10 messages...");
|
||||
Map<String, Object> headers = Collections.singletonMap(KafkaHeaders.TOPIC, this.properties.getTopic());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
toKafka.send(new GenericMessage<>("foo" + i));
|
||||
toKafka.send(new GenericMessage<>("foo" + i, headers));
|
||||
}
|
||||
toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE));
|
||||
PollableChannel fromKafka = context.getBean("received", PollableChannel.class);
|
||||
System.out.println("Sending a null message...");
|
||||
toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE, headers));
|
||||
PollableChannel fromKafka = context.getBean("fromKafka", PollableChannel.class);
|
||||
Message<?> received = fromKafka.receive(10000);
|
||||
int count = 0;
|
||||
while (received != null) {
|
||||
System.out.println(received);
|
||||
received = fromKafka.receive(10000);
|
||||
received = fromKafka.receive(++count < 11 ? 10000 : 1000);
|
||||
}
|
||||
System.out.println("Adding an adapter for a second topic and sending 10 messages...");
|
||||
addAnotherListenerForTopics(this.properties.getNewTopic());
|
||||
headers = Collections.singletonMap(KafkaHeaders.TOPIC, this.properties.getNewTopic());
|
||||
for (int i = 0; i < 10; i++) {
|
||||
toKafka.send(new GenericMessage<>("foo" + i, headers));
|
||||
}
|
||||
received = fromKafka.receive(10000);
|
||||
count = 0;
|
||||
while (received != null) {
|
||||
System.out.println(received);
|
||||
received = fromKafka.receive(++count < 10 ? 10000 : 1000);
|
||||
}
|
||||
context.close();
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -103,8 +117,7 @@ public class Application {
|
||||
public MessageHandler handler(KafkaTemplate<String, String> kafkaTemplate) {
|
||||
KafkaProducerMessageHandler<String, String> handler =
|
||||
new KafkaProducerMessageHandler<>(kafkaTemplate);
|
||||
handler.setTopicExpression(new LiteralExpression(this.topic));
|
||||
handler.setMessageKeyExpression(new LiteralExpression(this.messageKey));
|
||||
handler.setMessageKeyExpression(new LiteralExpression(this.properties.getMessageKey()));
|
||||
return handler;
|
||||
}
|
||||
|
||||
@@ -120,7 +133,7 @@ public class Application {
|
||||
public KafkaMessageListenerContainer<String, String> container(
|
||||
ConsumerFactory<String, String> kafkaConsumerFactory) {
|
||||
return new KafkaMessageListenerContainer<>(kafkaConsumerFactory,
|
||||
new ContainerProperties(new TopicPartitionInitialOffset(this.topic, 0)));
|
||||
new ContainerProperties(new TopicPartitionInitialOffset(this.properties.getTopic(), 0)));
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -128,70 +141,33 @@ public class Application {
|
||||
adapter(KafkaMessageListenerContainer<String, String> container) {
|
||||
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
|
||||
new KafkaMessageDrivenChannelAdapter<>(container);
|
||||
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
|
||||
kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka());
|
||||
return kafkaMessageDrivenChannelAdapter;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PollableChannel received() {
|
||||
public PollableChannel fromKafka() {
|
||||
return new QueueChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TopicCreator topicCreator() {
|
||||
return new TopicCreator(this.topic, this.zookeeperConnect);
|
||||
}
|
||||
@Autowired
|
||||
private IntegrationFlowContext flowContext;
|
||||
|
||||
public static class TopicCreator implements SmartLifecycle {
|
||||
|
||||
private final String topic;
|
||||
|
||||
private final String zkConnect;
|
||||
|
||||
private volatile boolean running;
|
||||
|
||||
public TopicCreator(String topic, String zkConnect) {
|
||||
this.topic = topic;
|
||||
this.zkConnect = zkConnect;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
ZkUtils zkUtils = new ZkUtils(new ZkClient(this.zkConnect, 6000, 6000,
|
||||
ZKStringSerializer$.MODULE$), null, false);
|
||||
try {
|
||||
AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), null);
|
||||
}
|
||||
catch (TopicExistsException e) {
|
||||
// no-op
|
||||
}
|
||||
this.running = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
return Integer.MIN_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
callback.run();
|
||||
}
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
public void addAnotherListenerForTopics(String... topics) {
|
||||
Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
|
||||
// change the group id so we don't revoke the other partitions.
|
||||
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
|
||||
consumerProperties.get(ConsumerConfig.GROUP_ID_CONFIG) + "x");
|
||||
IntegrationFlow flow =
|
||||
IntegrationFlows
|
||||
.from(Kafka.messageDrivenChannelAdapter(
|
||||
new DefaultKafkaConsumerFactory<String, String>(consumerProperties), topics))
|
||||
.channel("fromKafka")
|
||||
.get();
|
||||
this.flowContext.registration(flow).register();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.samples.kafka;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Properties for the kafka sample app.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 5.0
|
||||
*
|
||||
*/
|
||||
@ConfigurationProperties("kafka")
|
||||
public class KafkaAppProperties {
|
||||
|
||||
private String topic;
|
||||
|
||||
private String newTopic;
|
||||
|
||||
private String messageKey;
|
||||
|
||||
public String getTopic() {
|
||||
return this.topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public String getNewTopic() {
|
||||
return this.newTopic;
|
||||
}
|
||||
|
||||
public void setNewTopic(String newTopic) {
|
||||
this.newTopic = newTopic;
|
||||
}
|
||||
|
||||
public String getMessageKey() {
|
||||
return this.messageKey;
|
||||
}
|
||||
|
||||
public void setMessageKey(String messageKey) {
|
||||
this.messageKey = messageKey;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,14 +1,13 @@
|
||||
kafka:
|
||||
zookeeper:
|
||||
connect: localhost:2181
|
||||
topic: si.topic
|
||||
newTopic: si.new.topic
|
||||
messageKey: si.key
|
||||
spring:
|
||||
kafka:
|
||||
consumer:
|
||||
group-id: siTestGroup
|
||||
enable-auto-commit: true
|
||||
auto-commit-interval: 100
|
||||
auto-offset-reset: earliest
|
||||
enable-auto-commit: false
|
||||
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
producer:
|
||||
|
||||
@@ -198,10 +198,10 @@ subprojects { subproject ->
|
||||
postgresVersion = '9.1-901-1.jdbc4'
|
||||
subethasmtpVersion = '1.2'
|
||||
slf4jVersion = '1.7.11'
|
||||
springIntegrationVersion = '5.0.0.M1'
|
||||
springIntegrationVersion = '5.0.0.BUILD-SNAPSHOT'
|
||||
springIntegrationKafkaVersion = '3.0.0.BUILD-SNAPSHOT'
|
||||
springIntegrationSplunkVersion = '1.1.0.RELEASE'
|
||||
springKafkaVersion = '1.1.2.BUILD-SNAPSHOT'
|
||||
springKafkaVersion = '2.0.0.BUILD-SNAPSHOT'
|
||||
springVersion = '5.0.0.BUILD-SNAPSHOT'
|
||||
springSecurityVersion = '4.2.0.RELEASE'
|
||||
springWebFlowVersion = '2.3.3.RELEASE'
|
||||
@@ -611,9 +611,6 @@ project('kafka') {
|
||||
}
|
||||
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
|
||||
compile "org.springframework.kafka:spring-kafka:$springKafkaVersion"
|
||||
compile ("org.springframework.kafka:spring-kafka-test:$springKafkaVersion") {
|
||||
exclude group: 'org.slf4j'
|
||||
}
|
||||
|
||||
compile "log4j:log4j:$log4jVersion"
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>1.1.2.BUILD-SNAPSHOT</version>
|
||||
<version>2.0.0.BUILD-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
||||
Reference in New Issue
Block a user