Implement remainder of Spring Boot Checkstyle code rules
Fixes #532 Implements most of the remaining rules from Spring Boot, except JavaDoc + a few additional ones
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
@@ -23,11 +23,11 @@ public class KafkaConsumerProperties {
|
||||
|
||||
private boolean autoCommitOffset = true;
|
||||
|
||||
private boolean resetOffsets = false;
|
||||
private boolean resetOffsets;
|
||||
|
||||
private KafkaMessageChannelBinder.StartOffset startOffset = null;
|
||||
private KafkaMessageChannelBinder.StartOffset startOffset;
|
||||
|
||||
private boolean enableDlq = false;
|
||||
private boolean enableDlq;
|
||||
|
||||
public boolean isAutoCommitOffset() {
|
||||
return autoCommitOffset;
|
||||
|
||||
@@ -129,7 +129,7 @@ public class KafkaMessageChannelBinder
|
||||
|
||||
private boolean autoCreateTopics = true;
|
||||
|
||||
private boolean autoAddPartitions = false;
|
||||
private boolean autoAddPartitions;
|
||||
|
||||
private RetryOperations metadataRetryOperations;
|
||||
|
||||
@@ -163,7 +163,7 @@ public class KafkaMessageChannelBinder
|
||||
|
||||
private int offsetUpdateTimeWindow = 10000;
|
||||
|
||||
private int offsetUpdateCount = 0;
|
||||
private int offsetUpdateCount;
|
||||
|
||||
private int offsetUpdateShutdownTimeout = 2000;
|
||||
|
||||
@@ -173,7 +173,7 @@ public class KafkaMessageChannelBinder
|
||||
|
||||
private ProducerListener producerListener;
|
||||
|
||||
private volatile Producer<byte[],byte[]> dlqProducer;
|
||||
private volatile Producer<byte[], byte[]> dlqProducer;
|
||||
|
||||
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
|
||||
|
||||
@@ -783,11 +783,11 @@ public class KafkaMessageChannelBinder
|
||||
}
|
||||
}
|
||||
|
||||
private class ReceivingHandler extends AbstractReplyProducingMessageHandler {
|
||||
private final class ReceivingHandler extends AbstractReplyProducingMessageHandler {
|
||||
|
||||
private final ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties;
|
||||
|
||||
public ReceivingHandler(ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
private ReceivingHandler(ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
this.consumerProperties = consumerProperties;
|
||||
}
|
||||
|
||||
@@ -802,22 +802,21 @@ public class KafkaMessageChannelBinder
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
private final class KafkaBinderHeaders extends MessageHeaders {
|
||||
|
||||
KafkaBinderHeaders(Map<String, Object> headers) {
|
||||
super(headers, MessageHeaders.ID_VALUE_NONE, -1L);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldCopyRequestHeaders() {
|
||||
// prevent the message from being copied again in superclass
|
||||
return false;
|
||||
}
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
private final class KafkaBinderHeaders extends MessageHeaders {
|
||||
KafkaBinderHeaders(Map<String, Object> headers) {
|
||||
super(headers, MessageHeaders.ID_VALUE_NONE, -1L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class SendingHandler extends AbstractMessageHandler {
|
||||
private final class SendingHandler extends AbstractMessageHandler {
|
||||
|
||||
private final AtomicInteger roundRobinCount = new AtomicInteger();
|
||||
|
||||
|
||||
@@ -29,9 +29,9 @@ public class KafkaProducerProperties {
|
||||
|
||||
private ProducerMetadata.CompressionType compressionType = ProducerMetadata.CompressionType.none;
|
||||
|
||||
private boolean sync = false;
|
||||
private boolean sync;
|
||||
|
||||
private int batchTimeout = 0;
|
||||
private int batchTimeout;
|
||||
|
||||
public int getBufferSize() {
|
||||
return bufferSize;
|
||||
|
||||
@@ -68,7 +68,7 @@ public class WindowingOffsetManager implements OffsetManager, InitializingBean,
|
||||
|
||||
private long timespan = 10 * 1000;
|
||||
|
||||
private int count = 0;
|
||||
private int count;
|
||||
|
||||
private Subject<PartitionAndOffset, PartitionAndOffset> offsets;
|
||||
|
||||
@@ -187,13 +187,13 @@ public class WindowingOffsetManager implements OffsetManager, InitializingBean,
|
||||
delegate.flush();
|
||||
}
|
||||
|
||||
class PartitionAndOffset {
|
||||
private final class PartitionAndOffset {
|
||||
|
||||
private final Partition partition;
|
||||
|
||||
private final Long offset;
|
||||
|
||||
public PartitionAndOffset(Partition partition, Long offset) {
|
||||
private PartitionAndOffset(Partition partition, Long offset) {
|
||||
this.partition = partition;
|
||||
this.offset = offset;
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private int offsetUpdateTimeWindow = 10000;
|
||||
|
||||
private int offsetUpdateCount = 0;
|
||||
private int offsetUpdateCount;
|
||||
|
||||
private int offsetUpdateShutdownTimeout = 2000;
|
||||
|
||||
@@ -47,7 +47,7 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private boolean autoCreateTopics = true;
|
||||
|
||||
private boolean autoAddPartitions = false;
|
||||
private boolean autoAddPartitions;
|
||||
|
||||
/**
|
||||
* ZK session timeout in milliseconds.
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -79,7 +79,8 @@ import static org.junit.Assert.fail;
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class KafkaBinderTests extends PartitionCapableBinderTests<KafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
public class KafkaBinderTests extends
|
||||
PartitionCapableBinderTests<KafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@@ -614,24 +615,6 @@ public class KafkaBinderTests extends PartitionCapableBinderTests<KafkaTestBinde
|
||||
binding.unbind();
|
||||
}
|
||||
|
||||
private static class FailingInvocationCountingMessageHandler implements MessageHandler {
|
||||
|
||||
private int invocationCount = 0;
|
||||
|
||||
public FailingInvocationCountingMessageHandler() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
invocationCount++;
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
public int getInvocationCount() {
|
||||
return invocationCount;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionCountNotReduced() throws Exception {
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
@@ -689,4 +672,22 @@ public class KafkaBinderTests extends PartitionCapableBinderTests<KafkaTestBinde
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(testTopicName, kafkaTestSupport.getZkClient());
|
||||
assertThat(topicMetadata.partitionsMetadata().size(), equalTo(6));
|
||||
}
|
||||
|
||||
private static final class FailingInvocationCountingMessageHandler implements MessageHandler {
|
||||
|
||||
private int invocationCount;
|
||||
|
||||
private FailingInvocationCountingMessageHandler() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
invocationCount++;
|
||||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
public int getInvocationCount() {
|
||||
return invocationCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,16 +20,14 @@ import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
|
||||
import org.springframework.cloud.stream.binder.PartitionSelectorStrategy;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public class RawKafkaPartitionTestSupport implements PartitionKeyExtractorStrategy, PartitionSelectorStrategy {
|
||||
|
||||
@Override
|
||||
public int selectPartition(Object key, int divisor) {
|
||||
return ((byte[])key)[0] % divisor;
|
||||
return ((byte[]) key)[0] % divisor;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -83,9 +83,9 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
input2.setBeanName("test.input2J");
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
|
||||
|
||||
output.send(new GenericMessage<>(new byte[]{(byte)0}));
|
||||
output.send(new GenericMessage<>(new byte[]{(byte)1}));
|
||||
output.send(new GenericMessage<>(new byte[]{(byte)2}));
|
||||
output.send(new GenericMessage<>(new byte[] { (byte) 0 }));
|
||||
output.send(new GenericMessage<>(new byte[] { (byte) 1 }));
|
||||
output.send(new GenericMessage<>(new byte[] { (byte) 2 }));
|
||||
|
||||
Message<?> receive0 = receive(input0);
|
||||
assertNotNull(receive0);
|
||||
@@ -94,11 +94,8 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
Message<?> receive2 = receive(input2);
|
||||
assertNotNull(receive2);
|
||||
|
||||
assertThat(Arrays.asList(
|
||||
((byte[]) receive0.getPayload())[0],
|
||||
((byte[]) receive1.getPayload())[0],
|
||||
((byte[]) receive2.getPayload())[0]),
|
||||
containsInAnyOrder((byte)0, (byte)1, (byte)2));
|
||||
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
|
||||
((byte[]) receive2.getPayload())[0]), containsInAnyOrder((byte) 0, (byte) 1, (byte) 2));
|
||||
|
||||
input0Binding.unbind();
|
||||
input1Binding.unbind();
|
||||
@@ -166,7 +163,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
((byte[]) receive0.getPayload())[0],
|
||||
((byte[]) receive1.getPayload())[0],
|
||||
((byte[]) receive2.getPayload())[0]),
|
||||
containsInAnyOrder((byte)0, (byte)1, (byte)2));
|
||||
containsInAnyOrder((byte) 0, (byte) 1, (byte) 2));
|
||||
|
||||
input0Binding.unbind();
|
||||
input1Binding.unbind();
|
||||
@@ -192,7 +189,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertNotNull(inbound);
|
||||
assertEquals("foo", new String((byte[])inbound.getPayload()));
|
||||
assertEquals("foo", new String((byte[]) inbound.getPayload()));
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
@@ -233,7 +230,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(module1InputChannel);
|
||||
assertNotNull(inbound);
|
||||
assertEquals("foo", new String((byte[])inbound.getPayload()));
|
||||
assertEquals("foo", new String((byte[]) inbound.getPayload()));
|
||||
|
||||
Message<?> tapped1 = receive(module2InputChannel);
|
||||
Message<?> tapped2 = receive(module3InputChannel);
|
||||
@@ -277,7 +274,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
private void assertMessageReceive(QueueChannel moduleInputChannel, String payload) {
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertNotNull(inbound);
|
||||
assertEquals(payload, new String((byte[])inbound.getPayload()));
|
||||
assertEquals(payload, new String((byte[]) inbound.getPayload()));
|
||||
assertNull(inbound.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE));
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ public class RabbitBindingCleaner implements BindingCleaner {
|
||||
|
||||
@Override
|
||||
public Map<String, List<String>> clean(String entity, boolean isJob) {
|
||||
return clean("http://localhost:15672", "guest", "guest", "/", BINDER_PREFIX , entity, isJob);
|
||||
return clean("http://localhost:15672", "guest", "guest", "/", BINDER_PREFIX, entity, isJob);
|
||||
}
|
||||
|
||||
public Map<String, List<String>> clean(String adminUri, String user, String pw, String vhost,
|
||||
|
||||
@@ -28,7 +28,7 @@ public class RabbitConsumerProperties {
|
||||
|
||||
private String prefix = "";
|
||||
|
||||
private boolean transacted = false;
|
||||
private boolean transacted;
|
||||
|
||||
private AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;
|
||||
|
||||
@@ -40,11 +40,11 @@ public class RabbitConsumerProperties {
|
||||
|
||||
private int txSize = 1;
|
||||
|
||||
private boolean autoBindDlq = false;
|
||||
private boolean autoBindDlq;
|
||||
|
||||
private boolean durableSubscription = true;
|
||||
|
||||
private boolean republishToDlq = false;
|
||||
private boolean republishToDlq;
|
||||
|
||||
private boolean requeueRejected = true;
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ import org.springframework.web.client.RestTemplate;
|
||||
* @author Gary Russell
|
||||
* @since 1.2
|
||||
*/
|
||||
public class RabbitManagementUtils {
|
||||
public abstract class RabbitManagementUtils {
|
||||
|
||||
public static RestTemplate buildRestTemplate(String adminUri, String user, String password) {
|
||||
BasicCredentialsProvider credsProvider = new BasicCredentialsProvider();
|
||||
|
||||
@@ -524,10 +524,10 @@ public class RabbitMessageChannelBinder extends AbstractBinder<MessageChannel, E
|
||||
}
|
||||
|
||||
public void cleanAutoDeclareContext(String prefix, String name) {
|
||||
synchronized(this.autoDeclareContext) {
|
||||
removeSingleton(applyPrefix(prefix,name) + ".binding");
|
||||
removeSingleton(applyPrefix(prefix,name));
|
||||
String dlq = applyPrefix(prefix,name) + ".dlq";
|
||||
synchronized (this.autoDeclareContext) {
|
||||
removeSingleton(applyPrefix(prefix, name) + ".binding");
|
||||
removeSingleton(applyPrefix(prefix, name));
|
||||
String dlq = applyPrefix(prefix, name) + ".dlq";
|
||||
removeSingleton(dlq + ".binding");
|
||||
removeSingleton(dlq);
|
||||
}
|
||||
@@ -564,7 +564,7 @@ public class RabbitMessageChannelBinder extends AbstractBinder<MessageChannel, E
|
||||
}
|
||||
}
|
||||
|
||||
private class SendingHandler extends AbstractMessageHandler implements Lifecycle {
|
||||
private final class SendingHandler extends AbstractMessageHandler implements Lifecycle {
|
||||
|
||||
private final MessageHandler delegate;
|
||||
|
||||
@@ -626,9 +626,9 @@ public class RabbitMessageChannelBinder extends AbstractBinder<MessageChannel, E
|
||||
|
||||
}
|
||||
|
||||
private class ReceivingHandler extends AbstractReplyProducingMessageHandler {
|
||||
private final class ReceivingHandler extends AbstractReplyProducingMessageHandler {
|
||||
|
||||
public ReceivingHandler() {
|
||||
private ReceivingHandler() {
|
||||
super();
|
||||
this.setBeanFactory(RabbitMessageChannelBinder.this.getBeanFactory());
|
||||
}
|
||||
|
||||
@@ -30,11 +30,11 @@ public class RabbitProducerProperties {
|
||||
|
||||
private String[] requestHeaderPatterns = new String[] {"STANDARD_REQUEST_HEADERS", "*"};
|
||||
|
||||
private boolean autoBindDlq = false;
|
||||
private boolean autoBindDlq;
|
||||
|
||||
private boolean compress = false;
|
||||
private boolean compress;
|
||||
|
||||
private boolean batchingEnabled = false;
|
||||
private boolean batchingEnabled;
|
||||
|
||||
private int batchSize = 100;
|
||||
|
||||
|
||||
@@ -48,6 +48,11 @@ import org.springframework.context.annotation.Profile;
|
||||
@AutoConfigureBefore({CloudAutoConfiguration.class, RabbitAutoConfiguration.class})
|
||||
public class RabbitServiceAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public HealthIndicator binderHealthIndicator(RabbitTemplate rabbitTemplate) {
|
||||
return new RabbitHealthIndicator(rabbitTemplate);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@Profile("cloud")
|
||||
@ConditionalOnClass(Cloud.class)
|
||||
@@ -70,9 +75,4 @@ public class RabbitServiceAutoConfiguration {
|
||||
@Import(RabbitAutoConfiguration.class)
|
||||
protected static class NoCloudConfig {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public HealthIndicator binderHealthIndicator(RabbitTemplate rabbitTemplate) {
|
||||
return new RabbitHealthIndicator(rabbitTemplate);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,21 @@
|
||||
/*
|
||||
* Copyright 2015-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* This package contains an implementation of the {@link org.springframework.cloud.stream.binder.Binder} for RabbitMQ.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.rabbit;
|
||||
package org.springframework.cloud.stream.binder.rabbit;
|
||||
|
||||
@@ -584,9 +584,6 @@ public class RabbitBinderTests extends PartitionCapableBinderTests<RabbitTestBin
|
||||
output.setBeanName("batchingProducer");
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("batching.0", output, properties);
|
||||
|
||||
while (template.receive(properties.getExtension().getPrefix() + "batching.0.default") != null) {
|
||||
}
|
||||
|
||||
Log logger = spy(TestUtils.getPropertyValue(binder, "binder.compressingPostProcessor.logger", Log.class));
|
||||
new DirectFieldAccessor(TestUtils.getPropertyValue(binder, "binder.compressingPostProcessor"))
|
||||
.setPropertyValue("logger", logger);
|
||||
|
||||
@@ -66,10 +66,10 @@ public class RabbitBinderModuleTests {
|
||||
@ClassRule
|
||||
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
|
||||
|
||||
private ConfigurableApplicationContext context = null;
|
||||
private ConfigurableApplicationContext context;
|
||||
|
||||
public static final ConnectionFactory MOCK_CONNECTION_FACTORY =
|
||||
Mockito.mock(ConnectionFactory.class, Mockito.RETURNS_MOCKS);
|
||||
public static final ConnectionFactory MOCK_CONNECTION_FACTORY = Mockito.mock(ConnectionFactory.class,
|
||||
Mockito.RETURNS_MOCKS);
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
@@ -91,26 +91,26 @@ public class RabbitBinderModuleTests {
|
||||
Binder binder = binderFactory.getBinder(null);
|
||||
assertThat(binder, instanceOf(RabbitMessageChannelBinder.class));
|
||||
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
|
||||
ConnectionFactory binderConnectionFactory =
|
||||
(ConnectionFactory) binderFieldAccessor.getPropertyValue("connectionFactory");
|
||||
ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
|
||||
.getPropertyValue("connectionFactory");
|
||||
assertThat(binderConnectionFactory, instanceOf(CachingConnectionFactory.class));
|
||||
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
|
||||
assertThat(binderConnectionFactory, is(connectionFactory));
|
||||
CompositeHealthIndicator bindersHealthIndicator =
|
||||
context.getBean("bindersHealthIndicator", CompositeHealthIndicator.class);
|
||||
CompositeHealthIndicator bindersHealthIndicator = context.getBean("bindersHealthIndicator",
|
||||
CompositeHealthIndicator.class);
|
||||
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
|
||||
assertNotNull(bindersHealthIndicator);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String,HealthIndicator> healthIndicators =
|
||||
(Map<String, HealthIndicator>) directFieldAccessor.getPropertyValue("indicators");
|
||||
Map<String, HealthIndicator> healthIndicators = (Map<String, HealthIndicator>) directFieldAccessor
|
||||
.getPropertyValue("indicators");
|
||||
assertThat(healthIndicators, hasKey("rabbit"));
|
||||
assertThat(healthIndicators.get("rabbit").health().getStatus(), equalTo(Status.UP));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testParentConnectionFactoryInheritedByDefaultAndRabbitSettingsPropagated() {
|
||||
context = SpringApplication.run(SimpleProcessor.class,
|
||||
"--server.port=0",
|
||||
context = SpringApplication.run(SimpleProcessor.class, "--server.port=0",
|
||||
"--spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true",
|
||||
"--spring.cloud.stream.rabbit.bindings.output.producer.transacted=true");
|
||||
BinderFactory<?> binderFactory = context.getBean(BinderFactory.class);
|
||||
@@ -118,53 +118,54 @@ public class RabbitBinderModuleTests {
|
||||
assertThat(binder, instanceOf(RabbitMessageChannelBinder.class));
|
||||
ChannelBindingService channelBindingService = context.getBean(ChannelBindingService.class);
|
||||
DirectFieldAccessor channelBindingServiceAccessor = new DirectFieldAccessor(channelBindingService);
|
||||
Map<String, List<Binding<MessageChannel>>> consumerBindings = (Map<String, List<Binding<MessageChannel>>>)
|
||||
channelBindingServiceAccessor.getPropertyValue("consumerBindings");
|
||||
Map<String, List<Binding<MessageChannel>>> consumerBindings = (Map<String, List<Binding<MessageChannel>>>) channelBindingServiceAccessor
|
||||
.getPropertyValue("consumerBindings");
|
||||
Binding<MessageChannel> inputBinding = consumerBindings.get("input").get(0);
|
||||
SimpleMessageListenerContainer container = TestUtils.getPropertyValue(inputBinding,
|
||||
"endpoint.messageListenerContainer",
|
||||
SimpleMessageListenerContainer.class);
|
||||
"endpoint.messageListenerContainer", SimpleMessageListenerContainer.class);
|
||||
assertTrue(TestUtils.getPropertyValue(container, "transactional", Boolean.class));
|
||||
Map<String, Binding<MessageChannel>> producerBindings =
|
||||
(Map<String, Binding<MessageChannel>>) TestUtils.getPropertyValue(channelBindingService, "producerBindings");
|
||||
Map<String, Binding<MessageChannel>> producerBindings = (Map<String, Binding<MessageChannel>>) TestUtils
|
||||
.getPropertyValue(channelBindingService, "producerBindings");
|
||||
Binding<MessageChannel> outputBinding = producerBindings.get("output");
|
||||
assertTrue(TestUtils.getPropertyValue(outputBinding, "endpoint.handler.delegate.amqpTemplate.transactional", Boolean.class));
|
||||
assertTrue(TestUtils.getPropertyValue(outputBinding, "endpoint.handler.delegate.amqpTemplate.transactional",
|
||||
Boolean.class));
|
||||
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
|
||||
ConnectionFactory binderConnectionFactory =
|
||||
(ConnectionFactory) binderFieldAccessor.getPropertyValue("connectionFactory");
|
||||
ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
|
||||
.getPropertyValue("connectionFactory");
|
||||
assertThat(binderConnectionFactory, instanceOf(CachingConnectionFactory.class));
|
||||
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
|
||||
assertThat(binderConnectionFactory, is(connectionFactory));
|
||||
CompositeHealthIndicator bindersHealthIndicator =
|
||||
context.getBean("bindersHealthIndicator", CompositeHealthIndicator.class);
|
||||
CompositeHealthIndicator bindersHealthIndicator = context.getBean("bindersHealthIndicator",
|
||||
CompositeHealthIndicator.class);
|
||||
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
|
||||
assertNotNull(bindersHealthIndicator);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, HealthIndicator> healthIndicators =
|
||||
(Map<String, HealthIndicator>) directFieldAccessor.getPropertyValue("indicators");
|
||||
Map<String, HealthIndicator> healthIndicators = (Map<String, HealthIndicator>) directFieldAccessor
|
||||
.getPropertyValue("indicators");
|
||||
assertThat(healthIndicators, hasKey("rabbit"));
|
||||
assertThat(healthIndicators.get("rabbit").health().getStatus(), equalTo(Status.UP));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParentConnectionFactoryInheritedIfOverridden() {
|
||||
context = new SpringApplication(SimpleProcessor.class, ConnectionFactoryConfiguration.class).run("--server.port=0");
|
||||
context = new SpringApplication(SimpleProcessor.class, ConnectionFactoryConfiguration.class)
|
||||
.run("--server.port=0");
|
||||
BinderFactory<?> binderFactory = context.getBean(BinderFactory.class);
|
||||
Binder binder = binderFactory.getBinder(null);
|
||||
assertThat(binder, instanceOf(RabbitMessageChannelBinder.class));
|
||||
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
|
||||
ConnectionFactory binderConnectionFactory =
|
||||
(ConnectionFactory) binderFieldAccessor.getPropertyValue("connectionFactory");
|
||||
ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
|
||||
.getPropertyValue("connectionFactory");
|
||||
assertThat(binderConnectionFactory, is(MOCK_CONNECTION_FACTORY));
|
||||
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
|
||||
assertThat(binderConnectionFactory, is(connectionFactory));
|
||||
CompositeHealthIndicator bindersHealthIndicator =
|
||||
context.getBean("bindersHealthIndicator", CompositeHealthIndicator.class);
|
||||
CompositeHealthIndicator bindersHealthIndicator = context.getBean("bindersHealthIndicator",
|
||||
CompositeHealthIndicator.class);
|
||||
assertNotNull(bindersHealthIndicator);
|
||||
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String,HealthIndicator> healthIndicators =
|
||||
(Map<String, HealthIndicator>) directFieldAccessor.getPropertyValue("indicators");
|
||||
Map<String, HealthIndicator> healthIndicators = (Map<String, HealthIndicator>) directFieldAccessor
|
||||
.getPropertyValue("indicators");
|
||||
assertThat(healthIndicators, hasKey("rabbit"));
|
||||
// mock connection factory behaves as if down
|
||||
assertThat(healthIndicators.get("rabbit").health().getStatus(), equalTo(Status.DOWN));
|
||||
@@ -183,17 +184,17 @@ public class RabbitBinderModuleTests {
|
||||
Binder binder = binderFactory.getBinder(null);
|
||||
assertThat(binder, instanceOf(RabbitMessageChannelBinder.class));
|
||||
DirectFieldAccessor binderFieldAccessor = new DirectFieldAccessor(binder);
|
||||
ConnectionFactory binderConnectionFactory =
|
||||
(ConnectionFactory) binderFieldAccessor.getPropertyValue("connectionFactory");
|
||||
ConnectionFactory binderConnectionFactory = (ConnectionFactory) binderFieldAccessor
|
||||
.getPropertyValue("connectionFactory");
|
||||
ConnectionFactory connectionFactory = context.getBean(ConnectionFactory.class);
|
||||
assertThat(binderConnectionFactory, not(is(connectionFactory)));
|
||||
CompositeHealthIndicator bindersHealthIndicator =
|
||||
context.getBean("bindersHealthIndicator", CompositeHealthIndicator.class);
|
||||
CompositeHealthIndicator bindersHealthIndicator = context.getBean("bindersHealthIndicator",
|
||||
CompositeHealthIndicator.class);
|
||||
assertNotNull(bindersHealthIndicator);
|
||||
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(bindersHealthIndicator);
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String,HealthIndicator> healthIndicators =
|
||||
(Map<String, HealthIndicator>) directFieldAccessor.getPropertyValue("indicators");
|
||||
Map<String, HealthIndicator> healthIndicators = (Map<String, HealthIndicator>) directFieldAccessor
|
||||
.getPropertyValue("indicators");
|
||||
assertThat(healthIndicators, hasKey("custom"));
|
||||
assertThat(healthIndicators.get("custom").health().getStatus(), equalTo(Status.UP));
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ public abstract class AbstractBinderTests<B extends AbstractTestBinder<? extends
|
||||
* Allows accomodating tests which are slower than normal (e.g. retry).
|
||||
*/
|
||||
protected Message<?> receive(PollableChannel channel, int additionalMultiplier) {
|
||||
return channel.receive((int)(1000 * timeoutMultiplier * additionalMultiplier));
|
||||
return channel.receive((int) (1000 * timeoutMultiplier * additionalMultiplier));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -143,7 +143,7 @@ public abstract class AbstractBinderTests<B extends AbstractTestBinder<? extends
|
||||
moduleOutputChannel2.send(message2);
|
||||
|
||||
|
||||
Message<?> messages[] = new Message[2];
|
||||
Message<?>[] messages = new Message[2];
|
||||
messages[0] = receive(moduleInputChannel);
|
||||
messages[1] = receive(moduleInputChannel);
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ public abstract class AbstractTestBinder<C extends AbstractBinder<MessageChannel
|
||||
@Override
|
||||
public Binding<MessageChannel> bindProducer(String name, MessageChannel moduleOutputChannel, PP properties) {
|
||||
queues.add(name);
|
||||
return binder.bindProducer(name, moduleOutputChannel, properties);
|
||||
return binder.bindProducer(name, moduleOutputChannel, properties);
|
||||
}
|
||||
|
||||
public C getCoreBinder() {
|
||||
|
||||
@@ -30,7 +30,7 @@ import static org.mockito.Mockito.when;
|
||||
*
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class BinderTestUtils {
|
||||
public abstract class BinderTestUtils {
|
||||
|
||||
private static final MessageBuilderFactory mbf = new DefaultMessageBuilderFactory();
|
||||
|
||||
|
||||
@@ -23,12 +23,13 @@ import org.springframework.messaging.MessageChannel;
|
||||
*
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public abstract class BrokerBinderTests<B extends AbstractTestBinder<? extends AbstractBinder<MessageChannel, CP, PP>, CP, PP>, CP extends ConsumerProperties, PP extends ProducerProperties> extends AbstractBinderTests<B,CP,PP> {
|
||||
public abstract class BrokerBinderTests<B extends AbstractTestBinder<? extends AbstractBinder<MessageChannel, CP, PP>, CP, PP>, CP extends ConsumerProperties, PP extends ProducerProperties>
|
||||
extends AbstractBinderTests<B, CP, PP> {
|
||||
|
||||
/**
|
||||
* Create a new spy on the given 'queue'. This allows de-correlating the creation of
|
||||
* the 'connection' from its actual usage, which may be needed by some implementations to
|
||||
* see messages sent after connection creation.
|
||||
* the 'connection' from its actual usage, which may be needed by some implementations
|
||||
* to see messages sent after connection creation.
|
||||
*/
|
||||
public abstract Spy spyOn(final String name);
|
||||
|
||||
|
||||
@@ -51,7 +51,8 @@ import static org.junit.Assert.assertThat;
|
||||
* @author Mark Fisher
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<? extends AbstractBinder<MessageChannel, CP, PP>, CP, PP>, CP extends ConsumerProperties, PP extends ProducerProperties> extends BrokerBinderTests<B,CP,PP> {
|
||||
abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<? extends AbstractBinder<MessageChannel, CP, PP>, CP, PP>, CP extends ConsumerProperties, PP extends ProducerProperties>
|
||||
extends BrokerBinderTests<B, CP, PP> {
|
||||
|
||||
protected static final SpelExpressionParser spelExpressionParser = new SpelExpressionParser();
|
||||
|
||||
@@ -60,13 +61,16 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
public void testAnonymousGroup() throws Exception {
|
||||
B binder = getBinder();
|
||||
DirectChannel output = new DirectChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("defaultGroup.0", output, createProducerProperties());
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("defaultGroup.0", output,
|
||||
createProducerProperties());
|
||||
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
Binding<MessageChannel> binding1 = binder.bindConsumer("defaultGroup.0", null, input1, createConsumerProperties());
|
||||
Binding<MessageChannel> binding1 = binder.bindConsumer("defaultGroup.0", null, input1,
|
||||
createConsumerProperties());
|
||||
|
||||
QueueChannel input2 = new QueueChannel();
|
||||
Binding<MessageChannel> binding2 = binder.bindConsumer("defaultGroup.0", null, input2, createConsumerProperties());
|
||||
Binding<MessageChannel> binding2 = binder.bindConsumer("defaultGroup.0", null, input2,
|
||||
createConsumerProperties());
|
||||
|
||||
String testPayload1 = "foo-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload1.getBytes()));
|
||||
@@ -120,7 +124,8 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
output.send(new GenericMessage<>(testPayload.getBytes()));
|
||||
|
||||
QueueChannel inbound1 = new QueueChannel();
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testDestination, "test1", inbound1, createConsumerProperties());
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testDestination, "test1", inbound1,
|
||||
createConsumerProperties());
|
||||
|
||||
Message<?> receivedMessage1 = receive(inbound1);
|
||||
assertThat(receivedMessage1, not(nullValue()));
|
||||
@@ -138,16 +143,18 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
String testDestination = "testDestination" + UUID.randomUUID().toString().replace("-", "");
|
||||
|
||||
PP producerProperties = createProducerProperties();
|
||||
producerProperties.setRequiredGroups("test1","test2");
|
||||
producerProperties.setRequiredGroups("test1", "test2");
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testDestination, output, producerProperties);
|
||||
|
||||
String testPayload = "foo-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload.getBytes()));
|
||||
|
||||
QueueChannel inbound1 = new QueueChannel();
|
||||
Binding<MessageChannel> consumerBinding1 = binder.bindConsumer(testDestination, "test1", inbound1, createConsumerProperties());
|
||||
Binding<MessageChannel> consumerBinding1 = binder.bindConsumer(testDestination, "test1", inbound1,
|
||||
createConsumerProperties());
|
||||
QueueChannel inbound2 = new QueueChannel();
|
||||
Binding<MessageChannel> consumerBinding2 = binder.bindConsumer(testDestination, "test2", inbound2, createConsumerProperties());
|
||||
Binding<MessageChannel> consumerBinding2 = binder.bindConsumer(testDestination, "test2", inbound2,
|
||||
createConsumerProperties());
|
||||
|
||||
Message<?> receivedMessage1 = receive(inbound1);
|
||||
assertThat(receivedMessage1, not(nullValue()));
|
||||
@@ -192,8 +199,8 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, producerProperties);
|
||||
try {
|
||||
AbstractEndpoint endpoint = extractEndpoint(outputBinding);
|
||||
assertThat(getEndpointRouting(endpoint), containsString(
|
||||
getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']"));
|
||||
assertThat(getEndpointRouting(endpoint),
|
||||
containsString(getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']"));
|
||||
}
|
||||
catch (UnsupportedOperationException ignored) {
|
||||
}
|
||||
@@ -201,8 +208,7 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
Message<Integer> message2 = MessageBuilder.withPayload(2)
|
||||
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "foo")
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43)
|
||||
.build();
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
|
||||
output.send(message2);
|
||||
output.send(new GenericMessage<>(1));
|
||||
output.send(new GenericMessage<>(0));
|
||||
@@ -219,9 +225,8 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
@Override
|
||||
public boolean matches(Object item) {
|
||||
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor((Message<?>) item);
|
||||
boolean result = "foo".equals(accessor.getCorrelationId()) &&
|
||||
42 == accessor.getSequenceNumber() &&
|
||||
43 == accessor.getSequenceSize();
|
||||
boolean result = "foo".equals(accessor.getCorrelationId()) && 42 == accessor.getSequenceNumber()
|
||||
&& 43 == accessor.getSequenceSize();
|
||||
return result;
|
||||
}
|
||||
};
|
||||
@@ -232,21 +237,13 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
assertThat(receive2, fooMatcher);
|
||||
}
|
||||
else {
|
||||
assertThat(Arrays.asList(
|
||||
(Integer) receive0.getPayload(),
|
||||
(Integer) receive1.getPayload(),
|
||||
(Integer) receive2.getPayload()),
|
||||
containsInAnyOrder(0, 1, 2));
|
||||
assertThat(Arrays.asList((Integer) receive0.getPayload(), (Integer) receive1.getPayload(),
|
||||
(Integer) receive2.getPayload()), containsInAnyOrder(0, 1, 2));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Matcher<Iterable<? extends Message<?>>> containsOur3Messages = containsInAnyOrder(
|
||||
fooMatcher,
|
||||
hasProperty("payload", equalTo(0)),
|
||||
hasProperty("payload", equalTo(1))
|
||||
);
|
||||
assertThat(
|
||||
Arrays.asList(receive0, receive1, receive2),
|
||||
containsOur3Messages);
|
||||
Matcher<Iterable<? extends Message<?>>> containsOur3Messages = containsInAnyOrder(fooMatcher,
|
||||
hasProperty("payload", equalTo(0)), hasProperty("payload", equalTo(1)));
|
||||
assertThat(Arrays.asList(receive0, receive1, receive2), containsOur3Messages);
|
||||
|
||||
}
|
||||
input0Binding.unbind();
|
||||
@@ -285,8 +282,8 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, producerProperties);
|
||||
if (usesExplicitRouting()) {
|
||||
AbstractEndpoint endpoint = extractEndpoint(outputBinding);
|
||||
assertThat(getEndpointRouting(endpoint), containsString(
|
||||
getExpectedRoutingBaseDestination("partJ.0", "test") + "-' + headers['partition']"));
|
||||
assertThat(getEndpointRouting(endpoint),
|
||||
containsString(getExpectedRoutingBaseDestination("partJ.0", "test") + "-' + headers['partition']"));
|
||||
}
|
||||
|
||||
output.send(new GenericMessage<>(2));
|
||||
@@ -307,11 +304,8 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
}
|
||||
else {
|
||||
|
||||
assertThat(Arrays.asList(
|
||||
(Integer) receive0.getPayload(),
|
||||
(Integer) receive1.getPayload(),
|
||||
(Integer) receive2.getPayload()),
|
||||
containsInAnyOrder(0, 1, 2));
|
||||
assertThat(Arrays.asList((Integer) receive0.getPayload(), (Integer) receive1.getPayload(),
|
||||
(Integer) receive2.getPayload()), containsInAnyOrder(0, 1, 2));
|
||||
}
|
||||
|
||||
input0Binding.unbind();
|
||||
@@ -321,10 +315,12 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementations should return whether the binder under test uses "explicit" routing (e.g. Rabbit)
|
||||
* whereby Spring Cloud Stream is responsible for assigning a partition and knows which exact consumer will receive the
|
||||
* message (i.e. honor "partitionIndex") or "implicit" routing (e.g. Kafka) whereby the only guarantee
|
||||
* is that messages will be spread, but we don't control exactly which consumer gets which message.
|
||||
* Implementations should return whether the binder under test uses "explicit" routing
|
||||
* (e.g. Rabbit) whereby Spring Cloud Stream is responsible for assigning a partition
|
||||
* and knows which exact consumer will receive the message (i.e. honor
|
||||
* "partitionIndex") or "implicit" routing (e.g. Kafka) whereby the only guarantee is
|
||||
* that messages will be spread, but we don't control exactly which consumer gets
|
||||
* which message.
|
||||
*/
|
||||
protected abstract boolean usesExplicitRouting();
|
||||
|
||||
@@ -336,8 +332,8 @@ abstract public class PartitionCapableBinderTests<B extends AbstractTestBinder<?
|
||||
}
|
||||
|
||||
/**
|
||||
* For implementations that rely on explicit routing, return the expected base destination
|
||||
* (the part that precedes '-partition' within the expression).
|
||||
* For implementations that rely on explicit routing, return the expected base
|
||||
* destination (the part that precedes '-partition' within the expression).
|
||||
*/
|
||||
protected String getExpectedRoutingBaseDestination(String name, String group) {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
@@ -20,7 +20,6 @@ import org.springframework.messaging.Message;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class PartitionTestSupport implements PartitionKeyExtractorStrategy, PartitionSelectorStrategy {
|
||||
|
||||
@@ -25,5 +25,5 @@ package org.springframework.cloud.stream.binder;
|
||||
*/
|
||||
public interface Spy {
|
||||
|
||||
public Object receive(boolean expectNull) throws Exception;
|
||||
Object receive(boolean expectNull) throws Exception;
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ import org.springframework.util.Assert;
|
||||
/**
|
||||
* Copy of class in org.springframework.amqp.utils.test to avoid dependency on spring-amqp
|
||||
*/
|
||||
public class TestUtils {
|
||||
public abstract class TestUtils {
|
||||
|
||||
/**
|
||||
* Uses nested {@link DirectFieldAccessor}s to obtain a property using dotted notation
|
||||
|
||||
@@ -47,7 +47,8 @@ public class KryoCodecAutoConfiguration {
|
||||
@Autowired
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@Autowired KryoCodecProperties kryoCodecProperties;
|
||||
@Autowired
|
||||
KryoCodecProperties kryoCodecProperties;
|
||||
|
||||
@Bean
|
||||
public PojoCodec codec() {
|
||||
|
||||
@@ -27,7 +27,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
|
||||
public class KryoCodecProperties {
|
||||
private boolean references = true;
|
||||
|
||||
|
||||
public boolean isReferences() {
|
||||
return references;
|
||||
}
|
||||
@@ -35,5 +35,5 @@ public class KryoCodecProperties {
|
||||
public void setReferences(boolean references) {
|
||||
this.references = references;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config;
|
||||
|
||||
import org.junit.Test;
|
||||
@@ -42,16 +43,19 @@ import static org.junit.Assert.assertThat;
|
||||
@SpringApplicationConfiguration({ContentTypeOutboundSourceTests.TestSource.class})
|
||||
public class ContentTypeOutboundSourceTests {
|
||||
|
||||
@Autowired @Bindings(TestSource.class)
|
||||
@Autowired
|
||||
@Bindings(TestSource.class)
|
||||
private Source testSource;
|
||||
|
||||
@Autowired
|
||||
private BinderFactory binderFactory;
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testMessageHeaderWhenNoExplicitContentTypeOnMessage() throws Exception {
|
||||
testSource.output().send(MessageBuilder.withPayload("{\"message\":\"Hi\"}").build());
|
||||
Message<String> received = (Message<String>) ((TestSupportBinder) binderFactory.getBinder(null)).messageCollector().forChannel(testSource.output()).poll();
|
||||
Message<String> received = (Message<String>) ((TestSupportBinder) binderFactory.getBinder(null))
|
||||
.messageCollector().forChannel(testSource.output()).poll();
|
||||
assertThat(received.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString(), equalTo("application/json"));
|
||||
assertThat(received.getPayload(), equalTo("{\"message\":\"Hi\"}"));
|
||||
}
|
||||
|
||||
@@ -1,3 +1,19 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config;
|
||||
|
||||
import java.util.List;
|
||||
@@ -39,7 +55,8 @@ import static org.junit.Assert.assertTrue;
|
||||
@SpringApplicationConfiguration(CustomMessageConverterTests.TestSource.class)
|
||||
public class CustomMessageConverterTests {
|
||||
|
||||
@Autowired @Bindings(TestSource.class)
|
||||
@Autowired
|
||||
@Bindings(TestSource.class)
|
||||
private Source testSource;
|
||||
|
||||
@Autowired
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -54,7 +55,7 @@ public class ErrorChannelTests {
|
||||
|
||||
@Test
|
||||
public void testErrorChannelBinding() throws Exception {
|
||||
Message<?> message = (Message<?>) ((TestSupportBinder) binderFactory.getBinder(null)).messageCollector().forChannel(errorChannel).poll(10, TimeUnit.SECONDS);
|
||||
Message<?> message = ((TestSupportBinder) binderFactory.getBinder(null)).messageCollector().forChannel(errorChannel).poll(10, TimeUnit.SECONDS);
|
||||
Assert.isTrue(message instanceof ErrorMessage, "Message should be an instance of ErrorMessage");
|
||||
Assert.isTrue(message.getPayload() instanceof MessagingException, "Message payload should be an instance" +
|
||||
"of MessagingException");
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config;
|
||||
|
||||
import java.util.List;
|
||||
@@ -54,7 +55,8 @@ import static org.junit.Assert.assertTrue;
|
||||
@SpringApplicationConfiguration({MessageChannelConfigurerTests.TestSink.class})
|
||||
public class MessageChannelConfigurerTests {
|
||||
|
||||
@Autowired @Bindings(TestSink.class)
|
||||
@Autowired
|
||||
@Bindings(TestSink.class)
|
||||
private Sink testSink;
|
||||
|
||||
@Autowired
|
||||
@@ -95,7 +97,7 @@ public class MessageChannelConfigurerTests {
|
||||
assertTrue(objectMapper == this.objectMapper);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@EnableBinding(Sink.class)
|
||||
@EnableAutoConfiguration
|
||||
@PropertySource("classpath:/org/springframework/cloud/stream/config/channel/sink-channel-configurers.properties")
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.annotation.rxjava;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
@@ -73,7 +73,7 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple
|
||||
|
||||
private volatile Subscription subscription;
|
||||
|
||||
private volatile boolean running = false;
|
||||
private volatile boolean running;
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public SubjectMessageHandler(RxJavaProcessor processor) {
|
||||
|
||||
@@ -66,7 +66,7 @@ public class KafkaTestSupport extends AbstractExternalResourceTestSupport<String
|
||||
private final Properties brokerConfig = TestUtils.createBrokerConfig(0, TestUtils.choosePort(), false);
|
||||
|
||||
// caches previous failures to reach the external server - preventing repeated retries
|
||||
private static boolean hasFailedAlready = false;
|
||||
private static boolean hasFailedAlready;
|
||||
|
||||
static {
|
||||
// check if either the environment or Java property is set to use embedded tests
|
||||
@@ -122,7 +122,8 @@ public class KafkaTestSupport extends AbstractExternalResourceTestSupport<String
|
||||
try {
|
||||
int zkConnectionTimeout = 10000;
|
||||
int zkSessionTimeout = 10000;
|
||||
zkClient = new ZkClient(getZkConnectString(), zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer$.MODULE$);
|
||||
zkClient = new ZkClient(getZkConnectString(), zkSessionTimeout, zkConnectionTimeout,
|
||||
ZKStringSerializer$.MODULE$);
|
||||
}
|
||||
catch (Exception e) {
|
||||
zookeeper.shutdown();
|
||||
@@ -134,15 +135,18 @@ public class KafkaTestSupport extends AbstractExternalResourceTestSupport<String
|
||||
brokerConfig.put("zookeeper.connect", zookeeper.getConnectString());
|
||||
brokerConfig.put("auto.create.topics.enable", "false");
|
||||
brokerConfig.put("delete.topic.enable", "true");
|
||||
kafkaServer = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), SystemTime$.MODULE$);
|
||||
log.debug("Created Kafka server at " + kafkaServer.config().hostName() + ":" + kafkaServer.config().port());
|
||||
kafkaServer = TestUtils.createServer(new KafkaConfig(brokerConfigProperties),
|
||||
SystemTime$.MODULE$);
|
||||
log.debug("Created Kafka server at " + kafkaServer.config().hostName() + ":"
|
||||
+ kafkaServer.config().port());
|
||||
}
|
||||
catch (Exception e) {
|
||||
zookeeper.shutdown();
|
||||
zkClient.close();
|
||||
throw e;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
}
|
||||
catch (Exception e) {
|
||||
hasFailedAlready = true;
|
||||
throw e;
|
||||
}
|
||||
@@ -154,7 +158,8 @@ public class KafkaTestSupport extends AbstractExternalResourceTestSupport<String
|
||||
throw new RuntimeException("Kafka server not available");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
throw new RuntimeException("Kafka server not available");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,5 +32,5 @@ public interface MessageCollector {
|
||||
/**
|
||||
* Obtain a queue that will receive messages sent to the given channel.
|
||||
*/
|
||||
public BlockingQueue<Message<?>> forChannel(MessageChannel channel);
|
||||
BlockingQueue<Message<?>> forChannel(MessageChannel channel);
|
||||
}
|
||||
|
||||
@@ -61,9 +61,10 @@ public class TestSupportBinder implements Binder<MessageChannel, ConsumerPropert
|
||||
* Registers a single subscriber to the channel, that enqueues messages for later retrieval and assertion in tests.
|
||||
*/
|
||||
@Override
|
||||
public Binding<MessageChannel> bindProducer(String name, MessageChannel outboundBindTarget, ProducerProperties properties) {
|
||||
public Binding<MessageChannel> bindProducer(String name, MessageChannel outboundBindTarget,
|
||||
ProducerProperties properties) {
|
||||
final BlockingQueue<Message<?>> queue = messageCollector.register(outboundBindTarget);
|
||||
((SubscribableChannel)outboundBindTarget).subscribe(new MessageHandler() {
|
||||
((SubscribableChannel) outboundBindTarget).subscribe(new MessageHandler() {
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
queue.add(message);
|
||||
@@ -112,7 +113,7 @@ public class TestSupportBinder implements Binder<MessageChannel, ConsumerPropert
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
private static class TestBinding implements Binding<MessageChannel> {
|
||||
private static final class TestBinding implements Binding<MessageChannel> {
|
||||
|
||||
private final MessageChannel target;
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ import org.springframework.messaging.SubscribableChannel;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Venil Noronha
|
||||
*/
|
||||
class AggregateApplication {
|
||||
abstract class AggregateApplication {
|
||||
|
||||
private static final String SPRING_CLOUD_STREAM_INTERNAL_PREFIX = "spring.cloud.stream.internal";
|
||||
|
||||
@@ -92,12 +92,10 @@ class AggregateApplication {
|
||||
return aggregatorParentConfiguration.run(args);
|
||||
}
|
||||
|
||||
static void createChildContexts(ConfigurableApplicationContext parentContext,
|
||||
Class<?>[] apps, String args[][]) {
|
||||
static void createChildContexts(ConfigurableApplicationContext parentContext, Class<?>[] apps, String[][] args) {
|
||||
for (int i = apps.length - 1; i >= 0; i--) {
|
||||
String appClassName = apps[i].getName();
|
||||
embedApp(parentContext, getNamespace(appClassName, i), apps[i])
|
||||
.run(args != null ? args[i] : new String[0]);
|
||||
embedApp(parentContext, getNamespace(appClassName, i), apps[i]).run(args != null ? args[i] : new String[0]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -57,7 +57,7 @@ public class AggregateApplicationBuilder {
|
||||
}
|
||||
|
||||
public AggregateApplicationBuilder(Object[] sources, String[] args) {
|
||||
this(SpringApplication.run(addAggregatorParentIfMissing(sources),args));
|
||||
this(SpringApplication.run(addAggregatorParentIfMissing(sources), args));
|
||||
}
|
||||
|
||||
public AggregateApplicationBuilder(ConfigurableApplicationContext parentContext) {
|
||||
@@ -135,6 +135,13 @@ public class AggregateApplicationBuilder {
|
||||
return parentContext;
|
||||
}
|
||||
|
||||
private ChildContextBuilder childContext(Class<?> app,
|
||||
ConfigurableApplicationContext parentContext, String namespace) {
|
||||
return new ChildContextBuilder(
|
||||
AggregateApplication.embedApp(parentContext, namespace, app));
|
||||
}
|
||||
|
||||
|
||||
public class SourceConfigurer extends AppConfigurer<SourceConfigurer> {
|
||||
|
||||
public SourceConfigurer(Class<?> app) {
|
||||
@@ -184,9 +191,9 @@ public class AggregateApplicationBuilder {
|
||||
|
||||
String[] args;
|
||||
|
||||
String[] names = null;
|
||||
String[] names;
|
||||
|
||||
String[] profiles = null;
|
||||
String[] profiles;
|
||||
|
||||
String namespace;
|
||||
|
||||
@@ -226,17 +233,11 @@ public class AggregateApplicationBuilder {
|
||||
void embed() {
|
||||
childContext(this.app, AggregateApplicationBuilder.this.parentContext,
|
||||
this.namespace).args(this.args).config(this.names)
|
||||
.profiles(this.profiles).run();
|
||||
.profiles(this.profiles).run();
|
||||
}
|
||||
}
|
||||
|
||||
private ChildContextBuilder childContext(Class<?> app,
|
||||
ConfigurableApplicationContext parentContext, String namespace) {
|
||||
return new ChildContextBuilder(
|
||||
AggregateApplication.embedApp(parentContext, namespace, app));
|
||||
}
|
||||
|
||||
private class ChildContextBuilder {
|
||||
private final class ChildContextBuilder {
|
||||
|
||||
private SpringApplicationBuilder builder;
|
||||
|
||||
@@ -244,7 +245,7 @@ public class AggregateApplicationBuilder {
|
||||
|
||||
private String[] args;
|
||||
|
||||
public ChildContextBuilder(SpringApplicationBuilder builder) {
|
||||
private ChildContextBuilder(SpringApplicationBuilder builder) {
|
||||
this.builder = builder;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.aggregate;
|
||||
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
|
||||
@@ -192,10 +192,7 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
|
||||
* @return the constructed name.
|
||||
*/
|
||||
protected final String groupedName(String name, String group) {
|
||||
if (!StringUtils.hasText(group)) {
|
||||
group = "default";
|
||||
}
|
||||
return name + GROUP_INDEX_DELIMITER + group;
|
||||
return name + GROUP_INDEX_DELIMITER + (StringUtils.hasText(group) ? group : "default");
|
||||
}
|
||||
|
||||
protected final MessageValues serializePayloadIfNecessary(Message<?> message) {
|
||||
@@ -290,7 +287,7 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new SerializationFailedException("unable to deserialize [" + className + "]. Class not found.",
|
||||
e);//NOSONAR
|
||||
e); //NOSONAR
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new SerializationFailedException("unable to deserialize [" + className + "]", e);
|
||||
@@ -326,6 +323,11 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform manual acknowledgement based on the metadata stored in the binder.
|
||||
*/
|
||||
public void doManualAck(LinkedList<MessageHeaders> messageHeaders) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles representing any java class as a {@link MimeType}.
|
||||
@@ -382,10 +384,4 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform manual acknowledgement based on the metadata stored in the binder.
|
||||
*/
|
||||
public void doManualAck(LinkedList<MessageHeaders> messageHeaders) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.springframework.messaging.MessageHeaders;
|
||||
* @author David Turanski
|
||||
*/
|
||||
public final class BinderHeaders {
|
||||
|
||||
|
||||
public static final String BINDER_ORIGINAL_CONTENT_TYPE = "originalContentType";
|
||||
|
||||
/**
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@@ -31,11 +31,11 @@ public class ConsumerProperties {
|
||||
|
||||
private int concurrency = 1;
|
||||
|
||||
private boolean partitioned = false;
|
||||
private boolean partitioned;
|
||||
|
||||
private int instanceCount = 1;
|
||||
|
||||
private int instanceIndex = 0;
|
||||
private int instanceIndex;
|
||||
|
||||
private int maxAttempts = 3;
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -87,12 +87,13 @@ public class DefaultBinderFactory<T> implements BinderFactory<T>, DisposableBean
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Binder<T,?,?> getBinder(String name) {
|
||||
public synchronized Binder<T, ?, ?> getBinder(String name) {
|
||||
String configurationName;
|
||||
// Fall back to a default if no argument is provided
|
||||
if (StringUtils.isEmpty(name)) {
|
||||
if (this.binderConfigurations.size() == 0) {
|
||||
throw new IllegalStateException("A default binder has been requested, but there there is no binder available");
|
||||
throw new IllegalStateException(
|
||||
"A default binder has been requested, but there there is no binder available");
|
||||
}
|
||||
else if (this.binderConfigurations.size() == 1) {
|
||||
configurationName = this.binderConfigurations.keySet().iterator().next();
|
||||
@@ -104,11 +105,12 @@ public class DefaultBinderFactory<T> implements BinderFactory<T>, DisposableBean
|
||||
else {
|
||||
throw new IllegalStateException(
|
||||
"A default binder has been requested, but there is more than one binder available: "
|
||||
+ StringUtils.collectionToCommaDelimitedString(this.binderConfigurations.keySet()) + ", and"
|
||||
+ " no default binder has been set.");
|
||||
+ StringUtils.collectionToCommaDelimitedString(this.binderConfigurations.keySet())
|
||||
+ ", and" + " no default binder has been set.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
configurationName = name;
|
||||
}
|
||||
if (!this.binderInstanceCache.containsKey(configurationName)) {
|
||||
@@ -120,7 +122,7 @@ public class DefaultBinderFactory<T> implements BinderFactory<T>, DisposableBean
|
||||
// Convert all properties to arguments, so that they receive maximum precedence
|
||||
ArrayList<String> args = new ArrayList<>();
|
||||
for (Map.Entry<Object, Object> property : binderProperties.entrySet()) {
|
||||
args.add(String.format("--%s=%s",property.getKey(),property.getValue()));
|
||||
args.add(String.format("--%s=%s", property.getKey(), property.getValue()));
|
||||
}
|
||||
// Initialize the domain with a unique name based on the bootstrapping context setting
|
||||
ConfigurableEnvironment environment = context != null ? context.getEnvironment() : null;
|
||||
@@ -155,7 +157,7 @@ public class DefaultBinderFactory<T> implements BinderFactory<T>, DisposableBean
|
||||
ConfigurableApplicationContext binderProducingContext =
|
||||
springApplicationBuilder.run(args.toArray(new String[args.size()]));
|
||||
@SuppressWarnings("unchecked")
|
||||
Binder<T,?,?> binder = binderProducingContext.getBean(Binder.class);
|
||||
Binder<T, ?, ?> binder = binderProducingContext.getBean(Binder.class);
|
||||
if (bindersHealthIndicator != null) {
|
||||
OrderedHealthAggregator healthAggregator = new OrderedHealthAggregator();
|
||||
Map<String, HealthIndicator> indicators = binderProducingContext.getBeansOfType(HealthIndicator.class);
|
||||
@@ -175,18 +177,18 @@ public class DefaultBinderFactory<T> implements BinderFactory<T>, DisposableBean
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
private static class BinderInstanceHolder<T> {
|
||||
private static final class BinderInstanceHolder<T> {
|
||||
|
||||
private final Binder<T,?,?> binderInstance;
|
||||
private final Binder<T, ?, ?> binderInstance;
|
||||
|
||||
private final ConfigurableApplicationContext binderContext;
|
||||
|
||||
public BinderInstanceHolder(Binder<T,?,?> binderInstance, ConfigurableApplicationContext binderContext) {
|
||||
private BinderInstanceHolder(Binder<T, ?, ?> binderInstance, ConfigurableApplicationContext binderContext) {
|
||||
this.binderInstance = binderInstance;
|
||||
this.binderContext = binderContext;
|
||||
}
|
||||
|
||||
public Binder<T,?,?> getBinderInstance() {
|
||||
public Binder<T, ?, ?> getBinderInstance() {
|
||||
return this.binderInstance;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@@ -59,8 +59,7 @@ public class EmbeddedHeadersMessageConverter {
|
||||
int headerCount = 0;
|
||||
int headersLength = 0;
|
||||
for (String header : headers) {
|
||||
Object value = original.get(header) == null ? null
|
||||
: original.get(header);
|
||||
Object value = original.get(header) == null ? null : original.get(header);
|
||||
if (value != null) {
|
||||
String json = this.objectMapper.toJson(value);
|
||||
headerValues[n] = json.getBytes("UTF-8");
|
||||
@@ -72,7 +71,7 @@ public class EmbeddedHeadersMessageConverter {
|
||||
}
|
||||
}
|
||||
// 0xff, n(1), [ [lenHdr(1), hdr, lenValue(4), value] ... ]
|
||||
byte[] newPayload = new byte[((byte[])original.getPayload()).length + headersLength + headerCount * 5 + 2];
|
||||
byte[] newPayload = new byte[((byte[]) original.getPayload()).length + headersLength + headerCount * 5 + 2];
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(newPayload);
|
||||
byteBuffer.put((byte) 0xff); // signal new format
|
||||
byteBuffer.put((byte) headerCount);
|
||||
@@ -85,16 +84,18 @@ public class EmbeddedHeadersMessageConverter {
|
||||
}
|
||||
}
|
||||
|
||||
byteBuffer.put((byte[])original.getPayload());
|
||||
byteBuffer.put((byte[]) original.getPayload());
|
||||
return byteBuffer.array();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a message where headers, that were originally embedded into the payload, have been promoted
|
||||
* back to actual headers. The new payload is now the original payload.
|
||||
* Return a message where headers, that were originally embedded into the payload,
|
||||
* have been promoted back to actual headers. The new payload is now the original
|
||||
* payload.
|
||||
*
|
||||
* @param message the message to extract headers
|
||||
* @param copyRequestHeaders boolean value to specify if original headers should be copied
|
||||
* @param copyRequestHeaders boolean value to specify if original headers should be
|
||||
* copied
|
||||
*/
|
||||
public MessageValues extractHeaders(Message<byte[]> message, boolean copyRequestHeaders) throws Exception {
|
||||
byte[] bytes = message.getPayload();
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
/**
|
||||
|
||||
@@ -129,33 +129,36 @@ public class PartitionHandler {
|
||||
return strategy.selectPartition(key, producerProperties.getPartitionCount());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> T getBean(String className, Class<T> type) {
|
||||
if (this.beanFactory.containsBean(className)) {
|
||||
return this.beanFactory.getBean(className, type);
|
||||
}
|
||||
else {
|
||||
synchronized (this) {
|
||||
T bean;
|
||||
if (this.beanFactory.containsBean(className)) {
|
||||
return this.beanFactory.getBean(className, type);
|
||||
bean = this.beanFactory.getBean(className, type);
|
||||
}
|
||||
Class<?> clazz;
|
||||
try {
|
||||
clazz = ClassUtils.forName(className, this.beanFactory.getBeanClassLoader());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new BinderException("Failed to load class: " + className, e);
|
||||
}
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
T object = (T) clazz.newInstance();
|
||||
Assert.isInstanceOf(type, object);
|
||||
this.beanFactory.registerSingleton(className, object);
|
||||
this.beanFactory.initializeBean(object, className);
|
||||
return object;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new BinderException("Failed to instantiate class: " + className, e);
|
||||
else {
|
||||
Class<?> clazz;
|
||||
try {
|
||||
clazz = ClassUtils.forName(className, this.beanFactory.getBeanClassLoader());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new BinderException("Failed to load class: " + className, e);
|
||||
}
|
||||
try {
|
||||
bean = (T) clazz.newInstance();
|
||||
Assert.isInstanceOf(type, bean);
|
||||
this.beanFactory.registerSingleton(className, bean);
|
||||
this.beanFactory.initializeBean(bean, className);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new BinderException("Failed to instantiate class: " + className, e);
|
||||
}
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,10 +18,9 @@ package org.springframework.cloud.stream.binder;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
|
||||
/**
|
||||
* Strategy for extracting a partition key from a Message.
|
||||
*
|
||||
*
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public interface PartitionKeyExtractorStrategy {
|
||||
|
||||
@@ -35,14 +35,14 @@ import org.springframework.expression.Expression;
|
||||
public class ProducerProperties {
|
||||
|
||||
@JsonSerialize(using = ExpressionSerializer.class)
|
||||
private Expression partitionKeyExpression = null;
|
||||
private Expression partitionKeyExpression;
|
||||
|
||||
private Class<?> partitionKeyExtractorClass = null;
|
||||
private Class<?> partitionKeyExtractorClass;
|
||||
|
||||
private Class<?> partitionSelectorClass = null;
|
||||
private Class<?> partitionSelectorClass;
|
||||
|
||||
@JsonSerialize(using = ExpressionSerializer.class)
|
||||
private Expression partitionSelectorExpression = null;
|
||||
private Expression partitionSelectorExpression;
|
||||
|
||||
private int partitionCount = 1;
|
||||
|
||||
|
||||
@@ -26,32 +26,32 @@ import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* A {@link DefaultContentTypeResolver} that can parse String values.
|
||||
*
|
||||
*
|
||||
* @author David Turanski
|
||||
*/
|
||||
public class StringConvertingContentTypeResolver extends DefaultContentTypeResolver {
|
||||
|
||||
private ConcurrentMap<String,MimeType> mimeTypeCache = new ConcurrentHashMap<>();
|
||||
private ConcurrentMap<String, MimeType> mimeTypeCache = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public MimeType resolve(MessageHeaders headers) {
|
||||
return resolve((Map<String, Object>) headers);
|
||||
}
|
||||
|
||||
public MimeType resolve(Map<String,Object> headers) {
|
||||
public MimeType resolve(Map<String, Object> headers) {
|
||||
MimeType mimeType = null;
|
||||
Object value = headers.get(MessageHeaders.CONTENT_TYPE);
|
||||
if (value instanceof MimeType) {
|
||||
return (MimeType) value;
|
||||
mimeType = (MimeType) value;
|
||||
}
|
||||
else if (value instanceof String) {
|
||||
MimeType mimeType = mimeTypeCache.get(value);
|
||||
mimeType = mimeTypeCache.get(value);
|
||||
if (mimeType == null) {
|
||||
String valueAsString = (String) value;
|
||||
mimeType = MimeType.valueOf(valueAsString);
|
||||
mimeTypeCache.put(valueAsString,mimeType);
|
||||
mimeTypeCache.put(valueAsString, mimeType);
|
||||
}
|
||||
return mimeType;
|
||||
}
|
||||
return getDefaultMimeType();
|
||||
return mimeType != null ? mimeType : getDefaultMimeType();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@@ -70,7 +70,7 @@ public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Obje
|
||||
|
||||
private Class<?> type;
|
||||
|
||||
private Object proxy = null;
|
||||
private Object proxy;
|
||||
|
||||
private Map<String, ChannelHolder> inputHolders = new HashMap<>();
|
||||
|
||||
@@ -82,21 +82,22 @@ public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Obje
|
||||
|
||||
@Override
|
||||
public synchronized Object invoke(MethodInvocation invocation) throws Throwable {
|
||||
MessageChannel messageChannel = null;
|
||||
Method method = invocation.getMethod();
|
||||
if (MessageChannel.class.isAssignableFrom(method.getReturnType())) {
|
||||
Input input = AnnotationUtils.findAnnotation(method, Input.class);
|
||||
if (input != null) {
|
||||
String name = BindingBeanDefinitionRegistryUtils.getChannelName(input, method);
|
||||
return this.inputHolders.get(name).getMessageChannel();
|
||||
messageChannel = this.inputHolders.get(name).getMessageChannel();
|
||||
}
|
||||
Output output = AnnotationUtils.findAnnotation(method, Output.class);
|
||||
if (output != null) {
|
||||
String name = BindingBeanDefinitionRegistryUtils.getChannelName(output, method);
|
||||
return this.outputHolders.get(name).getMessageChannel();
|
||||
messageChannel = this.outputHolders.get(name).getMessageChannel();
|
||||
}
|
||||
}
|
||||
//ignore
|
||||
return null;
|
||||
return messageChannel;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -251,13 +252,13 @@ public class BindableProxyFactory implements MethodInterceptor, FactoryBean<Obje
|
||||
* Holds information about the channels exposed by the interface proxy, as well as
|
||||
* their status.
|
||||
*/
|
||||
class ChannelHolder {
|
||||
private final class ChannelHolder {
|
||||
|
||||
private MessageChannel messageChannel;
|
||||
|
||||
private boolean bindable;
|
||||
|
||||
public ChannelHolder(MessageChannel messageChannel, boolean bindable) {
|
||||
private ChannelHolder(MessageChannel messageChannel, boolean bindable) {
|
||||
this.messageChannel = messageChannel;
|
||||
this.bindable = bindable;
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -32,7 +32,7 @@ import org.springframework.context.SmartLifecycle;
|
||||
*/
|
||||
public class InputBindingLifecycle implements SmartLifecycle, ApplicationContextAware {
|
||||
|
||||
private volatile boolean running = false;
|
||||
private volatile boolean running;
|
||||
|
||||
private ConfigurableApplicationContext applicationContext;
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
|
||||
@@ -115,13 +116,13 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea
|
||||
* Will wrap the returning result of the conversion into a {@link Message} if it is not a {@link Message}
|
||||
* instance already.
|
||||
*/
|
||||
class MessageWrappingMessageConverter implements SmartMessageConverter {
|
||||
private final class MessageWrappingMessageConverter implements SmartMessageConverter {
|
||||
|
||||
private final MimeType contentType;
|
||||
|
||||
private final SmartMessageConverter delegate;
|
||||
|
||||
public MessageWrappingMessageConverter(SmartMessageConverter delegate, MimeType contentType) {
|
||||
private MessageWrappingMessageConverter(SmartMessageConverter delegate, MimeType contentType) {
|
||||
Assert.notNull(delegate, "Delegate converter cannot be null");
|
||||
Assert.notNull(contentType, "Content type cannot be null");
|
||||
this.delegate = delegate;
|
||||
@@ -166,7 +167,7 @@ public class MessageConverterConfigurer implements MessageChannelConfigurer, Bea
|
||||
* @param headers the existing message headers
|
||||
* @return the converted message
|
||||
*/
|
||||
protected final Object build(Object payload, MessageHeaders headers) {
|
||||
protected Object build(Object payload, MessageHeaders headers) {
|
||||
MimeType messageContentType = MessageConverterUtils.X_JAVA_OBJECT.equals(contentType) ?
|
||||
MessageConverterUtils.javaObjectMimeType(payload.getClass()) : contentType;
|
||||
return messageBuilderFactory.withPayload(payload).copyHeaders(headers)
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -33,7 +33,7 @@ import org.springframework.context.SmartLifecycle;
|
||||
*/
|
||||
public class OutputBindingLifecycle implements SmartLifecycle, ApplicationContextAware {
|
||||
|
||||
private volatile boolean running = false;
|
||||
private volatile boolean running;
|
||||
|
||||
private ConfigurableApplicationContext applicationContext;
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
@@ -172,11 +172,11 @@ public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProces
|
||||
return method;
|
||||
}
|
||||
|
||||
private class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler {
|
||||
private final class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler {
|
||||
|
||||
private final InvocableHandlerMethod invocableHandlerMethod;
|
||||
|
||||
public StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod) {
|
||||
private StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod) {
|
||||
this.invocableHandlerMethod = invocableHandlerMethod;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.config;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -31,7 +31,7 @@ import org.springframework.validation.annotation.Validated;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Gary Russell
|
||||
*/
|
||||
@JsonInclude(value = Include.NON_DEFAULT)
|
||||
@JsonInclude(Include.NON_DEFAULT)
|
||||
@Validated
|
||||
public class BindingProperties {
|
||||
|
||||
@@ -56,9 +56,9 @@ public class BindingProperties {
|
||||
|
||||
private String binder;
|
||||
|
||||
private ConsumerProperties consumer = null;
|
||||
private ConsumerProperties consumer;
|
||||
|
||||
private ProducerProperties producer = null;
|
||||
private ProducerProperties producer;
|
||||
|
||||
public String getDestination() {
|
||||
return this.destination;
|
||||
|
||||
@@ -119,8 +119,8 @@ public class ChannelBindingServiceConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer
|
||||
(MessageConverterConfigurer messageConverterConfigurer) {
|
||||
public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer(
|
||||
MessageConverterConfigurer messageConverterConfigurer) {
|
||||
List<MessageChannelConfigurer> configurerList = new ArrayList<>();
|
||||
configurerList.add(messageConverterConfigurer);
|
||||
return new CompositeMessageChannelConfigurer(configurerList);
|
||||
@@ -171,6 +171,21 @@ public class ChannelBindingServiceConfiguration {
|
||||
return new CompositeMessageConverterFactory(messageConverters, objectMapper);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public static MessageHandlerMethodFactory messageHandlerMethodFactory(CompositeMessageConverterFactory compositeMessageConverterFactory) {
|
||||
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
|
||||
messageHandlerMethodFactory.setMessageConverter(compositeMessageConverterFactory.getMessageConverterForAllRegistered());
|
||||
return messageHandlerMethodFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public static StreamListenerAnnotationBeanPostProcessor bindToAnnotationBeanPostProcessor(
|
||||
@Lazy BinderAwareChannelResolver binderAwareChannelResolver,
|
||||
@Lazy MessageHandlerMethodFactory messageHandlerMethodFactory) {
|
||||
return new StreamListenerAnnotationBeanPostProcessor(binderAwareChannelResolver,
|
||||
messageHandlerMethodFactory);
|
||||
}
|
||||
|
||||
// IMPORTANT: Nested class to avoid instantiating all of the above early
|
||||
@Configuration
|
||||
protected static class PostProcessorConfiguration {
|
||||
@@ -232,18 +247,4 @@ public class ChannelBindingServiceConfiguration {
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public static MessageHandlerMethodFactory messageHandlerMethodFactory(CompositeMessageConverterFactory compositeMessageConverterFactory) {
|
||||
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
|
||||
messageHandlerMethodFactory.setMessageConverter(compositeMessageConverterFactory.getMessageConverterForAllRegistered());
|
||||
return messageHandlerMethodFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public static StreamListenerAnnotationBeanPostProcessor bindToAnnotationBeanPostProcessor(
|
||||
@Lazy BinderAwareChannelResolver binderAwareChannelResolver,
|
||||
@Lazy MessageHandlerMethodFactory messageHandlerMethodFactory) {
|
||||
return new StreamListenerAnnotationBeanPostProcessor(binderAwareChannelResolver,
|
||||
messageHandlerMethodFactory);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ public class ChannelBindingServiceProperties implements ApplicationContextAware,
|
||||
private ConversionService conversionService;
|
||||
|
||||
@Value("${INSTANCE_INDEX:${CF_INSTANCE_INDEX:0}}")
|
||||
private int instanceIndex = 0;
|
||||
private int instanceIndex;
|
||||
|
||||
private int instanceCount = 1;
|
||||
|
||||
|
||||
@@ -40,7 +40,8 @@ import org.springframework.integration.context.IntegrationContextUtils;
|
||||
public class SpelExpressionConverterConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConfigurationPropertiesBinding @IntegrationConverter
|
||||
@ConfigurationPropertiesBinding
|
||||
@IntegrationConverter
|
||||
public Converter<String, Expression> spelConverter() {
|
||||
return new SpelConverter();
|
||||
}
|
||||
@@ -69,7 +70,8 @@ public class SpelExpressionConverterConfiguration {
|
||||
return expression;
|
||||
}
|
||||
catch (ParseException e) {
|
||||
throw new IllegalArgumentException(String.format("Could not convert '%s' into a SpEL expression", source), e);
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Could not convert '%s' into a SpEL expression", source), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ package org.springframework.cloud.stream.converter;
|
||||
|
||||
/**
|
||||
* Exception thrown when an error is encountered during message conversion.
|
||||
*
|
||||
*
|
||||
* @author David Turanski
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
|
||||
@@ -23,8 +23,8 @@ import org.springframework.util.MimeType;
|
||||
/**
|
||||
* A custom converter for {@link MediaType} that accepts a plain java class name as a shorthand for
|
||||
* {@code application/x-java-object;type=the.qualified.ClassName}.
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author David Turanski
|
||||
*/
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.converter;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
||||
@@ -30,7 +30,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Gary Russell
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class MessageConverterUtils {
|
||||
public abstract class MessageConverterUtils {
|
||||
|
||||
/**
|
||||
* An MimeType specifying a {@link Tuple}.
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.converter;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
@@ -49,7 +50,7 @@ public class PojoToStringMessageConverter extends AbstractFromMessageConverter {
|
||||
String payloadString = null;
|
||||
if (message.getPayload() instanceof Tuple) {
|
||||
TupleBuilder builder = TupleBuilder.tuple();
|
||||
builder.putAll((Tuple)message.getPayload());
|
||||
builder.putAll((Tuple) message.getPayload());
|
||||
payloadString = builder.build().toString();
|
||||
}
|
||||
else {
|
||||
|
||||
@@ -66,7 +66,7 @@ public class ChannelsEndpoint extends AbstractEndpoint<Map<String, Object>> {
|
||||
});
|
||||
}
|
||||
|
||||
@JsonInclude(value = Include.NON_DEFAULT)
|
||||
@JsonInclude(Include.NON_DEFAULT)
|
||||
public static class ChannelsMetaData {
|
||||
|
||||
private Map<String, BindingProperties> inputs = new LinkedHashMap<>();
|
||||
|
||||
@@ -27,9 +27,9 @@ import org.springframework.messaging.MessageChannel;
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public interface Source {
|
||||
|
||||
|
||||
String OUTPUT = "output";
|
||||
|
||||
|
||||
@Output(Source.OUTPUT)
|
||||
MessageChannel output();
|
||||
|
||||
|
||||
@@ -241,7 +241,7 @@ public class BinderAwareChannelResolverTests {
|
||||
return binding;
|
||||
}
|
||||
|
||||
private class TestBinding implements Binding<MessageChannel> {
|
||||
private final class TestBinding implements Binding<MessageChannel> {
|
||||
|
||||
private final String name;
|
||||
|
||||
|
||||
@@ -190,7 +190,7 @@ public class ExtendedPropertiesBinderAwareChannelResolverTests extends BinderAwa
|
||||
return new ExtendedProducerProperties(new ProducerProperties());
|
||||
}
|
||||
|
||||
private class TestBinding implements Binding<MessageChannel> {
|
||||
private final class TestBinding implements Binding<MessageChannel> {
|
||||
|
||||
private final String name;
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ public class InputOutputBindingOrderTest {
|
||||
|
||||
public static class SomeLifecycle implements SmartLifecycle {
|
||||
|
||||
private boolean running = false;
|
||||
private boolean running;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Autowired
|
||||
|
||||
@@ -58,7 +58,7 @@ public class LifecycleBinderTests {
|
||||
|
||||
public static class SimpleLifecycle implements Lifecycle {
|
||||
|
||||
private boolean running = false;
|
||||
private boolean running;
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
|
||||
@@ -38,17 +38,15 @@ public class MessageConverterTests {
|
||||
@Test
|
||||
public void testHeaderEmbedding() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes())
|
||||
.setHeader("foo", "bar")
|
||||
.setHeader("baz", "quxx")
|
||||
.build();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar")
|
||||
.setHeader("baz", "quxx").build();
|
||||
byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
assertEquals(0xff, embedded[0] & 0xff);
|
||||
assertEquals("\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0006\"quxx\"Hello",
|
||||
new String(embedded).substring(1));
|
||||
|
||||
MessageValues extracted = converter.extractHeaders(MessageBuilder.withPayload(embedded).build(), false);
|
||||
assertEquals("Hello", new String((byte[])extracted.getPayload()));
|
||||
assertEquals("Hello", new String((byte[]) extracted.getPayload()));
|
||||
assertEquals("bar", extracted.get("foo"));
|
||||
assertEquals("quxx", extracted.get("baz"));
|
||||
}
|
||||
@@ -56,17 +54,15 @@ public class MessageConverterTests {
|
||||
@Test
|
||||
public void testUnicodeHeader() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes())
|
||||
.setHeader("foo", "bar")
|
||||
.setHeader("baz", "ØØØØØØØØ")
|
||||
.build();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar")
|
||||
.setHeader("baz", "ØØØØØØØØ").build();
|
||||
byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
assertEquals(0xff, embedded[0] & 0xff);
|
||||
assertEquals("\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0012\"ØØØØØØØØ\"Hello",
|
||||
new String(embedded, "UTF-8").substring(1));
|
||||
|
||||
MessageValues extracted = converter.extractHeaders(MessageBuilder.withPayload(embedded).build(), false);
|
||||
assertEquals("Hello", new String((byte[])extracted.getPayload()));
|
||||
assertEquals("Hello", new String((byte[]) extracted.getPayload()));
|
||||
assertEquals("bar", extracted.get("foo"));
|
||||
assertEquals("ØØØØØØØØ", extracted.get("baz"));
|
||||
}
|
||||
@@ -74,13 +70,10 @@ public class MessageConverterTests {
|
||||
@Test
|
||||
public void testHeaderEmbeddingMissingHeader() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes())
|
||||
.setHeader("foo", "bar")
|
||||
.build();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar").build();
|
||||
byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
assertEquals(0xff, embedded[0] & 0xff);
|
||||
assertEquals("\u0001\u0003foo\u0000\u0000\u0000\u0005\"bar\"Hello",
|
||||
new String(embedded).substring(1));
|
||||
assertEquals("\u0001\u0003foo\u0000\u0000\u0000\u0005\"bar\"Hello", new String(embedded).substring(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -88,8 +81,8 @@ public class MessageConverterTests {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
byte[] bytes = "\u0002\u0003foo\u0003bar\u0003baz\u0004quxxHello".getBytes("UTF-8");
|
||||
Message<byte[]> message = new GenericMessage<byte[]>(bytes);
|
||||
MessageValues extracted = converter.extractHeaders(message,false);
|
||||
assertEquals("Hello", new String((byte[])extracted.getPayload()));
|
||||
MessageValues extracted = converter.extractHeaders(message, false);
|
||||
assertEquals("Hello", new String((byte[]) extracted.getPayload()));
|
||||
assertEquals("bar", extracted.get("foo"));
|
||||
assertEquals("quxx", extracted.get("baz"));
|
||||
}
|
||||
@@ -100,7 +93,7 @@ public class MessageConverterTests {
|
||||
byte[] bytes = "\u0002\u0003foo\u0020bar\u0003baz\u0004quxxHello".getBytes("UTF-8");
|
||||
Message<byte[]> message = new GenericMessage<byte[]>(bytes);
|
||||
try {
|
||||
converter.extractHeaders(message,false);
|
||||
converter.extractHeaders(message, false);
|
||||
Assert.fail("Exception expected");
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
||||
@@ -45,7 +45,8 @@ public class ProcessorBindingTestsWithDefaults {
|
||||
@Autowired
|
||||
private Binder binder;
|
||||
|
||||
@Autowired @Bindings(TestProcessor.class)
|
||||
@Autowired
|
||||
@Bindings(TestProcessor.class)
|
||||
private Processor processor;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -46,7 +46,8 @@ public class SinkBindingTestsWithDefaults {
|
||||
@Autowired
|
||||
private Binder binder;
|
||||
|
||||
@Autowired @Bindings(TestSink.class)
|
||||
@Autowired
|
||||
@Bindings(TestSink.class)
|
||||
private Sink testSink;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -50,7 +50,8 @@ public class SourceBindingTestsWithBindingTargets {
|
||||
@Autowired
|
||||
private Binder binder;
|
||||
|
||||
@Autowired @Bindings(TestSource.class)
|
||||
@Autowired
|
||||
@Bindings(TestSource.class)
|
||||
private Source testSource;
|
||||
|
||||
@Autowired
|
||||
|
||||
@@ -45,7 +45,8 @@ public class SourceBindingTestsWithDefaults {
|
||||
@Autowired
|
||||
private Binder binder;
|
||||
|
||||
@Autowired @Bindings(TestSource.class)
|
||||
@Autowired
|
||||
@Bindings(TestSource.class)
|
||||
private Source testSource;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
@@ -74,7 +74,8 @@ public class BoundChannelsInterceptedTest {
|
||||
public void fooSink(Message<?> message) {
|
||||
}
|
||||
|
||||
@GlobalChannelInterceptor @Bean
|
||||
@Bean
|
||||
@GlobalChannelInterceptor
|
||||
public ChannelInterceptor globalChannelInterceptor() {
|
||||
return mock(ChannelInterceptor.class);
|
||||
}
|
||||
|
||||
@@ -46,14 +46,14 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringApplicationConfiguration(PartitionedConsumerTest.TestSink.class)
|
||||
|
||||
public class PartitionedConsumerTest {
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Autowired
|
||||
private Binder binder;
|
||||
|
||||
@Autowired @Bindings(TestSink.class)
|
||||
@Autowired
|
||||
@Bindings(TestSink.class)
|
||||
private Sink testSink;
|
||||
|
||||
@Test
|
||||
@@ -66,7 +66,6 @@ public class PartitionedConsumerTest {
|
||||
verifyNoMoreInteractions(binder);
|
||||
}
|
||||
|
||||
|
||||
@EnableBinding(Sink.class)
|
||||
@EnableAutoConfiguration
|
||||
@Import(MockBinderRegistryConfiguration.class)
|
||||
|
||||
@@ -44,14 +44,14 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringApplicationConfiguration(PartitionedProducerTest.TestSource.class)
|
||||
|
||||
public class PartitionedProducerTest {
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Autowired
|
||||
private Binder binder;
|
||||
|
||||
@Autowired @Bindings(TestSource.class)
|
||||
@Autowired
|
||||
@Bindings(TestSource.class)
|
||||
private Source testSource;
|
||||
|
||||
@Test
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.utils;
|
||||
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@@ -37,11 +37,11 @@ public class MockBinderRegistryConfiguration {
|
||||
@Bean
|
||||
public BinderTypeRegistry binderTypeRegistry() {
|
||||
return new DefaultBinderTypeRegistry(
|
||||
Collections.singletonMap("mock", new BinderType("", new Class[]{MockBinderConfiguration.class})));
|
||||
Collections.singletonMap("mock", new BinderType("", new Class[] { MockBinderConfiguration.class })));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binder<?,?,?> defaultBinder(BinderFactory<MessageChannel> binderFactory) {
|
||||
public Binder<?, ?, ?> defaultBinder(BinderFactory<MessageChannel> binderFactory) {
|
||||
return binderFactory.getBinder(null);
|
||||
}
|
||||
}
|
||||
|
||||
17
src/checkstyle/checkstyle-header.txt
Normal file
17
src/checkstyle/checkstyle-header.txt
Normal file
@@ -0,0 +1,17 @@
|
||||
^\Q/*\E$
|
||||
^\Q * Copyright \E(20\d\d\-)?20\d\d\Q the original author or authors.\E$
|
||||
^\Q *\E$
|
||||
^\Q * Licensed under the Apache License, Version 2.0 (the "License");\E$
|
||||
^\Q * you may not use this file except in compliance with the License.\E$
|
||||
^\Q * You may obtain a copy of the License at\E$
|
||||
^\Q *\E$
|
||||
^\Q * http://www.apache.org/licenses/LICENSE-2.0\E$
|
||||
^\Q *\E$
|
||||
^\Q * Unless required by applicable law or agreed to in writing, software\E$
|
||||
^\Q * distributed under the License is distributed on an "AS IS" BASIS,\E$
|
||||
^\Q * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\E$
|
||||
^\Q * See the License for the specific language governing permissions and\E$
|
||||
^\Q * limitations under the License.\E$
|
||||
^\Q */\E$
|
||||
^$
|
||||
^.*$
|
||||
@@ -3,21 +3,85 @@
|
||||
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
|
||||
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
|
||||
<module name="Checker">
|
||||
<!-- Root Checks -->
|
||||
<module name="RegexpHeader">
|
||||
<property name="headerFile" value="src/checkstyle/checkstyle-header.txt" />
|
||||
<property name="fileExtensions" value="java" />
|
||||
</module>
|
||||
<module name="NewlineAtEndOfFile">
|
||||
<property name="lineSeparator" value="lf"/>
|
||||
</module>
|
||||
|
||||
<module name="TreeWalker">
|
||||
<!-- this. in front of fields -->
|
||||
<module name="RequireThis">
|
||||
<property name="checkMethods" value="false"/>
|
||||
|
||||
<!-- Annotations -->
|
||||
<module name="AnnotationUseStyle">
|
||||
<property name="elementStyle" value="compact" />
|
||||
</module>
|
||||
<module name="MissingOverride" />
|
||||
<module name="PackageAnnotation" />
|
||||
<module name="AnnotationLocation">
|
||||
<property name="allowSamelineSingleParameterlessAnnotation"
|
||||
value="false" />
|
||||
</module>
|
||||
|
||||
<!-- Block Checks -->
|
||||
<module name="EmptyBlock">
|
||||
<property name="option" value="text" />
|
||||
</module>
|
||||
<module name="LeftCurly" />
|
||||
<module name="RightCurly">
|
||||
<property name="option" value="alone" />
|
||||
</module>
|
||||
<module name="NeedBraces" />
|
||||
<module name="AvoidNestedBlocks" />
|
||||
|
||||
<!-- tabs instead of spaces -->
|
||||
<module name="RegexpSinglelineJava">
|
||||
<property name="format" value="^\t* "/>
|
||||
<property name="message" value="Indent must use tab characters"/>
|
||||
<property name="ignoreComments" value="true"/>
|
||||
</module>
|
||||
|
||||
<!-- Class Design -->
|
||||
<module name="FinalClass" />
|
||||
<module name="InterfaceIsType" />
|
||||
<module name="HideUtilityClassConstructor" />
|
||||
<module name="MutableException" />
|
||||
<module name="InnerTypeLast" />
|
||||
<module name="OneTopLevelClass" />
|
||||
|
||||
<!-- Coding -->
|
||||
<module name="CovariantEquals" />
|
||||
<module name="EmptyStatement" />
|
||||
<module name="EqualsHashCode" />
|
||||
<module name="InnerAssignment" />
|
||||
<module name="SimplifyBooleanExpression" />
|
||||
<module name="SimplifyBooleanReturn" />
|
||||
<module name="StringLiteralEquality" />
|
||||
<module name="NestedForDepth">
|
||||
<property name="max" value="3" />
|
||||
</module>
|
||||
<module name="NestedIfDepth">
|
||||
<property name="max" value="3" />
|
||||
</module>
|
||||
<module name="NestedTryDepth">
|
||||
<property name="max" value="3" />
|
||||
</module>
|
||||
<module name="MultipleVariableDeclarations" />
|
||||
<module name="RequireThis">
|
||||
<property name="checkMethods" value="false" />
|
||||
</module>
|
||||
<module name="OneStatementPerLine" />
|
||||
<module name="ExplicitInitialization"/>
|
||||
<module name="ParameterAssignment"/>
|
||||
|
||||
<!-- Imports -->
|
||||
<module name="AvoidStarImport"/>
|
||||
<module name="AvoidStaticImport">
|
||||
<property name="excludes" value="org.junit.Assert.*,org.mockito.Mockito.*,org.mockito.Matchers.*,org.hamcrest.Matchers.*"/>
|
||||
</module>
|
||||
<module name="FallThrough"/>
|
||||
<module name="ImportOrder">
|
||||
<property name="groups" value="java,/^javax?\./,*,org.springframework" />
|
||||
<property name="ordered" value="true" />
|
||||
@@ -25,11 +89,56 @@
|
||||
<property name="option" value="bottom" />
|
||||
<property name="sortStaticImportsAlphabetically" value="true" />
|
||||
</module>
|
||||
<module name="UnusedImports"/>
|
||||
<module name="RedundantImport"/>
|
||||
<module name="IllegalImport">
|
||||
<property name="illegalPkgs" value="org.slf4j"/>
|
||||
</module>
|
||||
<module name="NeedBraces"/>
|
||||
<module name="RedundantImport"/>
|
||||
<module name="ReturnCount">
|
||||
<property name="max" value="0"/>
|
||||
<property name="tokens" value="CTOR_DEF"/>
|
||||
</module>
|
||||
<module name="ReturnCount">
|
||||
<property name="max" value="1"/>
|
||||
<property name="tokens" value="LAMBDA"/>
|
||||
</module>
|
||||
<module name="ReturnCount">
|
||||
<property name="max" value="3"/>
|
||||
<property name="tokens" value="METHOD_DEF"/>
|
||||
</module>
|
||||
<module name="UnusedImports"/>
|
||||
|
||||
<!-- Miscellaneous -->
|
||||
<module name="CommentsIndentation" />
|
||||
<module name="UpperEll" />
|
||||
<module name="ArrayTypeStyle" />
|
||||
<module name="OuterTypeFilename" />
|
||||
|
||||
<!-- Modifiers -->
|
||||
<module name="RedundantModifier" />
|
||||
|
||||
<!-- Regexp -->
|
||||
<module name="RegexpSinglelineJava">
|
||||
<property name="format" value="^\t* +\t*\S" />
|
||||
<property name="message"
|
||||
value="Line has leading space characters; indentation should be performed with tabs only." />
|
||||
<property name="ignoreComments" value="true" />
|
||||
</module>
|
||||
<module name="Regexp">
|
||||
<property name="format" value="[ \t]+$" />
|
||||
<property name="illegalPattern" value="true" />
|
||||
<property name="message" value="Trailing whitespace" />
|
||||
</module>
|
||||
|
||||
<!-- Whitespace -->
|
||||
<module name="GenericWhitespace" />
|
||||
<module name="MethodParamPad" />
|
||||
<module name="NoWhitespaceAfter" >
|
||||
<property name="tokens" value="BNOT, DEC, DOT, INC, LNOT, UNARY_MINUS, UNARY_PLUS, ARRAY_DECLARATOR"/>
|
||||
</module>
|
||||
<module name="NoWhitespaceBefore" />
|
||||
<module name="ParenPad" />
|
||||
<module name="TypecastParenPad" />
|
||||
<module name="WhitespaceAfter" />
|
||||
<module name="WhitespaceAround" />
|
||||
</module>
|
||||
</module>
|
||||
|
||||
Reference in New Issue
Block a user