From ba51c23e7d0f0babca80d3010a3ce46191d4e2bc Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 17 Feb 2023 15:26:54 -0500 Subject: [PATCH] Consolidate invokeHandler invocation --- ...eByOneMessagingMessageListenerAdapter.java | 3 +- ...eamingMessagingMessageListenerAdapter.java | 3 +- ...RecordMessagingMessageListenerAdapter.java | 69 ------------------- ...rBatchMessagingMessageListenerAdapter.java | 2 +- ...PulsarMessagingMessageListenerAdapter.java | 18 +---- ...RecordMessagingMessageListenerAdapter.java | 2 +- ...rRecordMessagingReaderListenerAdapter.java | 6 +- 7 files changed, 11 insertions(+), 92 deletions(-) delete mode 100644 spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java index 7cdfe5b7..a3b8cd27 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveOneByOneMessagingMessageListenerAdapter.java @@ -34,6 +34,7 @@ import reactor.core.publisher.Mono; * * @param payload type. * @author Christophe Bornet + * @author Soby Chacko */ public class PulsarReactiveOneByOneMessagingMessageListenerAdapter extends PulsarReactiveMessagingMessageListenerAdapter implements ReactivePulsarOneByOneMessageHandler { @@ -58,7 +59,7 @@ public class PulsarReactiveOneByOneMessagingMessageListenerAdapter this.logger.debug("Processing [" + message + "]"); } try { - return (Mono) invokeHandler(theRecord, message, null, null); + return (Mono) invokeHandler(message, theRecord, null, null); } catch (Exception e) { return Mono.error(e); diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java index 14c5878b..a31b94af 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarReactiveStreamingMessagingMessageListenerAdapter.java @@ -34,6 +34,7 @@ import reactor.core.publisher.Flux; * * @param payload type. * @author Christophe Bornet + * @author Soby Chacko */ public class PulsarReactiveStreamingMessagingMessageListenerAdapter extends PulsarReactiveMessagingMessageListenerAdapter implements ReactivePulsarStreamingHandler { @@ -50,7 +51,7 @@ public class PulsarReactiveStreamingMessagingMessageListenerAdapter theRecords = records.map(record -> toMessagingMessage(record, null)); } try { - return (Flux>) invokeHandler(theRecords, null, null, null); + return (Flux>) invokeHandler(null, theRecords, null, null); } catch (Exception e) { return Flux.error(e); diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java deleted file mode 100644 index f7896916..00000000 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2022-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.pulsar.reactive.listener.adapter; - -import java.lang.reflect.Method; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageListener; - -import org.springframework.lang.Nullable; -import org.springframework.pulsar.listener.Acknowledgement; -import org.springframework.pulsar.listener.PulsarAcknowledgingMessageListener; -import org.springframework.pulsar.listener.adapter.HandlerAdapter; -import org.springframework.pulsar.listener.adapter.PulsarMessagingMessageListenerAdapter; - -/** - * A {@link MessageListener MessageListener} adapter that invokes a configurable - * {@link HandlerAdapter}; used when the factory is configured for the listener to receive - * individual messages. - * - * @param payload type. - * @author Soby Chacko - */ -@SuppressWarnings("serial") -public class PulsarRecordMessagingMessageListenerAdapter extends PulsarMessagingMessageListenerAdapter - implements PulsarAcknowledgingMessageListener { - - public PulsarRecordMessagingMessageListenerAdapter(Object bean, Method method) { - super(bean, method); - } - - @Override - public void received(Consumer consumer, Message record, @Nullable Acknowledgement acknowledgement) { - org.springframework.messaging.Message message = null; - Object theRecord = record; - if (isHeaderFound() || isSpringMessage()) { - message = toMessagingMessage(record, consumer); - } - else if (isSimpleExtraction()) { - theRecord = record.getValue(); - } - - if (logger.isDebugEnabled()) { - this.logger.debug("Processing [" + message + "]"); - } - try { - invokeHandler(theRecord, message, consumer, acknowledgement); - } - catch (Exception e) { - throw e; - } - } - -} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarBatchMessagingMessageListenerAdapter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarBatchMessagingMessageListenerAdapter.java index 1cf30b5a..272349dc 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarBatchMessagingMessageListenerAdapter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarBatchMessagingMessageListenerAdapter.java @@ -140,7 +140,7 @@ public class PulsarBatchMessagingMessageListenerAdapter extends PulsarMessagi protected void invoke(Object records, Consumer consumer, Message message, Acknowledgement acknowledgement) { try { - invokeHandler(records, message, consumer, acknowledgement); + invokeHandler(message, records, consumer, acknowledgement); } catch (Exception e) { throw e; diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java index 5770dc65..7c0ede91 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarMessagingMessageListenerAdapter.java @@ -38,7 +38,6 @@ import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.pulsar.listener.Acknowledgement; import org.springframework.pulsar.support.DefaultPulsarMessageHeaderMapper; import org.springframework.pulsar.support.converter.PulsarMessagingMessageConverter; import org.springframework.pulsar.support.converter.PulsarRecordMessageConverter; @@ -146,22 +145,9 @@ public abstract class PulsarMessagingMessageListenerAdapter { return getMessageConverter().toMessageFromReader(record, reader, getType()); } - protected final Object invokeHandler(Object data, org.springframework.messaging.Message message, - Consumer consumer, Acknowledgement acknowledgement) { - + protected final Object invokeHandler(org.springframework.messaging.Message message, Object... providedArgs) { try { - return this.handlerMethod.invoke(message, data, consumer, acknowledgement); - } - catch (Exception ex) { - throw new MessageConversionException("Cannot handle message", ex); - } - } - - protected final Object invokeHandler(Object data, org.springframework.messaging.Message message, - Reader reader) { - - try { - return this.handlerMethod.invoke(message, data, reader); + return this.handlerMethod.invoke(message, providedArgs); } catch (Exception ex) { throw new MessageConversionException("Cannot handle message", ex); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java index b9b27e1c..fb7b2302 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingMessageListenerAdapter.java @@ -56,7 +56,7 @@ public class PulsarRecordMessagingMessageListenerAdapter extends PulsarMessag this.logger.debug("Processing [" + message + "]"); } try { - invokeHandler(theRecord, message, consumer, acknowledgement); + invokeHandler(message, theRecord, consumer, acknowledgement); } catch (Exception e) { throw e; diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingReaderListenerAdapter.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingReaderListenerAdapter.java index 520826ab..7d37c753 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingReaderListenerAdapter.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/adapter/PulsarRecordMessagingReaderListenerAdapter.java @@ -39,11 +39,11 @@ public class PulsarRecordMessagingReaderListenerAdapter extends PulsarMessagi } @Override - public void received(Reader consumer, Message record) { + public void received(Reader reader, Message record) { org.springframework.messaging.Message message = null; Object theRecord = record; if (isHeaderFound() || isSpringMessage()) { - message = toMessagingMessageFromReader(record, consumer); + message = toMessagingMessageFromReader(record, reader); } else if (isSimpleExtraction()) { theRecord = record.getValue(); @@ -53,7 +53,7 @@ public class PulsarRecordMessagingReaderListenerAdapter extends PulsarMessagi this.logger.debug("Processing [" + message + "]"); } try { - invokeHandler(theRecord, message, consumer); + invokeHandler(message, theRecord, reader); } catch (Exception e) { throw e;