Fixes #913: Support for ErrorMessageSendingRecoverer
Polishing - PR Comments; Move some classes to top-level; fix FinalRethrowingErrorMessageHandler so it's always the last subscriber. Polishing - fix save handler Polishing Change type of `consumerProperties` to C. Polishing Polishing - PR Comments - Add Tests
This commit is contained in:
committed by
Vinicius Carvalho
parent
31ce03a0d7
commit
3c2c7da508
@@ -55,6 +55,11 @@
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,8 +16,11 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
@@ -26,10 +29,14 @@ import org.springframework.context.Lifecycle;
|
||||
import org.springframework.expression.ExpressionParser;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.channel.FixedSubscriberChannel;
|
||||
import org.springframework.integration.context.IntegrationContextUtils;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.integration.endpoint.EventDrivenConsumer;
|
||||
import org.springframework.integration.handler.AbstractMessageHandler;
|
||||
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
|
||||
import org.springframework.integration.handler.BridgeHandler;
|
||||
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
|
||||
import org.springframework.integration.support.ErrorMessageStrategy;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
@@ -240,8 +247,10 @@ public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties,
|
||||
AbstractMessageChannelBinder.this.logger
|
||||
.error("Exception thrown while unbinding " + this.toString(), e);
|
||||
}
|
||||
AbstractMessageChannelBinder.this.afterUnbindConsumer(destination, this.group, properties);
|
||||
afterUnbindConsumer(destination, this.group, properties);
|
||||
destroyErrorInfrastructure(destination, group, properties);
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
catch (Exception e) {
|
||||
@@ -283,6 +292,163 @@ public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties,
|
||||
protected void afterUnbindConsumer(ConsumerDestination destination, String group, C consumerProperties) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an errorChannelRecoverer that writes to a pub/sub channel for the destination.
|
||||
* @param destination the destination.
|
||||
* @param group the group.
|
||||
* @param consumerProperties the properties.
|
||||
* @return the ErrorInfrastructure which is a holder for the error channel, the recoverer and the
|
||||
* message handler that is subscribed to the channel.
|
||||
*/
|
||||
protected final ErrorInfrastructure registerErrorInfrastructure(ConsumerDestination destination, String group,
|
||||
C consumerProperties) {
|
||||
ErrorMessageStrategy errorMessageStrategy = getErrorMessageStrategy();
|
||||
ConfigurableListableBeanFactory beanFactory = getApplicationContext().getBeanFactory();
|
||||
String errorChannelName = errorsBaseName(destination, group, consumerProperties);
|
||||
SubscribableChannel errorChannel = null;
|
||||
if (getApplicationContext().containsBean(errorChannelName)) {
|
||||
Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
|
||||
if (!(errorChannelObject instanceof SubscribableChannel)) {
|
||||
throw new IllegalStateException(
|
||||
"Error channel '" + errorChannelName + "' must be a SubscribableChannel");
|
||||
}
|
||||
errorChannel = (SubscribableChannel) errorChannelObject;
|
||||
}
|
||||
else {
|
||||
errorChannel = new BinderErrorChannel();
|
||||
beanFactory.registerSingleton(errorChannelName, errorChannel);
|
||||
errorChannel = (LastSubscriberAwareChannel) beanFactory.initializeBean(errorChannel, errorChannelName);
|
||||
}
|
||||
ErrorMessageSendingRecoverer recoverer;
|
||||
if (errorMessageStrategy == null) {
|
||||
recoverer = new ErrorMessageSendingRecoverer(errorChannel);
|
||||
}
|
||||
else {
|
||||
recoverer = new ErrorMessageSendingRecoverer(errorChannel, errorMessageStrategy);
|
||||
}
|
||||
String recovererBeanName = getErrorRecovererName(destination, group, consumerProperties);
|
||||
beanFactory.registerSingleton(recovererBeanName, recoverer);
|
||||
beanFactory.initializeBean(recoverer, recovererBeanName);
|
||||
MessageHandler handler = getErrorMessageHandler(destination, group, consumerProperties);
|
||||
MessageChannel defaultErrorChannel = null;
|
||||
if (getApplicationContext().containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
|
||||
defaultErrorChannel = getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
|
||||
MessageChannel.class);
|
||||
}
|
||||
if (handler == null && errorChannel instanceof LastSubscriberAwareChannel) {
|
||||
handler = getDefaultErrorMessageHandler((LastSubscriberAwareChannel) errorChannel, defaultErrorChannel != null);
|
||||
}
|
||||
String errorMessageHandlerName = getErrorMessageHandlerName(destination, group, consumerProperties);
|
||||
if (handler != null) {
|
||||
beanFactory.registerSingleton(errorMessageHandlerName, handler);
|
||||
beanFactory.initializeBean(handler, errorMessageHandlerName);
|
||||
errorChannel.subscribe(handler);
|
||||
}
|
||||
if (defaultErrorChannel != null) {
|
||||
BridgeHandler errorBridge = new BridgeHandler();
|
||||
errorBridge.setOutputChannel(defaultErrorChannel);
|
||||
errorChannel.subscribe(errorBridge);
|
||||
String errorBridgeHandlerName = getErrorBridgeName(destination, group, consumerProperties);
|
||||
beanFactory.registerSingleton(errorBridgeHandlerName, errorBridge);
|
||||
beanFactory.initializeBean(errorBridge, errorBridgeHandlerName);
|
||||
}
|
||||
return new ErrorInfrastructure(errorChannel, recoverer, handler);
|
||||
}
|
||||
|
||||
private void destroyErrorInfrastructure(ConsumerDestination destination, String group, C properties) {
|
||||
try {
|
||||
String recoverer = getErrorRecovererName(destination, group, properties);
|
||||
if (getApplicationContext().containsBean(recoverer)) {
|
||||
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory()).destroySingleton(recoverer);
|
||||
}
|
||||
String errorChannelName = errorsBaseName(destination, group, properties);
|
||||
String errorMessageHandlerName = getErrorMessageHandlerName(destination, group, properties);
|
||||
String errorBridgeHandlerName = getErrorBridgeName(destination, group, properties);
|
||||
MessageHandler bridgeHandler = null;
|
||||
if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
|
||||
bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName, MessageHandler.class);
|
||||
}
|
||||
MessageHandler handler = null;
|
||||
if (getApplicationContext().containsBean(errorMessageHandlerName)) {
|
||||
handler = getApplicationContext().getBean(errorMessageHandlerName, MessageHandler.class);
|
||||
}
|
||||
if (getApplicationContext().containsBean(errorChannelName)) {
|
||||
SubscribableChannel channel = getApplicationContext().getBean(errorChannelName, SubscribableChannel.class);
|
||||
if (bridgeHandler != null) {
|
||||
channel.unsubscribe(bridgeHandler);
|
||||
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
|
||||
.destroySingleton(errorBridgeHandlerName);
|
||||
}
|
||||
if (handler != null) {
|
||||
channel.unsubscribe(handler);
|
||||
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
|
||||
.destroySingleton(errorMessageHandlerName);
|
||||
}
|
||||
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
|
||||
.destroySingleton(errorChannelName);
|
||||
}
|
||||
}
|
||||
catch (IllegalStateException e) {
|
||||
// context is shutting down.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Binders can return a message handler to be subscribed to the error channel.
|
||||
* Examples might be if the user wishes to (re)publish messages to a DLQ.
|
||||
* @param destination the destination.
|
||||
* @param group the group.
|
||||
* @param consumerProperties the properties.
|
||||
* @return the handler (may be null, which is the default, causing the exception to be
|
||||
* rethrown).
|
||||
*/
|
||||
protected MessageHandler getErrorMessageHandler(final ConsumerDestination destination, final String group,
|
||||
final C consumerProperties) {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the default error message handler, which throws the error message payload to
|
||||
* the caller if there are no user handlers subscribed. The handler is ordered so it
|
||||
* runs after any user-defined handlers that are subscribed.
|
||||
* @param errorChannel the error channel.
|
||||
* @param defaultErrorChannelPresent true if the context has a default 'errorChannel'.
|
||||
* @return the handler.
|
||||
*/
|
||||
protected MessageHandler getDefaultErrorMessageHandler(LastSubscriberAwareChannel errorChannel,
|
||||
boolean defaultErrorChannelPresent) {
|
||||
return new FinalRethrowingErrorMessageHandler(errorChannel, defaultErrorChannelPresent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Binders can return an {@link ErrorMessageStrategy} for building error messages; binder
|
||||
* implementations typically might add extra headers to the error message.
|
||||
* @return the implementation - may be null.
|
||||
*/
|
||||
protected ErrorMessageStrategy getErrorMessageStrategy() {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected String getErrorRecovererName(ConsumerDestination destination, String group,
|
||||
C consumerProperties) {
|
||||
return errorsBaseName(destination, group, consumerProperties) + ".recoverer";
|
||||
}
|
||||
|
||||
protected String getErrorMessageHandlerName(ConsumerDestination destination, String group,
|
||||
C consumerProperties) {
|
||||
return errorsBaseName(destination, group, consumerProperties) + ".handler";
|
||||
}
|
||||
|
||||
protected String getErrorBridgeName(ConsumerDestination destination, String group,
|
||||
C consumerProperties) {
|
||||
return errorsBaseName(destination, group, consumerProperties) + ".bridge";
|
||||
}
|
||||
|
||||
protected String errorsBaseName(ConsumerDestination destination, String group,
|
||||
C consumerProperties) {
|
||||
return destination.getName() + "." + group + ".errors";
|
||||
}
|
||||
|
||||
private final class ReceivingHandler extends AbstractReplyProducingMessageHandler {
|
||||
|
||||
private final boolean extractEmbeddedHeaders;
|
||||
@@ -393,4 +559,34 @@ public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties,
|
||||
return this.delegate instanceof Lifecycle && ((Lifecycle) this.delegate).isRunning();
|
||||
}
|
||||
}
|
||||
|
||||
protected static class ErrorInfrastructure {
|
||||
|
||||
private final SubscribableChannel errorChannel;
|
||||
|
||||
private final ErrorMessageSendingRecoverer recoverer;
|
||||
|
||||
private final MessageHandler handler;
|
||||
|
||||
ErrorInfrastructure(SubscribableChannel errorChannel, ErrorMessageSendingRecoverer recoverer,
|
||||
MessageHandler handler) {
|
||||
this.errorChannel = errorChannel;
|
||||
this.recoverer = recoverer;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public SubscribableChannel getErrorChannel() {
|
||||
return this.errorChannel;
|
||||
}
|
||||
|
||||
public ErrorMessageSendingRecoverer getRecoverer() {
|
||||
return this.recoverer;
|
||||
}
|
||||
|
||||
public MessageHandler getHandler() {
|
||||
return this.handler;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
|
||||
/**
|
||||
* A channel for errors. If it has a {@link LastSubscriberMessageHandler}
|
||||
* subscriber, it can only have one and it will always be the last subscriber.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.3
|
||||
*
|
||||
*/
|
||||
class BinderErrorChannel extends PublishSubscribeChannel implements LastSubscriberAwareChannel {
|
||||
|
||||
private final AtomicInteger subscribers = new AtomicInteger();
|
||||
|
||||
private volatile LastSubscriberMessageHandler finalHandler;
|
||||
|
||||
@Override
|
||||
public boolean subscribe(MessageHandler handler) {
|
||||
this.subscribers.incrementAndGet();
|
||||
if (handler instanceof LastSubscriberMessageHandler && this.finalHandler != null) {
|
||||
throw new IllegalStateException("Only one LastSubscriberMessageHandler is allowed");
|
||||
}
|
||||
if (this.finalHandler != null) {
|
||||
unsubscribe(this.finalHandler);
|
||||
}
|
||||
boolean result = super.subscribe(handler);
|
||||
if (this.finalHandler != null) {
|
||||
super.subscribe(finalHandler);
|
||||
}
|
||||
if (handler instanceof LastSubscriberMessageHandler && this.finalHandler == null) {
|
||||
this.finalHandler = (LastSubscriberMessageHandler) handler;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean unsubscribe(MessageHandler handler) {
|
||||
this.subscribers.decrementAndGet();
|
||||
return super.unsubscribe(handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int subscribers() {
|
||||
return this.subscribers.get();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
|
||||
/**
|
||||
* A MessageHandler that always is the last subscriber (on a {@link BinderErrorChannel})
|
||||
* that throws an exception if it the only subscriber (aside from the bridge to the global
|
||||
* error channel). It is typically only used if a binder implementation does not return
|
||||
* a handled from {@code getErrorMessageHandler()}.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.3
|
||||
*
|
||||
*/
|
||||
class FinalRethrowingErrorMessageHandler implements MessageHandler, LastSubscriberMessageHandler {
|
||||
|
||||
private final LastSubscriberAwareChannel errorChannel;
|
||||
|
||||
private final boolean defaultErrorChannelPresent;
|
||||
|
||||
FinalRethrowingErrorMessageHandler(LastSubscriberAwareChannel errorChannel, boolean defaultErrorChannelPresent) {
|
||||
this.errorChannel = errorChannel;
|
||||
this.defaultErrorChannelPresent = defaultErrorChannelPresent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
if (this.errorChannel.subscribers() > (this.defaultErrorChannelPresent ? 2 : 1)) {
|
||||
// user has subscribed; default is 2, this and the bridge to the
|
||||
// errorChannel
|
||||
return;
|
||||
}
|
||||
if (message.getPayload() instanceof MessagingException) {
|
||||
throw (MessagingException) message.getPayload();
|
||||
}
|
||||
else {
|
||||
throw new MessagingException((Message<?>) null, (Throwable) message.getPayload());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
/**
|
||||
* A channel that can ensure a {@code LastSubscriberMessageHandler} is always
|
||||
* the last subscriber.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.3
|
||||
*
|
||||
*/
|
||||
interface LastSubscriberAwareChannel extends SubscribableChannel {
|
||||
|
||||
/**
|
||||
* Return the current subscribers.
|
||||
* @return the subscribers.
|
||||
*/
|
||||
int subscribers();
|
||||
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
|
||||
/**
|
||||
* A marker interface designating a subscriber that must be the last.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 1.3
|
||||
*
|
||||
*/
|
||||
public interface LastSubscriberMessageHandler extends MessageHandler {
|
||||
|
||||
}
|
||||
@@ -16,24 +16,41 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.ErrorInfrastructure;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.integration.handler.BridgeHandler;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.willAnswer;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
* @author Gary Russell
|
||||
* @since 1.2.2
|
||||
*/
|
||||
public class AbstractMessageChannelBinderTests {
|
||||
@@ -41,7 +58,10 @@ public class AbstractMessageChannelBinderTests {
|
||||
@Test
|
||||
public void testEndpointLifecycle() throws Exception {
|
||||
StubMessageChannelBinder binder = new StubMessageChannelBinder();
|
||||
binder.setApplicationContext(new GenericApplicationContext());
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
context.getBeanFactory().registerSingleton("errorChannel", new PublishSubscribeChannel());
|
||||
binder.setApplicationContext(context);
|
||||
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo", "fooGroup", new DirectChannel(),
|
||||
new ConsumerProperties());
|
||||
@@ -51,7 +71,25 @@ public class AbstractMessageChannelBinderTests {
|
||||
Mockito.verify((InitializingBean) messageProducer).afterPropertiesSet();
|
||||
Mockito.verify((MessageProducer) messageProducer).setOutputChannel(Mockito.any(MessageChannel.class));
|
||||
Mockito.verifyNoMoreInteractions(messageProducer);
|
||||
ErrorInfrastructure errorInfra = binder.errorInfrastructure;
|
||||
SubscribableChannel errorChannel = errorInfra.getErrorChannel();
|
||||
assertThat(errorChannel).isNotNull();
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<MessageHandler> handlers = TestUtils.getPropertyValue(errorChannel, "dispatcher.handlers", Set.class);
|
||||
assertThat(handlers.size()).isEqualTo(2);
|
||||
Iterator<MessageHandler> iterator = handlers.iterator();
|
||||
assertThat(iterator.next()).isInstanceOf(BridgeHandler.class);
|
||||
assertThat(iterator.next()).isInstanceOf(LastSubscriberMessageHandler.class);
|
||||
assertThat(context.containsBean("foo.fooGroup.errors")).isTrue();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.recoverer")).isTrue();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.handler")).isTrue();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.bridge")).isTrue();
|
||||
consumerBinding.unbind();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors")).isFalse();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.recoverer")).isFalse();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.handler")).isFalse();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.bridge")).isFalse();
|
||||
|
||||
Mockito.verify((Lifecycle) messageProducer).stop();
|
||||
Mockito.verify((DisposableBean) messageProducer).destroy();
|
||||
Mockito.verifyNoMoreInteractions(messageProducer);
|
||||
@@ -69,12 +107,65 @@ public class AbstractMessageChannelBinderTests {
|
||||
Mockito.verifyNoMoreInteractions(messageHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndpointBinderHasRecoverer() throws Exception {
|
||||
StubMessageChannelBinder binder = new StubMessageChannelBinder(true);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
context.getBeanFactory().registerSingleton("errorChannel", new PublishSubscribeChannel());
|
||||
binder.setApplicationContext(context);
|
||||
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo", "fooGroup", new DirectChannel(),
|
||||
new ConsumerProperties());
|
||||
ErrorInfrastructure errorInfra = binder.errorInfrastructure;
|
||||
SubscribableChannel errorChannel = errorInfra.getErrorChannel();
|
||||
assertThat(errorChannel).isNotNull();
|
||||
@SuppressWarnings("unchecked")
|
||||
Set<MessageHandler> handlers = TestUtils.getPropertyValue(errorChannel, "dispatcher.handlers", Set.class);
|
||||
assertThat(handlers.size()).isEqualTo(2);
|
||||
Iterator<MessageHandler> iterator = handlers.iterator();
|
||||
assertThat(iterator.next()).isInstanceOf(BridgeHandler.class);
|
||||
assertThat(iterator.next()).isNotInstanceOf(LastSubscriberMessageHandler.class);
|
||||
assertThat(context.containsBean("foo.fooGroup.errors")).isTrue();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.recoverer")).isTrue();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.handler")).isTrue();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.bridge")).isTrue();
|
||||
consumerBinding.unbind();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors")).isFalse();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.recoverer")).isFalse();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.handler")).isFalse();
|
||||
assertThat(context.containsBean("foo.fooGroup.errors.bridge")).isFalse();
|
||||
}
|
||||
|
||||
private static class StubMessageChannelBinder extends
|
||||
AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, ProvisioningProvider<ConsumerProperties, ProducerProperties>> {
|
||||
AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties,
|
||||
ProvisioningProvider<ConsumerProperties, ProducerProperties>> {
|
||||
|
||||
private final boolean hasRecoverer;
|
||||
|
||||
private ErrorInfrastructure errorInfrastructure;
|
||||
|
||||
StubMessageChannelBinder() {
|
||||
this(false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private StubMessageChannelBinder() {
|
||||
StubMessageChannelBinder(boolean hasRecoverer) {
|
||||
super(true, null, Mockito.mock(ProvisioningProvider.class));
|
||||
mockProvisioner();
|
||||
this.hasRecoverer = hasRecoverer;
|
||||
}
|
||||
|
||||
private void mockProvisioner() {
|
||||
willAnswer(new Answer<SimpleConsumerDestination>() {
|
||||
|
||||
@Override
|
||||
public SimpleConsumerDestination answer(final InvocationOnMock invocation) throws Throwable {
|
||||
return new SimpleConsumerDestination(invocation.getArgumentAt(0, String.class));
|
||||
}
|
||||
|
||||
}).given(this.provisioningProvider).provisionConsumerDestination(anyString(), anyString(),
|
||||
Matchers.any(ConsumerProperties.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -88,10 +179,39 @@ public class AbstractMessageChannelBinderTests {
|
||||
@Override
|
||||
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
|
||||
ConsumerProperties properties) throws Exception {
|
||||
return Mockito.mock(MessageProducer.class,
|
||||
this.errorInfrastructure = registerErrorInfrastructure(destination, group, properties);
|
||||
MessageProducer adapter = Mockito.mock(MessageProducer.class,
|
||||
Mockito.withSettings().extraInterfaces(Lifecycle.class, InitializingBean.class,
|
||||
DisposableBean.class));
|
||||
return adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group,
|
||||
ConsumerProperties consumerProperties) {
|
||||
if (this.hasRecoverer) {
|
||||
return mock(MessageHandler.class);
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class SimpleConsumerDestination implements ConsumerDestination {
|
||||
|
||||
private final String name;
|
||||
|
||||
SimpleConsumerDestination(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user