Update PartitionHandler partition properties during initialization

- The PartitionHandler has per-binding partition properties derived from the producer properties for the binding
   - These partition properties (especially PartitionKeyExtractorStrategy and PartitionSelectorStrategy)  need not be initialized when the messages are processed; instead they can be set during the PartitionHandler instantiation
 - Add tests

This resolves #772

Refactor PartitionHandler
This commit is contained in:
Ilayaperumal Gopinathan
2017-02-07 12:13:56 +05:30
committed by Marius Bogoevici
parent 48c53ef278
commit 2df9f9b4ee
7 changed files with 257 additions and 96 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,11 +16,9 @@
package org.springframework.cloud.stream.binder;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.expression.EvaluationContext;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* Utility class to determine if a binding is configured for partitioning
@@ -36,34 +34,30 @@ import org.springframework.util.ClassUtils;
*/
public class PartitionHandler {
private final ConfigurableListableBeanFactory beanFactory;
private final EvaluationContext evaluationContext;
private final PartitionSelectorStrategy partitionSelector;
private final ProducerProperties producerProperties;
private final PartitionKeyExtractorStrategy partitionKeyExtractorStrategy;
private final PartitionSelectorStrategy partitionSelectorStrategy;
/**
* Construct a {@code PartitionHandler}.
*
* @param beanFactory bean factory for binder
* @param evaluationContext evaluation context for binder
* @param partitionSelector configured partition selector; may be {@code null}
* @param properties binder properties
* @param evaluationContext evaluation context for binder
* @param properties binder properties
* @param partitionKeyExtractorStrategy PartitionKeyExtractor strategy
* @param partitionSelectorStrategy PartitionSelector strategy
*/
public PartitionHandler(ConfigurableListableBeanFactory beanFactory,
EvaluationContext evaluationContext,
PartitionSelectorStrategy partitionSelector,
ProducerProperties properties) {
Assert.notNull(beanFactory, "BeanFactory must not be null");
this.beanFactory = beanFactory;
public PartitionHandler(EvaluationContext evaluationContext,
ProducerProperties properties,
PartitionKeyExtractorStrategy partitionKeyExtractorStrategy,
PartitionSelectorStrategy partitionSelectorStrategy) {
this.evaluationContext = evaluationContext;
this.partitionSelector = partitionSelector == null
? new DefaultPartitionSelector()
: partitionSelector;
this.producerProperties = properties;
this.partitionKeyExtractorStrategy = partitionKeyExtractorStrategy;
this.partitionSelectorStrategy = partitionSelectorStrategy;
}
/**
@@ -88,15 +82,12 @@ public class PartitionHandler {
Object key = extractKey(message);
int partition;
if (this.producerProperties.getPartitionSelectorClass() != null) {
partition = invokePartitionSelector(key);
}
else if (this.producerProperties.getPartitionSelectorExpression() != null) {
if (this.producerProperties.getPartitionSelectorExpression() != null) {
partition = this.producerProperties.getPartitionSelectorExpression().getValue(
this.evaluationContext, key, Integer.class);
}
else {
partition = this.partitionSelector.selectPartition(key, producerProperties.getPartitionCount());
partition = this.partitionSelectorStrategy.selectPartition(key, producerProperties.getPartitionCount());
}
// protection in case a user selector returns a negative.
return Math.abs(partition % producerProperties.getPartitionCount());
@@ -116,68 +107,11 @@ public class PartitionHandler {
}
private Object invokeKeyExtractor(Message<?> message) {
PartitionKeyExtractorStrategy strategy = getBean(
producerProperties.getPartitionKeyExtractorClass().getName(),
PartitionKeyExtractorStrategy.class);
return strategy.extractKey(message);
return this.partitionKeyExtractorStrategy.extractKey(message);
}
private int invokePartitionSelector(Object key) {
PartitionSelectorStrategy strategy = getBean(
producerProperties.getPartitionSelectorClass().getName(),
PartitionSelectorStrategy.class);
return strategy.selectPartition(key, producerProperties.getPartitionCount());
}
@SuppressWarnings("unchecked")
private <T> T getBean(String className, Class<T> type) {
if (this.beanFactory.containsBean(className)) {
return this.beanFactory.getBean(className, type);
}
else {
synchronized (this) {
T bean;
if (this.beanFactory.containsBean(className)) {
bean = this.beanFactory.getBean(className, type);
}
else {
Class<?> clazz;
try {
clazz = ClassUtils.forName(className, this.beanFactory.getBeanClassLoader());
}
catch (Exception e) {
throw new BinderException("Failed to load class: " + className, e);
}
try {
bean = (T) clazz.newInstance();
Assert.isInstanceOf(type, bean);
this.beanFactory.registerSingleton(className, bean);
this.beanFactory.initializeBean(bean, className);
}
catch (Exception e) {
throw new BinderException("Failed to instantiate class: " + className, e);
}
}
return bean;
}
}
}
/**
* Default partition strategy; only works on keys with "real" hash codes,
* such as String. Caller now always applies modulo so no need to do so here.
*/
private static class DefaultPartitionSelector implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
int hashCode = key.hashCode();
if (hashCode == Integer.MIN_VALUE) {
hashCode = 0;
}
return Math.abs(hashCode);
}
return this.partitionSelectorStrategy.selectPartition(key, producerProperties.getPartitionCount());
}
}

