From e9a577d6bdc9fbf06d408a60b0da2f88b7ee6157 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 4 Sep 2024 15:31:01 -0400 Subject: [PATCH] GH-9428: Emit MQTT delivery events even if share client instance Fixes: #9428 Issue link: https://github.com/spring-projects/spring-integration/issues/9428 When `ClientManager` is used for MQTT channel adapters, a `MqttMessageDeliveredEvent` is not emitted since callback for the `ClientManager` is not aware about `deliveryComplete` * Use a `MqttActionListener` abstraction for the `publish` operation instead of a `deliveryComplete` from a common callback * Make some other refactoring into the `MqttPahoMessageHandler` and `Mqttv5PahoMessageHandler` extracting a common logic into their `AbstractMqttMessageHandler` superclass * Introduce an `MqttMessageNotDeliveredEvent` to be emitted from the `MqttActionListener.onFailure()` callback * Adapt mocks in the `MqttAdapterTests` for a new code flow * Add delivery events verification into the `ClientManagerBackToBackTests` * Fix race condition in the `ClientManagerBackToBackTests` Looks like the message can be consumed even before we just emit that `MqttMessageSentEvent` --- .../event/MqttMessageNotDeliveredEvent.java | 48 +++++++++++++ .../outbound/AbstractMqttMessageHandler.java | 60 +++++++++++++++- .../mqtt/outbound/MqttPahoMessageHandler.java | 70 +++++++----------- .../outbound/Mqttv5PahoMessageHandler.java | 71 +++++++------------ .../mqtt/ClientManagerBackToBackTests.java | 45 +++++++++++- .../integration/mqtt/MqttAdapterTests.java | 6 +- .../antora/modules/ROOT/pages/mqtt.adoc | 1 + .../antora/modules/ROOT/pages/whats-new.adoc | 1 + 8 files changed, 206 insertions(+), 96 deletions(-) create mode 100644 spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttMessageNotDeliveredEvent.java diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttMessageNotDeliveredEvent.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttMessageNotDeliveredEvent.java new file mode 100644 index 0000000000..4b1320b66e --- /dev/null +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/event/MqttMessageNotDeliveredEvent.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt.event; + +import java.io.Serial; + +/** + * An event emitted (when using aysnc) when the client indicates the message + * was not delivered on publish operation. + * + * @author Artem Bilan + * + * @since 6.4 + * + */ +public class MqttMessageNotDeliveredEvent extends MqttMessageDeliveryEvent { + + @Serial + private static final long serialVersionUID = 8983514811627569920L; + + private final Throwable exception; + + public MqttMessageNotDeliveredEvent(Object source, int messageId, String clientId, + int clientInstance, Throwable exception) { + + super(source, messageId, clientId, clientInstance); + this.exception = exception; + } + + public Throwable getException() { + return this.exception; + } + +} diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java index 4b43a6722c..fc1a1e3644 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/AbstractMqttMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,9 @@ import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; import org.springframework.integration.handler.MessageProcessor; import org.springframework.integration.mqtt.core.ClientManager; +import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; +import org.springframework.integration.mqtt.event.MqttMessageNotDeliveredEvent; +import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.support.management.ManageableLifecycle; @@ -76,6 +79,10 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHa private final ClientManager clientManager; + private boolean async; + + private boolean asyncEvents; + private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT; private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT; @@ -319,6 +326,32 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHa return this.clientManager; } + /** + * Set to true if you don't want to block when sending messages. Default false. + * When true, message sent/delivered events will be published for reception + * by a suitably configured 'ApplicationListener' or an event + * inbound-channel-adapter. + * @param async true for async. + * @see #setAsyncEvents(boolean) + */ + public void setAsync(boolean async) { + this.async = async; + } + + protected boolean isAsync() { + return this.async; + } + + /** + * When {@link #setAsync(boolean)} is true, setting this to true enables + * publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent} + * to be emitted. Default false. + * @param asyncEvents the asyncEvents. + */ + public void setAsyncEvents(boolean asyncEvents) { + this.asyncEvents = asyncEvents; + } + @Override protected void onInit() { super.onInit(); @@ -372,6 +405,31 @@ public abstract class AbstractMqttMessageHandler extends AbstractMessageHa publish(topic, mqttMessage, message); } + protected void messageSentEvent(Message message, String topic, int messageId) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageSentEvent(this, message, topic, messageId, getClientId(), + getClientInstance())); + } + } + + protected void sendDeliveryCompleteEvent(int messageId) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageDeliveredEvent(this, messageId, getClientId(), getClientInstance())); + } + } + + protected void sendFailedDeliveryEvent(int messageId, Throwable exception) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + if (this.async && this.asyncEvents && applicationEventPublisher != null) { + applicationEventPublisher.publishEvent( + new MqttMessageNotDeliveredEvent(this, messageId, getClientId(), getClientInstance(), exception)); + } + } + protected abstract void publish(String topic, Object mqttMessage, Message message); } diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java index 3a565d76cc..0e33d996ce 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java @@ -16,8 +16,10 @@ package org.springframework.integration.mqtt.outbound; +import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; @@ -29,8 +31,6 @@ import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoComponent; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; -import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; -import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.mqtt.support.MqttUtils; @@ -60,9 +60,7 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler headerMapper = new MqttHeaderMapper(); public Mqttv5PahoMessageHandler(String url, String clientId) { @@ -118,28 +115,6 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler configClass, String topicName, CountDownLatch subscribedLatch) throws Exception { @@ -115,6 +120,12 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest { else { assertThat(payload).isEqualTo(testPayload.getBytes(StandardCharsets.UTF_8)); } + + if (ctx.containsBean("deliveryEvents")) { + List deliveryEvents = ctx.getBean("deliveryEvents", List.class); + // MqttMessageSentEvent and MqttMessageDeliveredEvent + await().untilAsserted(() -> assertThat(deliveryEvents).hasSize(2)); + } } } @@ -164,6 +175,16 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest { subscribedLatch.countDown(); } + @EventListener + void mqttEvents(MqttMessageDeliveryEvent event) { + deliveryEvents().add(event); + } + + @Bean + List deliveryEvents() { + return new ArrayList<>(); + } + @Bean public Mqttv3ClientManager mqttv3ClientManager() { MqttConnectOptions connectionOptions = new MqttConnectOptions(); @@ -174,7 +195,10 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest { @Bean public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) { - return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager)); + MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttv3ClientManager); + mqttPahoMessageHandler.setAsync(true); + mqttPahoMessageHandler.setAsyncEvents(true); + return f -> f.handle(mqttPahoMessageHandler); } @Bean @@ -257,6 +281,7 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest { var clientManager = ctx.getBean(Mqttv3ClientManager.class); return new MqttPahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME); } + } @Configuration @@ -272,6 +297,16 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest { subscribedLatch.countDown(); } + @EventListener + void mqttEvents(MqttMessageDeliveryEvent event) { + deliveryEvents().add(event); + } + + @Bean + List deliveryEvents() { + return new ArrayList<>(); + } + @Bean public Mqttv5ClientManager mqttv5ClientManager() { return new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5"); @@ -280,7 +315,10 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest { @Bean @ServiceActivator(inputChannel = "mqttOutFlow.input") public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler(Mqttv5ClientManager mqttv5ClientManager) { - return new Mqttv5PahoMessageHandler(mqttv5ClientManager); + Mqttv5PahoMessageHandler mqttPahoMessageHandler = new Mqttv5PahoMessageHandler(mqttv5ClientManager); + mqttPahoMessageHandler.setAsync(true); + mqttPahoMessageHandler.setAsyncEvents(true); + return mqttPahoMessageHandler; } @Bean @@ -358,10 +396,13 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest { var clientManager = ctx.getBean(Mqttv5ClientManager.class); return new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME); } + } interface MessageDrivenChannelAdapterFactory { + MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx); + } record ClientV3Disconnector(Mqttv3ClientManager clientManager) { diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java index 8d14687a0c..2a3588b67b 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java @@ -182,7 +182,7 @@ public class MqttAdapterTests { assertThat(new String(message.getPayload())).isEqualTo("Hello, world!"); publishCalled.set(true); return deliveryToken; - }).given(client).publish(anyString(), any(MqttMessage.class)); + }).given(client).publish(anyString(), any(), any(), any()); handler.handleMessage(new GenericMessage<>("Hello, world!")); @@ -204,7 +204,7 @@ public class MqttAdapterTests { given(clientManager.getClient()).willReturn(client); var deliveryToken = mock(MqttDeliveryToken.class); - given(client.publish(anyString(), any(MqttMessage.class))).willReturn(deliveryToken); + given(client.publish(anyString(), any(), any(), any())).willReturn(deliveryToken); var handler = new MqttPahoMessageHandler(clientManager); handler.setDefaultTopic("mqtt-foo"); @@ -218,7 +218,7 @@ public class MqttAdapterTests { // then verify(client, never()).connect(any(MqttConnectOptions.class)); - verify(client).publish(anyString(), any(MqttMessage.class)); + verify(client).publish(anyString(), any(), any(), any()); verify(client, never()).disconnect(); verify(client, never()).disconnect(anyLong()); verify(client, never()).close(); diff --git a/src/reference/antora/modules/ROOT/pages/mqtt.adoc b/src/reference/antora/modules/ROOT/pages/mqtt.adoc index 3e0154a626..ce0ba68ea1 100644 --- a/src/reference/antora/modules/ROOT/pages/mqtt.adoc +++ b/src/reference/antora/modules/ROOT/pages/mqtt.adoc @@ -399,6 +399,7 @@ Certain application events are published by the adapters. For the MQTT v5 Paho client, this event is also emitted when the server performs a normal disconnection, in which case the `cause` of the lost connection is `null`. * `MqttMessageSentEvent` - published by the outbound adapter when a message has been sent, if running in asynchronous mode. * `MqttMessageDeliveredEvent` - published by the outbound adapter when the client indicates that a message has been delivered, if running in asynchronous mode. +* `MqttMessageNotDeliveredEvent` - published by the outbound adapter when the client indicates that a message has not been delivered, if running in asynchronous mode. * `MqttSubscribedEvent` - published by the inbound adapter after subscribing to the topics. These events can be received by an `ApplicationListener` or with an `@EventListener` method. diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index c3f3c50a62..d92bd74f9f 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -72,5 +72,6 @@ See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information. === MQTT Support Changes Multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext` +Also a `MqttMessageNotDeliveredEvent` event has been introduced to emit when action callback reacts to the delivery failure. See xref:mqtt.adoc[MQTT Support] for more information.