diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index 8b11ea08..f2f4829c 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -23,7 +23,6 @@ import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -36,6 +35,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; @@ -57,7 +57,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.handler.annotation.Header; import org.springframework.pulsar.annotation.EnablePulsar; -import org.springframework.pulsar.config.PulsarClientFactoryBean; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; @@ -111,8 +111,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { } @Bean - public PulsarClientFactoryBean pulsarClientFactoryBean() { - return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl())); + public PulsarClient pulsarClient() throws PulsarClientException { + return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient(); } @Bean diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java index 1a5af4f9..e7be6bf7 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -19,6 +19,7 @@ package org.springframework.pulsar.autoconfigure; import java.util.Optional; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.springframework.beans.factory.ObjectProvider; @@ -30,14 +31,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; -import org.springframework.pulsar.config.PulsarClientFactoryBean; import org.springframework.pulsar.core.CachingPulsarProducerFactory; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultPulsarReaderFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; @@ -71,8 +73,21 @@ public class PulsarAutoConfiguration { @Bean @ConditionalOnMissingBean - public PulsarClientFactoryBean pulsarClientFactoryBean() { - return new PulsarClientFactoryBean(this.properties.buildClientProperties()); + public PulsarClientBuilderConfigurer pulsarClientBuilderConfigurer(PulsarProperties pulsarProperties, + ObjectProvider customizers) { + return new PulsarClientBuilderConfigurer(pulsarProperties, customizers.orderedStream().toList()); + } + + @Bean + @ConditionalOnMissingBean + public PulsarClient pulsarClient(PulsarClientBuilderConfigurer configurer) { + var clientFactory = new DefaultPulsarClientFactory(configurer::configure); + try { + return clientFactory.createClient(); + } + catch (PulsarClientException ex) { + throw new IllegalArgumentException("Failed to create client: " + ex.getMessage(), ex); + } } @Bean diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarClientBuilderConfigurer.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarClientBuilderConfigurer.java new file mode 100644 index 00000000..c6a10f5a --- /dev/null +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarClientBuilderConfigurer.java @@ -0,0 +1,172 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.autoconfigure; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; + +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.boot.util.LambdaSafe; +import org.springframework.pulsar.autoconfigure.PulsarProperties.Client; +import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; +import org.springframework.util.unit.DataSize; + +/** + * Configure Pulsar {@link ClientBuilder} with sensible defaults and apply a list of + * optional {@link PulsarClientBuilderCustomizer customizers}. + * + * @author Chris Bono + */ +public class PulsarClientBuilderConfigurer { + + private final PulsarProperties properties; + + private final List customizers; + + /** + * Creates a new configurer that will use the given properties for configuration. + * @param properties properties to use + * @param customizers list of customizers to apply or empty list if no customizers + */ + public PulsarClientBuilderConfigurer(PulsarProperties properties, List customizers) { + Assert.notNull(properties, "properties must not be null"); + Assert.notNull(customizers, "customizers must not be null"); + this.properties = properties; + this.customizers = customizers; + } + + /** + * Configure the specified {@link ClientBuilder}. The builder can be further tuned and + * default settings can be overridden. + * @param clientBuilder the {@link ClientBuilder} instance to configure + */ + public void configure(ClientBuilder clientBuilder) { + applyProperties(this.properties, clientBuilder); + applyCustomizers(this.customizers, clientBuilder); + } + + @SuppressWarnings("deprecation") + protected void applyProperties(PulsarProperties pulsarProperties, ClientBuilder clientBuilder) { + var map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + var clientProperties = pulsarProperties.getClient(); + map.from(clientProperties::getServiceUrl).to(clientBuilder::serviceUrl); + map.from(clientProperties::getListenerName).to(clientBuilder::listenerName); + map.from(clientProperties::getNumIoThreads).to(clientBuilder::ioThreads); + map.from(clientProperties::getNumListenerThreads).to(clientBuilder::listenerThreads); + map.from(clientProperties::getNumConnectionsPerBroker).to(clientBuilder::connectionsPerBroker); + map.from(clientProperties::getMaxConcurrentLookupRequest).to(clientBuilder::maxConcurrentLookupRequests); + map.from(clientProperties::getMaxLookupRequest).to(clientBuilder::maxLookupRequests); + map.from(clientProperties::getMaxLookupRedirects).to(clientBuilder::maxLookupRedirects); + map.from(clientProperties::getMaxNumberOfRejectedRequestPerConnection) + .to(clientBuilder::maxNumberOfRejectedRequestPerConnection); + map.from(clientProperties::getUseTcpNoDelay).to(clientBuilder::enableTcpNoDelay); + + // Authentication properties + applyAuthentication(clientProperties, clientBuilder); + + // TLS properties + map.from(clientProperties::getUseTls).to(clientBuilder::enableTls); + map.from(clientProperties::getTlsHostnameVerificationEnable).to(clientBuilder::enableTlsHostnameVerification); + map.from(clientProperties::getTlsTrustCertsFilePath).to(clientBuilder::tlsTrustCertsFilePath); + map.from(clientProperties::getTlsCertificateFilePath).to(clientBuilder::tlsCertificateFilePath); + map.from(clientProperties::getTlsKeyFilePath).to(clientBuilder::tlsKeyFilePath); + map.from(clientProperties::getTlsAllowInsecureConnection).to(clientBuilder::allowTlsInsecureConnection); + map.from(clientProperties::getUseKeyStoreTls).to(clientBuilder::useKeyStoreTls); + map.from(clientProperties::getSslProvider).to(clientBuilder::sslProvider); + map.from(clientProperties::getTlsTrustStoreType).to(clientBuilder::tlsTrustStoreType); + map.from(clientProperties::getTlsTrustStorePath).to(clientBuilder::tlsTrustStorePath); + map.from(clientProperties::getTlsTrustStorePassword).to(clientBuilder::tlsTrustStorePassword); + map.from(clientProperties::getTlsCiphers).to(clientBuilder::tlsCiphers); + map.from(clientProperties::getTlsProtocols).to(clientBuilder::tlsProtocols); + + map.from(clientProperties::getStatsInterval).as(Duration::toSeconds).to(clientBuilder, + (cb, val) -> cb.statsInterval(val, TimeUnit.SECONDS)); + map.from(clientProperties::getKeepAliveInterval).asInt(Duration::toMillis).to(clientBuilder, + (cb, val) -> cb.keepAliveInterval(val, TimeUnit.MILLISECONDS)); + map.from(clientProperties::getConnectionTimeout).asInt(Duration::toMillis).to(clientBuilder, + (cb, val) -> cb.connectionTimeout(val, TimeUnit.MILLISECONDS)); + map.from(clientProperties::getOperationTimeout).asInt(Duration::toMillis).to(clientBuilder, + (cb, val) -> cb.operationTimeout(val, TimeUnit.MILLISECONDS)); + map.from(clientProperties::getLookupTimeout).asInt(Duration::toMillis).to(clientBuilder, + (cb, val) -> cb.lookupTimeout(val, TimeUnit.MILLISECONDS)); + map.from(clientProperties::getInitialBackoffInterval).as(Duration::toMillis).to(clientBuilder, + (cb, val) -> cb.startingBackoffInterval(val, TimeUnit.MILLISECONDS)); + map.from(clientProperties::getMaxBackoffInterval).as(Duration::toMillis).to(clientBuilder, + (cb, val) -> cb.maxBackoffInterval(val, TimeUnit.MILLISECONDS)); + map.from(clientProperties::getEnableBusyWait).to(clientBuilder::enableBusyWait); + map.from(clientProperties::getMemoryLimit).as(DataSize::toBytes).to(clientBuilder, + (cb, val) -> cb.memoryLimit(val, SizeUnit.BYTES)); + map.from(clientProperties::getEnableTransaction).to(clientBuilder::enableTransaction); + map.from(clientProperties::getProxyServiceUrl) + .to((proxyUrl) -> clientBuilder.proxyServiceUrl(proxyUrl, clientProperties.getProxyProtocol())); + map.from(clientProperties::getDnsLookupBindAddress) + .to((bindAddr) -> clientBuilder.dnsLookupBind(bindAddr, clientProperties.getDnsLookupBindPort())); + map.from(clientProperties::getSocks5ProxyAddress).as(this::parseSocketAddress) + .to(clientBuilder::socks5ProxyAddress); + map.from(clientProperties::getSocks5ProxyUsername).to(clientBuilder::socks5ProxyUsername); + map.from(clientProperties::getSocks5ProxyPassword).to(clientBuilder::socks5ProxyPassword); + } + + private void applyAuthentication(Client clientProperties, ClientBuilder clientBuilder) { + if (StringUtils.hasText(clientProperties.getAuthParams()) + && !CollectionUtils.isEmpty(clientProperties.getAuthentication())) { + throw new IllegalArgumentException( + "Cannot set both spring.pulsar.client.authParams and spring.pulsar.client.authentication.*"); + } + var authPluginClass = clientProperties.getAuthPluginClassName(); + if (StringUtils.hasText(authPluginClass)) { + var authParams = clientProperties.getAuthParams(); + if (clientProperties.getAuthentication() != null) { + authParams = AuthParameterUtils.maybeConvertToEncodedParamString(clientProperties.getAuthentication()); + } + try { + clientBuilder.authentication(authPluginClass, authParams); + } + catch (PulsarClientException.UnsupportedAuthenticationException ex) { + throw new IllegalArgumentException("Unable to configure authentication: " + ex.getMessage(), ex); + } + } + } + + private InetSocketAddress parseSocketAddress(String address) { + try { + var uri = URI.create(address); + return new InetSocketAddress(uri.getHost(), uri.getPort()); + } + catch (Exception e) { + throw new IllegalArgumentException("Invalid address: " + e.getMessage(), e); + } + } + + protected void applyCustomizers(List clientBuilderCustomizers, + ClientBuilder clientBuilder) { + LambdaSafe.callbacks(PulsarClientBuilderCustomizer.class, clientBuilderCustomizers, clientBuilder) + .withLogger(PulsarClientBuilderConfigurer.class) + .invoke((customizer) -> customizer.customize(clientBuilder)); + } + +} diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java index b79b7f3a..8123c3dc 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java @@ -110,10 +110,6 @@ public class PulsarProperties { return new HashMap<>(this.consumer.buildProperties()); } - public Map buildClientProperties() { - return new HashMap<>(this.client.buildProperties()); - } - public Map buildProducerProperties() { return new HashMap<>(this.producer.buildProperties()); } @@ -773,67 +769,6 @@ public class PulsarProperties { this.socks5ProxyPassword = socks5ProxyPassword; } - public Map buildProperties() { - if (StringUtils.hasText(this.getAuthParams()) && !CollectionUtils.isEmpty(this.getAuthentication())) { - throw new IllegalArgumentException( - "Cannot set both spring.pulsar.client.authParams and spring.pulsar.client.authentication.*"); - } - - PulsarProperties.Properties properties = new Properties(); - - PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(this::getServiceUrl).to(properties.in("serviceUrl")); - map.from(this::getListenerName).to(properties.in("listenerName")); - map.from(this::getAuthPluginClassName).to(properties.in("authPluginClassName")); - map.from(this::getAuthParams).to(properties.in("authParams")); - map.from(this::getAuthentication).as(AuthParameterUtils::maybeConvertToEncodedParamString) - .to(properties.in("authParams")); - map.from(this::getOperationTimeout).as(Duration::toMillis).to(properties.in("operationTimeoutMs")); - map.from(this::getLookupTimeout).as(Duration::toMillis).to(properties.in("lookupTimeoutMs")); - map.from(this::getNumIoThreads).to(properties.in("numIoThreads")); - map.from(this::getNumListenerThreads).to(properties.in("numListenerThreads")); - map.from(this::getNumConnectionsPerBroker).to(properties.in("connectionsPerBroker")); - map.from(this::getUseTcpNoDelay).to(properties.in("useTcpNoDelay")); - map.from(this::getUseTls).to(properties.in("useTls")); - map.from(this::getTlsHostnameVerificationEnable).to(properties.in("tlsHostnameVerificationEnable")); - map.from(this::getTlsTrustCertsFilePath).to(properties.in("tlsTrustCertsFilePath")); - map.from(this::getTlsCertificateFilePath).to(properties.in("tlsCertificateFilePath")); - map.from(this::getTlsKeyFilePath).to(properties.in("tlsKeyFilePath")); - map.from(this::getTlsAllowInsecureConnection).to(properties.in("tlsAllowInsecureConnection")); - map.from(this::getUseKeyStoreTls).to(properties.in("useKeyStoreTls")); - map.from(this::getSslProvider).to(properties.in("sslProvider")); - map.from(this::getTlsTrustStoreType).to(properties.in("tlsTrustStoreType")); - map.from(this::getTlsTrustStorePath).to(properties.in("tlsTrustStorePath")); - map.from(this::getTlsTrustStorePassword).to(properties.in("tlsTrustStorePassword")); - map.from(this::getTlsCiphers).to(properties.in("tlsCiphers")); - map.from(this::getTlsProtocols).to(properties.in("tlsProtocols")); - map.from(this::getStatsInterval).as(Duration::toSeconds).to(properties.in("statsIntervalSeconds")); - map.from(this::getMaxConcurrentLookupRequest).to(properties.in("concurrentLookupRequest")); - map.from(this::getMaxLookupRequest).to(properties.in("maxLookupRequest")); - map.from(this::getMaxLookupRedirects).to(properties.in("maxLookupRedirects")); - map.from(this::getMaxNumberOfRejectedRequestPerConnection) - .to(properties.in("maxNumberOfRejectedRequestPerConnection")); - map.from(this::getKeepAliveInterval).asInt(Duration::toSeconds) - .to(properties.in("keepAliveIntervalSeconds")); - map.from(this::getConnectionTimeout).asInt(Duration::toMillis).to(properties.in("connectionTimeoutMs")); - map.from(this::getRequestTimeout).asInt(Duration::toMillis).to(properties.in("requestTimeoutMs")); - map.from(this::getInitialBackoffInterval).as(Duration::toNanos) - .to(properties.in("initialBackoffIntervalNanos")); - map.from(this::getMaxBackoffInterval).as(Duration::toNanos).to(properties.in("maxBackoffIntervalNanos")); - map.from(this::getEnableBusyWait).to(properties.in("enableBusyWait")); - map.from(this::getMemoryLimit).as(DataSize::toBytes).to(properties.in("memoryLimitBytes")); - map.from(this::getProxyServiceUrl).to(properties.in("proxyServiceUrl")); - map.from(this::getProxyProtocol).to(properties.in("proxyProtocol")); - map.from(this::getEnableTransaction).to(properties.in("enableTransaction")); - map.from(this::getDnsLookupBindAddress).to(properties.in("dnsLookupBindAddress")); - map.from(this::getDnsLookupBindPort).to(properties.in("dnsLookupBindPort")); - map.from(this::getSocks5ProxyAddress).to(properties.in("socks5ProxyAddress")); - map.from(this::getSocks5ProxyUsername).to(properties.in("socks5ProxyUsername")); - map.from(this::getSocks5ProxyPassword).to(properties.in("socks5ProxyPassword")); - - return properties; - } - } public static class Function { diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java index 1951e298..a69500ee 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java @@ -18,11 +18,16 @@ package org.springframework.pulsar.autoconfigure; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -46,7 +51,6 @@ import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.annotation.PulsarBootstrapConfiguration; import org.springframework.pulsar.annotation.PulsarListenerAnnotationBeanPostProcessor; import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; -import org.springframework.pulsar.config.PulsarClientFactoryBean; import org.springframework.pulsar.config.PulsarListenerContainerFactory; import org.springframework.pulsar.config.PulsarListenerEndpointRegistry; import org.springframework.pulsar.core.CachingPulsarProducerFactory; @@ -55,6 +59,7 @@ import org.springframework.pulsar.core.DefaultPulsarReaderFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.core.PulsarAdministration; +import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; import org.springframework.pulsar.core.PulsarConsumerFactory; import org.springframework.pulsar.core.PulsarProducerFactory; import org.springframework.pulsar.core.PulsarReaderFactory; @@ -105,8 +110,8 @@ class PulsarAutoConfigurationTests { @Test void defaultBeansAreAutoConfigured() { - this.contextRunner.run((context) -> assertThat(context).hasNotFailed() - .hasSingleBean(PulsarClientFactoryBean.class).hasSingleBean(PulsarProducerFactory.class) + this.contextRunner.run((context) -> assertThat(context).hasSingleBean(PulsarClientBuilderConfigurer.class) + .hasSingleBean(PulsarClient.class).hasSingleBean(PulsarProducerFactory.class) .hasSingleBean(PulsarTemplate.class).hasSingleBean(PulsarConsumerFactory.class) .hasSingleBean(ConcurrentPulsarListenerContainerFactory.class) .hasSingleBean(PulsarListenerAnnotationBeanPostProcessor.class) @@ -115,14 +120,19 @@ class PulsarAutoConfigurationTests { } @Test - void customPulsarClientFactoryBeanIsRespected() { - PulsarClientFactoryBean clientFactoryBean = new PulsarClientFactoryBean( - new PulsarProperties().buildClientProperties()); + void customPulsarClientBuilderConfigurerIsRespected() { + var customConfigurer = new PulsarClientBuilderConfigurer(new PulsarProperties(), Collections.emptyList()); this.contextRunner - .withBean("customPulsarClientFactoryBean", PulsarClientFactoryBean.class, () -> clientFactoryBean) - .run((context) -> assertThat(context) - .getBean("&customPulsarClientFactoryBean", PulsarClientFactoryBean.class) - .isSameAs(clientFactoryBean)); + .withBean("customPulsarClientConfigurer", PulsarClientBuilderConfigurer.class, () -> customConfigurer) + .run((context) -> assertThat(context).getBean(PulsarClientBuilderConfigurer.class) + .isSameAs(customConfigurer)); + } + + @Test + void customPulsarClientIsRespected() { + var customClient = mock(PulsarClient.class); + this.contextRunner.withBean("customPulsarClient", PulsarClient.class, () -> customClient) + .run((context) -> assertThat(context).getBean(PulsarClient.class).isSameAs(customClient)); } @Test @@ -358,15 +368,28 @@ class PulsarAutoConfigurationTests { class ClientAutoConfigurationTests { @Test - void authParamMapConvertedToEncodedParamString() { - contextRunner.withPropertyValues( - "spring.pulsar.client.auth-plugin-class-name=org.apache.pulsar.client.impl.auth.AuthenticationBasic", - "spring.pulsar.client.authentication.userId=username", - "spring.pulsar.client.authentication.password=topsecret") - .run((context -> assertThat(context).hasNotFailed().getBean(PulsarClientFactoryBean.class) - .extracting("config", InstanceOfAssertFactories.map(String.class, Object.class)) - .doesNotContainKey("authParamMap").doesNotContainKey("userId").doesNotContainKey("password") - .containsEntry("authParams", "{\"password\":\"topsecret\",\"userId\":\"username\"}"))); + void clientConfigurerWithNoUserDefinedCustomizers() { + contextRunner.run((context) -> { + assertThat(context).getBean(PulsarClientBuilderConfigurer.class) + .hasFieldOrPropertyWithValue("customizers", Collections.emptyList()); + }); + } + + @Test + void clientConfigurerWithUserDefinedCustomizers() { + contextRunner.withUserConfiguration(ClientCustomizersTestConfiguration.class) + .run((context) -> assertThat(context).getBean(PulsarClientBuilderConfigurer.class) + .extracting("customizers", InstanceOfAssertFactories.LIST) + .containsExactly(ClientCustomizersTestConfiguration.clientCustomizerBar, + ClientCustomizersTestConfiguration.clientCustomizerFoo)); + } + + @Test + void clientConfigurerIsApplied() { + var clientConfigurer = spy( + new PulsarClientBuilderConfigurer(new PulsarProperties(), Collections.emptyList())); + contextRunner.withBean("clientConfigurer", PulsarClientBuilderConfigurer.class, () -> clientConfigurer) + .run((context) -> verify(clientConfigurer).configure(any(ClientBuilder.class))); } } @@ -567,4 +590,24 @@ class PulsarAutoConfigurationTests { } + @Configuration(proxyBeanMethods = false) + static class ClientCustomizersTestConfiguration { + + static PulsarClientBuilderCustomizer clientCustomizerFoo = mock(PulsarClientBuilderCustomizer.class); + static PulsarClientBuilderCustomizer clientCustomizerBar = mock(PulsarClientBuilderCustomizer.class); + + @Bean + @Order(200) + PulsarClientBuilderCustomizer clientCustomizerFoo() { + return clientCustomizerFoo; + } + + @Bean + @Order(100) + PulsarClientBuilderCustomizer clientCustomizerBar() { + return clientCustomizerBar; + } + + } + } diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarClientBuilderConfigurerTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarClientBuilderConfigurerTests.java new file mode 100644 index 00000000..14f624d8 --- /dev/null +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarClientBuilderConfigurerTests.java @@ -0,0 +1,226 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.autoconfigure; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ProxyProtocol; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.api.SizeUnit; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.pulsar.core.PulsarClientBuilderCustomizer; +import org.springframework.util.unit.DataSize; + +/** + * Tests for {@link PulsarClientBuilderConfigurer}. + * + * @author Chris Bono + */ +public class PulsarClientBuilderConfigurerTests { + + @Test + void singleCustomizerIsApplied() { + var customizer = mock(PulsarClientBuilderCustomizer.class); + var configurer = new PulsarClientBuilderConfigurer(new PulsarProperties(), List.of(customizer)); + var clientBuilder = mock(ClientBuilder.class); + configurer.configure(clientBuilder); + verify(customizer).customize(clientBuilder); + } + + @Test + void multipleCustomizersAreAppliedInOrder() { + var customizer1 = mock(PulsarClientBuilderCustomizer.class); + var customizer2 = mock(PulsarClientBuilderCustomizer.class); + var configurer = new PulsarClientBuilderConfigurer(new PulsarProperties(), List.of(customizer2, customizer1)); + var clientBuilder = mock(ClientBuilder.class); + configurer.configure(clientBuilder); + InOrder inOrder = inOrder(customizer1, customizer2); + inOrder.verify(customizer2).customize(clientBuilder); + inOrder.verify(customizer1).customize(clientBuilder); + } + + @SuppressWarnings("deprecation") + @Test + void standardPropertiesAreApplied() { + var pulsarProps = new PulsarProperties(); + var clientProps = pulsarProps.getClient(); + clientProps.setServiceUrl("my-service-url"); + clientProps.setListenerName("my-listener"); + clientProps.setOperationTimeout(Duration.ofSeconds(1)); + clientProps.setLookupTimeout(Duration.ofSeconds(2)); + clientProps.setNumIoThreads(3); + clientProps.setNumListenerThreads(4); + clientProps.setNumConnectionsPerBroker(5); + clientProps.setUseTcpNoDelay(false); + clientProps.setUseTls(true); + clientProps.setTlsHostnameVerificationEnable(true); + clientProps.setTlsTrustCertsFilePath("my-trust-certs-file-path"); + clientProps.setTlsCertificateFilePath("my-certificate-file-path"); + clientProps.setTlsKeyFilePath("my-key-file-path"); + clientProps.setTlsAllowInsecureConnection(true); + clientProps.setUseKeyStoreTls(true); + clientProps.setSslProvider("my-ssl-provider"); + clientProps.setTlsTrustStoreType("my-trust-store-type"); + clientProps.setTlsTrustStorePath("my-trust-store-path"); + clientProps.setTlsTrustStorePassword("my-trust-store-password"); + clientProps.setTlsCiphers(Set.of("my-tls-cipher")); + clientProps.setTlsProtocols(Set.of("my-tls-protocol")); + clientProps.setStatsInterval(Duration.ofSeconds(6)); + clientProps.setMaxConcurrentLookupRequest(7); + clientProps.setMaxLookupRequest(8); + clientProps.setMaxLookupRedirects(9); + clientProps.setMaxNumberOfRejectedRequestPerConnection(10); + clientProps.setKeepAliveInterval(Duration.ofSeconds(11)); + clientProps.setConnectionTimeout(Duration.ofSeconds(12)); + clientProps.setInitialBackoffInterval(Duration.ofSeconds(13)); + clientProps.setMaxBackoffInterval(Duration.ofSeconds(14)); + clientProps.setEnableBusyWait(true); + clientProps.setMemoryLimit(DataSize.ofBytes(15)); + clientProps.setProxyServiceUrl("my-proxy-service-url"); + clientProps.setProxyProtocol(ProxyProtocol.SNI); + clientProps.setEnableTransaction(true); + clientProps.setDnsLookupBindAddress("my-dns-lookup-bind-address"); + clientProps.setDnsLookupBindPort(16); + clientProps.setSocks5ProxyAddress("socks5://my-socks5-proxy-address:5150"); + clientProps.setSocks5ProxyUsername("my-socks5-proxy-username"); + clientProps.setSocks5ProxyPassword("my-socks5-proxy-password"); + + var configurer = new PulsarClientBuilderConfigurer(pulsarProps, Collections.emptyList()); + var clientBuilder = mock(ClientBuilder.class); + configurer.configure(clientBuilder); + + verify(clientBuilder).serviceUrl(clientProps.getServiceUrl()); + verify(clientBuilder).listenerName("my-listener"); + verify(clientBuilder).operationTimeout(1000, TimeUnit.MILLISECONDS); + verify(clientBuilder).lookupTimeout(2000, TimeUnit.MILLISECONDS); + verify(clientBuilder).ioThreads(3); + verify(clientBuilder).listenerThreads(4); + verify(clientBuilder).connectionsPerBroker(5); + verify(clientBuilder).enableTcpNoDelay(false); + verify(clientBuilder).enableTls(true); + verify(clientBuilder).enableTlsHostnameVerification(true); + verify(clientBuilder).tlsTrustCertsFilePath("my-trust-certs-file-path"); + verify(clientBuilder).tlsCertificateFilePath("my-certificate-file-path"); + verify(clientBuilder).tlsKeyFilePath("my-key-file-path"); + verify(clientBuilder).allowTlsInsecureConnection(true); + verify(clientBuilder).useKeyStoreTls(true); + verify(clientBuilder).sslProvider("my-ssl-provider"); + verify(clientBuilder).tlsTrustStoreType("my-trust-store-type"); + verify(clientBuilder).tlsTrustStorePath("my-trust-store-path"); + verify(clientBuilder).tlsTrustStorePassword("my-trust-store-password"); + verify(clientBuilder).tlsCiphers(Set.of("my-tls-cipher")); + verify(clientBuilder).tlsProtocols(Set.of("my-tls-protocol")); + verify(clientBuilder).statsInterval(6, TimeUnit.SECONDS); + verify(clientBuilder).maxConcurrentLookupRequests(7); + verify(clientBuilder).maxLookupRequests(8); + verify(clientBuilder).maxLookupRedirects(9); + verify(clientBuilder).maxNumberOfRejectedRequestPerConnection(10); + verify(clientBuilder).keepAliveInterval(11000, TimeUnit.MILLISECONDS); + verify(clientBuilder).connectionTimeout(12000, TimeUnit.MILLISECONDS); + verify(clientBuilder).startingBackoffInterval(13000, TimeUnit.MILLISECONDS); + verify(clientBuilder).maxBackoffInterval(14000, TimeUnit.MILLISECONDS); + verify(clientBuilder).enableBusyWait(true); + verify(clientBuilder).memoryLimit(15, SizeUnit.BYTES); + verify(clientBuilder).proxyServiceUrl("my-proxy-service-url", ProxyProtocol.SNI); + verify(clientBuilder).enableTransaction(true); + verify(clientBuilder).dnsLookupBind("my-dns-lookup-bind-address", 16); + verify(clientBuilder).socks5ProxyAddress(new InetSocketAddress("my-socks5-proxy-address", 5150)); + verify(clientBuilder).socks5ProxyUsername("my-socks5-proxy-username"); + verify(clientBuilder).socks5ProxyPassword("my-socks5-proxy-password"); + } + + @Test + void customizerAppliedAfterProperties() { + var pulsarProps = new PulsarProperties(); + var clientProps = pulsarProps.getClient(); + clientProps.setServiceUrl("foo"); + + PulsarClientBuilderCustomizer customizer = (clientBuilder) -> clientBuilder.serviceUrl("bar"); + var configurer = new PulsarClientBuilderConfigurer(pulsarProps, List.of(customizer)); + var clientBuilder = mock(ClientBuilder.class); + configurer.configure(clientBuilder); + + InOrder inOrder = inOrder(clientBuilder); + inOrder.verify(clientBuilder).serviceUrl("foo"); + inOrder.verify(clientBuilder).serviceUrl("bar"); + } + + @Nested + class AuthenticationProperties { + + private final String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; + + private final String authParamsStr = "{\"token\":\"1234\"}"; + + private final String authToken = "1234"; + + @Test + void usingAuthParamsString() throws UnsupportedAuthenticationException { + var pulsarProps = new PulsarProperties(); + var clientProps = pulsarProps.getClient(); + clientProps.setAuthPluginClassName(authPluginClassName); + clientProps.setAuthParams(authParamsStr); + var configurer = new PulsarClientBuilderConfigurer(pulsarProps, Collections.emptyList()); + var clientBuilder = mock(ClientBuilder.class); + configurer.configure(clientBuilder); + verify(clientBuilder).authentication(authPluginClassName, authParamsStr); + } + + @Test + void usingAuthenticationMap() throws UnsupportedAuthenticationException { + var pulsarProps = new PulsarProperties(); + var clientProps = pulsarProps.getClient(); + clientProps.setAuthPluginClassName(authPluginClassName); + clientProps.setAuthentication(Map.of("token", authToken)); + var configurer = new PulsarClientBuilderConfigurer(pulsarProps, Collections.emptyList()); + var clientBuilder = mock(ClientBuilder.class); + configurer.configure(clientBuilder); + verify(clientBuilder).authentication(authPluginClassName, authParamsStr); + } + + @Test + void notAllowedToUseBothAuthParamsStringAndAuthenticationMap() { + var pulsarProps = new PulsarProperties(); + var clientProps = pulsarProps.getClient(); + clientProps.setAuthPluginClassName(authPluginClassName); + clientProps.setAuthParams(authParamsStr); + clientProps.setAuthentication(Map.of("token", authToken)); + var configurer = new PulsarClientBuilderConfigurer(pulsarProps, Collections.emptyList()); + var clientBuilder = mock(ClientBuilder.class); + assertThatIllegalArgumentException().isThrownBy(() -> configurer.configure(clientBuilder)) + .withMessageContaining( + "Cannot set both spring.pulsar.client.authParams and spring.pulsar.client.authentication.*"); + } + + } + +} diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java index ae0b1897..06eba955 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatRuntimeException; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -34,7 +35,6 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.ProxyProtocol; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; @@ -55,6 +55,7 @@ import org.springframework.boot.context.properties.source.ConfigurationPropertyS import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; import org.springframework.pulsar.autoconfigure.PulsarProperties.SchemaInfo; import org.springframework.pulsar.autoconfigure.PulsarProperties.TypeMapping; +import org.springframework.util.unit.DataSize; /** * Unit tests for {@link PulsarProperties}. @@ -83,7 +84,7 @@ public class PulsarPropertiesTests { @Test void clientProperties() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.client.service-url", "my-service-url"); props.put("spring.pulsar.client.listener-name", "my-listener"); props.put("spring.pulsar.client.operation-timeout", "1s"); @@ -127,86 +128,72 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.client.socks5-proxy-password", "my-socks5-proxy-password"); bind(props); - Map clientProps = properties.buildClientProperties(); - // Verify that the props can be loaded in a ClientBuilder - assertThatNoException().isThrownBy(() -> PulsarClient.builder().loadConf(clientProps)); - - assertThat(clientProps).containsEntry("serviceUrl", "my-service-url") - .containsEntry("listenerName", "my-listener").containsEntry("operationTimeoutMs", 1_000L) - .containsEntry("lookupTimeoutMs", 2_000L).containsEntry("numIoThreads", 3) - .containsEntry("numListenerThreads", 4).containsEntry("connectionsPerBroker", 5) - .containsEntry("useTcpNoDelay", false).containsEntry("useTls", true) - .containsEntry("tlsHostnameVerificationEnable", true) - .containsEntry("tlsTrustCertsFilePath", "my-trust-certs-file-path") - .containsEntry("tlsCertificateFilePath", "my-certificate-file-path") - .containsEntry("tlsKeyFilePath", "my-key-file-path") - .containsEntry("tlsAllowInsecureConnection", true).containsEntry("useKeyStoreTls", true) - .containsEntry("sslProvider", "my-ssl-provider") - .containsEntry("tlsTrustStoreType", "my-trust-store-type") - .containsEntry("tlsTrustStorePath", "my-trust-store-path") - .containsEntry("tlsTrustStorePassword", "my-trust-store-password") - .hasEntrySatisfying("tlsCiphers", - ciphers -> assertThat(ciphers) - .asInstanceOf(InstanceOfAssertFactories.collection(String.class)) - .containsExactly("my-tls-cipher")) - .hasEntrySatisfying("tlsProtocols", - protocols -> assertThat(protocols) - .asInstanceOf(InstanceOfAssertFactories.collection(String.class)) - .containsExactly("my-tls-protocol")) - .containsEntry("statsIntervalSeconds", 6L).containsEntry("concurrentLookupRequest", 7) - .containsEntry("maxLookupRequest", 8).containsEntry("maxLookupRedirects", 9) - .containsEntry("maxNumberOfRejectedRequestPerConnection", 10) - .containsEntry("keepAliveIntervalSeconds", 11).containsEntry("connectionTimeoutMs", 12_000) - .containsEntry("requestTimeoutMs", 13_000) - .containsEntry("initialBackoffIntervalNanos", 14_000_000_000L) - .containsEntry("maxBackoffIntervalNanos", 15_000_000_000L).containsEntry("enableBusyWait", true) - .containsEntry("memoryLimitBytes", 16L).containsEntry("proxyServiceUrl", "my-proxy-service-url") - .containsEntry("proxyProtocol", ProxyProtocol.SNI).containsEntry("enableTransaction", true) - .containsEntry("dnsLookupBindAddress", "my-dns-lookup-bind-address") - .containsEntry("dnsLookupBindPort", 17) - .containsEntry("socks5ProxyAddress", "my-socks5-proxy-address") - .containsEntry("socks5ProxyUsername", "my-socks5-proxy-username") - .containsEntry("socks5ProxyPassword", "my-socks5-proxy-password"); + PulsarProperties.Client clientProps = PulsarPropertiesTests.this.properties.getClient(); + assertThat(clientProps.getServiceUrl()).isEqualTo("my-service-url"); + assertThat(clientProps.getListenerName()).isEqualTo("my-listener"); + assertThat(clientProps.getOperationTimeout()).isEqualTo(Duration.ofMillis(1000)); + assertThat(clientProps.getLookupTimeout()).isEqualTo(Duration.ofMillis(2000)); + assertThat(clientProps.getNumIoThreads()).isEqualTo(3); + assertThat(clientProps.getNumListenerThreads()).isEqualTo(4); + assertThat(clientProps.getNumConnectionsPerBroker()).isEqualTo(5); + assertThat(clientProps.getUseTcpNoDelay()).isFalse(); + assertThat(clientProps.getUseTls()).isTrue(); + assertThat(clientProps.getTlsHostnameVerificationEnable()).isTrue(); + assertThat(clientProps.getTlsTrustCertsFilePath()).isEqualTo("my-trust-certs-file-path"); + assertThat(clientProps.getTlsCertificateFilePath()).isEqualTo("my-certificate-file-path"); + assertThat(clientProps.getTlsKeyFilePath()).isEqualTo("my-key-file-path"); + assertThat(clientProps.getTlsAllowInsecureConnection()).isTrue(); + assertThat(clientProps.getUseKeyStoreTls()).isTrue(); + assertThat(clientProps.getSslProvider()).isEqualTo("my-ssl-provider"); + assertThat(clientProps.getTlsTrustStoreType()).isEqualTo("my-trust-store-type"); + assertThat(clientProps.getTlsTrustStorePath()).isEqualTo("my-trust-store-path"); + assertThat(clientProps.getTlsTrustStorePassword()).isEqualTo("my-trust-store-password"); + assertThat(clientProps.getTlsCiphers()).containsExactly("my-tls-cipher"); + assertThat(clientProps.getTlsProtocols()).containsExactly("my-tls-protocol"); + assertThat(clientProps.getStatsInterval()).isEqualTo(Duration.ofSeconds(6)); + assertThat(clientProps.getMaxConcurrentLookupRequest()).isEqualTo(7); + assertThat(clientProps.getMaxLookupRequest()).isEqualTo(8); + assertThat(clientProps.getMaxLookupRedirects()).isEqualTo(9); + assertThat(clientProps.getMaxNumberOfRejectedRequestPerConnection()).isEqualTo(10); + assertThat(clientProps.getKeepAliveInterval()).isEqualTo(Duration.ofSeconds(11)); + assertThat(clientProps.getConnectionTimeout()).isEqualTo(Duration.ofMillis(12000)); + assertThat(clientProps.getRequestTimeout()).isEqualTo(Duration.ofMillis(13_000)); + assertThat(clientProps.getInitialBackoffInterval()).isEqualTo(Duration.ofMillis(14000)); + assertThat(clientProps.getMaxBackoffInterval()).isEqualTo(Duration.ofMillis(15000)); + assertThat(clientProps.getEnableBusyWait()).isTrue(); + assertThat(clientProps.getMemoryLimit()).isEqualTo(DataSize.ofBytes(16)); + assertThat(clientProps.getProxyServiceUrl()).isEqualTo("my-proxy-service-url"); + assertThat(clientProps.getProxyProtocol()).isEqualTo(ProxyProtocol.SNI); + assertThat(clientProps.getEnableTransaction()).isTrue(); + assertThat(clientProps.getDnsLookupBindAddress()).isEqualTo("my-dns-lookup-bind-address"); + assertThat(clientProps.getDnsLookupBindPort()).isEqualTo(17); + assertThat(clientProps.getSocks5ProxyAddress()).isEqualTo("my-socks5-proxy-address"); + assertThat(clientProps.getSocks5ProxyUsername()).isEqualTo("my-socks5-proxy-username"); + assertThat(clientProps.getSocks5ProxyPassword()).isEqualTo("my-socks5-proxy-password"); } @Test void authenticationUsingAuthParamsString() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.client.auth-plugin-class-name", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); props.put("spring.pulsar.client.auth-params", authParamsStr); bind(props); - assertThat(properties.getClient().getAuthParams()).isEqualTo(authParamsStr); - assertThat(properties.getClient().getAuthPluginClassName()).isEqualTo(authPluginClassName); - Map clientProps = properties.buildClientProperties(); - - assertThat(clientProps).containsEntry("authPluginClassName", authPluginClassName) - .containsEntry("authParams", authParamsStr); + var clientProps = PulsarPropertiesTests.this.properties.getClient(); + assertThat(clientProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); + assertThat(clientProps.getAuthParams()).isEqualTo(authParamsStr); } @Test void authenticationUsingAuthenticationMap() { - Map props = new HashMap<>(); + var props = new HashMap(); props.put("spring.pulsar.client.auth-plugin-class-name", authPluginClassName); props.put("spring.pulsar.client.authentication.token", authToken); bind(props); - assertThat(properties.getClient().getAuthentication()).containsEntry("token", authToken); - assertThat(properties.getClient().getAuthPluginClassName()).isEqualTo(authPluginClassName); - Map clientProps = properties.buildClientProperties(); - assertThat(clientProps).containsEntry("authPluginClassName", authPluginClassName) - .containsEntry("authParams", authParamsStr); - } - - @Test - void authenticationNotAllowedUsingBothAuthParamsStringAndAuthenticationMap() { - Map props = new HashMap<>(); - props.put("spring.pulsar.client.auth-plugin-class-name", authPluginClassName); - props.put("spring.pulsar.client.auth-params", authParamsStr); - props.put("spring.pulsar.client.authentication.token", authToken); - bind(props); - assertThatIllegalArgumentException().isThrownBy(properties::buildClientProperties).withMessageContaining( - "Cannot set both spring.pulsar.client.authParams and spring.pulsar.client.authentication.*"); + var clientProps = PulsarPropertiesTests.this.properties.getClient(); + assertThat(clientProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); + assertThat(clientProps.getAuthentication()).containsEntry("token", authToken); } } diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java index d1416a6c..9788f32c 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarReactiveAutoConfigurationTests.java @@ -21,7 +21,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import java.time.Duration; -import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -49,7 +48,6 @@ import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.pulsar.config.PulsarClientFactoryBean; import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory; @@ -261,20 +259,14 @@ class PulsarReactiveAutoConfigurationTests { @Test @SuppressWarnings("rawtypes") - void beansAreInjectedInReactivePulsarClient() throws Exception { - try (PulsarClient client = mock(PulsarClient.class)) { - PulsarClientFactoryBean factoryBean = new PulsarClientFactoryBean(Collections.emptyMap()) { - @Override - protected PulsarClient createInstance() { - return client; - } - }; - this.contextRunner.withBean("customPulsarClient", PulsarClientFactoryBean.class, () -> factoryBean) - .run((context -> assertThat(context).hasNotFailed().getBean(ReactivePulsarClient.class) - .extracting("reactivePulsarResourceAdapter") - .extracting("pulsarClientSupplier", InstanceOfAssertFactories.type(Supplier.class)) - .extracting(Supplier::get).isSameAs(client))); - } + void beansAreInjectedInReactivePulsarClient() { + this.contextRunner.run((context -> { + PulsarClient pulsarClient = context.getBean(PulsarClient.class); + assertThat(context).hasNotFailed().getBean(ReactivePulsarClient.class) + .extracting("reactivePulsarResourceAdapter") + .extracting("pulsarClientSupplier", InstanceOfAssertFactories.type(Supplier.class)) + .extracting(Supplier::get).isSameAs(pulsarClient); + })); } @Test diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarClientFactoryBean.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarClientFactoryBean.java deleted file mode 100644 index 823b1cff..00000000 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarClientFactoryBean.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2022-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.pulsar.config; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -import org.apache.pulsar.client.api.PulsarClient; - -import org.springframework.beans.factory.FactoryBean; -import org.springframework.beans.factory.config.AbstractFactoryBean; -import org.springframework.core.log.LogAccessor; -import org.springframework.lang.Nullable; - -/** - * {@link FactoryBean} implementation for the {@link PulsarClient}. - * - * @author Soby Chacko - * @author Chris Bono - */ -public class PulsarClientFactoryBean extends AbstractFactoryBean { - - private final LogAccessor logger = new LogAccessor(this.getClass()); - - private final Map config = new HashMap<>(); - - public PulsarClientFactoryBean(Map config) { - Objects.requireNonNull(config, "Config map cannot be null"); - this.config.putAll(config); - } - - @Override - public Class getObjectType() { - return PulsarClient.class; - } - - @Override - protected PulsarClient createInstance() throws Exception { - return PulsarClient.builder().loadConf(this.config).build(); - } - - @Override - protected void destroyInstance(@Nullable PulsarClient instance) throws Exception { - if (instance != null) { - this.logger.info(() -> "Closing client " + instance); - instance.close(); - } - } - -} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java new file mode 100644 index 00000000..b6eab137 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java @@ -0,0 +1,59 @@ +/* + * Copyright 2022-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; + +import org.springframework.util.Assert; + +/** + * Default implementation for {@link PulsarClientFactory}. + * + * @author Soby Chacko + * @author Chris Bono + */ +public class DefaultPulsarClientFactory implements PulsarClientFactory { + + private final PulsarClientBuilderCustomizer customizer; + + /** + * Construct a factory that creates clients using a default Pulsar client builder with + * no modifications other than the specified service url. + * @param serviceUrl the service url + */ + public DefaultPulsarClientFactory(String serviceUrl) { + this((clientBuilder -> clientBuilder.serviceUrl(serviceUrl))); + } + + /** + * Construct a factory that creates clients using a customized Pulsar client builder. + * @param customizer the customizer to apply to the builder + */ + public DefaultPulsarClientFactory(PulsarClientBuilderCustomizer customizer) { + Assert.notNull(customizer, "customizer must not be null"); + this.customizer = customizer; + } + + @Override + public PulsarClient createClient() throws PulsarClientException { + var clientBuilder = PulsarClient.builder(); + this.customizer.customize(clientBuilder); + return clientBuilder.build(); + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientBuilderCustomizer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientBuilderCustomizer.java new file mode 100644 index 00000000..77a6020d --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientBuilderCustomizer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import org.apache.pulsar.client.api.ClientBuilder; + +/** + * The interface to customize a {@link ClientBuilder}. + * + * @author Chris Bono + */ +@FunctionalInterface +public interface PulsarClientBuilderCustomizer { + + /** + * Customizes a {@link ClientBuilder}. + * @param clientBuilder the builder to customize + */ + void customize(ClientBuilder clientBuilder); + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java new file mode 100644 index 00000000..7d675bed --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarClientFactory.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; + +/** + * Pulsar client factory interface. + * + * @author Soby Chacko + * @author Chris Bono + */ +public interface PulsarClientFactory { + + /** + * Create a client. + * @return the created client instance + * @throws PulsarClientException if an error occurs creating the client + */ + PulsarClient createClient() throws PulsarClientException; + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java new file mode 100644 index 00000000..7d220b4d --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarClientFactoryTests.java @@ -0,0 +1,63 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.assertj.core.api.Assertions.assertThatRuntimeException; + +import org.apache.pulsar.client.api.PulsarClientException; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link DefaultPulsarClientFactory}. + * + * @author Chris Bono + */ +class DefaultPulsarClientFactoryTests { + + @Test + void constructWithServiceUrl() throws PulsarClientException { + var clientFactory = new DefaultPulsarClientFactory("pulsar://localhost:5150"); + assertThat(clientFactory.createClient()).hasFieldOrPropertyWithValue("conf.serviceUrl", + "pulsar://localhost:5150"); + } + + @Test + void constructWithCustomizer() throws PulsarClientException { + var clientFactory = new DefaultPulsarClientFactory( + (clientBuilder) -> clientBuilder.serviceUrl("pulsar://localhost:5150")); + assertThat(clientFactory.createClient()).hasFieldOrPropertyWithValue("conf.serviceUrl", + "pulsar://localhost:5150"); + } + + @Test + void constructWithNullCustomizer() { + assertThatIllegalArgumentException() + .isThrownBy(() -> new DefaultPulsarClientFactory((PulsarClientBuilderCustomizer) null)) + .withMessage("customizer must not be null"); + } + + @Test + void customizerThrowsException() { + var clientFactory = new DefaultPulsarClientFactory((clientBuilder) -> { + throw new RuntimeException("Who turned out the lights?"); + }); + assertThatRuntimeException().isThrownBy(clientFactory::createClient).withMessage("Who turned out the lights?"); + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index 01cf3011..106413db 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; @@ -61,10 +62,10 @@ import org.springframework.messaging.handler.annotation.Header; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; -import org.springframework.pulsar.config.PulsarClientFactoryBean; import org.springframework.pulsar.config.PulsarListenerContainerFactory; import org.springframework.pulsar.config.PulsarListenerEndpointRegistry; import org.springframework.pulsar.core.ConsumerBuilderCustomizer; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; @@ -109,8 +110,8 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { } @Bean - public PulsarClientFactoryBean pulsarClientFactoryBean() { - return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl())); + public PulsarClient pulsarClient() throws PulsarClientException { + return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient(); } @Bean diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java index 4806e3e5..cae76088 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java @@ -35,8 +35,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; -import org.springframework.pulsar.config.PulsarClientFactoryBean; import org.springframework.pulsar.config.PulsarListenerContainerFactory; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; @@ -135,8 +135,8 @@ public class ObservationIntegrationTests extends SampleTestRunner implements Pul } @Bean - public PulsarClientFactoryBean pulsarClientFactoryBean() { - return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl())); + public PulsarClient pulsarClient() throws PulsarClientException { + return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient(); } @Bean diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java index 6a442389..0182a23a 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -40,8 +39,8 @@ import org.springframework.lang.Nullable; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory; -import org.springframework.pulsar.config.PulsarClientFactoryBean; import org.springframework.pulsar.config.PulsarListenerContainerFactory; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarConsumerFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultSchemaResolver; @@ -175,8 +174,8 @@ public class ObservationTests implements PulsarTestContainerSupport { } @Bean - PulsarClientFactoryBean pulsarClientFactoryBean() { - return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl())); + public PulsarClient pulsarClient() throws PulsarClientException { + return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient(); } @Bean(name = "observationTestsTemplate") diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTests.java index a8475500..535ed90f 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/reader/PulsarReaderTests.java @@ -39,8 +39,8 @@ import org.springframework.context.annotation.Configuration; import org.springframework.pulsar.annotation.EnablePulsar; import org.springframework.pulsar.annotation.PulsarReader; import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory; -import org.springframework.pulsar.config.PulsarClientFactoryBean; import org.springframework.pulsar.config.PulsarReaderContainerFactory; +import org.springframework.pulsar.core.DefaultPulsarClientFactory; import org.springframework.pulsar.core.DefaultPulsarProducerFactory; import org.springframework.pulsar.core.DefaultPulsarReaderFactory; import org.springframework.pulsar.core.PulsarProducerFactory; @@ -78,8 +78,8 @@ public class PulsarReaderTests implements PulsarTestContainerSupport { } @Bean - public PulsarClientFactoryBean pulsarClientFactoryBean() { - return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl())); + public PulsarClient pulsarClient() throws PulsarClientException { + return new DefaultPulsarClientFactory(PulsarTestContainerSupport.getPulsarBrokerUrl()).createClient(); } @Bean