From 06b2d2b89e83f98bb5d50bfd4099e94b6db15700 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 26 Apr 2016 16:44:58 -0400 Subject: [PATCH] STOMP client session supports sending ack/nack Issue: SPR-14208 --- .../simp/stomp/DefaultStompSession.java | 19 +++++++++- .../messaging/simp/stomp/StompSession.java | 14 ++++++- .../simp/stomp/DefaultStompSessionTests.java | 38 ++++++++++++++++++- 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java index 711607801e..fc0d3fe4ac 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -310,6 +310,23 @@ public class DefaultStompSession implements ConnectionHandlingStompSession { return subscription; } + @Override + public Receiptable acknowledge(String messageId, boolean consumed) { + StompHeaders stompHeaders = new StompHeaders(); + stompHeaders.setId(messageId); + + String receiptId = checkOrAddReceipt(stompHeaders); + Receiptable receiptable = new ReceiptHandler(receiptId); + + StompCommand command = (consumed ? StompCommand.ACK : StompCommand.NACK); + StompHeaderAccessor accessor = createHeaderAccessor(command); + accessor.addNativeHeaders(stompHeaders); + Message message = createMessage(accessor, null); + execute(message); + + return receiptable; + } + private void unsubscribe(String id) { StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.UNSUBSCRIBE); accessor.setSubscriptionId(id); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java index ec46f518da..d908a2ab74 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,6 +86,18 @@ public interface StompSession { */ Subscription subscribe(StompHeaders headers, StompFrameHandler handler); + /** + * Send an acknowledgement whether a message was consumed or not resulting + * in an ACK or NACK frame respectively. + *

Note: to use this when subscribing you must set the + * {@link StompHeaders#setAck(String) ack} header to "client" or + * "client-individual" in order ot use this. + * @param messageId the id of the message + * @param consumed whether the message was consumed or not + * @return a Receiptable for tracking events + */ + Receiptable acknowledge(String messageId, boolean consumed); + /** * Disconnect the session by sending a DISCONNECT frame. */ diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java index ae7fd9d149..d4000748c2 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2015 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -491,6 +491,42 @@ public class DefaultStompSessionTests { assertEquals(subscription.getSubscriptionId(), stompHeaders.getId()); } + @Test + public void ack() throws Exception { + + this.session.afterConnected(this.connection); + assertTrue(this.session.isConnected()); + + String messageId = "123"; + this.session.acknowledge(messageId, true); + + Message message = this.messageCaptor.getValue(); + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + assertEquals(StompCommand.ACK, accessor.getCommand()); + + StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); + assertEquals(stompHeaders.toString(), 1, stompHeaders.size()); + assertEquals(messageId, stompHeaders.getId()); + } + + @Test + public void nack() throws Exception { + + this.session.afterConnected(this.connection); + assertTrue(this.session.isConnected()); + + String messageId = "123"; + this.session.acknowledge(messageId, false); + + Message message = this.messageCaptor.getValue(); + StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); + assertEquals(StompCommand.NACK, accessor.getCommand()); + + StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders()); + assertEquals(stompHeaders.toString(), 1, stompHeaders.size()); + assertEquals(messageId, stompHeaders.getId()); + } + @Test public void receiptReceived() throws Exception {