GH-124: Fix auto-wiring ambiguity in the ZeroMqSupplierConfiguration

Fixes: https://github.com/spring-cloud/spring-functions-catalog/issues/124

Also use `toReactivePublisher(true)` to avoid manual lifecycle management.
This commit is contained in:
Artem Bilan
2024-12-19 16:55:17 -05:00
parent 34892ae762
commit f531bf26d4

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.fn.supplier.zeromq;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.zeromq.SocketType; import org.zeromq.SocketType;
import org.zeromq.ZContext; import org.zeromq.ZContext;
import org.zeromq.ZMQ; import org.zeromq.ZMQ;
@@ -28,7 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.zeromq.inbound.ZeroMqMessageProducer; import org.springframework.integration.zeromq.inbound.ZeroMqMessageProducer;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.GenericMessage;
@@ -37,21 +38,19 @@ import org.springframework.messaging.support.GenericMessage;
* A supplier auto-configuration that receives data from ZeroMQ. * A supplier auto-configuration that receives data from ZeroMQ.
* *
* @author Daniel Frey * @author Daniel Frey
* @since 3.1.0 * @author Artem Bilan
*/ */
@AutoConfiguration @AutoConfiguration
@EnableConfigurationProperties(ZeroMqSupplierProperties.class) @EnableConfigurationProperties(ZeroMqSupplierProperties.class)
public class ZeroMqSupplierConfiguration { public class ZeroMqSupplierConfiguration {
private FluxMessageChannel output = new FluxMessageChannel();
@Bean @Bean
public ZContext zContext() { public ZContext zContext() {
return new ZContext(); return new ZContext();
} }
@Bean @Bean
public ZeroMqMessageProducer adapter(ZeroMqSupplierProperties properties, ZContext zContext, public ZeroMqMessageProducer zeroMqSupplierMessageProducer(ZeroMqSupplierProperties properties, ZContext zContext,
@Autowired(required = false) Consumer<ZMQ.Socket> socketConfigurer) { @Autowired(required = false) Consumer<ZMQ.Socket> socketConfigurer) {
ZeroMqMessageProducer zeroMqMessageProducer = new ZeroMqMessageProducer(zContext, properties.getSocketType()); ZeroMqMessageProducer zeroMqMessageProducer = new ZeroMqMessageProducer(zContext, properties.getSocketType());
@@ -70,15 +69,17 @@ public class ZeroMqSupplierConfiguration {
if (socketConfigurer != null) { if (socketConfigurer != null) {
zeroMqMessageProducer.setSocketConfigurer(socketConfigurer); zeroMqMessageProducer.setSocketConfigurer(socketConfigurer);
} }
zeroMqMessageProducer.setOutputChannel(this.output);
zeroMqMessageProducer.setAutoStartup(false);
return zeroMqMessageProducer; return zeroMqMessageProducer;
} }
@Bean @Bean
public Supplier<Flux<Message<?>>> zeromqSupplier(ZeroMqMessageProducer adapter) { Publisher<Message<Object>> zeroMqSupplierFlow(ZeroMqMessageProducer zeroMqSupplierMessageProducer) {
return () -> Flux.from(this.output).doOnSubscribe((subscription) -> adapter.start()); return IntegrationFlow.from(zeroMqSupplierMessageProducer).toReactivePublisher(true);
}
@Bean
public Supplier<Flux<Message<?>>> zeromqSupplier(Publisher<Message<Object>> zeroMqSupplierFlow) {
return () -> Flux.from(zeroMqSupplierFlow);
} }
} }