From 2df9f9b4ee30dff6aad7773d8d033111c1a5348d Mon Sep 17 00:00:00 2001 From: Ilayaperumal Gopinathan Date: Tue, 7 Feb 2017 12:13:56 +0530 Subject: [PATCH] Update PartitionHandler partition properties during initialization - The PartitionHandler has per-binding partition properties derived from the producer properties for the binding - These partition properties (especially PartitionKeyExtractorStrategy and PartitionSelectorStrategy) need not be initialized when the messages are processed; instead they can be set during the PartitionHandler instantiation - Add tests This resolves #772 Refactor PartitionHandler --- .../cloud/stream/binder/PartitionHandler.java | 102 ++++-------------- .../binding/MessageConverterConfigurer.java | 86 +++++++++++++-- .../CustomPartitionedProducerTest.java | 95 ++++++++++++++++ .../CustomPartitionKeyExtractorClass.java | 31 ++++++ .../CustomPartitionSelectorClass.java | 30 ++++++ .../partitioning/PartitionedProducerTest.java | 5 +- ...ustom-partitioned-producer-test.properties | 4 + 7 files changed, 257 insertions(+), 96 deletions(-) create mode 100644 spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/CustomPartitionedProducerTest.java create mode 100644 spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionKeyExtractorClass.java create mode 100644 spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionSelectorClass.java create mode 100644 spring-cloud-stream/src/test/resources/org/springframework/cloud/stream/binder/custom-partitioned-producer-test.properties diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/PartitionHandler.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/PartitionHandler.java index 1d7078b02..49b7d25d5 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/PartitionHandler.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/PartitionHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,9 @@ package org.springframework.cloud.stream.binder; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.expression.EvaluationContext; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; /** * Utility class to determine if a binding is configured for partitioning @@ -36,34 +34,30 @@ import org.springframework.util.ClassUtils; */ public class PartitionHandler { - private final ConfigurableListableBeanFactory beanFactory; - private final EvaluationContext evaluationContext; - private final PartitionSelectorStrategy partitionSelector; - private final ProducerProperties producerProperties; + private final PartitionKeyExtractorStrategy partitionKeyExtractorStrategy; + + private final PartitionSelectorStrategy partitionSelectorStrategy; /** * Construct a {@code PartitionHandler}. * - * @param beanFactory bean factory for binder - * @param evaluationContext evaluation context for binder - * @param partitionSelector configured partition selector; may be {@code null} - * @param properties binder properties + * @param evaluationContext evaluation context for binder + * @param properties binder properties + * @param partitionKeyExtractorStrategy PartitionKeyExtractor strategy + * @param partitionSelectorStrategy PartitionSelector strategy */ - public PartitionHandler(ConfigurableListableBeanFactory beanFactory, - EvaluationContext evaluationContext, - PartitionSelectorStrategy partitionSelector, - ProducerProperties properties) { - Assert.notNull(beanFactory, "BeanFactory must not be null"); - this.beanFactory = beanFactory; + public PartitionHandler(EvaluationContext evaluationContext, + ProducerProperties properties, + PartitionKeyExtractorStrategy partitionKeyExtractorStrategy, + PartitionSelectorStrategy partitionSelectorStrategy) { this.evaluationContext = evaluationContext; - this.partitionSelector = partitionSelector == null - ? new DefaultPartitionSelector() - : partitionSelector; this.producerProperties = properties; + this.partitionKeyExtractorStrategy = partitionKeyExtractorStrategy; + this.partitionSelectorStrategy = partitionSelectorStrategy; } /** @@ -88,15 +82,12 @@ public class PartitionHandler { Object key = extractKey(message); int partition; - if (this.producerProperties.getPartitionSelectorClass() != null) { - partition = invokePartitionSelector(key); - } - else if (this.producerProperties.getPartitionSelectorExpression() != null) { + if (this.producerProperties.getPartitionSelectorExpression() != null) { partition = this.producerProperties.getPartitionSelectorExpression().getValue( this.evaluationContext, key, Integer.class); } else { - partition = this.partitionSelector.selectPartition(key, producerProperties.getPartitionCount()); + partition = this.partitionSelectorStrategy.selectPartition(key, producerProperties.getPartitionCount()); } // protection in case a user selector returns a negative. return Math.abs(partition % producerProperties.getPartitionCount()); @@ -116,68 +107,11 @@ public class PartitionHandler { } private Object invokeKeyExtractor(Message message) { - PartitionKeyExtractorStrategy strategy = getBean( - producerProperties.getPartitionKeyExtractorClass().getName(), - PartitionKeyExtractorStrategy.class); - return strategy.extractKey(message); + return this.partitionKeyExtractorStrategy.extractKey(message); } private int invokePartitionSelector(Object key) { - PartitionSelectorStrategy strategy = getBean( - producerProperties.getPartitionSelectorClass().getName(), - PartitionSelectorStrategy.class); - return strategy.selectPartition(key, producerProperties.getPartitionCount()); - } - - @SuppressWarnings("unchecked") - private T getBean(String className, Class type) { - if (this.beanFactory.containsBean(className)) { - return this.beanFactory.getBean(className, type); - } - else { - synchronized (this) { - T bean; - if (this.beanFactory.containsBean(className)) { - bean = this.beanFactory.getBean(className, type); - } - else { - Class clazz; - try { - clazz = ClassUtils.forName(className, this.beanFactory.getBeanClassLoader()); - } - catch (Exception e) { - throw new BinderException("Failed to load class: " + className, e); - } - try { - bean = (T) clazz.newInstance(); - Assert.isInstanceOf(type, bean); - this.beanFactory.registerSingleton(className, bean); - this.beanFactory.initializeBean(bean, className); - } - catch (Exception e) { - throw new BinderException("Failed to instantiate class: " + className, e); - } - } - return bean; - } - } - } - - /** - * Default partition strategy; only works on keys with "real" hash codes, - * such as String. Caller now always applies modulo so no need to do so here. - */ - private static class DefaultPartitionSelector implements PartitionSelectorStrategy { - - @Override - public int selectPartition(Object key, int partitionCount) { - int hashCode = key.hashCode(); - if (hashCode == Integer.MIN_VALUE) { - hashCode = 0; - } - return Math.abs(hashCode); - } - + return this.partitionSelectorStrategy.selectPartition(key, producerProperties.getPartitionCount()); } } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java index a8c3f4b92..735863552 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageConverterConfigurer.java @@ -22,8 +22,12 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.cloud.stream.binder.BinderException; import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.PartitionHandler; +import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy; +import org.springframework.cloud.stream.binder.PartitionSelectorStrategy; +import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; @@ -42,6 +46,7 @@ import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.ChannelInterceptorAdapter; import org.springframework.tuple.Tuple; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.MimeType; import org.springframework.util.StringUtils; @@ -95,6 +100,7 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea /** * Setup data-type and message converters for the given message channel. + * * @param channel message channel to set the data-type and message converters * @param channelName the channel name */ @@ -104,14 +110,62 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea final BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties( channelName); final String contentType = bindingProperties.getContentType(); - if (!input && bindingProperties.getProducer() != null && bindingProperties.getProducer().isPartitioned()) { - messageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties)); + ProducerProperties producerProperties = bindingProperties.getProducer(); + if (!input && producerProperties != null && producerProperties.isPartitioned()) { + messageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties, + getPartitionKeyExtractorStrategy(producerProperties), getPartitionSelectorStrategy(producerProperties))); } if (StringUtils.hasText(contentType)) { messageChannel.addInterceptor(new ContentTypeConvertingInterceptor(contentType, input)); } } + private PartitionKeyExtractorStrategy getPartitionKeyExtractorStrategy(ProducerProperties producerProperties) { + if (producerProperties.getPartitionKeyExtractorClass() != null) { + return getBean( + producerProperties.getPartitionKeyExtractorClass().getName(), + PartitionKeyExtractorStrategy.class); + } + return null; + } + + private PartitionSelectorStrategy getPartitionSelectorStrategy(ProducerProperties producerProperties) { + if (producerProperties.getPartitionSelectorClass() != null) { + return getBean( + producerProperties.getPartitionSelectorClass().getName(), + PartitionSelectorStrategy.class); + } + return new DefaultPartitionSelector(); + } + + @SuppressWarnings("unchecked") + private T getBean(String className, Class type) { + if (this.beanFactory.containsBean(className)) { + return this.beanFactory.getBean(className, type); + } + else { + synchronized (this) { + T bean; + Class clazz; + try { + clazz = ClassUtils.forName(className, this.beanFactory.getBeanClassLoader()); + } + catch (Exception e) { + throw new BinderException("Failed to load class: " + className, e); + } + try { + bean = (T) clazz.newInstance(); + Assert.isInstanceOf(type, bean); + this.beanFactory.registerSingleton(className, bean); + this.beanFactory.initializeBean(bean, className); + } + catch (Exception e) { + throw new BinderException("Failed to instantiate class: " + className, e); + } + return bean; + } + } + } private final class ContentTypeConvertingInterceptor extends ChannelInterceptorAdapter { @@ -205,18 +259,18 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea } } - private final class PartitioningInterceptor extends ChannelInterceptorAdapter { + protected final class PartitioningInterceptor extends ChannelInterceptorAdapter { private final BindingProperties bindingProperties; private final PartitionHandler partitionHandler; - PartitioningInterceptor(BindingProperties bindingProperties) { + PartitioningInterceptor(BindingProperties bindingProperties, + PartitionKeyExtractorStrategy partitionKeyExtractorStrategy, + PartitionSelectorStrategy partitionSelectorStrategy) { this.bindingProperties = bindingProperties; - this.partitionHandler = new PartitionHandler(MessageConverterConfigurer.this.beanFactory, - ExpressionUtils.createStandardEvaluationContext(MessageConverterConfigurer.this.beanFactory), - null, - this.bindingProperties.getProducer()); + this.partitionHandler = new PartitionHandler(ExpressionUtils.createStandardEvaluationContext(MessageConverterConfigurer.this.beanFactory), + this.bindingProperties.getProducer(), partitionKeyExtractorStrategy, partitionSelectorStrategy); } @Override @@ -228,6 +282,22 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea .build(); } + } + + /** + * Default partition strategy; only works on keys with "real" hash codes, + * such as String. Caller now always applies modulo so no need to do so here. + */ + private static class DefaultPartitionSelector implements PartitionSelectorStrategy { + + @Override + public int selectPartition(Object key, int partitionCount) { + int hashCode = key.hashCode(); + if (hashCode == Integer.MIN_VALUE) { + hashCode = 0; + } + return Math.abs(hashCode); + } } diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/CustomPartitionedProducerTest.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/CustomPartitionedProducerTest.java new file mode 100644 index 000000000..a52ef25ed --- /dev/null +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/CustomPartitionedProducerTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binding; + +import java.lang.reflect.Field; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.binder.PartitionHandler; +import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy; +import org.springframework.cloud.stream.binder.PartitionSelectorStrategy; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass; +import org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass; +import org.springframework.cloud.stream.utils.MockBinderRegistryConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.PropertySource; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageSource; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.util.ReflectionUtils; + +/** + * @author Ilayaperumal Gopinathan + */ +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = CustomPartitionedProducerTest.TestSource.class) +public class CustomPartitionedProducerTest { + + @Autowired + private Source testSource; + + @Test + @SuppressWarnings("unchecked") + public void testCustomPartitionedProducer() { + DirectChannel messageChannel = (DirectChannel) this.testSource.output(); + for (ChannelInterceptor channelInterceptor : messageChannel.getChannelInterceptors()) { + if (channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor) { + Field partitionHandlerField = ReflectionUtils.findField(MessageConverterConfigurer.PartitioningInterceptor.class, "partitionHandler"); + ReflectionUtils.makeAccessible(partitionHandlerField); + PartitionHandler partitionHandler = (PartitionHandler) ReflectionUtils.getField(partitionHandlerField, channelInterceptor); + Field partitonKeyExtractorField = ReflectionUtils.findField(PartitionHandler.class, "partitionKeyExtractorStrategy"); + ReflectionUtils.makeAccessible(partitonKeyExtractorField); + Field partitonSelectorField = ReflectionUtils.findField(PartitionHandler.class, "partitionSelectorStrategy"); + ReflectionUtils.makeAccessible(partitonSelectorField); + Assert.assertTrue(((PartitionKeyExtractorStrategy) ReflectionUtils.getField(partitonKeyExtractorField, partitionHandler)).getClass().equals(CustomPartitionKeyExtractorClass.class)); + Assert.assertTrue(((PartitionSelectorStrategy) ReflectionUtils.getField(partitonSelectorField, partitionHandler)).getClass().equals(CustomPartitionSelectorClass.class)); + } + } + } + + @EnableBinding(Source.class) + @EnableAutoConfiguration + @Import(MockBinderRegistryConfiguration.class) + @PropertySource("classpath:/org/springframework/cloud/stream/binder/custom-partitioned-producer-test.properties") + public static class TestSource { + + @Bean + @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1")) + public MessageSource timerMessageSource() { + return new MessageSource() { + @Override + public Message receive() { + throw new MessagingException("test"); + } + }; + } + } +} diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionKeyExtractorClass.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionKeyExtractorClass.java new file mode 100644 index 000000000..e4c02d4a1 --- /dev/null +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionKeyExtractorClass.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.partitioning; + +import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy; +import org.springframework.messaging.Message; + +/** + * @author Ilayaperumal Gopinathan + */ +public class CustomPartitionKeyExtractorClass implements PartitionKeyExtractorStrategy { + + @Override + public String extractKey(Message message) { + return (String) message.getHeaders().get("key"); + } +} diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionSelectorClass.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionSelectorClass.java new file mode 100644 index 000000000..b4f19cc66 --- /dev/null +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/CustomPartitionSelectorClass.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.partitioning; + +import org.springframework.cloud.stream.binder.PartitionSelectorStrategy; + +/** + * @author Ilayaperumal Gopinathan + */ +public class CustomPartitionSelectorClass implements PartitionSelectorStrategy { + + @Override + public int selectPartition(Object key, int partitionCount) { + return Integer.valueOf((String) key); + } +} diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/PartitionedProducerTest.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/PartitionedProducerTest.java index d57efa63f..7d0b7d6aa 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/PartitionedProducerTest.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/partitioning/PartitionedProducerTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2015 the original author or authors. + * Copyright 2015-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,6 @@ import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.annotation.Bindings; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderFactory; @@ -54,7 +53,6 @@ public class PartitionedProducerTest { private BinderFactory binderFactory; @Autowired - @Bindings(TestSource.class) private Source testSource; @Test @@ -77,5 +75,4 @@ public class PartitionedProducerTest { public static class TestSource { } - } diff --git a/spring-cloud-stream/src/test/resources/org/springframework/cloud/stream/binder/custom-partitioned-producer-test.properties b/spring-cloud-stream/src/test/resources/org/springframework/cloud/stream/binder/custom-partitioned-producer-test.properties new file mode 100644 index 000000000..99efd2692 --- /dev/null +++ b/spring-cloud-stream/src/test/resources/org/springframework/cloud/stream/binder/custom-partitioned-producer-test.properties @@ -0,0 +1,4 @@ +spring.cloud.stream.bindings.output.destination=partOut +spring.cloud.stream.bindings.output.producer.partitionCount=3 +spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass=org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass +spring.cloud.stream.bindings.output.producer.partitionSelectorClass=org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass