GH-9428: Emit MQTT delivery events even if share client instance

Fixes: #9428

Issue link: https://github.com/spring-projects/spring-integration/issues/9428

When `ClientManager` is used for MQTT channel adapters, a `MqttMessageDeliveredEvent`
is not emitted since callback for the `ClientManager` is not aware about `deliveryComplete`

* Use a `MqttActionListener` abstraction for the `publish` operation instead of a `deliveryComplete`
from a common callback
* Make some other refactoring into the `MqttPahoMessageHandler` and `Mqttv5PahoMessageHandler`
extracting a common logic into their `AbstractMqttMessageHandler` superclass
* Introduce an `MqttMessageNotDeliveredEvent` to be emitted from the `MqttActionListener.onFailure()` callback
* Adapt mocks in the `MqttAdapterTests` for a new code flow
* Add delivery events verification into the `ClientManagerBackToBackTests`

* Fix race condition in the `ClientManagerBackToBackTests`

Looks like the message can be consumed even before we just emit that `MqttMessageSentEvent`
This commit is contained in:
Artem Bilan
2024-09-04 15:31:01 -04:00
committed by GitHub
parent 49a0aaa793
commit e9a577d6bd
8 changed files with 206 additions and 96 deletions

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.mqtt.event;
import java.io.Serial;
/**
* An event emitted (when using aysnc) when the client indicates the message
* was not delivered on publish operation.
*
* @author Artem Bilan
*
* @since 6.4
*
*/
public class MqttMessageNotDeliveredEvent extends MqttMessageDeliveryEvent {
@Serial
private static final long serialVersionUID = 8983514811627569920L;
private final Throwable exception;
public MqttMessageNotDeliveredEvent(Object source, int messageId, String clientId,
int clientInstance, Throwable exception) {
super(source, messageId, clientId, clientInstance);
this.exception = exception;
}
public Throwable getException() {
return this.exception;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,9 @@ import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageNotDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.support.management.ManageableLifecycle;
@@ -76,6 +79,10 @@ public abstract class AbstractMqttMessageHandler<T, C> extends AbstractMessageHa
private final ClientManager<T, C> clientManager;
private boolean async;
private boolean asyncEvents;
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;
@@ -319,6 +326,32 @@ public abstract class AbstractMqttMessageHandler<T, C> extends AbstractMessageHa
return this.clientManager;
}
/**
* Set to true if you don't want to block when sending messages. Default false.
* When true, message sent/delivered events will be published for reception
* by a suitably configured 'ApplicationListener' or an event
* inbound-channel-adapter.
* @param async true for async.
* @see #setAsyncEvents(boolean)
*/
public void setAsync(boolean async) {
this.async = async;
}
protected boolean isAsync() {
return this.async;
}
/**
* When {@link #setAsync(boolean)} is true, setting this to true enables
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
* to be emitted. Default false.
* @param asyncEvents the asyncEvents.
*/
public void setAsyncEvents(boolean asyncEvents) {
this.asyncEvents = asyncEvents;
}
@Override
protected void onInit() {
super.onInit();
@@ -372,6 +405,31 @@ public abstract class AbstractMqttMessageHandler<T, C> extends AbstractMessageHa
publish(topic, mqttMessage, message);
}
protected void messageSentEvent(Message<?> message, String topic, int messageId) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, messageId, getClientId(),
getClientInstance()));
}
}
protected void sendDeliveryCompleteEvent(int messageId) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageDeliveredEvent(this, messageId, getClientId(), getClientInstance()));
}
}
protected void sendFailedDeliveryEvent(int messageId, Throwable exception) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageNotDeliveredEvent(this, messageId, getClientId(), getClientInstance(), exception));
}
}
protected abstract void publish(String topic, Object mqttMessage, Message<?> message);
}

View File

