diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java index cd0516de6d..883101848b 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/AbstractAdaptableMessageListener.java @@ -242,7 +242,7 @@ public abstract class AbstractAdaptableMessageListener try { Message response = buildMessage(session, result); postProcessResponse(request, response); - Destination destination = getResponseDestination(request, response, session); + Destination destination = getResponseDestination(request, response, session, result); sendResponse(session, destination, response); } catch (Exception ex) { @@ -266,21 +266,24 @@ public abstract class AbstractAdaptableMessageListener * @see #setMessageConverter */ protected Message buildMessage(Session session, Object result) throws JMSException { + Object content = (result instanceof JmsResponse + ? ((JmsResponse) result).getResponse() : result); + MessageConverter converter = getMessageConverter(); if (converter != null) { - if (result instanceof org.springframework.messaging.Message) { - return this.messagingMessageConverter.toMessage(result, session); + if (content instanceof org.springframework.messaging.Message) { + return this.messagingMessageConverter.toMessage(content, session); } else { - return converter.toMessage(result, session); + return converter.toMessage(content, session); } } else { - if (!(result instanceof Message)) { + if (!(content instanceof Message)) { throw new MessageConversionException( - "No MessageConverter specified - cannot handle message [" + result + "]"); + "No MessageConverter specified - cannot handle message [" + content + "]"); } - return (Message) result; + return (Message) content; } } @@ -302,6 +305,18 @@ public abstract class AbstractAdaptableMessageListener response.setJMSCorrelationID(correlation); } + private Destination getResponseDestination(Message request, Message response, Session session, Object result) + throws JMSException { + if (result instanceof JmsResponse) { + JmsResponse jmsResponse = (JmsResponse) result; + Destination destination = jmsResponse.resolveDestination(getDestinationResolver(), session); + if (destination != null) { + return destination; + } + } + return getResponseDestination(request, response, session); + } + /** * Determine a response destination for the given message. *

The default implementation first checks the JMS Reply-To diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/adapter/JmsResponse.java b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/JmsResponse.java new file mode 100644 index 0000000000..e6a91858f2 --- /dev/null +++ b/spring-jms/src/main/java/org/springframework/jms/listener/adapter/JmsResponse.java @@ -0,0 +1,140 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.jms.listener.adapter; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.springframework.jms.support.destination.DestinationResolver; +import org.springframework.util.Assert; + +/** + * Return type of any JMS listener method used to indicate the actual response destination + * alongside the response itself. Typically used when said destination needs to be + * computed at runtime. + *

+ * The example below sends a response with the content of the {@code result} argument to + * the {@code queueOut Queue}: + * + *

+ * package com.acme.foo;
+ *
+ * public class MyService {
+ *     @JmsListener
+ *     public JmsResponse process(String msg) {
+ *         // process incoming message
+ *         return JmsResponse.forQueue(result, "queueOut");
+ *     }
+ * }
+ * + * If the destination does not need to be computed at runtime, + * {@link org.springframework.messaging.handler.annotation.SendTo @SendTo} is the + * recommended declarative approach. + * + * @author Stephane Nicoll + * @since 4.2 + * @see org.springframework.jms.annotation.JmsListener + * @see org.springframework.messaging.handler.annotation.SendTo + */ +public class JmsResponse { + + private final Object response; + + private final Object destination; + + /** + * Create a new instance + * @param response the content of the result + * @param destination the destination + */ + protected JmsResponse(Object response, Object destination) { + Assert.notNull(response, "Result must not be null"); + this.response = response; + this.destination = destination; + } + + /** + * Create a {@link JmsResponse} targeting the queue with the specified name. + */ + public static JmsResponse forQueue(Object result, String queueName) { + Assert.notNull(queueName, "Queue name must not be null"); + return new JmsResponse(result, new DestinationNameHolder(queueName, false)); + } + + /** + * Create a {@link JmsResponse} targeting the topic with the specified name. + */ + public static JmsResponse forTopic(Object result, String topicName) { + Assert.notNull(topicName, "Topic name must not be null"); + return new JmsResponse(result, new DestinationNameHolder(topicName, true)); + } + + /** + * Create a {@link JmsResponse} targeting the specified {@link Destination}. + */ + public static JmsResponse forDestination(Object result, Destination destination) { + Assert.notNull(destination, "Destination must not be null"); + return new JmsResponse(result, destination); + } + + + public Object getResponse() { + return response; + } + + public Destination resolveDestination(DestinationResolver destinationResolver, Session session) + throws JMSException { + + if (this.destination instanceof Destination) { + return (Destination) this.destination; + } + if (this.destination instanceof DestinationNameHolder) { + DestinationNameHolder nameHolder = (DestinationNameHolder) this.destination; + return destinationResolver.resolveDestinationName(session, + nameHolder.destinationName, nameHolder.pubSubDomain); + } + return null; + } + + @Override + public String toString() { + return "JmsResponse{" + "response=" + this.response + ", destination=" + this.destination + '}'; + } + + + /** + * Internal class combining a destination name + * and its target destination type (queue or topic). + */ + protected static class DestinationNameHolder { + private final String destinationName; + + private final boolean pubSubDomain; + + public DestinationNameHolder(String destinationName, boolean pubSubDomain) { + this.destinationName = destinationName; + this.pubSubDomain = pubSubDomain; + } + + @Override + public String toString() { + return this.destinationName + "{" + "pubSubDomain=" + this.pubSubDomain + '}'; + } + } + +} diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/adapter/JmsResponseTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/adapter/JmsResponseTests.java new file mode 100644 index 0000000000..bc7d15f653 --- /dev/null +++ b/spring-jms/src/test/java/org/springframework/jms/listener/adapter/JmsResponseTests.java @@ -0,0 +1,84 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.jms.listener.adapter; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import org.springframework.jms.support.destination.DestinationResolver; + +import static org.junit.Assert.assertSame; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +/** + * @author Stephane Nicoll + */ +public class JmsResponseTests { + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + @Test + public void destinationDoesNotUseDestinationResolver() throws JMSException { + Destination destination = mock(Destination.class); + Destination actual = JmsResponse.forDestination("foo", destination).resolveDestination(null, null); + assertSame(destination, actual); + } + + @Test + public void resolveDestinationForQueue() throws JMSException { + Session session = mock(Session.class); + DestinationResolver destinationResolver = mock(DestinationResolver.class); + Destination destination = mock(Destination.class); + + given(destinationResolver.resolveDestinationName(session, "myQueue", false)).willReturn(destination); + JmsResponse jmsResponse = JmsResponse.forQueue("foo", "myQueue"); + Destination actual = jmsResponse.resolveDestination(destinationResolver, session); + assertSame(destination, actual); + } + + @Test + public void createWithNulResponse() { + thrown.expect(IllegalArgumentException.class); + JmsResponse.forQueue(null, "myQueue"); + } + + @Test + public void createWithNullQueueName() { + thrown.expect(IllegalArgumentException.class); + JmsResponse.forQueue("foo", null); + } + + @Test + public void createWithNullTopicName() { + thrown.expect(IllegalArgumentException.class); + JmsResponse.forTopic("foo", null); + } + + @Test + public void createWithNulDestination() { + thrown.expect(IllegalArgumentException.class); + JmsResponse.forDestination("foo", null); + } + +} diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java index f5f38d787d..80377824a7 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -21,8 +21,11 @@ import java.util.ArrayList; import java.util.List; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import org.junit.Before; import org.junit.Test; @@ -146,6 +149,97 @@ public class MessagingMessageListenerAdapterTests { assertEquals("Response", ((TextMessage) replyMessage).getText()); } + @Test + public void replyPayloadToQueue() throws JMSException { + Message request = MessageBuilder.withPayload("Response").build(); + + Session session = mock(Session.class); + Queue replyDestination = mock(Queue.class); + given(session.createQueue("queueOut")).willReturn(replyDestination); + + + MessageProducer messageProducer = mock(MessageProducer.class); + TextMessage responseMessage = mock(TextMessage.class); + given(session.createTextMessage("Response")).willReturn(responseMessage); + given(session.createProducer(replyDestination)).willReturn(messageProducer); + + MessagingMessageListenerAdapter listener = getPayloadInstance(request, "replyPayloadToQueue", Message.class); + listener.onMessage(mock(javax.jms.Message.class), session); + + + verify(session).createQueue("queueOut"); + verify(session).createTextMessage("Response"); + verify(messageProducer).send(responseMessage); + verify(messageProducer).close(); + } + + @Test + public void replyPayloadToTopic() throws JMSException { + Message request = MessageBuilder.withPayload("Response").build(); + + Session session = mock(Session.class); + Topic replyDestination = mock(Topic.class); + given(session.createTopic("topicOut")).willReturn(replyDestination); + + + MessageProducer messageProducer = mock(MessageProducer.class); + TextMessage responseMessage = mock(TextMessage.class); + given(session.createTextMessage("Response")).willReturn(responseMessage); + given(session.createProducer(replyDestination)).willReturn(messageProducer); + + MessagingMessageListenerAdapter listener = getPayloadInstance(request, "replyPayloadToTopic", Message.class); + listener.onMessage(mock(javax.jms.Message.class), session); + + + verify(session).createTopic("topicOut"); + verify(session).createTextMessage("Response"); + verify(messageProducer).send(responseMessage); + verify(messageProducer).close(); + } + + @Test + public void replyPayloadToDestination() throws JMSException { + Queue replyDestination = mock(Queue.class); + Message request = MessageBuilder.withPayload("Response") + .setHeader("destination", replyDestination).build(); + + Session session = mock(Session.class); + MessageProducer messageProducer = mock(MessageProducer.class); + TextMessage responseMessage = mock(TextMessage.class); + given(session.createTextMessage("Response")).willReturn(responseMessage); + given(session.createProducer(replyDestination)).willReturn(messageProducer); + + MessagingMessageListenerAdapter listener = getPayloadInstance(request, "replyPayloadToDestination", Message.class); + listener.onMessage(mock(javax.jms.Message.class), session); + + verify(session, times(0)).createQueue(anyString()); + verify(session).createTextMessage("Response"); + verify(messageProducer).send(responseMessage); + verify(messageProducer).close(); + } + + @Test + public void replyPayloadNoDestination() throws JMSException { + Queue replyDestination = mock(Queue.class); + Message request = MessageBuilder.withPayload("Response").build(); + + Session session = mock(Session.class); + MessageProducer messageProducer = mock(MessageProducer.class); + TextMessage responseMessage = mock(TextMessage.class); + given(session.createTextMessage("Response")).willReturn(responseMessage); + given(session.createProducer(replyDestination)).willReturn(messageProducer); + + MessagingMessageListenerAdapter listener = + getPayloadInstance(request, "replyPayloadNoDestination", Message.class); + listener.setDefaultResponseDestination(replyDestination); + listener.onMessage(mock(javax.jms.Message.class), session); + + verify(session, times(0)).createQueue(anyString()); + verify(session).createTextMessage("Response"); + verify(messageProducer).send(responseMessage); + verify(messageProducer).close(); + } + protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, Class... parameterTypes) { Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes); return createInstance(m); @@ -157,6 +251,19 @@ public class MessagingMessageListenerAdapterTests { return adapter; } + protected MessagingMessageListenerAdapter getPayloadInstance(final Object payload, + String methodName, Class... parameterTypes) { + Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes); + MessagingMessageListenerAdapter adapter = new MessagingMessageListenerAdapter() { + @Override + protected Object extractMessage(javax.jms.Message message) { + return payload; + } + }; + adapter.setHandlerMethod(factory.createInvocableHandlerMethod(sample, m)); + return adapter; + } + private void initializeFactory(DefaultMessageHandlerMethodFactory factory) { factory.setBeanFactory(new StaticListableBeanFactory()); factory.afterPropertiesSet(); @@ -178,6 +285,23 @@ public class MessagingMessageListenerAdapterTests { .build(); } + public JmsResponse replyPayloadToQueue(Message input) { + return JmsResponse.forQueue(input.getPayload(), "queueOut"); + } + + public JmsResponse replyPayloadToTopic(Message input) { + return JmsResponse.forTopic(input.getPayload(), "topicOut"); + } + + public JmsResponse replyPayloadToDestination(Message input) { + return JmsResponse.forDestination(input.getPayload(), + input.getHeaders().get("destination", Destination.class)); + } + + public JmsResponse replyPayloadNoDestination(Message input) { + return new JmsResponse(input.getPayload(), null); + } + public void fail(String input) { throw new IllegalArgumentException("Expected test exception"); }