Add tests for PulsarProperties toCustomizer API (#405)

This commit is contained in:
Chris Bono
2023-05-13 23:22:58 -05:00
committed by GitHub
parent d38cc6cb8d
commit 12e557c79a

View File

@@ -20,9 +20,9 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import java.time.Duration;
import java.util.HashMap;
@@ -30,13 +30,17 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
@@ -49,11 +53,11 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.boot.context.properties.bind.BindException;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import org.springframework.pulsar.autoconfigure.PulsarProperties.SchemaInfo;
import org.springframework.pulsar.autoconfigure.PulsarProperties.TypeMapping;
@@ -68,22 +72,16 @@ import org.springframework.util.unit.DataSize;
*/
public class PulsarPropertiesTests {
private final PulsarProperties properties = new PulsarProperties();
private void bind(Map<String, String> map) {
ConfigurationPropertySource source = new MapConfigurationPropertySource(map);
new Binder(source).bind("spring.pulsar", Bindable.ofInstance(this.properties));
private PulsarProperties newConfigPropsFromUserProps(Map<String, String> map) {
var targetProps = new PulsarProperties();
var source = new MapConfigurationPropertySource(map);
new Binder(source).bind("spring.pulsar", Bindable.ofInstance(targetProps));
return targetProps;
}
@Nested
class ClientPropertiesTests {
private final String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
private final String authParamsStr = "{\"token\":\"1234\"}";
private final String authToken = "1234";
@Test
void clientProperties() {
var props = new HashMap<String, String>();
@@ -128,10 +126,8 @@ public class PulsarPropertiesTests {
props.put("spring.pulsar.client.socks5-proxy-address", "my-socks5-proxy-address");
props.put("spring.pulsar.client.socks5-proxy-username", "my-socks5-proxy-username");
props.put("spring.pulsar.client.socks5-proxy-password", "my-socks5-proxy-password");
var clientProps = newConfigPropsFromUserProps(props).getClient();
bind(props);
PulsarProperties.Client clientProps = PulsarPropertiesTests.this.properties.getClient();
assertThat(clientProps.getServiceUrl()).isEqualTo("my-service-url");
assertThat(clientProps.getListenerName()).isEqualTo("my-listener");
assertThat(clientProps.getOperationTimeout()).isEqualTo(Duration.ofMillis(1000));
@@ -177,23 +173,25 @@ public class PulsarPropertiesTests {
@Test
void authenticationUsingAuthParamsString() {
var authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
var authParamsStr = "{\"token\":\"1234\"}";
var props = new HashMap<String, String>();
props.put("spring.pulsar.client.auth-plugin-class-name",
"org.apache.pulsar.client.impl.auth.AuthenticationToken");
props.put("spring.pulsar.client.auth-plugin-class-name", authPluginClassName);
props.put("spring.pulsar.client.auth-params", authParamsStr);
bind(props);
var clientProps = PulsarPropertiesTests.this.properties.getClient();
var clientProps = newConfigPropsFromUserProps(props).getClient();
assertThat(clientProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(clientProps.getAuthParams()).isEqualTo(authParamsStr);
}
@Test
void authenticationUsingAuthenticationMap() {
var authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
var authToken = "1234";
var props = new HashMap<String, String>();
props.put("spring.pulsar.client.auth-plugin-class-name", authPluginClassName);
props.put("spring.pulsar.client.authentication.token", authToken);
bind(props);
var clientProps = PulsarPropertiesTests.this.properties.getClient();
var configProps = newConfigPropsFromUserProps(props);
var clientProps = configProps.getClient();
assertThat(clientProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(clientProps.getAuthentication()).containsEntry("token", authToken);
}
@@ -211,7 +209,7 @@ public class PulsarPropertiesTests {
@Test
void adminProperties() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.administration.service-url", "my-service-url");
props.put("spring.pulsar.administration.connection-timeout", "12s");
props.put("spring.pulsar.administration.read-timeout", "13s");
@@ -229,9 +227,7 @@ public class PulsarPropertiesTests {
props.put("spring.pulsar.administration.tls-trust-store-password", "my-trust-store-password");
props.put("spring.pulsar.administration.tls-ciphers[0]", "my-tls-cipher");
props.put("spring.pulsar.administration.tls-protocols[0]", "my-tls-protocol");
bind(props);
var adminProps = properties.getAdministration();
var adminProps = newConfigPropsFromUserProps(props).getAdministration();
// Verify properties
assertThat(adminProps.getServiceUrl()).isEqualTo("my-service-url");
@@ -256,77 +252,75 @@ public class PulsarPropertiesTests {
var adminBuilder = mock(PulsarAdminBuilder.class);
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
adminCustomizer.customize(adminBuilder);
verify(adminBuilder).serviceHttpUrl("my-service-url");
verify(adminBuilder).connectionTimeout(12_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).readTimeout(13_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).requestTimeout(14_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).autoCertRefreshTime(15_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).enableTlsHostnameVerification(true);
verify(adminBuilder).tlsTrustCertsFilePath("my-trust-certs-file-path");
verify(adminBuilder).tlsCertificateFilePath("my-certificate-file-path");
verify(adminBuilder).tlsKeyFilePath("my-key-file-path");
verify(adminBuilder).allowTlsInsecureConnection(true);
verify(adminBuilder).useKeyStoreTls(true);
verify(adminBuilder).sslProvider("my-ssl-provider");
verify(adminBuilder).tlsTrustStoreType("my-trust-store-type");
verify(adminBuilder).tlsTrustStorePath("my-trust-store-path");
verify(adminBuilder).tlsTrustStorePassword("my-trust-store-password");
verify(adminBuilder).tlsCiphers(Set.of("my-tls-cipher"));
verify(adminBuilder).tlsProtocols(Set.of("my-tls-protocol"));
then(adminBuilder).should().serviceHttpUrl("my-service-url");
then(adminBuilder).should().connectionTimeout(12_000, TimeUnit.MILLISECONDS);
then(adminBuilder).should().readTimeout(13_000, TimeUnit.MILLISECONDS);
then(adminBuilder).should().requestTimeout(14_000, TimeUnit.MILLISECONDS);
then(adminBuilder).should().autoCertRefreshTime(15_000, TimeUnit.MILLISECONDS);
then(adminBuilder).should().enableTlsHostnameVerification(true);
then(adminBuilder).should().tlsTrustCertsFilePath("my-trust-certs-file-path");
then(adminBuilder).should().tlsCertificateFilePath("my-certificate-file-path");
then(adminBuilder).should().tlsKeyFilePath("my-key-file-path");
then(adminBuilder).should().allowTlsInsecureConnection(true);
then(adminBuilder).should().useKeyStoreTls(true);
then(adminBuilder).should().sslProvider("my-ssl-provider");
then(adminBuilder).should().tlsTrustStoreType("my-trust-store-type");
then(adminBuilder).should().tlsTrustStorePath("my-trust-store-path");
then(adminBuilder).should().tlsTrustStorePassword("my-trust-store-password");
then(adminBuilder).should().tlsCiphers(Set.of("my-tls-cipher"));
then(adminBuilder).should().tlsProtocols(Set.of("my-tls-protocol"));
}
@Test
void authenticationUsingAuthParamsString() throws UnsupportedAuthenticationException {
Map<String, String> props = new HashMap<>();
props.put("spring.pulsar.administration.auth-plugin-class-name",
"org.apache.pulsar.client.impl.auth.AuthenticationToken");
props.put("spring.pulsar.administration.auth-params", authParamsStr);
bind(props);
var props = new HashMap<String, String>();
props.put("spring.pulsar.administration.auth-plugin-class-name", this.authPluginClassName);
props.put("spring.pulsar.administration.auth-params", this.authParamsStr);
var adminProps = newConfigPropsFromUserProps(props).getAdministration();
// Verify properties
var adminProps = properties.getAdministration();
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr);
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(this.authPluginClassName);
assertThat(adminProps.getAuthParams()).isEqualTo(this.authParamsStr);
// Verify customizer
var adminBuilder = mock(PulsarAdminBuilder.class);
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
adminCustomizer.customize(adminBuilder);
verify(adminBuilder).authentication(authPluginClassName, authParamsStr);
then(adminBuilder).should().authentication(this.authPluginClassName, this.authParamsStr);
}
@Test
void authenticationUsingAuthenticationMap() throws UnsupportedAuthenticationException {
Map<String, String> props = new HashMap<>();
props.put("spring.pulsar.administration.auth-plugin-class-name", authPluginClassName);
props.put("spring.pulsar.administration.authentication.token", authToken);
bind(props);
var props = new HashMap<String, String>();
props.put("spring.pulsar.administration.auth-plugin-class-name", this.authPluginClassName);
props.put("spring.pulsar.administration.authentication.token", this.authToken);
var adminProps = newConfigPropsFromUserProps(props).getAdministration();
// Verify properties
var adminProps = properties.getAdministration();
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(adminProps.getAuthentication()).containsEntry("token", authToken);
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(this.authPluginClassName);
assertThat(adminProps.getAuthentication()).containsEntry("token", this.authToken);
// Verify customizer
var adminBuilder = mock(PulsarAdminBuilder.class);
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
adminCustomizer.customize(adminBuilder);
verify(adminBuilder).authentication(authPluginClassName, authParamsStr);
then(adminBuilder).should().authentication(this.authPluginClassName, this.authParamsStr);
}
@Test
void authenticationNotAllowedUsingBothAuthParamsStringAndAuthenticationMap() {
Map<String, String> props = new HashMap<>();
props.put("spring.pulsar.administration.auth-plugin-class-name", authPluginClassName);
props.put("spring.pulsar.administration.auth-params", authParamsStr);
props.put("spring.pulsar.administration.authentication.token", authToken);
bind(props);
var props = new HashMap<String, String>();
props.put("spring.pulsar.administration.auth-plugin-class-name", this.authPluginClassName);
props.put("spring.pulsar.administration.auth-params", this.authParamsStr);
props.put("spring.pulsar.administration.authentication.token", this.authToken);
var adminProps = newConfigPropsFromUserProps(props).getAdministration();
// Verify properties
var adminProps = properties.getAdministration();
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(adminProps.getAuthentication()).containsEntry("token", authToken);
assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr);
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(this.authPluginClassName);
assertThat(adminProps.getAuthentication()).containsEntry("token", this.authToken);
assertThat(adminProps.getAuthParams()).isEqualTo(this.authParamsStr);
// Verify customizer
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
@@ -342,78 +336,79 @@ public class PulsarPropertiesTests {
@Test
void emptyByDefault() {
assertThat(properties.getDefaults().getTypeMappings()).isEmpty();
assertThat(new PulsarProperties().getDefaults().getTypeMappings()).isEmpty();
}
@Test
void withTopicsOnly() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
props.put("spring.pulsar.defaults.type-mappings[0].topic-name", "foo-topic");
props.put("spring.pulsar.defaults.type-mappings[1].message-type", String.class.getName());
props.put("spring.pulsar.defaults.type-mappings[1].topic-name", "string-topic");
bind(props);
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
var configProps = newConfigPropsFromUserProps(props);
assertThat(configProps.getDefaults().getTypeMappings()).containsExactly(
new TypeMapping(Foo.class, "foo-topic", null), new TypeMapping(String.class, "string-topic", null));
}
@Test
void withSchemaOnly() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
bind(props);
assertThat(properties.getDefaults().getTypeMappings())
var configProps = newConfigPropsFromUserProps(props);
assertThat(configProps.getDefaults().getTypeMappings())
.containsExactly(new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, null)));
}
@Test
void withTopicAndSchema() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
props.put("spring.pulsar.defaults.type-mappings[0].topic-name", "foo-topic");
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
bind(props);
assertThat(properties.getDefaults().getTypeMappings())
var configProps = newConfigPropsFromUserProps(props);
assertThat(configProps.getDefaults().getTypeMappings())
.containsExactly(new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, null)));
}
@Test
void withKeyValueSchema() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "KEY_VALUE");
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
bind(props);
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
var configProps = newConfigPropsFromUserProps(props);
assertThat(configProps.getDefaults().getTypeMappings()).containsExactly(
new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, String.class)));
}
@Test
void schemaTypeRequired() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause()
.withMessageContaining("schemaType must not be null");
assertThatExceptionOfType(BindException.class).isThrownBy(() -> newConfigPropsFromUserProps(props))
.havingRootCause().withMessageContaining("schemaType must not be null");
}
@Test
void schemaTypeNoneNotAllowed() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "NONE");
assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause()
.withMessageContaining("schemaType NONE not supported");
assertThatExceptionOfType(BindException.class).isThrownBy(() -> newConfigPropsFromUserProps(props))
.havingRootCause().withMessageContaining("schemaType NONE not supported");
}
@Test
void messageKeyTypeOnlyAllowedForKeyValueSchemaType() {
Map<String, String> props = new HashMap<>();
var props = new HashMap<String, String>();
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause()
assertThatExceptionOfType(BindException.class).isThrownBy(() -> newConfigPropsFromUserProps(props))
.havingRootCause()
.withMessageContaining("messageKeyType can only be set when schemaType is KEY_VALUE");
}
@@ -425,8 +420,10 @@ public class PulsarPropertiesTests {
@Nested
class ProducerPropertiesTests {
@Test
void producerProperties() {
private ProducerConfigProperties producerProps;
@BeforeEach
void producerTestProps() {
var props = new HashMap<String, String>();
props.put("spring.pulsar.producer.topic-name", "my-topic");
props.put("spring.pulsar.producer.producer-name", "my-producer");
@@ -449,10 +446,11 @@ public class PulsarPropertiesTests {
props.put("spring.pulsar.producer.producer-access-mode", "exclusive");
props.put("spring.pulsar.producer.lazy-start=partitioned-producers", "true");
props.put("spring.pulsar.producer.properties[my-prop]", "my-prop-value");
this.producerProps = newConfigPropsFromUserProps(props).getProducer();
}
bind(props);
var producerProps = properties.getProducer();
@Test
void producerProperties() {
assertThat(producerProps.getTopicName()).isEqualTo("my-topic");
assertThat(producerProps.getProducerName()).isEqualTo("my-producer");
assertThat(producerProps.getSendTimeout()).isEqualTo(Duration.ofMillis(2000));
@@ -476,13 +474,44 @@ public class PulsarPropertiesTests {
assertThat(producerProps.getProperties()).containsExactly(entry("my-prop", "my-prop-value"));
}
@SuppressWarnings({ "unchecked", "deprecation" })
@Test
void toProducerCustomizer() {
var producerBuilder = mock(ProducerBuilder.class);
var customizer = this.producerProps.toProducerBuilderCustomizer();
customizer.customize(producerBuilder);
then(producerBuilder).should().topic("my-topic");
then(producerBuilder).should().producerName("my-producer");
then(producerBuilder).should().sendTimeout(2_000, TimeUnit.MILLISECONDS);
then(producerBuilder).should().blockIfQueueFull(true);
then(producerBuilder).should().maxPendingMessages(3);
then(producerBuilder).should().maxPendingMessagesAcrossPartitions(4);
then(producerBuilder).should().messageRoutingMode(MessageRoutingMode.CustomPartition);
then(producerBuilder).should().hashingScheme(HashingScheme.Murmur3_32Hash);
then(producerBuilder).should().cryptoFailureAction(ProducerCryptoFailureAction.SEND);
then(producerBuilder).should().batchingMaxPublishDelay(5_000, TimeUnit.MILLISECONDS);
then(producerBuilder).should().roundRobinRouterBatchingPartitionSwitchFrequency(6);
then(producerBuilder).should().batchingMaxMessages(7);
then(producerBuilder).should().batchingMaxBytes(8);
then(producerBuilder).should().enableBatching(false);
then(producerBuilder).should().enableChunking(true);
then(producerBuilder).should().addEncryptionKey("my-key");
then(producerBuilder).should().compressionType(CompressionType.LZ4);
then(producerBuilder).should().initialSequenceId(9);
then(producerBuilder).should().accessMode(ProducerAccessMode.Exclusive);
then(producerBuilder).should().enableLazyStartPartitionedProducers(true);
then(producerBuilder).should().properties(Map.of("my-prop", "my-prop-value"));
}
}
@Nested
class ConsumerPropertiesTests {
@Test
void consumerProperties() {
private ConsumerConfigProperties consumerProps;
@BeforeEach
void consumerTestProps() {
var props = new HashMap<String, String>();
props.put("spring.pulsar.consumer.topics[0]", "my-topic");
props.put("spring.pulsar.consumer.topics-pattern", "my-pattern");
@@ -520,10 +549,11 @@ public class PulsarPropertiesTests {
props.put("spring.pulsar.consumer.auto-ack-oldest-chunked-message-on-queue-full", "false");
props.put("spring.pulsar.consumer.max-pending-chunked-message", "11");
props.put("spring.pulsar.consumer.expire-time-of-incomplete-chunked-message", "12s");
this.consumerProps = newConfigPropsFromUserProps(props).getConsumer();
}
bind(props);
var consumerProps = properties.getConsumer();
@Test
void consumerProperties() {
assertThat(consumerProps.getTopics()).containsExactly("my-topic");
assertThat(consumerProps.getTopicsPattern().toString()).isEqualTo("my-pattern");
assertThat(consumerProps.getSubscriptionName()).isEqualTo("my-subscription");
@@ -545,7 +575,7 @@ public class PulsarPropertiesTests {
assertThat(consumerProps.getSubscriptionInitialPosition()).isEqualTo(SubscriptionInitialPosition.Earliest);
assertThat(consumerProps.getPatternAutoDiscoveryPeriod()).isEqualTo(9);
assertThat(consumerProps.getRegexSubscriptionMode()).isEqualTo(RegexSubscriptionMode.AllTopics);
assertThat(consumerProps.getDeadLetterPolicy()).satisfies(dlp -> {
assertThat(consumerProps.getDeadLetterPolicy()).satisfies((dlp) -> {
assertThat(dlp.getMaxRedeliverCount()).isEqualTo(4);
assertThat(dlp.getRetryLetterTopic()).isEqualTo("my-retry-topic");
assertThat(dlp.getDeadLetterTopic()).isEqualTo("my-dlt-topic");
@@ -565,30 +595,86 @@ public class PulsarPropertiesTests {
assertThat(consumerProps.getExpireTimeOfIncompleteChunkedMessage()).isEqualTo(Duration.ofMillis(12_000));
}
@SuppressWarnings("unchecked")
@Test
void toConsumerCustomizer() {
var consumerBuilder = mock(ConsumerBuilder.class);
var customizer = this.consumerProps.toConsumerBuilderCustomizer();
customizer.customize(consumerBuilder);
then(consumerBuilder).should().topics(List.of("my-topic"));
var argCaptor = ArgumentCaptor.forClass(Pattern.class);
then(consumerBuilder).should().topicsPattern(argCaptor.capture());
assertThat(argCaptor.getValue().pattern()).isEqualTo("my-pattern");
then(consumerBuilder).should().subscriptionName("my-subscription");
then(consumerBuilder).should().subscriptionType(SubscriptionType.Shared);
then(consumerBuilder).should().subscriptionProperties(Map.of("my-sub-prop", "my-sub-prop-value"));
then(consumerBuilder).should().subscriptionMode(SubscriptionMode.NonDurable);
then(consumerBuilder).should().receiverQueueSize(1);
then(consumerBuilder).should().acknowledgmentGroupTime(2_000, TimeUnit.MILLISECONDS);
then(consumerBuilder).should().negativeAckRedeliveryDelay(3_000, TimeUnit.MILLISECONDS);
then(consumerBuilder).should().maxTotalReceiverQueueSizeAcrossPartitions(5);
then(consumerBuilder).should().consumerName("my-consumer");
then(consumerBuilder).should().ackTimeout(6_000, TimeUnit.MILLISECONDS);
then(consumerBuilder).should().ackTimeoutTickTime(7_000, TimeUnit.MILLISECONDS);
then(consumerBuilder).should().priorityLevel(8);
then(consumerBuilder).should().cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD);
then(consumerBuilder).should().properties(Map.of("my-prop", "my-prop-value"));
then(consumerBuilder).should().readCompacted(true);
then(consumerBuilder).should().subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
then(consumerBuilder).should().patternAutoDiscoveryPeriod(9);
then(consumerBuilder).should().subscriptionTopicsMode(RegexSubscriptionMode.AllTopics);
then(consumerBuilder).should()
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(4).retryLetterTopic("my-retry-topic")
.deadLetterTopic("my-dlt-topic").initialSubscriptionName("my-initial-subscription")
.build());
then(consumerBuilder).should().enableRetry(true);
then(consumerBuilder).should().autoUpdatePartitions(false);
then(consumerBuilder).should().autoUpdatePartitionsInterval(10_000, TimeUnit.MILLISECONDS);
then(consumerBuilder).should().replicateSubscriptionState(true);
then(consumerBuilder).should().startMessageIdInclusive();
then(consumerBuilder).should().enableBatchIndexAcknowledgment(true);
then(consumerBuilder).should().isAckReceiptEnabled(true);
then(consumerBuilder).should().poolMessages(true);
then(consumerBuilder).should().startPaused(true);
then(consumerBuilder).should().autoAckOldestChunkedMessageOnQueueFull(false);
then(consumerBuilder).should().maxPendingChunkedMessage(11);
then(consumerBuilder).should().expireTimeOfIncompleteChunkedMessage(12_000, TimeUnit.MILLISECONDS);
}
@SuppressWarnings("unchecked")
@Test
void toConsumerCustomizerResetDoesNotIncludeHead() {
var consumerBuilder = mock(ConsumerBuilder.class);
this.consumerProps.setResetIncludeHead(false);
var customizer = this.consumerProps.toConsumerBuilderCustomizer();
customizer.customize(consumerBuilder);
then(consumerBuilder).should(never()).startMessageIdInclusive();
}
}
@Nested
class FunctionPropertiesTests {
@Test
void functionProperties() {
Map<String, String> props = new HashMap<>();
bind(props);
void functionPropertiesWithDefaults() {
var props = new HashMap<String, String>();
var functionProps = newConfigPropsFromUserProps(props).getFunction();
assertThat(functionProps.getFailFast()).isTrue();
assertThat(functionProps.getPropagateFailures()).isTrue();
assertThat(functionProps.getPropagateStopFailures()).isFalse();
}
// check defaults
assertThat(properties.getFunction().getFailFast()).isTrue();
assertThat(properties.getFunction().getPropagateFailures()).isTrue();
assertThat(properties.getFunction().getPropagateStopFailures()).isFalse();
// set values and verify
@Test
void functionPropertiesWitValues() {
var props = new HashMap<String, String>();
props.put("spring.pulsar.function.fail-fast", "false");
props.put("spring.pulsar.function.propagate-failures", "false");
props.put("spring.pulsar.function.propagate-stop-failures", "true");
bind(props);
assertThat(properties.getFunction().getFailFast()).isFalse();
assertThat(properties.getFunction().getPropagateFailures()).isFalse();
assertThat(properties.getFunction().getPropagateStopFailures()).isTrue();
var functionProps = newConfigPropsFromUserProps(props).getFunction();
assertThat(functionProps.getFailFast()).isFalse();
assertThat(functionProps.getPropagateFailures()).isFalse();
assertThat(functionProps.getPropagateStopFailures()).isTrue();
}
}
@@ -596,9 +682,11 @@ public class PulsarPropertiesTests {
@Nested
class ReaderPropertiesTests {
private PulsarProperties.Reader readerProps;
@BeforeEach
void bindProperties() {
Map<String, String> props = new HashMap<>();
void readerTestProps() {
var props = new HashMap<String, String>();
props.put("spring.pulsar.reader.topic-names", "my-topic");
props.put("spring.pulsar.reader.receiver-queue-size", "100");
props.put("spring.pulsar.reader.reader-name", "my-reader");
@@ -606,44 +694,43 @@ public class PulsarPropertiesTests {
props.put("spring.pulsar.reader.subscription-role-prefix", "sub-role");
props.put("spring.pulsar.reader.read-compacted", "true");
props.put("spring.pulsar.reader.reset-include-head", "true");
bind(props);
this.readerProps = newConfigPropsFromUserProps(props).getReader();
}
@Test
void readerProperties() {
var readerProps = properties.getReader();
assertThat(readerProps.getTopicNames()).containsExactly("my-topic");
assertThat(readerProps.getReceiverQueueSize()).isEqualTo(100);
assertThat(readerProps.getReaderName()).isEqualTo("my-reader");
assertThat(readerProps.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(readerProps.getSubscriptionRolePrefix()).isEqualTo("sub-role");
assertThat(readerProps.getReadCompacted()).isTrue();
assertThat(readerProps.getResetIncludeHead()).isTrue();
assertThat(this.readerProps.getTopicNames()).containsExactly("my-topic");
assertThat(this.readerProps.getReceiverQueueSize()).isEqualTo(100);
assertThat(this.readerProps.getReaderName()).isEqualTo("my-reader");
assertThat(this.readerProps.getSubscriptionName()).isEqualTo("my-subscription");
assertThat(this.readerProps.getSubscriptionRolePrefix()).isEqualTo("sub-role");
assertThat(this.readerProps.getReadCompacted()).isTrue();
assertThat(this.readerProps.getResetIncludeHead()).isTrue();
}
@SuppressWarnings("unchecked")
@Test
void toReaderCustomizer() {
var readerBuilder = mock(ReaderBuilder.class);
var customizer = properties.getReader().toReaderBuilderCustomizer();
var customizer = this.readerProps.toReaderBuilderCustomizer();
customizer.customize(readerBuilder);
verify(readerBuilder).topics(List.of("my-topic"));
verify(readerBuilder).receiverQueueSize(100);
verify(readerBuilder).readerName("my-reader");
verify(readerBuilder).subscriptionName("my-subscription");
verify(readerBuilder).subscriptionRolePrefix("sub-role");
verify(readerBuilder).readCompacted(true);
verify(readerBuilder).startMessageIdInclusive();
then(readerBuilder).should().topics(List.of("my-topic"));
then(readerBuilder).should().receiverQueueSize(100);
then(readerBuilder).should().readerName("my-reader");
then(readerBuilder).should().subscriptionName("my-subscription");
then(readerBuilder).should().subscriptionRolePrefix("sub-role");
then(readerBuilder).should().readCompacted(true);
then(readerBuilder).should().startMessageIdInclusive();
}
@SuppressWarnings("unchecked")
@Test
void toReaderCustomizerResetDoesNotIncludeHead() {
properties.getReader().setResetIncludeHead(false);
this.readerProps.setResetIncludeHead(false);
var readerBuilder = mock(ReaderBuilder.class);
var customizer = properties.getReader().toReaderBuilderCustomizer();
var customizer = this.readerProps.toReaderBuilderCustomizer();
customizer.customize(readerBuilder);
verify(readerBuilder, never()).startMessageIdInclusive();
then(readerBuilder).should(never()).startMessageIdInclusive();
}
}