@@ -16,8 +16,10 @@
package org.springframework.integration.mqtt.outbound;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -29,8 +31,6 @@ import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
@@ -60,9 +60,7 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler<IMqttAsyn
private final MqttPahoClientFactory clientFactory;
private boolean async;
private boolean asyncEvents;
private final IMqttActionListener mqttPublishActionListener = new MqttPublishActionListener();
private volatile IMqttAsyncClient client;
@@ -113,29 +111,6 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler<IMqttAsyn
this.clientFactory = factory;
}
/**
* Set to true if you don't want to block when sending messages. Default false.
* When true, message sent/delivered events will be published for reception
* by a suitably configured 'ApplicationListener' or an event
* inbound-channel-adapter.
* @param async true for async.
* @since 4.1
*/
public void setAsync(boolean async) {
this.async = async;
}
/**
* When {@link #setAsync(boolean)} is true, setting this to true enables
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
* to be emitted. Default false.
* @param asyncEvents the asyncEvents.
* @since 4.1
*/
public void setAsyncEvents(boolean asyncEvents) {
this.asyncEvents = asyncEvents;
}
@Override
public MqttConnectOptions getConnectionInfo() {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
@@ -236,15 +211,12 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler<IMqttAsyn
Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
try {
IMqttDeliveryToken token = checkConnection()
.publish(topic, (MqttMessage) mqttMessage);
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (!this.async) {
.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener);
if (!isAsync()) {
token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync)
}
else if (this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
getClientInstance()));
else {
messageSentEvent(message, topic, token.getMessageId());
}
}
catch (MqttException e) {
@@ -252,15 +224,6 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler<IMqttAsyn
}
}
private void sendDeliveryComplete(IMqttDeliveryToken token) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(),
getClientInstance()));
}
}
@Override
public void connectionLost(Throwable cause) {
this.lock.lock();
@@ -293,7 +256,24 @@ public class MqttPahoMessageHandler extends AbstractMqttMessageHandler<IMqttAsyn
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
sendDeliveryComplete(token);
}
private final class MqttPublishActionListener implements IMqttActionListener {
MqttPublishActionListener() {
}
@Override
public void onSuccess(IMqttToken asyncActionToken) {
sendDeliveryCompleteEvent(asyncActionToken.getMessageId());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception);
}
}
}

View File

@@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets;
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClientPersistence;
@@ -36,8 +37,6 @@ import org.springframework.integration.mapping.HeaderMapper;
import org.springframework.integration.mqtt.core.ClientManager;
import org.springframework.integration.mqtt.core.MqttComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
@@ -62,15 +61,13 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler<IMqttAs
private final MqttConnectionOptions connectionOptions;
private final MqttActionListener mqttPublishActionListener = new MqttPublishActionListener();
private IMqttAsyncClient mqttClient;
@Nullable
private MqttClientPersistence persistence;
private boolean async;
private boolean asyncEvents;
private HeaderMapper<MqttProperties> headerMapper = new MqttHeaderMapper();
public Mqttv5PahoMessageHandler(String url, String clientId) {
@@ -118,28 +115,6 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler<IMqttAs
this.headerMapper = headerMapper;
}
/**
* Set to true if you don't want to block when sending messages. Default false.
* When true, message sent/delivered events will be published for reception
* by a suitably configured 'ApplicationListener' or an event
* inbound-channel-adapter.
* @param async true for async.
* @see #setAsyncEvents(boolean)
*/
public void setAsync(boolean async) {
this.async = async;
}
/**
* When {@link #setAsync(boolean)} is true, setting this to true enables
* publication of {@link MqttMessageSentEvent} and {@link MqttMessageDeliveredEvent}
* to be emitted. Default false.
* @param asyncEvents the asyncEvents.
*/
public void setAsyncEvents(boolean asyncEvents) {
this.asyncEvents = asyncEvents;
}
@Override
protected void onInit() {
super.onInit();
@@ -268,15 +243,13 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler<IMqttAs
if (!this.mqttClient.isConnected()) {
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
}
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (!this.async) {
IMqttToken token =
this.mqttClient.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener);
if (!isAsync()) {
token.waitForCompletion(completionTimeout); // NOSONAR (sync)
}
else if (this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageSentEvent(this, message, topic, token.getMessageId(), getClientId(),
getClientInstance()));
else {
messageSentEvent(message, topic, token.getMessageId());
}
}
catch (MqttException ex) {
@@ -284,18 +257,9 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler<IMqttAs
}
}
private void sendDeliveryComplete(IMqttToken token) {
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (this.async && this.asyncEvents && applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(
new MqttMessageDeliveredEvent(this, token.getMessageId(), getClientId(),
getClientInstance()));
}
}
@Override
public void deliveryComplete(IMqttToken token) {
sendDeliveryComplete(token);
}
@Override
@@ -330,4 +294,21 @@ public class Mqttv5PahoMessageHandler extends AbstractMqttMessageHandler<IMqttAs
}
private final class MqttPublishActionListener implements MqttActionListener {
MqttPublishActionListener() {
}
@Override
public void onSuccess(IMqttToken asyncActionToken) {
sendDeliveryCompleteEvent(asyncActionToken.getMessageId());
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
sendFailedDeliveryEvent(asyncActionToken.getMessageId(), exception);
}
}
}

