diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java index 0706ffc4..bad15f03 100644 --- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package org.springframework.integration.samples.barrier; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.boot.Banner.Mode; import org.springframework.boot.SpringApplication; +import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; @@ -43,7 +44,7 @@ public class Application { ConfigurableApplicationContext client = new SpringApplicationBuilder("/META-INF/spring/integration/client-context.xml") - .web(false) + .web(WebApplicationType.NONE) .bannerMode(Mode.OFF) .run(args); RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class); diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java index 4abc2bad..e68fb1ea 100644 --- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.integration.samples.barrier2; import java.util.Arrays; +import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; @@ -33,7 +34,7 @@ public class ErrorHandlingApplication { public static void main(String[] args) throws Exception { ConfigurableApplicationContext test = new SpringApplicationBuilder(ErrorHandlingApplication.class) - .web(false) + .web(WebApplicationType.NONE) .run(args); Gateway gateway = test.getBean(Gateway.class); try { diff --git a/basic/jpa/pom.xml b/basic/jpa/pom.xml index edafd6cc..fd8d6afd 100644 --- a/basic/jpa/pom.xml +++ b/basic/jpa/pom.xml @@ -62,6 +62,26 @@ spring-boot-starter-data-jpa compile + + jboss-interceptors-api_1.1_spec + org.jboss.spec.javax.interceptor + + + jsr250-api + javax.annotation + + + cdi-api + javax.enterprise + + + el-api + javax.el + + + javax.inject + javax.inject + commons-logging commons-logging @@ -74,6 +94,26 @@ 5.0.0.BUILD-SNAPSHOT compile + + jboss-interceptors-api_1.1_spec + org.jboss.spec.javax.interceptor + + + jsr250-api + javax.annotation + + + cdi-api + javax.enterprise + + + el-api + javax.el + + + javax.inject + javax.inject + commons-logging commons-logging @@ -87,17 +127,25 @@ compile - commons-logging - commons-logging + jboss-interceptors-api_1.1_spec + org.jboss.spec.javax.interceptor + + + jsr250-api + javax.annotation + + + cdi-api + javax.enterprise + + + el-api + javax.el + + + javax.inject + javax.inject - - - - org.hibernate.javax.persistence - hibernate-jpa-2.1-api - 1.0.0.Final - compile - commons-logging commons-logging @@ -110,6 +158,26 @@ 5.0.0.BUILD-SNAPSHOT runtime + + jboss-interceptors-api_1.1_spec + org.jboss.spec.javax.interceptor + + + jsr250-api + javax.annotation + + + cdi-api + javax.enterprise + + + el-api + javax.el + + + javax.inject + javax.inject + commons-logging commons-logging @@ -117,11 +185,31 @@ - org.hibernate - hibernate-entitymanager - 5.0.9.Final + org.hibernate.javax.persistence + hibernate-jpa-2.1-api + 1.0.0.Final runtime + + jboss-interceptors-api_1.1_spec + org.jboss.spec.javax.interceptor + + + jsr250-api + javax.annotation + + + cdi-api + javax.enterprise + + + el-api + javax.el + + + javax.inject + javax.inject + commons-logging commons-logging @@ -134,6 +222,26 @@ 2.4.2 runtime + + jboss-interceptors-api_1.1_spec + org.jboss.spec.javax.interceptor + + + jsr250-api + javax.annotation + + + cdi-api + javax.enterprise + + + el-api + javax.el + + + javax.inject + javax.inject + commons-logging commons-logging diff --git a/basic/kafka/README.md b/basic/kafka/README.md index 550d0e44..426d3e56 100644 --- a/basic/kafka/README.md +++ b/basic/kafka/README.md @@ -7,7 +7,7 @@ It uses Java configuration for the adapters. ## Running the sample -Start Apache Zookeeper and Apache Kafka according to the documentation for the Apache Kafka project. +Start Apache Zookeeper and Apache Kafka according to the documentation for the Apache Kafka project. $ gradlew :kafka:run @@ -19,21 +19,36 @@ In STS (Eclipse), go to package **org.springframework.integration.samples.kafka* ### Output -The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic` (which is created if necessary). +The application sends 10 messages (`foo0` ... `foo9`) to a kafka topic `si.topic`; this must exist or the broker must be configured to auto-create topics. + +It then sends a message with a `KafkaNull` payload. + +It then dynamically creates a new inbound adapter for a different topic `si.new.topic` and sends 10 messages there. The message-driven adapter receives the messages and places them in a `QueueChannel` which the application reads and writes to stdout: - GenericMessage [payload=foo0, headers={kafka_offset=21, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=22}] - GenericMessage [payload=foo1, headers={kafka_offset=22, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=23}] - GenericMessage [payload=foo2, headers={kafka_offset=23, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=24}] - GenericMessage [payload=foo3, headers={kafka_offset=24, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=25}] - GenericMessage [payload=foo4, headers={kafka_offset=25, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=26}] - GenericMessage [payload=foo5, headers={kafka_offset=26, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=27}] - GenericMessage [payload=foo6, headers={kafka_offset=27, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=28}] - GenericMessage [payload=foo7, headers={kafka_offset=28, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=29}] - GenericMessage [payload=foo8, headers={kafka_offset=29, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=30}] - GenericMessage [payload=foo9, headers={kafka_offset=30, kafka_messageKey=si.key, kafka_topic=si.topic, kafka_partitionId=0, kafka_nextOffset=31}] + Sending 10 messages... + Sending a null message... + GenericMessage [payload=foo0, headers={kafka_offset=657, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo1, headers={kafka_offset=658, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo2, headers={kafka_offset=659, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo3, headers={kafka_offset=660, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo4, headers={kafka_offset=661, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo5, headers={kafka_offset=662, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo6, headers={kafka_offset=663, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo7, headers={kafka_offset=664, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=foo8, headers={kafka_offset=665, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + GenericMessage [payload=org.springframework.kafka.support.KafkaNull@4d9d1b69, headers={kafka_offset=667, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.topic}] + Adding an adapter for a second topic and sending 10 messages... + GenericMessage [payload=foo0, headers={kafka_offset=70, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=foo1, headers={kafka_offset=71, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=foo2, headers={kafka_offset=72, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=foo3, headers={kafka_offset=73, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=foo4, headers={kafka_offset=74, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=foo5, headers={kafka_offset=75, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=foo6, headers={kafka_offset=76, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] + GenericMessage [payload=foo7, headers={kafka_offset=77, kafka_receivedMessageKey=si.key, kafka_receivedPartitionId=0, kafka_receivedTopic=si.new.topic}] Notice that the offset header increases on each run (the topic is not removed, to demonstrate that the offset is retained between executions). diff --git a/basic/kafka/pom.xml b/basic/kafka/pom.xml index f1f888f0..48755eef 100644 --- a/basic/kafka/pom.xml +++ b/basic/kafka/pom.xml @@ -77,13 +77,7 @@ org.springframework.kafka spring-kafka - 1.1.2.BUILD-SNAPSHOT - compile - - - org.springframework.kafka - spring-kafka-test - 1.1.2.BUILD-SNAPSHOT + 2.0.0.BUILD-SNAPSHOT compile diff --git a/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java b/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java index c7a19ad5..d4b89861 100644 --- a/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java +++ b/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/Application.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,24 +16,27 @@ package org.springframework.integration.samples.kafka; +import java.util.Collections; import java.util.Map; -import java.util.Properties; -import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.errors.TopicExistsException; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.SmartLifecycle; import org.springframework.context.annotation.Bean; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.context.IntegrationFlowContext; +import org.springframework.integration.kafka.dsl.Kafka; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.kafka.core.ConsumerFactory; @@ -43,6 +46,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.messaging.Message; @@ -51,44 +55,54 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; -import kafka.admin.AdminUtils; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; - /** * @author Gary Russell * @since 4.2 */ @SpringBootApplication +@EnableConfigurationProperties(KafkaAppProperties.class) public class Application { - @Value("${kafka.topic}") - private String topic; - - @Value("${kafka.messageKey}") - private String messageKey; - - @Value("${kafka.zookeeper.connect}") - private String zookeeperConnect; + @Autowired + private KafkaAppProperties properties; public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class) - .web(false) - .run(args); + .web(WebApplicationType.NONE) + .run(args); + context.getBean(Application.class).runDemo(context); + context.close(); + } + + private void runDemo(ConfigurableApplicationContext context) { MessageChannel toKafka = context.getBean("toKafka", MessageChannel.class); + System.out.println("Sending 10 messages..."); + Map headers = Collections.singletonMap(KafkaHeaders.TOPIC, this.properties.getTopic()); for (int i = 0; i < 10; i++) { - toKafka.send(new GenericMessage<>("foo" + i)); + toKafka.send(new GenericMessage<>("foo" + i, headers)); } - toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE)); - PollableChannel fromKafka = context.getBean("received", PollableChannel.class); + System.out.println("Sending a null message..."); + toKafka.send(new GenericMessage<>(KafkaNull.INSTANCE, headers)); + PollableChannel fromKafka = context.getBean("fromKafka", PollableChannel.class); Message received = fromKafka.receive(10000); + int count = 0; while (received != null) { System.out.println(received); - received = fromKafka.receive(10000); + received = fromKafka.receive(++count < 11 ? 10000 : 1000); + } + System.out.println("Adding an adapter for a second topic and sending 10 messages..."); + addAnotherListenerForTopics(this.properties.getNewTopic()); + headers = Collections.singletonMap(KafkaHeaders.TOPIC, this.properties.getNewTopic()); + for (int i = 0; i < 10; i++) { + toKafka.send(new GenericMessage<>("foo" + i, headers)); + } + received = fromKafka.receive(10000); + count = 0; + while (received != null) { + System.out.println(received); + received = fromKafka.receive(++count < 10 ? 10000 : 1000); } - context.close(); - System.exit(0); } @Bean @@ -103,8 +117,7 @@ public class Application { public MessageHandler handler(KafkaTemplate kafkaTemplate) { KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler<>(kafkaTemplate); - handler.setTopicExpression(new LiteralExpression(this.topic)); - handler.setMessageKeyExpression(new LiteralExpression(this.messageKey)); + handler.setMessageKeyExpression(new LiteralExpression(this.properties.getMessageKey())); return handler; } @@ -120,7 +133,7 @@ public class Application { public KafkaMessageListenerContainer container( ConsumerFactory kafkaConsumerFactory) { return new KafkaMessageListenerContainer<>(kafkaConsumerFactory, - new ContainerProperties(new TopicPartitionInitialOffset(this.topic, 0))); + new ContainerProperties(new TopicPartitionInitialOffset(this.properties.getTopic(), 0))); } @Bean @@ -128,70 +141,33 @@ public class Application { adapter(KafkaMessageListenerContainer container) { KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(container); - kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); + kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka()); return kafkaMessageDrivenChannelAdapter; } @Bean - public PollableChannel received() { + public PollableChannel fromKafka() { return new QueueChannel(); } - @Bean - public TopicCreator topicCreator() { - return new TopicCreator(this.topic, this.zookeeperConnect); - } + @Autowired + private IntegrationFlowContext flowContext; - public static class TopicCreator implements SmartLifecycle { - - private final String topic; - - private final String zkConnect; - - private volatile boolean running; - - public TopicCreator(String topic, String zkConnect) { - this.topic = topic; - this.zkConnect = zkConnect; - } - - @Override - public void start() { - ZkUtils zkUtils = new ZkUtils(new ZkClient(this.zkConnect, 6000, 6000, - ZKStringSerializer$.MODULE$), null, false); - try { - AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), null); - } - catch (TopicExistsException e) { - // no-op - } - this.running = true; - } - - @Override - public void stop() { - } - - @Override - public boolean isRunning() { - return this.running; - } - - @Override - public int getPhase() { - return Integer.MIN_VALUE; - } - - @Override - public boolean isAutoStartup() { - return true; - } - - @Override - public void stop(Runnable callback) { - callback.run(); - } + @Autowired + private KafkaProperties kafkaProperties; + public void addAnotherListenerForTopics(String... topics) { + Map consumerProperties = kafkaProperties.buildConsumerProperties(); + // change the group id so we don't revoke the other partitions. + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, + consumerProperties.get(ConsumerConfig.GROUP_ID_CONFIG) + "x"); + IntegrationFlow flow = + IntegrationFlows + .from(Kafka.messageDrivenChannelAdapter( + new DefaultKafkaConsumerFactory(consumerProperties), topics)) + .channel("fromKafka") + .get(); + this.flowContext.registration(flow).register(); } } diff --git a/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/KafkaAppProperties.java b/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/KafkaAppProperties.java new file mode 100644 index 00000000..85f8f1ac --- /dev/null +++ b/basic/kafka/src/main/java/org/springframework/integration/samples/kafka/KafkaAppProperties.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.kafka; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Properties for the kafka sample app. + * + * @author Gary Russell + * @since 5.0 + * + */ +@ConfigurationProperties("kafka") +public class KafkaAppProperties { + + private String topic; + + private String newTopic; + + private String messageKey; + + public String getTopic() { + return this.topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getNewTopic() { + return this.newTopic; + } + + public void setNewTopic(String newTopic) { + this.newTopic = newTopic; + } + + public String getMessageKey() { + return this.messageKey; + } + + public void setMessageKey(String messageKey) { + this.messageKey = messageKey; + } + +} diff --git a/basic/kafka/src/main/resources/application.yml b/basic/kafka/src/main/resources/application.yml index 6bf3b6fb..91928553 100644 --- a/basic/kafka/src/main/resources/application.yml +++ b/basic/kafka/src/main/resources/application.yml @@ -1,14 +1,13 @@ kafka: - zookeeper: - connect: localhost:2181 topic: si.topic + newTopic: si.new.topic messageKey: si.key spring: kafka: consumer: group-id: siTestGroup - enable-auto-commit: true - auto-commit-interval: 100 + auto-offset-reset: earliest + enable-auto-commit: false value-deserializer: org.apache.kafka.common.serialization.StringDeserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: diff --git a/build.gradle b/build.gradle index c935ee16..e00f44ed 100644 --- a/build.gradle +++ b/build.gradle @@ -198,10 +198,10 @@ subprojects { subproject -> postgresVersion = '9.1-901-1.jdbc4' subethasmtpVersion = '1.2' slf4jVersion = '1.7.11' - springIntegrationVersion = '5.0.0.M1' + springIntegrationVersion = '5.0.0.BUILD-SNAPSHOT' springIntegrationKafkaVersion = '3.0.0.BUILD-SNAPSHOT' springIntegrationSplunkVersion = '1.1.0.RELEASE' - springKafkaVersion = '1.1.2.BUILD-SNAPSHOT' + springKafkaVersion = '2.0.0.BUILD-SNAPSHOT' springVersion = '5.0.0.BUILD-SNAPSHOT' springSecurityVersion = '4.2.0.RELEASE' springWebFlowVersion = '2.3.3.RELEASE' @@ -611,9 +611,6 @@ project('kafka') { } compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" compile "org.springframework.kafka:spring-kafka:$springKafkaVersion" - compile ("org.springframework.kafka:spring-kafka-test:$springKafkaVersion") { - exclude group: 'org.slf4j' - } compile "log4j:log4j:$log4jVersion" diff --git a/dsl/kafka-dsl/pom.xml b/dsl/kafka-dsl/pom.xml index 72b4c5b3..4025da73 100644 --- a/dsl/kafka-dsl/pom.xml +++ b/dsl/kafka-dsl/pom.xml @@ -77,7 +77,7 @@ org.springframework.kafka spring-kafka-test - 1.1.2.BUILD-SNAPSHOT + 2.0.0.BUILD-SNAPSHOT compile