GH-3083 Fix lifecycle of reactive producer

Ensure that the corresponding FluxMessageChannel gets properly destroyed during application shutdown

Resolves #3083
This commit is contained in:
Oleg Zhurakousky
2025-04-30 08:59:18 +02:00
parent 4786fd4405
commit 2656bf7f4f
3 changed files with 29 additions and 2 deletions

View File

@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.Lifecycle;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.core.Pausable;
@@ -152,6 +153,10 @@ public class DefaultBinding<T> implements Binding<T> {
if (this.isRunning()) {
this.lifecycle.stop();
}
// See https://github.com/spring-cloud/spring-cloud-stream/issues/3083 for more details
if (target instanceof DirectWithAttributesChannel attributeChannel) {
attributeChannel.destroy();
}
}
@Override

View File

@@ -265,7 +265,9 @@ public class FunctionConfiguration {
.get();
IntegrationFlow postProcessedFlow = (IntegrationFlow) context.getAutowireCapableBeanFactory()
.initializeBean(integrationFlow, integrationFlowName);
context.registerBean(integrationFlowName, IntegrationFlow.class, () -> postProcessedFlow);
context.registerBean(integrationFlowName, IntegrationFlow.class, () -> {
return postProcessedFlow;
});
}
else {
IntegrationFlow integrationFlow = integrationFlowFromProvidedSupplier(new PartitionAwareFunctionWrapper(supplier, context, producerProperties),
@@ -311,7 +313,12 @@ public class FunctionConfiguration {
? ((Mono) publisher).map(this::wrapToMessageIfNecessary)
: ((Flux) publisher).map(this::wrapToMessageIfNecessary);
integrationFlowBuilder = IntegrationFlow.from(publisher);
// See https://github.com/spring-cloud/spring-cloud-stream/issues/3083 for more details
DirectWithAttributesChannel messageChannel = context.getBean(bindingName, DirectWithAttributesChannel.class);
FluxMessageChannel reactiveChannel = new FluxMessageChannel();
reactiveChannel.subscribeTo(publisher);
messageChannel.setAttribute(DirectWithAttributesChannel.COMPANION_ATTR, reactiveChannel);
integrationFlowBuilder = IntegrationFlow.from((MessageChannel) reactiveChannel);
// see https://github.com/spring-cloud/spring-cloud-stream/issues/1863 for details about the following code
taskScheduler.schedule(() -> { }, Instant.now()); // will keep AC alive

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.messaging;
import java.util.HashMap;
import java.util.Map;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageHandler;
@@ -28,6 +29,11 @@ import org.springframework.messaging.MessageHandler;
*/
public class DirectWithAttributesChannel extends DirectChannel {
/**
* Name of the attribute that is considered to be a companion of this channel.
*/
public static String COMPANION_ATTR = "companion";
private final Map<String, Object> attributes = new HashMap<>();
public void setAttribute(String key, Object value) {
@@ -43,6 +49,15 @@ public class DirectWithAttributesChannel extends DirectChannel {
return this.getComponentName();
}
@Override
public void destroy() {
super.destroy();
Object companion = this.attributes.get(COMPANION_ATTR);
if (companion != null && companion instanceof AbstractMessageChannel companionChannel) {
companionChannel.destroy();
}
}
@Override
public boolean subscribe(MessageHandler handler) {
return this.getDispatcher().getHandlerCount() == 1 ? false : super.subscribe(handler);