Use JmsListener in JmsMessageSenderIntegrationTests
This commit replaces the use of a synchronous receive operation using JmsTemplate by the use of JmsListener. The latter makes sure that the handling of the request and sending the response happens in the same session and that should hopefully fix the flakiness of this test. Closes gh-1539
This commit is contained in:
@@ -17,22 +17,34 @@
|
||||
package org.springframework.ws.transport.jms;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
|
||||
import jakarta.jms.BytesMessage;
|
||||
import jakarta.jms.ConnectionFactory;
|
||||
import jakarta.jms.JMSException;
|
||||
import jakarta.jms.Message;
|
||||
import jakarta.jms.TextMessage;
|
||||
import jakarta.xml.soap.MessageFactory;
|
||||
import jakarta.xml.soap.SOAPConstants;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import jakarta.xml.soap.SOAPException;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.jms.annotation.EnableJms;
|
||||
import org.springframework.jms.annotation.JmsListener;
|
||||
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
|
||||
import org.springframework.jms.core.MessagePostProcessor;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
|
||||
import org.springframework.util.function.ThrowingFunction;
|
||||
import org.springframework.ws.soap.SoapMessage;
|
||||
import org.springframework.ws.soap.SoapVersion;
|
||||
import org.springframework.ws.soap.saaj.SaajSoapMessage;
|
||||
@@ -41,57 +53,33 @@ import org.springframework.ws.transport.WebServiceConnection;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@ExtendWith(SpringExtension.class)
|
||||
@ContextConfiguration("jms-sender-applicationContext.xml")
|
||||
@SpringJUnitConfig
|
||||
class JmsMessageSenderIntegrationTests {
|
||||
|
||||
private static final String SOAP_ACTION = "\"http://springframework.org/DoIt\"";
|
||||
|
||||
private static final MessageFactory messageFactory = createMessageFactory();
|
||||
|
||||
@Autowired
|
||||
private JmsMessageSender messageSender;
|
||||
|
||||
@Autowired
|
||||
private JmsTemplate jmsTemplate;
|
||||
|
||||
private MessageFactory messageFactory;
|
||||
|
||||
private static final String SOAP_ACTION = "\"http://springframework.org/DoIt\"";
|
||||
|
||||
@BeforeEach
|
||||
void createMessageFactory() throws Exception {
|
||||
this.messageFactory = MessageFactory.newInstance(SOAPConstants.SOAP_1_1_PROTOCOL);
|
||||
}
|
||||
private TestJmsListener testJmsListener;
|
||||
|
||||
@Test
|
||||
void testSendAndReceiveQueueBytesMessageTemporaryQueue() throws Exception {
|
||||
|
||||
URI uri = new URI("jms:SenderRequestQueue?deliveryMode=NON_PERSISTENT");
|
||||
|
||||
try (WebServiceConnection connection = this.messageSender.createConnection(uri)) {
|
||||
|
||||
SoapMessage soapRequest = new SaajSoapMessage(this.messageFactory.createMessage());
|
||||
SoapMessage soapRequest = new SaajSoapMessage(messageFactory.createMessage());
|
||||
soapRequest.setSoapAction(SOAP_ACTION);
|
||||
connection.send(soapRequest);
|
||||
|
||||
BytesMessage request = (BytesMessage) this.jmsTemplate.receive();
|
||||
|
||||
assertThat(request).isNotNull();
|
||||
assertThat(request.readByte()).isNotEqualTo(-1);
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
this.messageFactory.createMessage().writeTo(bos);
|
||||
final byte[] buf = bos.toByteArray();
|
||||
|
||||
this.jmsTemplate.send(request.getJMSReplyTo(), session -> {
|
||||
|
||||
BytesMessage response = session.createBytesMessage();
|
||||
response.setStringProperty(JmsTransportConstants.PROPERTY_SOAP_ACTION, SOAP_ACTION);
|
||||
response.setStringProperty(JmsTransportConstants.PROPERTY_CONTENT_TYPE,
|
||||
SoapVersion.SOAP_11.getContentType());
|
||||
response.writeBytes(buf);
|
||||
return response;
|
||||
this.testJmsListener.handleMessage((message) -> {
|
||||
assertNonEmptyByteMessage(message);
|
||||
return createEmptySoapMessage();
|
||||
});
|
||||
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(this.messageFactory));
|
||||
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(messageFactory));
|
||||
assertThat(response).isNotNull();
|
||||
assertThat(response.getSoapAction()).isEqualTo(SOAP_ACTION);
|
||||
assertThat(response.hasFault()).isFalse();
|
||||
@@ -100,38 +88,20 @@ class JmsMessageSenderIntegrationTests {
|
||||
|
||||
@Test
|
||||
void testSendAndReceiveQueueBytesMessagePermanentQueue() throws Exception {
|
||||
|
||||
String responseQueueName = "SenderResponseQueue";
|
||||
URI uri = new URI("jms:SenderRequestQueue?replyToName=" + responseQueueName + "&deliveryMode=NON_PERSISTENT");
|
||||
|
||||
try (WebServiceConnection connection = this.messageSender.createConnection(uri)) {
|
||||
|
||||
SoapMessage soapRequest = new SaajSoapMessage(this.messageFactory.createMessage());
|
||||
SoapMessage soapRequest = new SaajSoapMessage(messageFactory.createMessage());
|
||||
soapRequest.setSoapAction(SOAP_ACTION);
|
||||
connection.send(soapRequest);
|
||||
|
||||
final BytesMessage request = (BytesMessage) this.jmsTemplate.receive();
|
||||
|
||||
assertThat(request).isNotNull();
|
||||
assertThat(request.readByte()).isNotEqualTo(-1);
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
this.messageFactory.createMessage().writeTo(bos);
|
||||
final byte[] buf = bos.toByteArray();
|
||||
|
||||
this.jmsTemplate.send(responseQueueName, session -> {
|
||||
|
||||
BytesMessage response = session.createBytesMessage();
|
||||
response.setJMSCorrelationID(request.getJMSMessageID());
|
||||
response.setStringProperty(JmsTransportConstants.PROPERTY_SOAP_ACTION, SOAP_ACTION);
|
||||
response.setStringProperty(JmsTransportConstants.PROPERTY_CONTENT_TYPE,
|
||||
SoapVersion.SOAP_11.getContentType());
|
||||
response.writeBytes(buf);
|
||||
return response;
|
||||
this.testJmsListener.handleMessage((message) -> {
|
||||
assertNonEmptyByteMessage(message);
|
||||
return createEmptySoapMessage();
|
||||
});
|
||||
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(this.messageFactory));
|
||||
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(messageFactory));
|
||||
assertThat(response).isNotNull();
|
||||
assertThat(response.getSoapAction()).isEqualTo(SOAP_ACTION);
|
||||
assertThat(response.hasFault()).isFalse();
|
||||
@@ -140,36 +110,18 @@ class JmsMessageSenderIntegrationTests {
|
||||
|
||||
@Test
|
||||
void testSendAndReceiveQueueTextMessage() throws Exception {
|
||||
|
||||
URI uri = new URI("jms:SenderRequestQueue?deliveryMode=NON_PERSISTENT&messageType=TEXT_MESSAGE");
|
||||
|
||||
try (WebServiceConnection connection = this.messageSender.createConnection(uri)) {
|
||||
|
||||
SoapMessage soapRequest = new SaajSoapMessage(this.messageFactory.createMessage());
|
||||
SoapMessage soapRequest = new SaajSoapMessage(messageFactory.createMessage());
|
||||
soapRequest.setSoapAction(SOAP_ACTION);
|
||||
connection.send(soapRequest);
|
||||
|
||||
TextMessage request = (TextMessage) this.jmsTemplate.receive();
|
||||
|
||||
assertThat(request).isNotNull();
|
||||
assertThat(request.getText()).isNotNull();
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
this.messageFactory.createMessage().writeTo(bos);
|
||||
final String text = bos.toString(StandardCharsets.UTF_8);
|
||||
|
||||
this.jmsTemplate.send(request.getJMSReplyTo(), session -> {
|
||||
|
||||
TextMessage response = session.createTextMessage();
|
||||
response.setStringProperty(JmsTransportConstants.PROPERTY_SOAP_ACTION, SOAP_ACTION);
|
||||
response.setStringProperty(JmsTransportConstants.PROPERTY_CONTENT_TYPE,
|
||||
SoapVersion.SOAP_11.getContentType());
|
||||
response.setText(text);
|
||||
return response;
|
||||
this.testJmsListener.handleMessage((message) -> {
|
||||
assertNonEmptyTextMessage(message);
|
||||
return createEmptySoapMessage();
|
||||
});
|
||||
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(this.messageFactory));
|
||||
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(messageFactory));
|
||||
assertThat(response).isNotNull();
|
||||
assertThat(response.getSoapAction()).isEqualTo(SOAP_ACTION);
|
||||
assertThat(response.hasFault()).isFalse();
|
||||
@@ -178,49 +130,119 @@ class JmsMessageSenderIntegrationTests {
|
||||
|
||||
@Test
|
||||
void testSendNoResponse() throws Exception {
|
||||
|
||||
URI uri = new URI("jms:SenderRequestQueue?deliveryMode=NON_PERSISTENT");
|
||||
|
||||
try (WebServiceConnection connection = this.messageSender.createConnection(uri)) {
|
||||
|
||||
SoapMessage soapRequest = new SaajSoapMessage(this.messageFactory.createMessage());
|
||||
SoapMessage soapRequest = new SaajSoapMessage(messageFactory.createMessage());
|
||||
soapRequest.setSoapAction(SOAP_ACTION);
|
||||
connection.send(soapRequest);
|
||||
|
||||
BytesMessage request = (BytesMessage) this.jmsTemplate.receive();
|
||||
|
||||
assertThat(request).isNotNull();
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
this.messageFactory.createMessage().writeTo(bos);
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(this.messageFactory));
|
||||
this.testJmsListener.handleMessage((message) -> {
|
||||
assertNonEmptyByteMessage(message);
|
||||
return null;
|
||||
});
|
||||
|
||||
SoapMessage response = (SoapMessage) connection.receive(new SaajSoapMessageFactory(messageFactory));
|
||||
assertThat(response).isNull();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPostProcessor() throws Exception {
|
||||
|
||||
MessagePostProcessor processor = message -> {
|
||||
|
||||
message.setBooleanProperty("processed", true);
|
||||
return message;
|
||||
};
|
||||
|
||||
URI uri = new URI("jms:SenderRequestQueue?deliveryMode=NON_PERSISTENT");
|
||||
|
||||
try (JmsSenderConnection connection = (JmsSenderConnection) this.messageSender.createConnection(uri)) {
|
||||
|
||||
connection.setPostProcessor(processor);
|
||||
SoapMessage soapRequest = new SaajSoapMessage(this.messageFactory.createMessage());
|
||||
SoapMessage soapRequest = new SaajSoapMessage(messageFactory.createMessage());
|
||||
connection.send(soapRequest);
|
||||
|
||||
BytesMessage request = (BytesMessage) this.jmsTemplate.receive();
|
||||
|
||||
assertThat(request).isNotNull();
|
||||
assertThat(request.getBooleanProperty("processed")).isTrue();
|
||||
this.testJmsListener.handleMessage((message) -> {
|
||||
assertNonEmptyByteMessage(message);
|
||||
assertThat(message.getBooleanProperty("processed")).isTrue();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private Object createEmptySoapMessage() throws SOAPException, IOException {
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
messageFactory.createMessage().writeTo(bos);
|
||||
String text = bos.toString(StandardCharsets.UTF_8);
|
||||
return MessageBuilder.withPayload(text)
|
||||
.setHeader(JmsTransportConstants.PROPERTY_SOAP_ACTION, SOAP_ACTION)
|
||||
.setHeader(JmsTransportConstants.PROPERTY_CONTENT_TYPE, SoapVersion.SOAP_11.getContentType())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static void assertNonEmptyByteMessage(jakarta.jms.Message message) throws JMSException {
|
||||
if (message instanceof BytesMessage bytesMessage) {
|
||||
assertThat(bytesMessage.readByte()).isNotEqualTo(-1);
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("Unexpected message type: " + message.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertNonEmptyTextMessage(jakarta.jms.Message message) throws JMSException {
|
||||
if (message instanceof TextMessage textMessage) {
|
||||
assertThat(textMessage.getText()).isNotEmpty();
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("Unexpected message type: " + message.getClass().getName());
|
||||
}
|
||||
}
|
||||
|
||||
private static MessageFactory createMessageFactory() {
|
||||
try {
|
||||
return MessageFactory.newInstance(SOAPConstants.SOAP_1_1_PROTOCOL);
|
||||
}
|
||||
catch (SOAPException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
static class TestJmsListener {
|
||||
|
||||
private ThrowingFunction<jakarta.jms.Message, Object> messageHandler;
|
||||
|
||||
public void handleMessage(ThrowingFunction<Message, Object> messageHandler) {
|
||||
this.messageHandler = messageHandler;
|
||||
}
|
||||
|
||||
@JmsListener(destination = "SenderRequestQueue")
|
||||
@SendTo("SenderResponseQueue")
|
||||
Object handleRequest(jakarta.jms.Message message) {
|
||||
return this.messageHandler.apply(message);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@EnableJms
|
||||
@Import(TestJmsListener.class)
|
||||
static class JmsBrokerConfig {
|
||||
|
||||
@Bean
|
||||
ActiveMQConnectionFactory connectionFactory() {
|
||||
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
|
||||
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
|
||||
factory.setConnectionFactory(connectionFactory);
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
JmsMessageSender messageSender(ConnectionFactory connectionFactory) {
|
||||
JmsMessageSender messageSender = new JmsMessageSender(connectionFactory);
|
||||
messageSender.setReceiveTimeout(Duration.ofSeconds(2).toMillis());
|
||||
return messageSender;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user