View File

@@ -22,8 +22,12 @@ import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
@@ -42,6 +46,7 @@ import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.ChannelInterceptorAdapter;
import org.springframework.tuple.Tuple;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;
@@ -95,6 +100,7 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea
/**
* Setup data-type and message converters for the given message channel.
*
* @param channel message channel to set the data-type and message converters
* @param channelName the channel name
*/
@@ -104,14 +110,62 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea
final BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(
channelName);
final String contentType = bindingProperties.getContentType();
if (!input && bindingProperties.getProducer() != null && bindingProperties.getProducer().isPartitioned()) {
messageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties));
ProducerProperties producerProperties = bindingProperties.getProducer();
if (!input && producerProperties != null && producerProperties.isPartitioned()) {
messageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties,
getPartitionKeyExtractorStrategy(producerProperties), getPartitionSelectorStrategy(producerProperties)));
}
if (StringUtils.hasText(contentType)) {
messageChannel.addInterceptor(new ContentTypeConvertingInterceptor(contentType, input));
}
}
private PartitionKeyExtractorStrategy getPartitionKeyExtractorStrategy(ProducerProperties producerProperties) {
if (producerProperties.getPartitionKeyExtractorClass() != null) {
return getBean(
producerProperties.getPartitionKeyExtractorClass().getName(),
PartitionKeyExtractorStrategy.class);
}
return null;
}
private PartitionSelectorStrategy getPartitionSelectorStrategy(ProducerProperties producerProperties) {
if (producerProperties.getPartitionSelectorClass() != null) {
return getBean(
producerProperties.getPartitionSelectorClass().getName(),
PartitionSelectorStrategy.class);
}
return new DefaultPartitionSelector();
}
@SuppressWarnings("unchecked")
private <T> T getBean(String className, Class<T> type) {
if (this.beanFactory.containsBean(className)) {
return this.beanFactory.getBean(className, type);
}
else {
synchronized (this) {
T bean;
Class<?> clazz;
try {
clazz = ClassUtils.forName(className, this.beanFactory.getBeanClassLoader());
}
catch (Exception e) {
throw new BinderException("Failed to load class: " + className, e);
}
try {
bean = (T) clazz.newInstance();
Assert.isInstanceOf(type, bean);
this.beanFactory.registerSingleton(className, bean);
this.beanFactory.initializeBean(bean, className);
}
catch (Exception e) {
throw new BinderException("Failed to instantiate class: " + className, e);
}
return bean;
}
}
}
private final class ContentTypeConvertingInterceptor extends ChannelInterceptorAdapter {
@@ -205,18 +259,18 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea
}
}
private final class PartitioningInterceptor extends ChannelInterceptorAdapter {
protected final class PartitioningInterceptor extends ChannelInterceptorAdapter {
private final BindingProperties bindingProperties;
private final PartitionHandler partitionHandler;
PartitioningInterceptor(BindingProperties bindingProperties) {
PartitioningInterceptor(BindingProperties bindingProperties,
PartitionKeyExtractorStrategy partitionKeyExtractorStrategy,
PartitionSelectorStrategy partitionSelectorStrategy) {
this.bindingProperties = bindingProperties;
this.partitionHandler = new PartitionHandler(MessageConverterConfigurer.this.beanFactory,
ExpressionUtils.createStandardEvaluationContext(MessageConverterConfigurer.this.beanFactory),
null,
this.bindingProperties.getProducer());
this.partitionHandler = new PartitionHandler(ExpressionUtils.createStandardEvaluationContext(MessageConverterConfigurer.this.beanFactory),
this.bindingProperties.getProducer(), partitionKeyExtractorStrategy, partitionSelectorStrategy);
}
@Override
@@ -228,6 +282,22 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea
.build();
}
}
/**
* Default partition strategy; only works on keys with "real" hash codes,
* such as String. Caller now always applies modulo so no need to do so here.
*/
private static class DefaultPartitionSelector implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
int hashCode = key.hashCode();
if (hashCode == Integer.MIN_VALUE) {
hashCode = 0;
}
return Math.abs(hashCode);
}
}

