From 12e557c79ad7c923d538a044dfe67297ead167da Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sat, 13 May 2023 23:22:58 -0500 Subject: [PATCH] Add tests for PulsarProperties toCustomizer API (#405) --- .../autoconfigure/PulsarPropertiesTests.java | 361 +++++++++++------- 1 file changed, 224 insertions(+), 137 deletions(-) diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java index 786e4179..2fe01dff 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java @@ -20,9 +20,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.entry; +import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; import java.time.Duration; import java.util.HashMap; @@ -30,13 +30,17 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.HashingScheme; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerAccessMode; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; @@ -49,11 +53,11 @@ import org.apache.pulsar.common.schema.SchemaType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.springframework.boot.context.properties.bind.BindException; import org.springframework.boot.context.properties.bind.Bindable; import org.springframework.boot.context.properties.bind.Binder; -import org.springframework.boot.context.properties.source.ConfigurationPropertySource; import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; import org.springframework.pulsar.autoconfigure.PulsarProperties.SchemaInfo; import org.springframework.pulsar.autoconfigure.PulsarProperties.TypeMapping; @@ -68,22 +72,16 @@ import org.springframework.util.unit.DataSize; */ public class PulsarPropertiesTests { - private final PulsarProperties properties = new PulsarProperties(); - - private void bind(Map map) { - ConfigurationPropertySource source = new MapConfigurationPropertySource(map); - new Binder(source).bind("spring.pulsar", Bindable.ofInstance(this.properties)); + private PulsarProperties newConfigPropsFromUserProps(Map map) { + var targetProps = new PulsarProperties(); + var source = new MapConfigurationPropertySource(map); + new Binder(source).bind("spring.pulsar", Bindable.ofInstance(targetProps)); + return targetProps; } @Nested class ClientPropertiesTests { - private final String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; - - private final String authParamsStr = "{\"token\":\"1234\"}"; - - private final String authToken = "1234"; - @Test void clientProperties() { var props = new HashMap(); @@ -128,10 +126,8 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.client.socks5-proxy-address", "my-socks5-proxy-address"); props.put("spring.pulsar.client.socks5-proxy-username", "my-socks5-proxy-username"); props.put("spring.pulsar.client.socks5-proxy-password", "my-socks5-proxy-password"); + var clientProps = newConfigPropsFromUserProps(props).getClient(); - bind(props); - - PulsarProperties.Client clientProps = PulsarPropertiesTests.this.properties.getClient(); assertThat(clientProps.getServiceUrl()).isEqualTo("my-service-url"); assertThat(clientProps.getListenerName()).isEqualTo("my-listener"); assertThat(clientProps.getOperationTimeout()).isEqualTo(Duration.ofMillis(1000)); @@ -177,23 +173,25 @@ public class PulsarPropertiesTests { @Test void authenticationUsingAuthParamsString() { + var authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; + var authParamsStr = "{\"token\":\"1234\"}"; var props = new HashMap(); - props.put("spring.pulsar.client.auth-plugin-class-name", - "org.apache.pulsar.client.impl.auth.AuthenticationToken"); + props.put("spring.pulsar.client.auth-plugin-class-name", authPluginClassName); props.put("spring.pulsar.client.auth-params", authParamsStr); - bind(props); - var clientProps = PulsarPropertiesTests.this.properties.getClient(); + var clientProps = newConfigPropsFromUserProps(props).getClient(); assertThat(clientProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); assertThat(clientProps.getAuthParams()).isEqualTo(authParamsStr); } @Test void authenticationUsingAuthenticationMap() { + var authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; + var authToken = "1234"; var props = new HashMap(); props.put("spring.pulsar.client.auth-plugin-class-name", authPluginClassName); props.put("spring.pulsar.client.authentication.token", authToken); - bind(props); - var clientProps = PulsarPropertiesTests.this.properties.getClient(); + var configProps = newConfigPropsFromUserProps(props); + var clientProps = configProps.getClient(); assertThat(clientProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); assertThat(clientProps.getAuthentication()).containsEntry("token", authToken); } @@ -211,7 +209,7 @@ public class PulsarPropertiesTests { @Test void adminProperties() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.administration.service-url", "my-service-url"); props.put("spring.pulsar.administration.connection-timeout", "12s"); props.put("spring.pulsar.administration.read-timeout", "13s"); @@ -229,9 +227,7 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.administration.tls-trust-store-password", "my-trust-store-password"); props.put("spring.pulsar.administration.tls-ciphers[0]", "my-tls-cipher"); props.put("spring.pulsar.administration.tls-protocols[0]", "my-tls-protocol"); - bind(props); - - var adminProps = properties.getAdministration(); + var adminProps = newConfigPropsFromUserProps(props).getAdministration(); // Verify properties assertThat(adminProps.getServiceUrl()).isEqualTo("my-service-url"); @@ -256,77 +252,75 @@ public class PulsarPropertiesTests { var adminBuilder = mock(PulsarAdminBuilder.class); var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); adminCustomizer.customize(adminBuilder); - verify(adminBuilder).serviceHttpUrl("my-service-url"); - verify(adminBuilder).connectionTimeout(12_000, TimeUnit.MILLISECONDS); - verify(adminBuilder).readTimeout(13_000, TimeUnit.MILLISECONDS); - verify(adminBuilder).requestTimeout(14_000, TimeUnit.MILLISECONDS); - verify(adminBuilder).autoCertRefreshTime(15_000, TimeUnit.MILLISECONDS); - verify(adminBuilder).enableTlsHostnameVerification(true); - verify(adminBuilder).tlsTrustCertsFilePath("my-trust-certs-file-path"); - verify(adminBuilder).tlsCertificateFilePath("my-certificate-file-path"); - verify(adminBuilder).tlsKeyFilePath("my-key-file-path"); - verify(adminBuilder).allowTlsInsecureConnection(true); - verify(adminBuilder).useKeyStoreTls(true); - verify(adminBuilder).sslProvider("my-ssl-provider"); - verify(adminBuilder).tlsTrustStoreType("my-trust-store-type"); - verify(adminBuilder).tlsTrustStorePath("my-trust-store-path"); - verify(adminBuilder).tlsTrustStorePassword("my-trust-store-password"); - verify(adminBuilder).tlsCiphers(Set.of("my-tls-cipher")); - verify(adminBuilder).tlsProtocols(Set.of("my-tls-protocol")); + + then(adminBuilder).should().serviceHttpUrl("my-service-url"); + then(adminBuilder).should().connectionTimeout(12_000, TimeUnit.MILLISECONDS); + then(adminBuilder).should().readTimeout(13_000, TimeUnit.MILLISECONDS); + then(adminBuilder).should().requestTimeout(14_000, TimeUnit.MILLISECONDS); + then(adminBuilder).should().autoCertRefreshTime(15_000, TimeUnit.MILLISECONDS); + then(adminBuilder).should().enableTlsHostnameVerification(true); + then(adminBuilder).should().tlsTrustCertsFilePath("my-trust-certs-file-path"); + then(adminBuilder).should().tlsCertificateFilePath("my-certificate-file-path"); + then(adminBuilder).should().tlsKeyFilePath("my-key-file-path"); + then(adminBuilder).should().allowTlsInsecureConnection(true); + then(adminBuilder).should().useKeyStoreTls(true); + then(adminBuilder).should().sslProvider("my-ssl-provider"); + then(adminBuilder).should().tlsTrustStoreType("my-trust-store-type"); + then(adminBuilder).should().tlsTrustStorePath("my-trust-store-path"); + then(adminBuilder).should().tlsTrustStorePassword("my-trust-store-password"); + then(adminBuilder).should().tlsCiphers(Set.of("my-tls-cipher")); + then(adminBuilder).should().tlsProtocols(Set.of("my-tls-protocol")); + } @Test void authenticationUsingAuthParamsString() throws UnsupportedAuthenticationException { - Map props = new HashMap<>(); - props.put("spring.pulsar.administration.auth-plugin-class-name", - "org.apache.pulsar.client.impl.auth.AuthenticationToken"); - props.put("spring.pulsar.administration.auth-params", authParamsStr); - bind(props); + var props = new HashMap(); + props.put("spring.pulsar.administration.auth-plugin-class-name", this.authPluginClassName); + props.put("spring.pulsar.administration.auth-params", this.authParamsStr); + var adminProps = newConfigPropsFromUserProps(props).getAdministration(); // Verify properties - var adminProps = properties.getAdministration(); - assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); - assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr); + assertThat(adminProps.getAuthPluginClassName()).isEqualTo(this.authPluginClassName); + assertThat(adminProps.getAuthParams()).isEqualTo(this.authParamsStr); // Verify customizer var adminBuilder = mock(PulsarAdminBuilder.class); var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); adminCustomizer.customize(adminBuilder); - verify(adminBuilder).authentication(authPluginClassName, authParamsStr); + then(adminBuilder).should().authentication(this.authPluginClassName, this.authParamsStr); } @Test void authenticationUsingAuthenticationMap() throws UnsupportedAuthenticationException { - Map props = new HashMap<>(); - props.put("spring.pulsar.administration.auth-plugin-class-name", authPluginClassName); - props.put("spring.pulsar.administration.authentication.token", authToken); - bind(props); + var props = new HashMap(); + props.put("spring.pulsar.administration.auth-plugin-class-name", this.authPluginClassName); + props.put("spring.pulsar.administration.authentication.token", this.authToken); + var adminProps = newConfigPropsFromUserProps(props).getAdministration(); // Verify properties - var adminProps = properties.getAdministration(); - assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); - assertThat(adminProps.getAuthentication()).containsEntry("token", authToken); + assertThat(adminProps.getAuthPluginClassName()).isEqualTo(this.authPluginClassName); + assertThat(adminProps.getAuthentication()).containsEntry("token", this.authToken); // Verify customizer var adminBuilder = mock(PulsarAdminBuilder.class); var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); adminCustomizer.customize(adminBuilder); - verify(adminBuilder).authentication(authPluginClassName, authParamsStr); + then(adminBuilder).should().authentication(this.authPluginClassName, this.authParamsStr); } @Test void authenticationNotAllowedUsingBothAuthParamsStringAndAuthenticationMap() { - Map props = new HashMap<>(); - props.put("spring.pulsar.administration.auth-plugin-class-name", authPluginClassName); - props.put("spring.pulsar.administration.auth-params", authParamsStr); - props.put("spring.pulsar.administration.authentication.token", authToken); - bind(props); + var props = new HashMap(); + props.put("spring.pulsar.administration.auth-plugin-class-name", this.authPluginClassName); + props.put("spring.pulsar.administration.auth-params", this.authParamsStr); + props.put("spring.pulsar.administration.authentication.token", this.authToken); + var adminProps = newConfigPropsFromUserProps(props).getAdministration(); // Verify properties - var adminProps = properties.getAdministration(); - assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); - assertThat(adminProps.getAuthentication()).containsEntry("token", authToken); - assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr); + assertThat(adminProps.getAuthPluginClassName()).isEqualTo(this.authPluginClassName); + assertThat(adminProps.getAuthentication()).containsEntry("token", this.authToken); + assertThat(adminProps.getAuthParams()).isEqualTo(this.authParamsStr); // Verify customizer var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); @@ -342,78 +336,79 @@ public class PulsarPropertiesTests { @Test void emptyByDefault() { - assertThat(properties.getDefaults().getTypeMappings()).isEmpty(); + assertThat(new PulsarProperties().getDefaults().getTypeMappings()).isEmpty(); } @Test void withTopicsOnly() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].topic-name", "foo-topic"); props.put("spring.pulsar.defaults.type-mappings[1].message-type", String.class.getName()); props.put("spring.pulsar.defaults.type-mappings[1].topic-name", "string-topic"); - bind(props); - assertThat(properties.getDefaults().getTypeMappings()).containsExactly( + var configProps = newConfigPropsFromUserProps(props); + assertThat(configProps.getDefaults().getTypeMappings()).containsExactly( new TypeMapping(Foo.class, "foo-topic", null), new TypeMapping(String.class, "string-topic", null)); } @Test void withSchemaOnly() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON"); - bind(props); - assertThat(properties.getDefaults().getTypeMappings()) + var configProps = newConfigPropsFromUserProps(props); + assertThat(configProps.getDefaults().getTypeMappings()) .containsExactly(new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, null))); } @Test void withTopicAndSchema() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].topic-name", "foo-topic"); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON"); - bind(props); - assertThat(properties.getDefaults().getTypeMappings()) + var configProps = newConfigPropsFromUserProps(props); + assertThat(configProps.getDefaults().getTypeMappings()) .containsExactly(new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, null))); } @Test void withKeyValueSchema() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "KEY_VALUE"); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName()); - bind(props); - assertThat(properties.getDefaults().getTypeMappings()).containsExactly( + var configProps = newConfigPropsFromUserProps(props); + assertThat(configProps.getDefaults().getTypeMappings()).containsExactly( new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, String.class))); } @Test void schemaTypeRequired() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName()); - assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause() - .withMessageContaining("schemaType must not be null"); + assertThatExceptionOfType(BindException.class).isThrownBy(() -> newConfigPropsFromUserProps(props)) + .havingRootCause().withMessageContaining("schemaType must not be null"); } @Test void schemaTypeNoneNotAllowed() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "NONE"); - assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause() - .withMessageContaining("schemaType NONE not supported"); + assertThatExceptionOfType(BindException.class).isThrownBy(() -> newConfigPropsFromUserProps(props)) + .havingRootCause().withMessageContaining("schemaType NONE not supported"); } @Test void messageKeyTypeOnlyAllowedForKeyValueSchemaType() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON"); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName()); - assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause() + assertThatExceptionOfType(BindException.class).isThrownBy(() -> newConfigPropsFromUserProps(props)) + .havingRootCause() .withMessageContaining("messageKeyType can only be set when schemaType is KEY_VALUE"); } @@ -425,8 +420,10 @@ public class PulsarPropertiesTests { @Nested class ProducerPropertiesTests { - @Test - void producerProperties() { + private ProducerConfigProperties producerProps; + + @BeforeEach + void producerTestProps() { var props = new HashMap(); props.put("spring.pulsar.producer.topic-name", "my-topic"); props.put("spring.pulsar.producer.producer-name", "my-producer"); @@ -449,10 +446,11 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.producer.producer-access-mode", "exclusive"); props.put("spring.pulsar.producer.lazy-start=partitioned-producers", "true"); props.put("spring.pulsar.producer.properties[my-prop]", "my-prop-value"); + this.producerProps = newConfigPropsFromUserProps(props).getProducer(); + } - bind(props); - - var producerProps = properties.getProducer(); + @Test + void producerProperties() { assertThat(producerProps.getTopicName()).isEqualTo("my-topic"); assertThat(producerProps.getProducerName()).isEqualTo("my-producer"); assertThat(producerProps.getSendTimeout()).isEqualTo(Duration.ofMillis(2000)); @@ -476,13 +474,44 @@ public class PulsarPropertiesTests { assertThat(producerProps.getProperties()).containsExactly(entry("my-prop", "my-prop-value")); } + @SuppressWarnings({ "unchecked", "deprecation" }) + @Test + void toProducerCustomizer() { + var producerBuilder = mock(ProducerBuilder.class); + var customizer = this.producerProps.toProducerBuilderCustomizer(); + customizer.customize(producerBuilder); + then(producerBuilder).should().topic("my-topic"); + then(producerBuilder).should().producerName("my-producer"); + then(producerBuilder).should().sendTimeout(2_000, TimeUnit.MILLISECONDS); + then(producerBuilder).should().blockIfQueueFull(true); + then(producerBuilder).should().maxPendingMessages(3); + then(producerBuilder).should().maxPendingMessagesAcrossPartitions(4); + then(producerBuilder).should().messageRoutingMode(MessageRoutingMode.CustomPartition); + then(producerBuilder).should().hashingScheme(HashingScheme.Murmur3_32Hash); + then(producerBuilder).should().cryptoFailureAction(ProducerCryptoFailureAction.SEND); + then(producerBuilder).should().batchingMaxPublishDelay(5_000, TimeUnit.MILLISECONDS); + then(producerBuilder).should().roundRobinRouterBatchingPartitionSwitchFrequency(6); + then(producerBuilder).should().batchingMaxMessages(7); + then(producerBuilder).should().batchingMaxBytes(8); + then(producerBuilder).should().enableBatching(false); + then(producerBuilder).should().enableChunking(true); + then(producerBuilder).should().addEncryptionKey("my-key"); + then(producerBuilder).should().compressionType(CompressionType.LZ4); + then(producerBuilder).should().initialSequenceId(9); + then(producerBuilder).should().accessMode(ProducerAccessMode.Exclusive); + then(producerBuilder).should().enableLazyStartPartitionedProducers(true); + then(producerBuilder).should().properties(Map.of("my-prop", "my-prop-value")); + } + } @Nested class ConsumerPropertiesTests { - @Test - void consumerProperties() { + private ConsumerConfigProperties consumerProps; + + @BeforeEach + void consumerTestProps() { var props = new HashMap(); props.put("spring.pulsar.consumer.topics[0]", "my-topic"); props.put("spring.pulsar.consumer.topics-pattern", "my-pattern"); @@ -520,10 +549,11 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.consumer.auto-ack-oldest-chunked-message-on-queue-full", "false"); props.put("spring.pulsar.consumer.max-pending-chunked-message", "11"); props.put("spring.pulsar.consumer.expire-time-of-incomplete-chunked-message", "12s"); + this.consumerProps = newConfigPropsFromUserProps(props).getConsumer(); + } - bind(props); - - var consumerProps = properties.getConsumer(); + @Test + void consumerProperties() { assertThat(consumerProps.getTopics()).containsExactly("my-topic"); assertThat(consumerProps.getTopicsPattern().toString()).isEqualTo("my-pattern"); assertThat(consumerProps.getSubscriptionName()).isEqualTo("my-subscription"); @@ -545,7 +575,7 @@ public class PulsarPropertiesTests { assertThat(consumerProps.getSubscriptionInitialPosition()).isEqualTo(SubscriptionInitialPosition.Earliest); assertThat(consumerProps.getPatternAutoDiscoveryPeriod()).isEqualTo(9); assertThat(consumerProps.getRegexSubscriptionMode()).isEqualTo(RegexSubscriptionMode.AllTopics); - assertThat(consumerProps.getDeadLetterPolicy()).satisfies(dlp -> { + assertThat(consumerProps.getDeadLetterPolicy()).satisfies((dlp) -> { assertThat(dlp.getMaxRedeliverCount()).isEqualTo(4); assertThat(dlp.getRetryLetterTopic()).isEqualTo("my-retry-topic"); assertThat(dlp.getDeadLetterTopic()).isEqualTo("my-dlt-topic"); @@ -565,30 +595,86 @@ public class PulsarPropertiesTests { assertThat(consumerProps.getExpireTimeOfIncompleteChunkedMessage()).isEqualTo(Duration.ofMillis(12_000)); } + @SuppressWarnings("unchecked") + @Test + void toConsumerCustomizer() { + var consumerBuilder = mock(ConsumerBuilder.class); + var customizer = this.consumerProps.toConsumerBuilderCustomizer(); + customizer.customize(consumerBuilder); + then(consumerBuilder).should().topics(List.of("my-topic")); + var argCaptor = ArgumentCaptor.forClass(Pattern.class); + then(consumerBuilder).should().topicsPattern(argCaptor.capture()); + assertThat(argCaptor.getValue().pattern()).isEqualTo("my-pattern"); + then(consumerBuilder).should().subscriptionName("my-subscription"); + then(consumerBuilder).should().subscriptionType(SubscriptionType.Shared); + then(consumerBuilder).should().subscriptionProperties(Map.of("my-sub-prop", "my-sub-prop-value")); + then(consumerBuilder).should().subscriptionMode(SubscriptionMode.NonDurable); + then(consumerBuilder).should().receiverQueueSize(1); + then(consumerBuilder).should().acknowledgmentGroupTime(2_000, TimeUnit.MILLISECONDS); + then(consumerBuilder).should().negativeAckRedeliveryDelay(3_000, TimeUnit.MILLISECONDS); + then(consumerBuilder).should().maxTotalReceiverQueueSizeAcrossPartitions(5); + then(consumerBuilder).should().consumerName("my-consumer"); + then(consumerBuilder).should().ackTimeout(6_000, TimeUnit.MILLISECONDS); + then(consumerBuilder).should().ackTimeoutTickTime(7_000, TimeUnit.MILLISECONDS); + then(consumerBuilder).should().priorityLevel(8); + then(consumerBuilder).should().cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD); + then(consumerBuilder).should().properties(Map.of("my-prop", "my-prop-value")); + then(consumerBuilder).should().readCompacted(true); + then(consumerBuilder).should().subscriptionInitialPosition(SubscriptionInitialPosition.Earliest); + then(consumerBuilder).should().patternAutoDiscoveryPeriod(9); + then(consumerBuilder).should().subscriptionTopicsMode(RegexSubscriptionMode.AllTopics); + then(consumerBuilder).should() + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(4).retryLetterTopic("my-retry-topic") + .deadLetterTopic("my-dlt-topic").initialSubscriptionName("my-initial-subscription") + .build()); + then(consumerBuilder).should().enableRetry(true); + then(consumerBuilder).should().autoUpdatePartitions(false); + then(consumerBuilder).should().autoUpdatePartitionsInterval(10_000, TimeUnit.MILLISECONDS); + then(consumerBuilder).should().replicateSubscriptionState(true); + then(consumerBuilder).should().startMessageIdInclusive(); + then(consumerBuilder).should().enableBatchIndexAcknowledgment(true); + then(consumerBuilder).should().isAckReceiptEnabled(true); + then(consumerBuilder).should().poolMessages(true); + then(consumerBuilder).should().startPaused(true); + then(consumerBuilder).should().autoAckOldestChunkedMessageOnQueueFull(false); + then(consumerBuilder).should().maxPendingChunkedMessage(11); + then(consumerBuilder).should().expireTimeOfIncompleteChunkedMessage(12_000, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("unchecked") + @Test + void toConsumerCustomizerResetDoesNotIncludeHead() { + var consumerBuilder = mock(ConsumerBuilder.class); + this.consumerProps.setResetIncludeHead(false); + var customizer = this.consumerProps.toConsumerBuilderCustomizer(); + customizer.customize(consumerBuilder); + then(consumerBuilder).should(never()).startMessageIdInclusive(); + } + } @Nested class FunctionPropertiesTests { @Test - void functionProperties() { - Map props = new HashMap<>(); - bind(props); + void functionPropertiesWithDefaults() { + var props = new HashMap(); + var functionProps = newConfigPropsFromUserProps(props).getFunction(); + assertThat(functionProps.getFailFast()).isTrue(); + assertThat(functionProps.getPropagateFailures()).isTrue(); + assertThat(functionProps.getPropagateStopFailures()).isFalse(); + } - // check defaults - assertThat(properties.getFunction().getFailFast()).isTrue(); - assertThat(properties.getFunction().getPropagateFailures()).isTrue(); - assertThat(properties.getFunction().getPropagateStopFailures()).isFalse(); - - // set values and verify + @Test + void functionPropertiesWitValues() { + var props = new HashMap(); props.put("spring.pulsar.function.fail-fast", "false"); props.put("spring.pulsar.function.propagate-failures", "false"); props.put("spring.pulsar.function.propagate-stop-failures", "true"); - bind(props); - - assertThat(properties.getFunction().getFailFast()).isFalse(); - assertThat(properties.getFunction().getPropagateFailures()).isFalse(); - assertThat(properties.getFunction().getPropagateStopFailures()).isTrue(); + var functionProps = newConfigPropsFromUserProps(props).getFunction(); + assertThat(functionProps.getFailFast()).isFalse(); + assertThat(functionProps.getPropagateFailures()).isFalse(); + assertThat(functionProps.getPropagateStopFailures()).isTrue(); } } @@ -596,9 +682,11 @@ public class PulsarPropertiesTests { @Nested class ReaderPropertiesTests { + private PulsarProperties.Reader readerProps; + @BeforeEach - void bindProperties() { - Map props = new HashMap<>(); + void readerTestProps() { + var props = new HashMap(); props.put("spring.pulsar.reader.topic-names", "my-topic"); props.put("spring.pulsar.reader.receiver-queue-size", "100"); props.put("spring.pulsar.reader.reader-name", "my-reader"); @@ -606,44 +694,43 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.reader.subscription-role-prefix", "sub-role"); props.put("spring.pulsar.reader.read-compacted", "true"); props.put("spring.pulsar.reader.reset-include-head", "true"); - bind(props); + this.readerProps = newConfigPropsFromUserProps(props).getReader(); } @Test void readerProperties() { - var readerProps = properties.getReader(); - assertThat(readerProps.getTopicNames()).containsExactly("my-topic"); - assertThat(readerProps.getReceiverQueueSize()).isEqualTo(100); - assertThat(readerProps.getReaderName()).isEqualTo("my-reader"); - assertThat(readerProps.getSubscriptionName()).isEqualTo("my-subscription"); - assertThat(readerProps.getSubscriptionRolePrefix()).isEqualTo("sub-role"); - assertThat(readerProps.getReadCompacted()).isTrue(); - assertThat(readerProps.getResetIncludeHead()).isTrue(); + assertThat(this.readerProps.getTopicNames()).containsExactly("my-topic"); + assertThat(this.readerProps.getReceiverQueueSize()).isEqualTo(100); + assertThat(this.readerProps.getReaderName()).isEqualTo("my-reader"); + assertThat(this.readerProps.getSubscriptionName()).isEqualTo("my-subscription"); + assertThat(this.readerProps.getSubscriptionRolePrefix()).isEqualTo("sub-role"); + assertThat(this.readerProps.getReadCompacted()).isTrue(); + assertThat(this.readerProps.getResetIncludeHead()).isTrue(); } @SuppressWarnings("unchecked") @Test void toReaderCustomizer() { var readerBuilder = mock(ReaderBuilder.class); - var customizer = properties.getReader().toReaderBuilderCustomizer(); + var customizer = this.readerProps.toReaderBuilderCustomizer(); customizer.customize(readerBuilder); - verify(readerBuilder).topics(List.of("my-topic")); - verify(readerBuilder).receiverQueueSize(100); - verify(readerBuilder).readerName("my-reader"); - verify(readerBuilder).subscriptionName("my-subscription"); - verify(readerBuilder).subscriptionRolePrefix("sub-role"); - verify(readerBuilder).readCompacted(true); - verify(readerBuilder).startMessageIdInclusive(); + then(readerBuilder).should().topics(List.of("my-topic")); + then(readerBuilder).should().receiverQueueSize(100); + then(readerBuilder).should().readerName("my-reader"); + then(readerBuilder).should().subscriptionName("my-subscription"); + then(readerBuilder).should().subscriptionRolePrefix("sub-role"); + then(readerBuilder).should().readCompacted(true); + then(readerBuilder).should().startMessageIdInclusive(); } @SuppressWarnings("unchecked") @Test void toReaderCustomizerResetDoesNotIncludeHead() { - properties.getReader().setResetIncludeHead(false); + this.readerProps.setResetIncludeHead(false); var readerBuilder = mock(ReaderBuilder.class); - var customizer = properties.getReader().toReaderBuilderCustomizer(); + var customizer = this.readerProps.toReaderBuilderCustomizer(); customizer.customize(readerBuilder); - verify(readerBuilder, never()).startMessageIdInclusive(); + then(readerBuilder).should(never()).startMessageIdInclusive(); } }