diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index f36fbdd7..4afd5058 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -50,15 +50,23 @@ import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.MethodIntrospector; import org.springframework.core.Ordered; import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.kafka.config.KafkaListenerConfigUtils; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; +import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.TopicPartitionInitialOffset; +import org.springframework.messaging.converter.GenericMessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; +import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver; +import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; +import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver; +import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver; +import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; @@ -87,6 +95,7 @@ import org.springframework.util.StringUtils; * @author Juergen Hoeller * @author Gary Russell * @author Artem Bilan + * @author Dariusz Szablinski * * @see KafkaListener * @see EnableKafka @@ -626,6 +635,32 @@ public class KafkaListenerAnnotationBeanPostProcessor private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory(); defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory); + + ConfigurableBeanFactory cbf = + (KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ? + (ConfigurableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory : null); + + DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService(); + defaultFactory.setConversionService(conversionService); + + List argumentResolvers = new ArrayList<>(); + + // Annotation-based argument resolution + argumentResolvers.add(new HeaderMethodArgumentResolver(conversionService, cbf)); + argumentResolvers.add(new HeadersMethodArgumentResolver()); + + // Type-based argument resolution + argumentResolvers.add(new MessageMethodArgumentResolver()); + argumentResolvers.add(new PayloadArgumentResolver(new GenericMessageConverter(conversionService)) { + + @Override + protected boolean isEmptyPayload(Object payload) { + return payload == null || payload instanceof KafkaNull; + } + + }); + defaultFactory.setArgumentResolvers(argumentResolvers); + defaultFactory.afterPropertiesSet(); return defaultFactory; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaNull.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaNull.java new file mode 100644 index 00000000..56f0aea1 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaNull.java @@ -0,0 +1,35 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +/** + * This class represents NULL Kafka payload. + * + * @author Dariusz Szablinski + * @since 1.0.3 + */ +public final class KafkaNull { + + /** + * Instance of KafkaNull. + */ + public final static KafkaNull INSTANCE = new KafkaNull(); + + private KafkaNull() { + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index 35e7d0f0..1c06cf62 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -35,6 +36,7 @@ import org.springframework.messaging.support.MessageBuilder; * * @author Marius Bogoevici * @author Gary Russell + * @author Dariusz Szablinski */ public class MessagingMessageConverter implements MessageConverter { @@ -105,7 +107,7 @@ public class MessagingMessageConverter implements MessageConverter { * @return the value. */ protected Object extractAndConvertValue(ConsumerRecord record, Type type) { - return record.value(); + return record.value() == null ? KafkaNull.INSTANCE : record.value(); } @SuppressWarnings("serial") diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java index 1ccd5074..833584b7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java @@ -21,6 +21,7 @@ import java.lang.reflect.Type; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -36,7 +37,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory; * * @author Gary Russell * @author Artem Bilan - * + * @author Dariusz Szablinski */ public class StringJsonMessageConverter extends MessagingMessageConverter { @@ -66,8 +67,12 @@ public class StringJsonMessageConverter extends MessagingMessageConverter { @Override protected Object extractAndConvertValue(ConsumerRecord record, Type type) { - JavaType javaType = TypeFactory.defaultInstance().constructType(type); Object value = record.value(); + if (record.value() == null) { + return KafkaNull.INSTANCE; + } + + JavaType javaType = TypeFactory.defaultInstance().constructType(type); if (value instanceof String) { try { return this.objectMapper.readValue((String) value, javaType); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 28240c46..042adbdd 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -78,7 +78,7 @@ import org.springframework.transaction.annotation.Transactional; /** * @author Gary Russell * @author Artem Bilan - * + * @author Dariusz Szablinski */ @ContextConfiguration @RunWith(SpringJUnit4ClassRunner.class) @@ -91,7 +91,7 @@ public class EnableKafkaIntegrationTests { @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "annotated1", "annotated2", "annotated3", "annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated9", "annotated10", - "annotated11"); + "annotated11", "annotated12", "annotated13"); @Autowired public IfaceListenerImpl ifaceListener; @@ -224,6 +224,18 @@ public class EnableKafkaIntegrationTests { assertThat(this.listener.foo.getBar()).isEqualTo("bar"); } + @Test + public void testNulls() throws Exception { + template.send("annotated12", null, null); + assertThat(this.listener.latch8.await(60, TimeUnit.SECONDS)).isTrue(); + } + + @Test + public void testEmpty() throws Exception { + template.send("annotated13", null, ""); + assertThat(this.listener.latch9.await(60, TimeUnit.SECONDS)).isTrue(); + } + @Configuration @EnableKafka @EnableTransactionManagement(proxyTargetClass = true) @@ -406,6 +418,10 @@ public class EnableKafkaIntegrationTests { private final CountDownLatch latch7 = new CountDownLatch(1); + private final CountDownLatch latch8 = new CountDownLatch(1); + + private final CountDownLatch latch9 = new CountDownLatch(1); + private final CountDownLatch eventLatch = new CountDownLatch(1); private volatile Integer partition; @@ -486,6 +502,18 @@ public class EnableKafkaIntegrationTests { this.latch7.countDown(); } + @KafkaListener(id = "quux", topics = "annotated12") + public void listen8(@Payload(required = false) String none) { + assertThat(none).isNull(); + this.latch8.countDown(); + } + + @KafkaListener(id = "corge", topics = "annotated13") + public void listen9(Object payload) { + assertThat(payload).isNotNull(); + this.latch9.countDown(); + } + } interface IfaceListener {