Make MQTT modules as auto-config

* Fix all the Checkstyle violations for those modules
* Introduce a `MosquittoContainerTest` contract into the `spring-function-test-support`
and use it in the MQTT modules
* Fix READMEs for previously migrated modules to auto-configuration
This commit is contained in:
Artem Bilan
2024-01-05 13:57:55 -05:00
parent 3f349ca451
commit fbe0d220f7
24 changed files with 163 additions and 145 deletions

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.fn.test.support.mqtt;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
/**
* The base contract for JUnit tests based on the container for MQTT Mosquitto broker. The
* Testcontainers 'reuse' option must be disabled,so, Ryuk container is started and will
* clean all the containers up from this test suite after JVM exit. Since the Mosquitto
* container instance is shared via static property, it is going to be started only once
* per JVM, therefore the target Docker container is reused automatically.
*
* @author Artem Bilan
*/
@Testcontainers(disabledWithoutDocker = true)
public interface MosquittoContainerTest {
GenericContainer<?> MOSQUITTO_CONTAINER = new GenericContainer<>("eclipse-mosquitto:2.0.13")
.withCommand("mosquitto -c /mosquitto-no-auth.conf")
.withExposedPorts(1883);
@BeforeAll
static void startContainer() {
MOSQUITTO_CONTAINER.start();
}
static String mqttUrl() {
return "tcp://localhost:" + MOSQUITTO_CONTAINER.getFirstMappedPort();
}
}

View File