View File

@@ -17,6 +17,8 @@
package org.springframework.integration.mqtt;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,6 +39,7 @@ import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.Mqttv3ClientManager;
import org.springframework.integration.mqtt.core.Mqttv5ClientManager;
import org.springframework.integration.mqtt.event.MqttMessageDeliveryEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
@@ -49,6 +52,7 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/**
* @author Artem Vozhdayenko
@@ -92,6 +96,7 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
Mqttv5ConfigRuntime.subscribedLatch);
}
@SuppressWarnings("unchecked")
private void testSubscribeAndPublish(Class<?> configClass, String topicName, CountDownLatch subscribedLatch)
throws Exception {
@@ -115,6 +120,12 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
else {
assertThat(payload).isEqualTo(testPayload.getBytes(StandardCharsets.UTF_8));
}
if (ctx.containsBean("deliveryEvents")) {
List<MqttMessageDeliveryEvent> deliveryEvents = ctx.getBean("deliveryEvents", List.class);
// MqttMessageSentEvent and MqttMessageDeliveredEvent
await().untilAsserted(() -> assertThat(deliveryEvents).hasSize(2));
}
}
}
@@ -164,6 +175,16 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
subscribedLatch.countDown();
}
@EventListener
void mqttEvents(MqttMessageDeliveryEvent event) {
deliveryEvents().add(event);
}
@Bean
List<MqttMessageDeliveryEvent> deliveryEvents() {
return new ArrayList<>();
}
@Bean
public Mqttv3ClientManager mqttv3ClientManager() {
MqttConnectOptions connectionOptions = new MqttConnectOptions();
@@ -174,7 +195,10 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
@Bean
public IntegrationFlow mqttOutFlow(Mqttv3ClientManager mqttv3ClientManager) {
return f -> f.handle(new MqttPahoMessageHandler(mqttv3ClientManager));
MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(mqttv3ClientManager);
mqttPahoMessageHandler.setAsync(true);
mqttPahoMessageHandler.setAsyncEvents(true);
return f -> f.handle(mqttPahoMessageHandler);
}
@Bean
@@ -257,6 +281,7 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
var clientManager = ctx.getBean(Mqttv3ClientManager.class);
return new MqttPahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
}
}
@Configuration
@@ -272,6 +297,16 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
subscribedLatch.countDown();
}
@EventListener
void mqttEvents(MqttMessageDeliveryEvent event) {
deliveryEvents().add(event);
}
@Bean
List<MqttMessageDeliveryEvent> deliveryEvents() {
return new ArrayList<>();
}
@Bean
public Mqttv5ClientManager mqttv5ClientManager() {
return new Mqttv5ClientManager(MosquittoContainerTest.mqttUrl(), "client-manager-client-id-v5");
@@ -280,7 +315,10 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
@Bean
@ServiceActivator(inputChannel = "mqttOutFlow.input")
public Mqttv5PahoMessageHandler mqttv5PahoMessageHandler(Mqttv5ClientManager mqttv5ClientManager) {
return new Mqttv5PahoMessageHandler(mqttv5ClientManager);
Mqttv5PahoMessageHandler mqttPahoMessageHandler = new Mqttv5PahoMessageHandler(mqttv5ClientManager);
mqttPahoMessageHandler.setAsync(true);
mqttPahoMessageHandler.setAsyncEvents(true);
return mqttPahoMessageHandler;
}
@Bean
@@ -358,10 +396,13 @@ class ClientManagerBackToBackTests implements MosquittoContainerTest {
var clientManager = ctx.getBean(Mqttv5ClientManager.class);
return new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, TOPIC_NAME);
}
}
interface MessageDrivenChannelAdapterFactory {
MessageProducerSupport createMessageDrivenAdapter(ApplicationContext ctx);
}
record ClientV3Disconnector(Mqttv3ClientManager clientManager) {

View File

@@ -182,7 +182,7 @@ public class MqttAdapterTests {
assertThat(new String(message.getPayload())).isEqualTo("Hello, world!");
publishCalled.set(true);
return deliveryToken;
}).given(client).publish(anyString(), any(MqttMessage.class));
}).given(client).publish(anyString(), any(), any(), any());
handler.handleMessage(new GenericMessage<>("Hello, world!"));
@@ -204,7 +204,7 @@ public class MqttAdapterTests {
given(clientManager.getClient()).willReturn(client);
var deliveryToken = mock(MqttDeliveryToken.class);
given(client.publish(anyString(), any(MqttMessage.class))).willReturn(deliveryToken);
given(client.publish(anyString(), any(), any(), any())).willReturn(deliveryToken);
var handler = new MqttPahoMessageHandler(clientManager);
handler.setDefaultTopic("mqtt-foo");
@@ -218,7 +218,7 @@ public class MqttAdapterTests {
// then
verify(client, never()).connect(any(MqttConnectOptions.class));
verify(client).publish(anyString(), any(MqttMessage.class));
verify(client).publish(anyString(), any(), any(), any());
verify(client, never()).disconnect();
verify(client, never()).disconnect(anyLong());
verify(client, never()).close();

View File

@@ -399,6 +399,7 @@ Certain application events are published by the adapters.
For the MQTT v5 Paho client, this event is also emitted when the server performs a normal disconnection, in which case the `cause` of the lost connection is `null`.
* `MqttMessageSentEvent` - published by the outbound adapter when a message has been sent, if running in asynchronous mode.
* `MqttMessageDeliveredEvent` - published by the outbound adapter when the client indicates that a message has been delivered, if running in asynchronous mode.
* `MqttMessageNotDeliveredEvent` - published by the outbound adapter when the client indicates that a message has not been delivered, if running in asynchronous mode.
* `MqttSubscribedEvent` - published by the inbound adapter after subscribing to the topics.
These events can be received by an `ApplicationListener<MqttIntegrationEvent>` or with an `@EventListener` method.

View File

@@ -72,5 +72,6 @@ See xref:sftp/session-factory.adoc[SFTP Session Factory] for more information.
=== MQTT Support Changes
Multiple instances of `MqttPahoMessageDrivenChannelAdapter` and `Mqttv5PahoMessageDrivenChannelAdapter` can now be added at runtime using corresponding `ClientManager` through `IntegrationFlowContext`
Also a `MqttMessageNotDeliveredEvent` event has been introduced to emit when action callback reacts to the delivery failure.
See xref:mqtt.adoc[MQTT Support] for more information.