Consolidate invokeHandler invocation

This commit is contained in:
Soby Chacko
2023-02-17 15:26:54 -05:00
parent 0621c540d6
commit ba51c23e7d
7 changed files with 11 additions and 92 deletions

View File

@@ -34,6 +34,7 @@ import reactor.core.publisher.Mono;
*
* @param <V> payload type.
* @author Christophe Bornet
* @author Soby Chacko
*/
public class PulsarReactiveOneByOneMessagingMessageListenerAdapter<V>
extends PulsarReactiveMessagingMessageListenerAdapter<V> implements ReactivePulsarOneByOneMessageHandler<V> {
@@ -58,7 +59,7 @@ public class PulsarReactiveOneByOneMessagingMessageListenerAdapter<V>
this.logger.debug("Processing [" + message + "]");
}
try {
return (Mono<Void>) invokeHandler(theRecord, message, null, null);
return (Mono<Void>) invokeHandler(message, theRecord, null, null);
}
catch (Exception e) {
return Mono.error(e);

View File

@@ -34,6 +34,7 @@ import reactor.core.publisher.Flux;
*
* @param <V> payload type.
* @author Christophe Bornet
* @author Soby Chacko
*/
public class PulsarReactiveStreamingMessagingMessageListenerAdapter<V>
extends PulsarReactiveMessagingMessageListenerAdapter<V> implements ReactivePulsarStreamingHandler<V> {
@@ -50,7 +51,7 @@ public class PulsarReactiveStreamingMessagingMessageListenerAdapter<V>
theRecords = records.map(record -> toMessagingMessage(record, null));
}
try {
return (Flux<MessageResult<Void>>) invokeHandler(theRecords, null, null, null);
return (Flux<MessageResult<Void>>) invokeHandler(null, theRecords, null, null);
}
catch (Exception e) {
return Flux.error(e);

View File

@@ -1,69 +0,0 @@
/*
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.reactive.listener.adapter;
import java.lang.reflect.Method;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.listener.Acknowledgement;
import org.springframework.pulsar.listener.PulsarAcknowledgingMessageListener;
import org.springframework.pulsar.listener.adapter.HandlerAdapter;
import org.springframework.pulsar.listener.adapter.PulsarMessagingMessageListenerAdapter;
/**
* A {@link MessageListener MessageListener} adapter that invokes a configurable
* {@link HandlerAdapter}; used when the factory is configured for the listener to receive
* individual messages.
*
* @param <V> payload type.
* @author Soby Chacko
*/
@SuppressWarnings("serial")
public class PulsarRecordMessagingMessageListenerAdapter<V> extends PulsarMessagingMessageListenerAdapter<V>
implements PulsarAcknowledgingMessageListener<V> {
public PulsarRecordMessagingMessageListenerAdapter(Object bean, Method method) {
super(bean, method);
}
@Override
public void received(Consumer<V> consumer, Message<V> record, @Nullable Acknowledgement acknowledgement) {
org.springframework.messaging.Message<?> message = null;
Object theRecord = record;
if (isHeaderFound() || isSpringMessage()) {
message = toMessagingMessage(record, consumer);
}
else if (isSimpleExtraction()) {
theRecord = record.getValue();
}
if (logger.isDebugEnabled()) {
this.logger.debug("Processing [" + message + "]");
}
try {
invokeHandler(theRecord, message, consumer, acknowledgement);
}
catch (Exception e) {
throw e;
}
}
}

View File

@@ -140,7 +140,7 @@ public class PulsarBatchMessagingMessageListenerAdapter<V> extends PulsarMessagi
protected void invoke(Object records, Consumer<V> consumer, Message<?> message, Acknowledgement acknowledgement) {
try {
invokeHandler(records, message, consumer, acknowledgement);
invokeHandler(message, records, consumer, acknowledgement);
}
catch (Exception e) {
throw e;

View File

@@ -38,7 +38,6 @@ import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.pulsar.listener.Acknowledgement;
import org.springframework.pulsar.support.DefaultPulsarMessageHeaderMapper;
import org.springframework.pulsar.support.converter.PulsarMessagingMessageConverter;
import org.springframework.pulsar.support.converter.PulsarRecordMessageConverter;
@@ -146,22 +145,9 @@ public abstract class PulsarMessagingMessageListenerAdapter<V> {
return getMessageConverter().toMessageFromReader(record, reader, getType());
}
protected final Object invokeHandler(Object data, org.springframework.messaging.Message<?> message,
Consumer<V> consumer, Acknowledgement acknowledgement) {
protected final Object invokeHandler(org.springframework.messaging.Message<?> message, Object... providedArgs) {
try {
return this.handlerMethod.invoke(message, data, consumer, acknowledgement);
}
catch (Exception ex) {
throw new MessageConversionException("Cannot handle message", ex);
}
}
protected final Object invokeHandler(Object data, org.springframework.messaging.Message<?> message,
Reader<V> reader) {
try {
return this.handlerMethod.invoke(message, data, reader);
return this.handlerMethod.invoke(message, providedArgs);
}
catch (Exception ex) {
throw new MessageConversionException("Cannot handle message", ex);

View File

@@ -56,7 +56,7 @@ public class PulsarRecordMessagingMessageListenerAdapter<V> extends PulsarMessag
this.logger.debug("Processing [" + message + "]");
}
try {
invokeHandler(theRecord, message, consumer, acknowledgement);
invokeHandler(message, theRecord, consumer, acknowledgement);
}
catch (Exception e) {
throw e;

View File

@@ -39,11 +39,11 @@ public class PulsarRecordMessagingReaderListenerAdapter<V> extends PulsarMessagi
}
@Override
public void received(Reader<V> consumer, Message<V> record) {
public void received(Reader<V> reader, Message<V> record) {
org.springframework.messaging.Message<?> message = null;
Object theRecord = record;
if (isHeaderFound() || isSpringMessage()) {
message = toMessagingMessageFromReader(record, consumer);
message = toMessagingMessageFromReader(record, reader);
}
else if (isSimpleExtraction()) {
theRecord = record.getValue();
@@ -53,7 +53,7 @@ public class PulsarRecordMessagingReaderListenerAdapter<V> extends PulsarMessagi
this.logger.debug("Processing [" + message + "]");
}
try {
invokeHandler(theRecord, message, consumer);
invokeHandler(message, theRecord, reader);
}
catch (Exception e) {
throw e;