View File

@@ -0,0 +1,95 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binding;
import java.lang.reflect.Field;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass;
import org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass;
import org.springframework.cloud.stream.utils.MockBinderRegistryConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
/**
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = CustomPartitionedProducerTest.TestSource.class)
public class CustomPartitionedProducerTest {
@Autowired
private Source testSource;
@Test
@SuppressWarnings("unchecked")
public void testCustomPartitionedProducer() {
DirectChannel messageChannel = (DirectChannel) this.testSource.output();
for (ChannelInterceptor channelInterceptor : messageChannel.getChannelInterceptors()) {
if (channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor) {
Field partitionHandlerField = ReflectionUtils.findField(MessageConverterConfigurer.PartitioningInterceptor.class, "partitionHandler");
ReflectionUtils.makeAccessible(partitionHandlerField);
PartitionHandler partitionHandler = (PartitionHandler) ReflectionUtils.getField(partitionHandlerField, channelInterceptor);
Field partitonKeyExtractorField = ReflectionUtils.findField(PartitionHandler.class, "partitionKeyExtractorStrategy");
ReflectionUtils.makeAccessible(partitonKeyExtractorField);
Field partitonSelectorField = ReflectionUtils.findField(PartitionHandler.class, "partitionSelectorStrategy");
ReflectionUtils.makeAccessible(partitonSelectorField);
Assert.assertTrue(((PartitionKeyExtractorStrategy) ReflectionUtils.getField(partitonKeyExtractorField, partitionHandler)).getClass().equals(CustomPartitionKeyExtractorClass.class));
Assert.assertTrue(((PartitionSelectorStrategy) ReflectionUtils.getField(partitonSelectorField, partitionHandler)).getClass().equals(CustomPartitionSelectorClass.class));
}
}
}
@EnableBinding(Source.class)
@EnableAutoConfiguration
@Import(MockBinderRegistryConfiguration.class)
@PropertySource("classpath:/org/springframework/cloud/stream/binder/custom-partitioned-producer-test.properties")
public static class TestSource {
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return new MessageSource<String>() {
@Override
public Message<String> receive() {
throw new MessagingException("test");
}
};
}
}
}

View File

@@ -0,0 +1,31 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.partitioning;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.messaging.Message;
/**
* @author Ilayaperumal Gopinathan
*/
public class CustomPartitionKeyExtractorClass implements PartitionKeyExtractorStrategy {
@Override
public String extractKey(Message<?> message) {
return (String) message.getHeaders().get("key");
}
}

View File

@@ -0,0 +1,30 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.partitioning;
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
/**
* @author Ilayaperumal Gopinathan
*/
public class CustomPartitionSelectorClass implements PartitionSelectorStrategy {
@Override
public int selectPartition(Object key, int partitionCount) {
return Integer.valueOf((String) key);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,6 @@ import org.mockito.ArgumentCaptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.Bindings;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
@@ -54,7 +53,6 @@ public class PartitionedProducerTest {
private BinderFactory binderFactory;
@Autowired
@Bindings(TestSource.class)
private Source testSource;
@Test
@@ -77,5 +75,4 @@ public class PartitionedProducerTest {
public static class TestSource {
}
}

View File

@@ -0,0 +1,4 @@
spring.cloud.stream.bindings.output.destination=partOut
spring.cloud.stream.bindings.output.producer.partitionCount=3
spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass=org.springframework.cloud.stream.partitioning.CustomPartitionKeyExtractorClass
spring.cloud.stream.bindings.output.producer.partitionSelectorClass=org.springframework.cloud.stream.partitioning.CustomPartitionSelectorClass