GH-160: Add support for null payload from Kafka
Fixes GH-160 (https://github.com/spring-projects/spring-kafka/issues/160)
* Add `KafkaNull`
* Override `PayloadArgumentResolver#isEmptyPayload` to support new `KafkaNull`
Polishing code style.
Add commit author to the `@author` list
(cherry picked from commit 0f0ebed)
This commit is contained in:
committed by
Artem Bilan
parent
9e0e248ba5
commit
d295c9f71d
@@ -50,15 +50,23 @@ import org.springframework.context.expression.StandardBeanExpressionResolver;
|
||||
import org.springframework.core.MethodIntrospector;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.format.support.DefaultFormattingConversionService;
|
||||
import org.springframework.kafka.config.KafkaListenerConfigUtils;
|
||||
import org.springframework.kafka.config.KafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
|
||||
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
|
||||
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
|
||||
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
|
||||
import org.springframework.kafka.support.KafkaNull;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.messaging.converter.GenericMessageConverter;
|
||||
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
|
||||
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
|
||||
import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
|
||||
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
|
||||
import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
|
||||
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
|
||||
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
|
||||
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
@@ -87,6 +95,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Juergen Hoeller
|
||||
* @author Gary Russell
|
||||
* @author Artem Bilan
|
||||
* @author Dariusz Szablinski
|
||||
*
|
||||
* @see KafkaListener
|
||||
* @see EnableKafka
|
||||
@@ -626,6 +635,32 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
|
||||
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
|
||||
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
|
||||
defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
|
||||
|
||||
ConfigurableBeanFactory cbf =
|
||||
(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
|
||||
(ConfigurableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory : null);
|
||||
|
||||
DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService();
|
||||
defaultFactory.setConversionService(conversionService);
|
||||
|
||||
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
|
||||
|
||||
// Annotation-based argument resolution
|
||||
argumentResolvers.add(new HeaderMethodArgumentResolver(conversionService, cbf));
|
||||
argumentResolvers.add(new HeadersMethodArgumentResolver());
|
||||
|
||||
// Type-based argument resolution
|
||||
argumentResolvers.add(new MessageMethodArgumentResolver());
|
||||
argumentResolvers.add(new PayloadArgumentResolver(new GenericMessageConverter(conversionService)) {
|
||||
|
||||
@Override
|
||||
protected boolean isEmptyPayload(Object payload) {
|
||||
return payload == null || payload instanceof KafkaNull;
|
||||
}
|
||||
|
||||
});
|
||||
defaultFactory.setArgumentResolvers(argumentResolvers);
|
||||
|
||||
defaultFactory.afterPropertiesSet();
|
||||
return defaultFactory;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.kafka.support;
|
||||
|
||||
/**
|
||||
* This class represents NULL Kafka payload.
|
||||
*
|
||||
* @author Dariusz Szablinski
|
||||
* @since 1.0.3
|
||||
*/
|
||||
public final class KafkaNull {
|
||||
|
||||
/**
|
||||
* Instance of KafkaNull.
|
||||
*/
|
||||
public final static KafkaNull INSTANCE = new KafkaNull();
|
||||
|
||||
private KafkaNull() {
|
||||
}
|
||||
|
||||
}
|
||||
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.KafkaNull;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
@@ -35,6 +36,7 @@ import org.springframework.messaging.support.MessageBuilder;
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Gary Russell
|
||||
* @author Dariusz Szablinski
|
||||
*/
|
||||
public class MessagingMessageConverter implements MessageConverter {
|
||||
|
||||
@@ -105,7 +107,7 @@ public class MessagingMessageConverter implements MessageConverter {
|
||||
* @return the value.
|
||||
*/
|
||||
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
|
||||
return record.value();
|
||||
return record.value() == null ? KafkaNull.INSTANCE : record.value();
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.lang.reflect.Type;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
||||
import org.springframework.kafka.support.KafkaNull;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
@@ -36,7 +37,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @author Artem Bilan
|
||||
*
|
||||
* @author Dariusz Szablinski
|
||||
*/
|
||||
public class StringJsonMessageConverter extends MessagingMessageConverter {
|
||||
|
||||
@@ -66,8 +67,12 @@ public class StringJsonMessageConverter extends MessagingMessageConverter {
|
||||
|
||||
@Override
|
||||
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
|
||||
JavaType javaType = TypeFactory.defaultInstance().constructType(type);
|
||||
Object value = record.value();
|
||||
if (record.value() == null) {
|
||||
return KafkaNull.INSTANCE;
|
||||
}
|
||||
|
||||
JavaType javaType = TypeFactory.defaultInstance().constructType(type);
|
||||
if (value instanceof String) {
|
||||
try {
|
||||
return this.objectMapper.readValue((String) value, javaType);
|
||||
|
||||
@@ -78,7 +78,7 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @author Artem Bilan
|
||||
*
|
||||
* @author Dariusz Szablinski
|
||||
*/
|
||||
@ContextConfiguration
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@@ -91,7 +91,7 @@ public class EnableKafkaIntegrationTests {
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "annotated1", "annotated2", "annotated3",
|
||||
"annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated9", "annotated10",
|
||||
"annotated11");
|
||||
"annotated11", "annotated12", "annotated13");
|
||||
|
||||
@Autowired
|
||||
public IfaceListenerImpl ifaceListener;
|
||||
@@ -224,6 +224,18 @@ public class EnableKafkaIntegrationTests {
|
||||
assertThat(this.listener.foo.getBar()).isEqualTo("bar");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNulls() throws Exception {
|
||||
template.send("annotated12", null, null);
|
||||
assertThat(this.listener.latch8.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmpty() throws Exception {
|
||||
template.send("annotated13", null, "");
|
||||
assertThat(this.listener.latch9.await(60, TimeUnit.SECONDS)).isTrue();
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableKafka
|
||||
@EnableTransactionManagement(proxyTargetClass = true)
|
||||
@@ -406,6 +418,10 @@ public class EnableKafkaIntegrationTests {
|
||||
|
||||
private final CountDownLatch latch7 = new CountDownLatch(1);
|
||||
|
||||
private final CountDownLatch latch8 = new CountDownLatch(1);
|
||||
|
||||
private final CountDownLatch latch9 = new CountDownLatch(1);
|
||||
|
||||
private final CountDownLatch eventLatch = new CountDownLatch(1);
|
||||
|
||||
private volatile Integer partition;
|
||||
@@ -486,6 +502,18 @@ public class EnableKafkaIntegrationTests {
|
||||
this.latch7.countDown();
|
||||
}
|
||||
|
||||
@KafkaListener(id = "quux", topics = "annotated12")
|
||||
public void listen8(@Payload(required = false) String none) {
|
||||
assertThat(none).isNull();
|
||||
this.latch8.countDown();
|
||||
}
|
||||
|
||||
@KafkaListener(id = "corge", topics = "annotated13")
|
||||
public void listen9(Object payload) {
|
||||
assertThat(payload).isNotNull();
|
||||
this.latch9.countDown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
interface IfaceListener<T> {
|
||||
|
||||
Reference in New Issue
Block a user