Harmonize tests between PulsarTemplate and ReactivePulsarTemplate (#332)

This commit is contained in:
Christophe Bornet
2023-02-08 15:39:23 +01:00
committed by GitHub
parent b432e3da67
commit d13ae9a94d
2 changed files with 146 additions and 76 deletions

View File

@@ -22,6 +22,7 @@ import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -36,11 +37,14 @@ import org.apache.pulsar.reactive.client.api.MutableReactiveMessageSenderSpec;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
@@ -160,70 +164,38 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
assertThat(msg.getProducerName()).isEqualTo("test-producer");
}
@Test
void sendMessageWithCustomTopicMapping() throws Exception {
String topic = "sendMessageWithCustomTopicMapping";
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client,
new MutableReactiveMessageSenderSpec(), null);
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaultTopic) throws Exception {
String topic = "ptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic";
MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec();
if (producerFactoryHasDefaultTopic) {
spec.setTopicName("fake-topic");
}
ReactivePulsarSenderFactory<Foo> producerFactory = new DefaultReactivePulsarSenderFactory<>(client, spec, null);
// Topic mappings allows not specifying the topic when sending (nor having
// default on producer)
DefaultTopicResolver topicResolver = new DefaultTopicResolver();
topicResolver.addCustomTopicMapping(String.class, topic);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory,
topicResolver.addCustomTopicMapping(Foo.class, topic);
ReactivePulsarTemplate<Foo> pulsarTemplate = new ReactivePulsarTemplate<>(producerFactory,
new DefaultSchemaResolver(), topicResolver);
Consumer<ReactivePulsarTemplate<String>> sendFunction = (template) -> template.send("test-message").subscribe();
sendAndConsume(pulsarTemplate, sendFunction, topic, Schema.STRING, "test-message");
}
@Test
void sendMessageWithCustomSchemaMapping() throws Exception {
String topic = "sendMessageWithCustomSchemaMapping";
ReactivePulsarSenderFactory<Foo> senderFactory = new DefaultReactivePulsarSenderFactory<>(client,
new MutableReactiveMessageSenderSpec(), null);
DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver();
schemaResolver.addCustomSchemaMapping(Foo.class, Schema.JSON(Foo.class));
ReactivePulsarTemplate<Foo> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory, schemaResolver,
new DefaultTopicResolver());
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
Consumer<ReactivePulsarTemplate<Foo>> sendFunction = (template) -> template.send(topic, foo).subscribe();
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (template) -> template
.send(foo, Schema.JSON(Foo.class)).subscribe();
sendAndConsume(pulsarTemplate, sendFunction, topic, Schema.JSON(Foo.class), foo);
}
@ParameterizedTest(name = "{0}")
@MethodSource("sendMessageFailedTestProvider")
void sendMessageFailed(String testName, ThrowingConsumer<ReactivePulsarTemplate<String>> sendFunction) {
@Test
void sendMessageWithoutTopicFails() {
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client,
new MutableReactiveMessageSenderSpec(), null);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException().isThrownBy(() -> sendFunction.accept(pulsarTemplate));
}
static Stream<Arguments> sendMessageFailedTestProvider() {
String message = "test-message";
return Stream.of(
arguments("sendWithoutTopic",
(ThrowingConsumer<ReactivePulsarTemplate<String>>) (template) -> template.send(message)),
arguments("sendNullWithoutSchema",
(ThrowingConsumer<ReactivePulsarTemplate<String>>) (template) -> template
.send("sendNullWithoutSchema", (String) null)));
}
@Test
void sendNullWithDefaultTopicFails() {
MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec();
spec.setTopicName("sendNullWithDefaultTopicFails");
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client, spec,
null);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send((String) null, Schema.STRING));
assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send("test-message").subscribe())
.withMessage("Topic must be specified when no default topic is configured");
}
private <T> Message<T> sendAndConsume(Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic,
Schema<T> schema, T expectedValue, Boolean withDefaultTopic) throws Exception {
Schema<T> schema, @Nullable T expectedValue, Boolean withDefaultTopic) throws Exception {
MutableReactiveMessageSenderSpec senderSpec = new MutableReactiveMessageSenderSpec();
if (withDefaultTopic) {
senderSpec.setTopicName(topic);
@@ -237,7 +209,7 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
}
private <T> Message<T> sendAndConsume(ReactivePulsarTemplate<T> template,
Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic, Schema<T> schema, T expectedValue)
Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic, Schema<T> schema, @Nullable T expectedValue)
throws Exception {
try (org.apache.pulsar.client.api.Consumer<T> consumer = client.newConsumer(schema).topic(topic)
.subscriptionName(topic + "-sub").subscribe()) {
@@ -250,7 +222,121 @@ class ReactivePulsarTemplateTests implements PulsarTestContainerSupport {
}
}
record Foo(String foo, String bar) {
@Nested
class SendNonPrimitiveSchemaTests {
@Test
void withSpecifiedSchema() throws Exception {
String topic = "ptt-specificSchema-topic";
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (template) -> template
.send(foo, Schema.AVRO(Foo.class)).subscribe();
sendAndConsume(sendFunction, topic, Schema.AVRO(Foo.class), foo, true);
}
@Test
void withSchemaInferredByMessageType() throws Exception {
String topic = "ptt-nospecificSchema-topic";
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (template) -> template.send(foo).subscribe();
sendAndConsume(sendFunction, topic, Schema.JSON(Foo.class), foo, true);
}
@Test
void withSchemaInferredByTypeMappings() throws Exception {
String topic = "ptt-schemaInferred-topic";
MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec();
spec.setTopicName(topic);
ReactivePulsarSenderFactory<Foo> producerFactory = new DefaultReactivePulsarSenderFactory<>(client, spec,
null);
// Custom schema resolver allows not specifying the schema when sending
DefaultSchemaResolver schemaResolver = new DefaultSchemaResolver();
schemaResolver.addCustomSchemaMapping(Foo.class, Schema.JSON(Foo.class));
ReactivePulsarTemplate<Foo> pulsarTemplate = new ReactivePulsarTemplate<>(producerFactory, schemaResolver,
new DefaultTopicResolver());
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (template) -> template.newMessage(foo).send()
.subscribe();
sendAndConsume(pulsarTemplate, sendFunction, topic, Schema.JSON(Foo.class), foo);
}
}
@Nested
class SendNullTests {
@Test
void sendNullWithDefaultTopicFails() {
MutableReactiveMessageSenderSpec spec = new MutableReactiveMessageSenderSpec();
spec.setTopicName("sendNullWithDefaultTopicFails");
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client, spec,
null);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException()
.isThrownBy(() -> pulsarTemplate.send((String) null, Schema.STRING).subscribe())
.withMessage("Topic must be specified when the message is null");
}
@Test
void sendNullWithoutSchemaFails() {
ReactivePulsarSenderFactory<String> senderFactory = new DefaultReactivePulsarSenderFactory<>(client,
new MutableReactiveMessageSenderSpec(), null);
ReactivePulsarTemplate<String> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException()
.isThrownBy(
() -> pulsarTemplate.send("sendNullWithoutSchemaFails", (String) null, null).subscribe())
.withMessage("Schema must be specified when the message is null");
}
}
public static class Foo {
private String foo;
private String bar;
Foo() {
}
Foo(String foo, String bar) {
this.foo = foo;
this.bar = bar;
}
public String getFoo() {
return foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
public String getBar() {
return bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Foo foo1 = (Foo) o;
return foo.equals(foo1.foo) && bar.equals(foo1.bar);
}
@Override
public int hashCode() {
return Objects.hash(foo, bar);
}
}
}

View File

@@ -227,18 +227,13 @@ class PulsarTemplateTests implements PulsarTestContainerSupport {
sendAndConsume(pulsarTemplate, sendFunction, topic, Schema.JSON(Foo.class), foo);
}
@ParameterizedTest(name = "{0}")
@MethodSource("sendMessageFailedTestProvider")
void sendMessageFailed(String testName, ThrowingConsumer<PulsarTemplate<String>> sendFunction) {
PulsarProducerFactory<String> senderFactory = new DefaultPulsarProducerFactory<>(client, new HashMap<>());
@Test
void sendMessageWithoutTopicFails() {
PulsarProducerFactory<String> senderFactory = new DefaultPulsarProducerFactory<>(client,
Collections.emptyMap());
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(senderFactory);
assertThatIllegalArgumentException().isThrownBy(() -> sendFunction.accept(pulsarTemplate));
}
static Stream<Arguments> sendMessageFailedTestProvider() {
String message = "test-message";
return Stream.of(arguments("sendWithoutTopic",
(ThrowingConsumer<PulsarTemplate<String>>) (template) -> template.send(message)));
assertThatIllegalArgumentException().isThrownBy(() -> pulsarTemplate.send("test-message"))
.withMessage("Topic must be specified when no default topic is configured");
}
private <T> Message<T> sendAndConsume(ThrowingConsumer<PulsarTemplate<T>> sendFunction, String topic,
@@ -324,17 +319,6 @@ class PulsarTemplateTests implements PulsarTestContainerSupport {
.withMessage("Schema must be specified when the message is null");
}
@Test
void sendNullWithTopicAndSchema() throws Exception {
String topic = "sendNullWithTopicAndSchema";
PulsarProducerFactory<String> senderFactory = new DefaultPulsarProducerFactory<>(client,
Collections.emptyMap());
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(senderFactory);
ThrowingConsumer<PulsarTemplate<String>> sendFunction = (template) -> template.send(topic, null,
Schema.STRING);
sendAndConsume(pulsarTemplate, sendFunction, topic, Schema.STRING, null);
}
}
public static class Foo {