From 0e4a4fad3892a5f5ed5b65ebae627b8de62f00ec Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Fri, 18 Nov 2022 13:43:48 -0600 Subject: [PATCH] Extract Flux from base messaging adapter PulsarMessagingMessageListenerAdapter was still referencing Flux. This removes all reactive usage from imperative spring-pulsar lib. --- ...activeMessagingMessageListenerAdapter.java | 63 +++++++++++++++++++ ...eByOneMessagingMessageListenerAdapter.java | 5 +- ...eamingMessagingMessageListenerAdapter.java | 5 +- spring-pulsar/build.gradle | 1 - ...PulsarMessagingMessageListenerAdapter.java | 31 ++++++--- 5 files changed, 89 insertions(+), 16 deletions(-) create mode 100644 spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveMessagingMessageListenerAdapter.java diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveMessagingMessageListenerAdapter.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveMessagingMessageListenerAdapter.java new file mode 100644 index 00000000..4d28aa35 --- /dev/null +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveMessagingMessageListenerAdapter.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.reactive.listener.adapter; + +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.List; + +import org.apache.pulsar.client.api.Messages; + +import org.springframework.pulsar.listener.adapter.PulsarMessagingMessageListenerAdapter; +import org.springframework.pulsar.reactive.listener.ReactivePulsarMessageHandler; + +import reactor.core.publisher.Flux; + +/** + * An abstract base for {@link ReactivePulsarMessageHandler MessageListener} adapters. + * + * @param payload type. + * @author Chris Bono + */ +public abstract class PulsarReactiveMessagingMessageListenerAdapter + extends PulsarMessagingMessageListenerAdapter { + + public PulsarReactiveMessagingMessageListenerAdapter(Object bean, Method method) { + super(bean, method); + } + + /** + * Determines if a type is one that holds multiple messages. + * @param type the type to check + * @return true if the type is a {@link List}, {@link Messages} or {@link Flux}, false + * otherwise + */ + protected boolean isMultipleMessageType(Type type) { + return super.isMultipleMessageType(type) || parameterIsType(type, Flux.class); + } + + /** + * Determine if the type is a reactive Flux. + * @param type type to check + * @return true if the type is a reactive Flux, false otherwise + */ + @Override + protected boolean isFlux(Type type) { + return Flux.class.equals(type); + } + +} diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java index 789c9d97..8901d86c 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java @@ -22,7 +22,6 @@ import org.apache.pulsar.client.api.Message; import org.reactivestreams.Publisher; import org.springframework.pulsar.listener.adapter.HandlerAdapter; -import org.springframework.pulsar.listener.adapter.PulsarMessagingMessageListenerAdapter; import org.springframework.pulsar.reactive.listener.ReactivePulsarMessageHandler; import org.springframework.pulsar.reactive.listener.ReactivePulsarOneByOneMessageHandler; @@ -36,8 +35,8 @@ import reactor.core.publisher.Mono; * @param payload type. * @author Christophe Bornet */ -public class PulsarReactiveOneByOneMessagingMessageListenerAdapter extends PulsarMessagingMessageListenerAdapter - implements ReactivePulsarOneByOneMessageHandler { +public class PulsarReactiveOneByOneMessagingMessageListenerAdapter + extends PulsarReactiveMessagingMessageListenerAdapter implements ReactivePulsarOneByOneMessageHandler { public PulsarReactiveOneByOneMessagingMessageListenerAdapter(Object bean, Method method) { super(bean, method); diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java index 6c6ece43..9f383b4e 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java @@ -22,7 +22,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.reactive.client.api.MessageResult; import org.springframework.pulsar.listener.adapter.HandlerAdapter; -import org.springframework.pulsar.listener.adapter.PulsarMessagingMessageListenerAdapter; import org.springframework.pulsar.reactive.listener.ReactivePulsarMessageHandler; import org.springframework.pulsar.reactive.listener.ReactivePulsarStreamingHandler; @@ -36,8 +35,8 @@ import reactor.core.publisher.Flux; * @param payload type. * @author Christophe Bornet */ -public class PulsarReactiveStreamingMessagingMessageListenerAdapter extends PulsarMessagingMessageListenerAdapter - implements ReactivePulsarStreamingHandler { +public class PulsarReactiveStreamingMessagingMessageListenerAdapter + extends PulsarReactiveMessagingMessageListenerAdapter implements ReactivePulsarStreamingHandler { public PulsarReactiveStreamingMessagingMessageListenerAdapter(Object bean, Method method) { super(bean, method); diff --git a/spring-pulsar/build.gradle b/spring-pulsar/build.gradle index 177c0938..ae85d6cd 100644 --- a/spring-pulsar/build.gradle +++ b/spring-pulsar/build.gradle @@ -22,7 +22,6 @@ dependencies { optional 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' optional 'com.fasterxml.jackson.datatype:jackson-datatype-joda' optional 'com.jayway.jsonpath:json-path' - optional 'io.projectreactor:reactor-core' testImplementation 'org.junit.jupiter:junit-jupiter' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testRuntimeOnly 'ch.qos.logback:logback-classic' diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java index 9ac19aa1..2733e9ce 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java @@ -22,7 +22,6 @@ import java.lang.reflect.Type; import java.lang.reflect.WildcardType; import java.util.List; -import org.apache.commons.logging.LogFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; @@ -44,8 +43,6 @@ import org.springframework.pulsar.support.converter.PulsarMessagingMessageConver import org.springframework.pulsar.support.converter.PulsarRecordMessageConverter; import org.springframework.util.Assert; -import reactor.core.publisher.Flux; - /** * An abstract {@link org.apache.pulsar.client.api.MessageListener} adapter providing the * necessary infrastructure to extract the payload from a Pulsar message. @@ -60,7 +57,7 @@ public abstract class PulsarMessagingMessageListenerAdapter { private final Object bean; - protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + protected final LogAccessor logger = new LogAccessor(getClass()); private final Type inferredType; @@ -176,8 +173,7 @@ public abstract class PulsarMessagingMessageListenerAdapter { else if (parameterIsType(parameterType, Message.class)) { pulsarMessageFound = true; } - else if (parameterIsType(parameterType, List.class) || parameterIsType(parameterType, Messages.class) - || parameterIsType(parameterType, Flux.class)) { + else if (isMultipleMessageType(parameterType)) { collectionFound = true; } } @@ -212,6 +208,15 @@ public abstract class PulsarMessagingMessageListenerAdapter { return genericParameterType; } + /** + * Determines if a type is one that holds multiple messages. + * @param type the type to check + * @return true if the type is a {@link List} or a {@link Messages}, false otherwise + */ + protected boolean isMultipleMessageType(Type type) { + return parameterIsType(type, List.class) || parameterIsType(type, Messages.class); + } + private Type extractGenericParameterTypFromMethodParameter(MethodParameter methodParameter) { Type genericParameterType = methodParameter.getGenericParameterType(); if (genericParameterType instanceof ParameterizedType parameterizedType) { @@ -236,8 +241,7 @@ public abstract class PulsarMessagingMessageListenerAdapter { this.simpleExtraction = true; } } - else if (parameterizedType.getRawType().equals(Flux.class) - && parameterizedType.getActualTypeArguments().length == 1) { + else if (isFlux(parameterizedType.getRawType()) && parameterizedType.getActualTypeArguments().length == 1) { Type paramType = parameterizedType.getActualTypeArguments()[0]; boolean messageHasGeneric = paramType instanceof ParameterizedType && ((ParameterizedType) paramType) @@ -255,7 +259,16 @@ public abstract class PulsarMessagingMessageListenerAdapter { return genericParameterType; } - private boolean parameterIsType(Type parameterType, Type type) { + /** + * Determine if the type is a reactive Flux. + * @param type type to check + * @return false as the imperative side does not know about Flux + */ + protected boolean isFlux(Type type) { + return false; + } + + protected boolean parameterIsType(Type parameterType, Type type) { if (parameterType instanceof ParameterizedType parameterizedType) { Type rawType = parameterizedType.getRawType(); if (rawType.equals(type)) {