GH-160: Add support for null payload from Kafka

Fixes GH-160 (https://github.com/spring-projects/spring-kafka/issues/160)
  * Add `KafkaNull`
  * Override `PayloadArgumentResolver#isEmptyPayload` to support new `KafkaNull`

Polishing code style.
Add commit author to the `@author` list
This commit is contained in:
Dariusz Szablinski
2016-08-08 23:28:35 +02:00
committed by Artem Bilan
parent ed6523be06
commit 0f0ebed1d8
5 changed files with 110 additions and 5 deletions

View File

@@ -50,15 +50,23 @@ import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
@@ -87,6 +95,7 @@ import org.springframework.util.StringUtils;
* @author Juergen Hoeller
* @author Gary Russell
* @author Artem Bilan
* @author Dariusz Szablinski
*
* @see KafkaListener
* @see EnableKafka
@@ -612,6 +621,32 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
ConfigurableBeanFactory cbf =
(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
(ConfigurableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory : null);
DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService();
defaultFactory.setConversionService(conversionService);
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
// Annotation-based argument resolution
argumentResolvers.add(new HeaderMethodArgumentResolver(conversionService, cbf));
argumentResolvers.add(new HeadersMethodArgumentResolver());
// Type-based argument resolution
argumentResolvers.add(new MessageMethodArgumentResolver());
argumentResolvers.add(new PayloadArgumentResolver(new GenericMessageConverter(conversionService)) {
@Override
protected boolean isEmptyPayload(Object payload) {
return payload == null || payload instanceof KafkaNull;
}
});
defaultFactory.setArgumentResolvers(argumentResolvers);
defaultFactory.afterPropertiesSet();
return defaultFactory;
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.support;
/**
* This class represents NULL Kafka payload.
*
* @author Dariusz Szablinski
* @since 1.0.3
*/
public final class KafkaNull {
/**
* Instance of KafkaNull.
*/
public final static KafkaNull INSTANCE = new KafkaNull();
private KafkaNull() {
}
}

View File

@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@@ -35,6 +36,7 @@ import org.springframework.messaging.support.MessageBuilder;
*
* @author Marius Bogoevici
* @author Gary Russell
* @author Dariusz Szablinski
*/
public class MessagingMessageConverter implements MessageConverter {
@@ -105,7 +107,7 @@ public class MessagingMessageConverter implements MessageConverter {
* @return the value.
*/
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
return record.value();
return record.value() == null ? KafkaNull.INSTANCE : record.value();
}
@SuppressWarnings("serial")

View File

@@ -21,6 +21,7 @@ import java.lang.reflect.Type;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
@@ -36,7 +37,7 @@ import com.fasterxml.jackson.databind.type.TypeFactory;
*
* @author Gary Russell
* @author Artem Bilan
*
* @author Dariusz Szablinski
*/
public class StringJsonMessageConverter extends MessagingMessageConverter {
@@ -66,8 +67,12 @@ public class StringJsonMessageConverter extends MessagingMessageConverter {
@Override
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
JavaType javaType = TypeFactory.defaultInstance().constructType(type);
Object value = record.value();
if (record.value() == null) {
return KafkaNull.INSTANCE;
}
JavaType javaType = TypeFactory.defaultInstance().constructType(type);
if (value instanceof String) {
try {
return this.objectMapper.readValue((String) value, javaType);

View File

@@ -76,7 +76,7 @@ import org.springframework.transaction.annotation.Transactional;
/**
* @author Gary Russell
* @author Artem Bilan
*
* @author Dariusz Szablinski
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@@ -89,7 +89,7 @@ public class EnableKafkaIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "annotated1", "annotated2", "annotated3",
"annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated9", "annotated10",
"annotated11");
"annotated11", "annotated12", "annotated13");
@Autowired
public IfaceListenerImpl ifaceListener;
@@ -212,6 +212,18 @@ public class EnableKafkaIntegrationTests {
assertThat(this.listener.foo.getBar()).isEqualTo("bar");
}
@Test
public void testNulls() throws Exception {
template.send("annotated12", null, null);
assertThat(this.listener.latch8.await(60, TimeUnit.SECONDS)).isTrue();
}
@Test
public void testEmpty() throws Exception {
template.send("annotated13", null, "");
assertThat(this.listener.latch9.await(60, TimeUnit.SECONDS)).isTrue();
}
@Configuration
@EnableKafka
@EnableTransactionManagement(proxyTargetClass = true)
@@ -394,6 +406,10 @@ public class EnableKafkaIntegrationTests {
private final CountDownLatch latch7 = new CountDownLatch(1);
private final CountDownLatch latch8 = new CountDownLatch(1);
private final CountDownLatch latch9 = new CountDownLatch(1);
private final CountDownLatch eventLatch = new CountDownLatch(1);
private volatile Integer partition;
@@ -473,6 +489,18 @@ public class EnableKafkaIntegrationTests {
this.latch7.countDown();
}
@KafkaListener(id = "quux", topics = "annotated12")
public void listen8(@Payload(required = false) String none) {
assertThat(none).isNull();
this.latch8.countDown();
}
@KafkaListener(id = "corge", topics = "annotated13")
public void listen9(Object payload) {
assertThat(payload).isNotNull();
this.latch9.countDown();
}
}
interface IfaceListener<T> {