@@ -0,0 +1,4 @@
/**
* The MQTT protocol testing support.
*/
package org.springframework.cloud.fn.test.support.mqtt;

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,28 +23,25 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.util.ObjectUtils;
/**
* Generic mqtt configuration.
* The MQTT client auto-configuration.
*
* @author Janne Valkealahti
* @author Artem Bilan
*/
@Configuration
@AutoConfiguration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfiguration {
@Autowired
private MqttProperties mqttProperties;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
public MqttPahoClientFactory mqttClientFactory(MqttProperties mqttProperties) {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setServerURIs(mqttProperties.getUrl());
mqttConnectOptions.setUserName(mqttProperties.getUsername());

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,7 +25,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* Generic mqtt connection properties.
* The MQTT client properties.
*
* @author Janne Valkealahti
* @author Artem Bilan
@@ -81,7 +81,7 @@ public class MqttProperties {
@Size(min = 1)
public String[] getUrl() {
return url;
return this.url;
}
public void setUrl(String[] url) {
@@ -89,7 +89,7 @@ public class MqttProperties {
}
public String getUsername() {
return username;
return this.username;
}
public void setUsername(String username) {
@@ -97,7 +97,7 @@ public class MqttProperties {
}
public String getPassword() {
return password;
return this.password;
}
public void setPassword(String password) {
@@ -105,7 +105,7 @@ public class MqttProperties {
}
public boolean isCleanSession() {
return cleanSession;
return this.cleanSession;
}
public void setCleanSession(boolean cleanSession) {
@@ -113,7 +113,7 @@ public class MqttProperties {
}
public int getKeepAliveInterval() {
return keepAliveInterval;
return this.keepAliveInterval;
}
public void setKeepAliveInterval(int keepAliveInterval) {
@@ -121,7 +121,7 @@ public class MqttProperties {
}
public int getConnectionTimeout() {
return connectionTimeout;
return this.connectionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
@@ -129,7 +129,7 @@ public class MqttProperties {
}
public String getPersistence() {
return persistence;
return this.persistence;
}
public void setPersistence(String persistence) {
@@ -137,7 +137,7 @@ public class MqttProperties {
}
public String getPersistenceDirectory() {
return persistenceDirectory;
return this.persistenceDirectory;
}
public void setPersistenceDirectory(String persistenceDirectory) {

View File

@@ -0,0 +1,4 @@
/**
* The MQTT client auto-configuration support.
*/
package org.springframework.cloud.fn.common.mqtt;

View File

@@ -0,0 +1 @@
org.springframework.cloud.fn.common.mqtt.MqttConfiguration

View File

@@ -5,7 +5,7 @@ The consumer uses the `FileWritingMessageHandler` from Spring Integration.
## Beans for injection
You can import `FileConsumerConfiguration` in the application and then inject the following bean.
The `FileConsumerConfiguration` auto-configuration provides the following bean:
`Consumer<Message<?>> fileConsumer`

View File

@@ -5,7 +5,7 @@ The consumer uses the `FtpMessageHandler` from Spring Integration.
## Beans for injection
You can import `FtpConsumerConfiguration` in the application and then inject the following bean.
The `FtpConsumerConfiguration` auto-configuration provides the following bean:
`Consumer<Message<?>> ftpConsumer`

View File

@@ -4,7 +4,7 @@ A consumer that allows you to send messages using the MQTT protocol.
## Beans for injection
You can import `MqttConsumerConfiguration` in the application and then inject the following bean.
The `MqttConsumerConfiguration` auto-configuration provides the following bean:
`Consumer<Message<?>> mqttConsumer`

View File

@@ -1,3 +1,5 @@
dependencies {
api project(':spring-mqtt-common')
testImplementation project(':spring-function-test-support')
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,14 +19,11 @@ package org.springframework.cloud.fn.consumer.mqtt;
import java.util.function.Consumer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.common.mqtt.MqttConfiguration;
import org.springframework.cloud.fn.common.mqtt.MqttProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
@@ -35,45 +32,37 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
/**
* A consumer that sends data to Mqtt.
* A consumer that sends data to MQTT.
*
* @author Janne Valkealahti
*
* @author Artem Bilan
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ MqttProperties.class, MqttConsumerProperties.class })
@Import(MqttConfiguration.class)
@EnableConfigurationProperties(MqttConsumerProperties.class)
@AutoConfiguration(after = MqttConfiguration.class)
public class MqttConsumerConfiguration {
@Autowired
private MqttConsumerProperties properties;
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Autowired
private BeanFactory beanFactory;
@Bean
public Consumer<Message<?>> mqttConsumer(MessageHandler mqttOutbound) {
return mqttOutbound::handleMessage;
}
@Bean
public MessageHandler mqttOutbound(
public MessageHandler mqttOutbound(MqttConsumerProperties properties, MqttPahoClientFactory mqttClientFactory,
BeanFactory beanFactory,
@Nullable ComponentCustomizer<MqttPahoMessageHandler> mqttMessageHandlerCustomizer) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(properties.getClientId(), mqttClientFactory);
messageHandler.setAsync(properties.isAsync());
messageHandler.setDefaultTopic(properties.getTopic());
messageHandler.setConverter(pahoMessageConverter());
messageHandler.setConverter(pahoMessageConverter(properties, beanFactory));
if (mqttMessageHandlerCustomizer != null) {
mqttMessageHandlerCustomizer.customize(messageHandler);
}
return messageHandler;
}
public DefaultPahoMessageConverter pahoMessageConverter() {
private DefaultPahoMessageConverter pahoMessageConverter(MqttConsumerProperties properties,
BeanFactory beanFactory) {
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(properties.getQos(),
properties.isRetained(), properties.getCharset());
converter.setBeanFactory(beanFactory);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* Properties for the Mqtt Consumer.
* Properties for the MQTT Consumer.
*
* @author Janne Valkealahti
*
@@ -34,32 +34,32 @@ import org.springframework.validation.annotation.Validated;
public class MqttConsumerProperties {
/**
* identifies the client.
* Identifies the client.
*/
private String clientId = "stream.client.id.sink";
/**
* the topic to which the sink will publish.
* The topic to which the sink will publish.
*/
private String topic = "stream.mqtt";
/**
* the quality of service to use.
* The quality of service to use.
*/
private int qos = 1;
/**
* whether to set the 'retained' flag.
* Whether to set the 'retained' flag.
*/
private boolean retained = false;
/**
* the charset used to convert a String payload to byte[].
* The charset used to convert a String payload to byte[].
*/
private String charset = "UTF-8";
/**
* whether or not to use async sends.
* Whether to use async sends.
*/
private boolean async = false;

View File

@@ -0,0 +1,4 @@
/**
* The MQTT consumer auto-configuration support.
*/
package org.springframework.cloud.fn.consumer.mqtt;

View File

@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.mqtt.MqttConsumerConfiguration

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,21 +16,19 @@
package org.springframework.cloud.fn.consumer.mqtt;
import java.time.Duration;
import java.util.Properties;
import java.util.function.Consumer;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.fn.test.support.mqtt.MosquittoContainerTest;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@@ -39,6 +37,8 @@ import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
@@ -46,19 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
"mqtt.ssl-properties.com.ibm.ssl.keyStoreType=TEST" })
@DirtiesContext
@Tag("integration")
public class MqttConsumerTests {
static {
GenericContainer<?> mosquitto = new GenericContainer<>("eclipse-mosquitto:2.0.13")
.withCommand("mosquitto -c /mosquitto-no-auth.conf")
.withReuse(true)
.withExposedPorts(1883)
.withStartupTimeout(Duration.ofSeconds(120))
.withStartupAttempts(3);
mosquitto.start();
final Integer mappedPort = mosquitto.getMappedPort(1883);
System.setProperty("mqtt.url", "tcp://localhost:" + mappedPort);
}
public class MqttConsumerTests implements MosquittoContainerTest {
@Autowired
private MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter;
@@ -69,9 +57,9 @@ public class MqttConsumerTests {
@Autowired
protected QueueChannel queue;
@AfterAll
public static void cleanup() {
System.clearProperty("mqtt.url");
@DynamicPropertySource
static void mongoDbProperties(DynamicPropertyRegistry registry) {
registry.add("mqtt.url", () -> "tcp://localhost:" + MOSQUITTO_CONTAINER.getMappedPort(1883));
}
@Test
@@ -91,11 +79,10 @@ public class MqttConsumerTests {
@SpringBootApplication
static class MqttConsumerTestApplication {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound(BeanFactory beanFactory) {
MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory mqttClientFactory,
BeanFactory beanFactory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("test",
mqttClientFactory, "test");
adapter.setQos(0);
@@ -104,7 +91,7 @@ public class MqttConsumerTests {
return adapter;
}
public DefaultPahoMessageConverter pahoMessageConverter(BeanFactory beanFactory) {
DefaultPahoMessageConverter pahoMessageConverter(BeanFactory beanFactory) {
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(1, true, "UTF-8");
converter.setPayloadAsBytes(false);
converter.setBeanFactory(beanFactory);
@@ -112,7 +99,7 @@ public class MqttConsumerTests {
}
@Bean
public QueueChannel queue() {
QueueChannel queue() {
return new QueueChannel();
}

View File

@@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and receive the data.
## Beans for injection
You can import the `FileSupplierConfiguration` in the application and then inject the following bean.
The `FileSupplierConfiguration` auto-configuration provides the following bean:
`fileSupplier`
@@ -24,7 +24,7 @@ All configuration properties are prefixed with `file.supplier`.
There are also properties that need to be used with the prefix `file.consumer`.
For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierProperties.java[FileSupplierProperties].
See link:src/main/java/org/springframework/cloud/fn/supplier/file/FileConsumerProperties.java[this] also.
See `FileConsumerProperties` also.
A `ComponentCustomizer<FileInboundChannelAdapterSpec>` bean can be added in the target project to provide any custom options for the `FileInboundChannelAdapterSpec` configuration used by the `fileSupplier`.

View File

@@ -1,6 +1,6 @@
# FTP Supplier
This module provides a FTP supplier that can be reused and composed in other applications.
This module provides an FTP supplier that can be reused and composed in other applications.
The `Supplier` uses the `FtpInboundChannelAdapter` from Spring Integration.
`FtpSupplier` is implemented as a `java.util.function.Supplier`.
This supplier gives you a reactive stream of files from the provided directory as the supplier has a signature of `Supplier<Flux<Message<?>>>`.
@@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and receive the data.
## Beans for injection
You can import the `FtpSupplierConfiguration` in the application and then inject the following bean.
The `FtpSupplierConfiguration` auto-configuration provides the following bean:
`ftpSupplier`
@@ -24,7 +24,7 @@ All configuration properties are prefixed with `ftp.supplier`.
There are also properties that need to be used with the prefix `file.consumer`.
For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/ftp/FtpSupplierProperties.java[FtpSupplierProperties].
See link:src/main/java/org/springframework/cloud/fn/supplier/file/FileConsumerProperties.java[this] also.
See `FileConsumerProperties` also.
A `ComponentCustomizer<FtpInboundChannelAdapterSpec>` bean can be added in the target project to provide any custom options for the `FtpInboundChannelAdapterSpec` configuration used by the `ftpSupplier`.

View File

@@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and then receive the data.
## Beans for injection
You can import the `MqttSupplierConfiguration` in the application and then inject the following bean.
The `MqttSupplierConfiguration` auto-configuration provides the following bean:
`mqttSupplier`

View File

@@ -1,3 +1,5 @@
dependencies {
api project(':spring-mqtt-common')
testImplementation project(':spring-function-test-support')
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2023 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,14 +22,11 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.common.mqtt.MqttConfiguration;
import org.springframework.cloud.fn.common.mqtt.MqttProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
@@ -38,38 +35,29 @@ import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
/**
* A source module that receives data from Mqtt.
* A supplier that receives data from MQTT.
*
* @author Janne Valkealahti
* @author Soby Chacko
*/
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties({ MqttProperties.class, MqttSupplierProperties.class })
@Import(MqttConfiguration.class)
@EnableConfigurationProperties(MqttSupplierProperties.class)
@AutoConfiguration(after = MqttConfiguration.class)
public class MqttSupplierConfiguration {
@Autowired
private MqttSupplierProperties properties;
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Autowired
private BeanFactory beanFactory;
@Bean
public Supplier<Flux<Message<?>>> mqttSupplier(Publisher<Message<?>> mqttPublisher) {
return () -> Flux.from(mqttPublisher);
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound(
public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttSupplierProperties properties,
MqttPahoClientFactory mqttClientFactory, BeanFactory beanFactory,
@Nullable ComponentCustomizer<MqttPahoMessageDrivenChannelAdapter> mqttMessageProducerCustomizer) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
this.properties.getClientId(), this.mqttClientFactory, this.properties.getTopics());
adapter.setQos(this.properties.getQos());
adapter.setConverter(pahoMessageConverter(this.beanFactory));
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(properties.getClientId(),
mqttClientFactory, properties.getTopics());
adapter.setQos(properties.getQos());
adapter.setConverter(pahoMessageConverter(properties, beanFactory));
adapter.setAutoStartup(false);
if (mqttMessageProducerCustomizer != null) {
@@ -84,7 +72,8 @@ public class MqttSupplierConfiguration {
return IntegrationFlow.from(mqttInbound).toReactivePublisher(true);
}
private DefaultPahoMessageConverter pahoMessageConverter(BeanFactory beanFactory) {
private DefaultPahoMessageConverter pahoMessageConverter(MqttSupplierProperties properties,
BeanFactory beanFactory) {
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(properties.getCharset());
converter.setPayloadAsBytes(properties.isBinary());
converter.setBeanFactory(beanFactory);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* Properties for the Mqtt Source.
* Properties for the MQTT supplier.
*
* @author Janne Valkealahti
* @author Soby Chacko
@@ -34,28 +34,28 @@ import org.springframework.validation.annotation.Validated;
public class MqttSupplierProperties {
/**
* identifies the client.
* Identifies the client.
*/
private String clientId = "stream.client.id.source";
/**
* the topic(s) (comma-delimited) to which the source will subscribe.
* The topic(s) (comma-delimited) to which the source will subscribe.
*/
private String[] topics = new String[] { "stream.mqtt" };
/**
* the qos; a single value for all topics or a comma-delimited list to match the
* The qos; a single value for all topics or a comma-delimited list to match the
* topics.
*/
private int[] qos = new int[] { 0 };
/**
* true to leave the payload as bytes.
* True to leave the payload as bytes.
*/
private boolean binary = false;
/**
* the charset used to convert bytes to String (when binary is false).
* The charset used to convert bytes to String (when binary is false).
*/
private String charset = "UTF-8";

View File

@@ -0,0 +1,4 @@
/**
* The MQTT supplier auto-configuration support.
*/
package org.springframework.cloud.fn.supplier.mqtt;

View File

@@ -0,0 +1 @@
org.springframework.cloud.fn.supplier.mqtt.MqttSupplierConfiguration

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,22 +16,19 @@
package org.springframework.cloud.fn.supplier.mqtt;
import java.time.Duration;
import java.util.Properties;
import java.util.function.Supplier;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.fn.test.support.mqtt.MosquittoContainerTest;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
@@ -40,6 +37,8 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import static org.assertj.core.api.Assertions.assertThat;
@@ -55,23 +54,11 @@ import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(properties = { "mqtt.supplier.topics=test,fake", "mqtt.supplier.qos=0,0",
"mqtt.ssl-properties.com.ibm.ssl.protocol=TLS", "mqtt.ssl-properties.com.ibm.ssl.keyStoreType=TEST" })
@DirtiesContext
public class MqttSupplierTests {
public class MqttSupplierTests implements MosquittoContainerTest {
static {
GenericContainer<?> mosquitto = new GenericContainer<>("eclipse-mosquitto:2.0.13")
.withCommand("mosquitto -c /mosquitto-no-auth.conf")
.withReuse(true)
.withExposedPorts(1883)
.withStartupTimeout(Duration.ofSeconds(120))
.withStartupAttempts(3);
mosquitto.start();
final Integer mappedPort = mosquitto.getMappedPort(1883);
System.setProperty("mqtt.url", "tcp://localhost:" + mappedPort);
}
@AfterAll
public static void cleanup() {
System.clearProperty("mqtt.url");
@DynamicPropertySource
static void mongoDbProperties(DynamicPropertyRegistry registry) {
registry.add("mqtt.url", () -> "tcp://localhost:" + MOSQUITTO_CONTAINER.getMappedPort(1883));
}
@Autowired
@@ -91,19 +78,17 @@ public class MqttSupplierTests {
final Flux<Message<?>> messageFlux = mqttSupplier.get();
StepVerifier.create(messageFlux).assertNext((message) -> {
assertThat(message.getPayload()).isEqualTo("hello");
}).thenCancel().verify();
StepVerifier.create(messageFlux)
.assertNext((message) -> assertThat(message.getPayload()).isEqualTo("hello"))
.thenCancel()
.verify();
}
@SpringBootApplication
static class MqttSupplierTestApplication {
@Autowired
private MqttPahoClientFactory mqttClientFactory;
@Bean
public MessageHandler mqttOutbound() {
MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("test", mqttClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test");
@@ -112,7 +97,7 @@ public class MqttSupplierTests {
}
@Bean
public DefaultPahoMessageConverter producerConverter() {
DefaultPahoMessageConverter producerConverter() {
return new DefaultPahoMessageConverter(1, true, "UTF-8");
}