From bfa6645c7d8e401ae2d1ea2587dbdd0eaa943265 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 23 Oct 2013 13:24:25 -0400 Subject: [PATCH] Make changes for timing related test failures --- .../simp/BrokerAvailabilityEvent.java | 6 + ...erRelayMessageHandlerIntegrationTests.java | 289 +++++++++--------- 2 files changed, 151 insertions(+), 144 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java index e2df82f77a..119021a883 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/BrokerAvailabilityEvent.java @@ -45,4 +45,10 @@ public class BrokerAvailabilityEvent extends ApplicationEvent { public boolean isBrokerAvailable() { return this.brokerAvailable; } + + @Override + public String toString() { + return "BrokerAvailabilityEvent=" + this.brokerAvailable; + } + } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 5b531505ae..736f5d0686 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -68,25 +68,24 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { private int port; + @Before public void setUp() throws Exception { this.port = SocketUtils.findAvailableTcpPort(61613); - createAndStartBroker(); - this.responseChannel = new ExecutorSubscribableChannel(); this.responseHandler = new ExpectationMatchingMessageHandler(); this.responseChannel.subscribe(this.responseHandler); - this.eventPublisher = new ExpectationMatchingEventPublisher(); + startActiveMqBroker(); createAndStartRelay(); } - private void createAndStartBroker() throws Exception { + private void startActiveMqBroker() throws Exception { this.activeMQBroker = new BrokerService(); - this.activeMQBroker.addConnector("stomp://localhost:" + port); + this.activeMQBroker.addConnector("stomp://localhost:" + this.port); this.activeMQBroker.setStartAsync(false); this.activeMQBroker.setDeleteAllMessagesOnStartup(true); this.activeMQBroker.start(); @@ -94,7 +93,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { private void createAndStartRelay() throws InterruptedException { this.relay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/queue/", "/topic/")); - this.relay.setRelayPort(port); + this.relay.setRelayPort(this.port); this.relay.setApplicationEventPublisher(this.eventPublisher); this.relay.setSystemHeartbeatReceiveInterval(0); this.relay.setSystemHeartbeatSendInterval(0); @@ -110,145 +109,11 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { this.relay.stop(); } finally { - stopBrokerAndAwait(); + stopActiveMqBrokerAndAwait(); } } - // When TCP client is behind interface and configurable: - // test "host" header (virtualHost property) - // test "/user/.." destination is excluded - - @Test - public void publishSubscribe() throws Exception { - - String sess1 = "sess1"; - MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); - this.relay.handleMessage(conn1.message); - this.responseHandler.expect(conn1); - - String sess2 = "sess2"; - MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build(); - this.relay.handleMessage(conn2.message); - this.responseHandler.expect(conn2); - - this.responseHandler.awaitAndAssert(); - - String subs1 = "subs1"; - String destination = "/topic/test"; - - MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); - this.relay.handleMessage(subscribe.message); - this.responseHandler.expect(subscribe); - this.responseHandler.awaitAndAssert(); - - MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build(); - this.responseHandler.expect(send); - - this.relay.handleMessage(send.message); - this.responseHandler.awaitAndAssert(); - } - - @Test - public void brokerUnvailableErrorFrameOnConnect() throws Exception { - - stopBrokerAndAwait(); - - MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build(); - this.responseHandler.expect(connect); - - this.relay.handleMessage(connect.message); - this.responseHandler.awaitAndAssert(); - } - - @Test(expected=MessageDeliveryException.class) - public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { - stopBrokerAndAwait(); - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); - this.relay.handleMessage(MessageBuilder.withPayload("test".getBytes()).setHeaders(headers).build()); - } - - @Test - public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception { - - String sess1 = "sess1"; - MessageExchange connect = MessageExchangeBuilder.connect(sess1).build(); - this.responseHandler.expect(connect); - - this.relay.handleMessage(connect.message); - - this.responseHandler.awaitAndAssert(); - - this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); - - stopBrokerAndAwait(); - - this.responseHandler.awaitAndAssert(); - } - - @Test - public void brokerAvailabilityEventWhenStopped() throws Exception { - this.eventPublisher.expectAvailabilityStatusChanges(false); - stopBrokerAndAwait(); - this.eventPublisher.awaitAndAssert(); - } - - @Test - public void relayReconnectsIfBrokerComesBackUp() throws Exception { - - String sess1 = "sess1"; - MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); - this.responseHandler.expect(conn1); - this.relay.handleMessage(conn1.message); - this.responseHandler.awaitAndAssert(); - - String subs1 = "subs1"; - String destination = "/topic/test"; - MessageExchange subscribe = - MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); - this.responseHandler.expect(subscribe); - - this.relay.handleMessage(subscribe.message); - this.responseHandler.awaitAndAssert(); - - this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); - - stopBrokerAndAwait(); - - this.responseHandler.awaitAndAssert(); - - this.eventPublisher.expectAvailabilityStatusChanges(false); - this.eventPublisher.awaitAndAssert(); - - this.eventPublisher.expectAvailabilityStatusChanges(true); - createAndStartBroker(); - this.eventPublisher.awaitAndAssert(); - - // TODO The event publisher assertions show that the broker's back up and the system relay session - // has reconnected. We need to decide what we want the reconnect behaviour to be for client relay - // sessions and add further message sending and assertions as appropriate. At the moment any client - // sessions will be closed and an ERROR from will be sent. - } - - @Test - public void disconnectClosesRelaySessionCleanly() throws Exception { - MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); - this.responseHandler.expect(connect); - this.relay.handleMessage(connect.message); - this.responseHandler.awaitAndAssert(); - - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - headers.setSessionId("sess1"); - - this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); - - Thread.sleep(2000); - - // Check that we have not received an ERROR as a result of the connection closing - this.responseHandler.awaitAndAssert(); - } - - - private void stopBrokerAndAwait() throws Exception { + private void stopActiveMqBrokerAndAwait() throws Exception { logger.debug("Stopping ActiveMQ broker and will await shutdown"); if (!this.activeMQBroker.isStarted()) { logger.debug("Broker not running"); @@ -266,6 +131,140 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } + // When TCP client is behind interface and configurable: + // test "host" header (virtualHost property) + // test "/user/.." destination is excluded + + @Test + public void publishSubscribe() throws Exception { + + String sess1 = "sess1"; + String sess2 = "sess2"; + MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); + MessageExchange conn2 = MessageExchangeBuilder.connect(sess2).build(); + this.responseHandler.expect(conn1, conn2); + + this.relay.handleMessage(conn1.message); + this.relay.handleMessage(conn2.message); + this.responseHandler.awaitAndAssert(); + + String subs1 = "subs1"; + String destination = "/topic/test"; + + MessageExchange subscribe = MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); + this.responseHandler.expect(subscribe); + + this.relay.handleMessage(subscribe.message); + this.responseHandler.awaitAndAssert(); + + MessageExchange send = MessageExchangeBuilder.send(destination, "foo").andExpectMessage(sess1, subs1).build(); + this.responseHandler.expect(send); + + this.relay.handleMessage(send.message); + this.responseHandler.awaitAndAssert(); + } + + @Test + public void brokerUnvailableErrorFrameOnConnect() throws Exception { + + stopActiveMqBrokerAndAwait(); + + MessageExchange connect = MessageExchangeBuilder.connectWithError("sess1").build(); + this.responseHandler.expect(connect); + + this.relay.handleMessage(connect.message); + this.responseHandler.awaitAndAssert(); + } + + @Test(expected=MessageDeliveryException.class) + public void messageDeliverExceptionIfSystemSessionForwardFails() throws Exception { + stopActiveMqBrokerAndAwait(); + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND); + this.relay.handleMessage(MessageBuilder.withPayload("test".getBytes()).setHeaders(headers).build()); + } + + @Test + public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception { + + String sess1 = "sess1"; + MessageExchange connect = MessageExchangeBuilder.connect(sess1).build(); + this.responseHandler.expect(connect); + + this.relay.handleMessage(connect.message); + this.responseHandler.awaitAndAssert(); + + this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); + + stopActiveMqBrokerAndAwait(); + + this.responseHandler.awaitAndAssert(); + } + + @Test + public void brokerAvailabilityEventWhenStopped() throws Exception { + this.eventPublisher.expectAvailabilityStatusChanges(false); + stopActiveMqBrokerAndAwait(); + this.eventPublisher.awaitAndAssert(); + } + + @Test + public void relayReconnectsIfBrokerComesBackUp() throws Exception { + + String sess1 = "sess1"; + MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); + this.responseHandler.expect(conn1); + + this.relay.handleMessage(conn1.message); + this.responseHandler.awaitAndAssert(); + + String subs1 = "subs1"; + String destination = "/topic/test"; + MessageExchange subscribe = + MessageExchangeBuilder.subscribeWithReceipt(sess1, subs1, destination, "r1").build(); + this.responseHandler.expect(subscribe); + + this.relay.handleMessage(subscribe.message); + this.responseHandler.awaitAndAssert(); + + this.responseHandler.expect(MessageExchangeBuilder.error(sess1).build()); + + stopActiveMqBrokerAndAwait(); + + this.responseHandler.awaitAndAssert(); + + this.eventPublisher.expectAvailabilityStatusChanges(false); + this.eventPublisher.awaitAndAssert(); + + this.eventPublisher.expectAvailabilityStatusChanges(true); + startActiveMqBroker(); + this.eventPublisher.awaitAndAssert(); + + // TODO The event publisher assertions show that the broker's back up and the system relay session + // has reconnected. We need to decide what we want the reconnect behaviour to be for client relay + // sessions and add further message sending and assertions as appropriate. At the moment any client + // sessions will be closed and an ERROR from will be sent. + } + + @Test + public void disconnectClosesRelaySessionCleanly() throws Exception { + + MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); + this.responseHandler.expect(connect); + + this.relay.handleMessage(connect.message); + this.responseHandler.awaitAndAssert(); + + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); + headers.setSessionId("sess1"); + this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); + + Thread.sleep(2000); + + // Check that we have not received an ERROR as a result of the connection closing + this.responseHandler.awaitAndAssert(); + } + + /** * Handles messages by matching them to expectations including a latch to wait for * the completion of expected messages. @@ -408,6 +407,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT); headers.setSessionId(sessionId); headers.setAcceptVersion("1.1,1.2"); + headers.setHeartbeat(0, 0); Message message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); MessageExchangeBuilder builder = new MessageExchangeBuilder(message); @@ -595,8 +595,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { public void awaitAndAssert() throws InterruptedException { synchronized(this.monitor) { - long endTime = System.currentTimeMillis() + 6000; - while (this.expected.size() != this.actual.size() && System.currentTimeMillis() < endTime) { + long endTime = System.currentTimeMillis() + 10000; + while ((this.expected.size() != this.actual.size()) && (System.currentTimeMillis() < endTime)) { this.monitor.wait(500); } assertEquals(this.expected, this.actual); @@ -605,6 +605,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Override public void publishEvent(ApplicationEvent event) { + logger.debug("Processing ApplicationEvent " + event); if (event instanceof BrokerAvailabilityEvent) { synchronized(this.monitor) { this.actual.add(((BrokerAvailabilityEvent) event).isBrokerAvailable());