Misc cleanups (#317)
* Replace `String.format` w/ `String.formatted` * Simplify LogAccessor usage * Replace `new LogAccessor(LogFactory.getLog(Class))` calls w/ `new LogAccessor(Class)` * Remove fully qualified refs in javadoc + code
This commit is contained in:
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.pulsar.client.api.DeadLetterPolicy;
|
||||
|
||||
import org.springframework.aop.framework.Advised;
|
||||
@@ -120,7 +119,7 @@ import org.springframework.util.StringUtils;
|
||||
public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
|
||||
implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {
|
||||
|
||||
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass()));
|
||||
private final LogAccessor logger = new LogAccessor(this.getClass());
|
||||
|
||||
/**
|
||||
* The bean name of the default {@link ReactivePulsarListenerContainerFactory}.
|
||||
@@ -508,7 +507,7 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("@ReactivePulsarListener can't resolve '%s' as a String", resolvedValue));
|
||||
"@ReactivePulsarListener can't resolve '%s' as a String".formatted(resolvedValue));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -536,12 +535,12 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
|
||||
ReflectionUtils.handleReflectionException(ex);
|
||||
}
|
||||
catch (NoSuchMethodException ex) {
|
||||
throw new IllegalStateException(String.format(
|
||||
"@ReactivePulsarListener method '%s' found on bean target class '%s', "
|
||||
+ "but not found in any interface(s) for bean JDK proxy. Either "
|
||||
+ "pull the method up to an interface or switch to subclass (CGLIB) "
|
||||
+ "proxies by setting proxy-target-class/proxyTargetClass " + "attribute to 'true'",
|
||||
method.getName(), method.getDeclaringClass().getSimpleName()), ex);
|
||||
throw new IllegalStateException("@ReactivePulsarListener method '%s' found on bean target class '%s', "
|
||||
+ "but not found in any interface(s) for bean JDK proxy. Either "
|
||||
+ "pull the method up to an interface or switch to subclass (CGLIB) "
|
||||
+ "proxies by setting proxy-target-class/proxyTargetClass "
|
||||
+ "attribute to 'true'".formatted(method.getName(), method.getDeclaringClass().getSimpleName()),
|
||||
ex);
|
||||
}
|
||||
}
|
||||
return method;
|
||||
|
||||
@@ -95,7 +95,7 @@ public class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSend
|
||||
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
|
||||
|
||||
String resolvedTopic = ReactiveMessageSenderUtils.resolveTopicName(topic, this);
|
||||
this.logger.trace(() -> String.format("Creating reactive message sender for '%s' topic", resolvedTopic));
|
||||
this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic));
|
||||
ReactiveMessageSenderBuilder<T> sender = this.reactivePulsarClient.messageSender(schema);
|
||||
sender.applySpec(this.reactiveMessageSenderSpec);
|
||||
sender.topic(resolvedTopic);
|
||||
|
||||
@@ -128,12 +128,12 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
|
||||
@Nullable MessageSpecBuilderCustomizer<T> messageSpecBuilderCustomizer,
|
||||
@Nullable ReactiveMessageSenderBuilderCustomizer<T> customizer) {
|
||||
String topicName = resolveTopic(topic, message.getClass());
|
||||
this.logger.trace(() -> String.format("Sending reactive msg to '%s' topic", topicName));
|
||||
this.logger.trace(() -> "Sending reactive msg to '%s' topic".formatted(topicName));
|
||||
ReactiveMessageSender<T> sender = createMessageSender(topicName, message, schema, customizer);
|
||||
// @formatter:off
|
||||
return sender.sendOne(getMessageSpec(messageSpecBuilderCustomizer, message))
|
||||
.doOnError(ex -> this.logger.error(ex, () -> String.format("Failed to send message to '%s' topic", topicName)))
|
||||
.doOnSuccess(msgId -> this.logger.trace(() -> String.format("Sent message to '%s' topic", topicName)));
|
||||
.doOnError(ex -> this.logger.error(ex, () -> "Failed to send message to '%s' topic".formatted(topicName)))
|
||||
.doOnSuccess(msgId -> this.logger.trace(() -> "Sent message to '%s' topic".formatted(topicName)));
|
||||
// @formatter:on
|
||||
}
|
||||
|
||||
@@ -144,11 +144,9 @@ public class ReactivePulsarTemplate<T> implements ReactivePulsarOperations<T> {
|
||||
if (firstMessage != null && firstSignal.isOnNext()) {
|
||||
String topicName = resolveTopic(topic, firstMessage.getClass());
|
||||
ReactiveMessageSender<T> sender = createMessageSender(topicName, firstMessage, schema, customizer);
|
||||
return messageFlux.map(MessageSpec::of).as(sender::sendMany)
|
||||
.doOnError(ex -> this.logger.error(ex,
|
||||
() -> String.format("Failed to send messages to '%s' topic", topicName)))
|
||||
.doOnNext(msgId -> this.logger
|
||||
.trace(() -> String.format("Sent messages to '%s' topic", topicName)));
|
||||
return messageFlux.map(MessageSpec::of).as(sender::sendMany).doOnError(
|
||||
ex -> this.logger.error(ex, () -> "Failed to send messages to '%s' topic".formatted(topicName)))
|
||||
.doOnNext(msgId -> this.logger.trace(() -> "Sent messages to '%s' topic".formatted(topicName)));
|
||||
}
|
||||
// The flux has errored or is completed
|
||||
return messageFlux.thenMany(Flux.empty());
|
||||
|
||||
@@ -32,8 +32,7 @@ import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* Tests for
|
||||
* {@link org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory}
|
||||
* Tests for {@link DefaultReactivePulsarConsumerFactory}.
|
||||
*
|
||||
* @author Christophe Bornet
|
||||
* @author Chris Bono
|
||||
|
||||
@@ -30,8 +30,7 @@ import org.assertj.core.api.InstanceOfAssertFactories;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* Tests for
|
||||
* {@link org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory}
|
||||
* Tests for {@link DefaultReactivePulsarReaderFactory}.
|
||||
*
|
||||
* @author Christophe Bornet
|
||||
*/
|
||||
@@ -43,7 +42,7 @@ class DefaultReactiveMessageReaderFactoryTests {
|
||||
void createReader() {
|
||||
MutableReactiveMessageReaderSpec spec = new MutableReactiveMessageReaderSpec();
|
||||
spec.setReaderName("test-reader");
|
||||
org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory<String> readerFactory = new org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory<>(
|
||||
DefaultReactivePulsarReaderFactory<String> readerFactory = new DefaultReactivePulsarReaderFactory<>(
|
||||
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), spec);
|
||||
|
||||
ReactiveMessageReader<String> reader = readerFactory.createReader(schema);
|
||||
@@ -56,7 +55,7 @@ class DefaultReactiveMessageReaderFactoryTests {
|
||||
void createReaderWithCustomizer() {
|
||||
MutableReactiveMessageReaderSpec spec = new MutableReactiveMessageReaderSpec();
|
||||
spec.setReaderName("test-reader");
|
||||
org.springframework.pulsar.reactive.core.DefaultReactivePulsarReaderFactory<String> readerFactory = new DefaultReactivePulsarReaderFactory<>(
|
||||
DefaultReactivePulsarReaderFactory<String> readerFactory = new DefaultReactivePulsarReaderFactory<>(
|
||||
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), spec);
|
||||
|
||||
ReactiveMessageReader<String> reader = readerFactory.createReader(schema,
|
||||
|
||||
@@ -50,7 +50,7 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Tests for {@link org.springframework.pulsar.reactive.core.ReactivePulsarTemplate}.
|
||||
* Tests for {@link ReactivePulsarTemplate}.
|
||||
*
|
||||
* @author Christophe Bornet
|
||||
* @author Chris Bono
|
||||
|
||||
Reference in New Issue
Block a user