From d38cc6cb8d1a1d12499e302cbee75c00211e3537 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sat, 6 May 2023 21:03:01 -0500 Subject: [PATCH] Use builder to autoconfigure PulsarAdministration (#401) --- .../listener/ReactivePulsarListenerTests.java | 4 +- .../PulsarAutoConfiguration.java | 2 +- .../autoconfigure/PulsarProperties.java | 87 ++++++------ .../autoconfigure/PulsarPropertiesTests.java | 125 ++++++++++++------ .../stream/binder/PulsarBinderTests.java | 4 +- .../core/DefaultPulsarClientFactory.java | 2 +- .../core/PulsarAdminBuilderCustomizer.java | 35 +++++ .../pulsar/core/PulsarAdministration.java | 69 +++------- .../core/PulsarAdministrationTests.java | 22 +-- .../pulsar/listener/PulsarListenerTests.java | 4 +- .../ObservationIntegrationTests.java | 4 +- .../pulsar/observation/ObservationTests.java | 4 +- 12 files changed, 187 insertions(+), 175 deletions(-) create mode 100644 spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdminBuilderCustomizer.java diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index a9d5b02c..1db5ec0f 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -28,7 +28,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -137,8 +136,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport { @Bean PulsarAdministration pulsarAdministration() { - return new PulsarAdministration( - PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl())); + return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); } @Bean diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java index 6111ce17..91c6085c 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -157,7 +157,7 @@ public class PulsarAutoConfiguration { @Bean @ConditionalOnMissingBean public PulsarAdministration pulsarAdministration() { - return new PulsarAdministration(this.properties.buildAdminProperties()); + return new PulsarAdministration(this.properties.getAdministration().toPulsarAdminBuilderCustomizer()); } @Bean diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java index c9decd8e..f4fbd4b5 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java @@ -18,19 +18,22 @@ package org.springframework.pulsar.autoconfigure; import java.time.Duration; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.ProxyProtocol; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.NestedConfigurationProperty; import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.lang.Nullable; +import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer; import org.springframework.pulsar.core.ReaderBuilderCustomizer; import org.springframework.pulsar.listener.AckMode; import org.springframework.util.CollectionUtils; @@ -107,10 +110,6 @@ public class PulsarProperties { return this.defaults; } - public Map buildAdminProperties() { - return new HashMap<>(this.admin.buildProperties()); - } - public static class Template { /** @@ -1157,38 +1156,52 @@ public class PulsarProperties { this.autoCertRefreshTime = autoCertRefreshTime; } - public Map buildProperties() { + public PulsarAdminBuilderCustomizer toPulsarAdminBuilderCustomizer() { + return (adminBuilder) -> { + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getServiceUrl).to(adminBuilder::serviceHttpUrl); + applyAuthentication(adminBuilder); + map.from(this::getTlsTrustCertsFilePath).to(adminBuilder::tlsTrustCertsFilePath); + map.from(this::getTlsCertificateFilePath).to(adminBuilder::tlsCertificateFilePath); + map.from(this::getTlsKeyFilePath).to(adminBuilder::tlsKeyFilePath); + map.from(this::isTlsAllowInsecureConnection).to(adminBuilder::allowTlsInsecureConnection); + map.from(this::isTlsHostnameVerificationEnable).to(adminBuilder::enableTlsHostnameVerification); + map.from(this::isUseKeyStoreTls).to(adminBuilder::useKeyStoreTls); + map.from(this::getSslProvider).to(adminBuilder::sslProvider); + map.from(this::getTlsTrustStoreType).to(adminBuilder::tlsTrustStoreType); + map.from(this::getTlsTrustStorePath).to(adminBuilder::tlsTrustStorePath); + map.from(this::getTlsTrustStorePassword).to(adminBuilder::tlsTrustStorePassword); + map.from(this::getTlsCiphers).to(adminBuilder::tlsCiphers); + map.from(this::getTlsProtocols).to(adminBuilder::tlsProtocols); + map.from(this::getConnectionTimeout).asInt(Duration::toMillis).to(adminBuilder, + (ab, val) -> ab.connectionTimeout(val, TimeUnit.MILLISECONDS)); + map.from(this::getReadTimeout).asInt(Duration::toMillis).to(adminBuilder, + (ab, val) -> ab.readTimeout(val, TimeUnit.MILLISECONDS)); + map.from(this::getRequestTimeout).asInt(Duration::toMillis).to(adminBuilder, + (ab, val) -> ab.requestTimeout(val, TimeUnit.MILLISECONDS)); + map.from(this::getAutoCertRefreshTime).asInt(Duration::toMillis).to(adminBuilder, + (ab, val) -> ab.autoCertRefreshTime(val, TimeUnit.MILLISECONDS)); + }; + } + + private void applyAuthentication(PulsarAdminBuilder adminBuilder) { if (StringUtils.hasText(this.getAuthParams()) && !CollectionUtils.isEmpty(this.getAuthentication())) { throw new IllegalArgumentException( "Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*"); } - PulsarProperties.Properties properties = new Properties(); - - PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(this::getServiceUrl).to(properties.in("serviceUrl")); - map.from(this::getAuthPluginClassName).to(properties.in("authPluginClassName")); - map.from(this::getAuthParams).to(properties.in("authParams")); - map.from(this::getAuthentication).as(AuthParameterUtils::maybeConvertToEncodedParamString) - .to(properties.in("authParams")); - map.from(this::getTlsTrustCertsFilePath).to(properties.in("tlsTrustCertsFilePath")); - map.from(this::getTlsCertificateFilePath).to(properties.in("tlsCertificateFilePath")); - map.from(this::getTlsKeyFilePath).to(properties.in("tlsKeyFilePath")); - map.from(this::isTlsAllowInsecureConnection).to(properties.in("tlsAllowInsecureConnection")); - map.from(this::isTlsHostnameVerificationEnable).to(properties.in("tlsHostnameVerificationEnable")); - map.from(this::isUseKeyStoreTls).to(properties.in("useKeyStoreTls")); - map.from(this::getSslProvider).to(properties.in("sslProvider")); - map.from(this::getTlsTrustStoreType).to(properties.in("tlsTrustStoreType")); - map.from(this::getTlsTrustStorePath).to(properties.in("tlsTrustStorePath")); - map.from(this::getTlsTrustStorePassword).to(properties.in("tlsTrustStorePassword")); - map.from(this::getTlsCiphers).to(properties.in("tlsCiphers")); - map.from(this::getTlsProtocols).to(properties.in("tlsProtocols")); - map.from(this::getConnectionTimeout).asInt(Duration::toMillis).to(properties.in("connectionTimeoutMs")); - map.from(this::getReadTimeout).asInt(Duration::toMillis).to(properties.in("readTimeoutMs")); - map.from(this::getRequestTimeout).asInt(Duration::toMillis).to(properties.in("requestTimeoutMs")); - map.from(this::getAutoCertRefreshTime).asInt(Duration::toSeconds) - .to(properties.in("autoCertRefreshSeconds")); - - return properties; + var authPluginClass = this.getAuthPluginClassName(); + if (StringUtils.hasText(authPluginClass)) { + var authParams = this.getAuthParams(); + if (this.getAuthentication() != null) { + authParams = AuthParameterUtils.maybeConvertToEncodedParamString(this.getAuthentication()); + } + try { + adminBuilder.authentication(authPluginClass, authParams); + } + catch (PulsarClientException.UnsupportedAuthenticationException ex) { + throw new IllegalArgumentException("Unable to configure authentication: " + ex.getMessage(), ex); + } + } } } @@ -1355,12 +1368,4 @@ public class PulsarProperties { } } - static class Properties extends HashMap { - - java.util.function.Consumer in(String key) { - return (value) -> put(key, value); - } - - } - } diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java index b1f56147..786e4179 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java @@ -19,7 +19,6 @@ package org.springframework.pulsar.autoconfigure; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; -import static org.assertj.core.api.Assertions.assertThatRuntimeException; import static org.assertj.core.api.Assertions.entry; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -29,8 +28,10 @@ import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.HashingScheme; @@ -38,13 +39,13 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; import org.apache.pulsar.client.api.ProxyProtocol; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.schema.SchemaType; -import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -228,63 +229,89 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.administration.tls-trust-store-password", "my-trust-store-password"); props.put("spring.pulsar.administration.tls-ciphers[0]", "my-tls-cipher"); props.put("spring.pulsar.administration.tls-protocols[0]", "my-tls-protocol"); - bind(props); - Map adminProps = properties.buildAdminProperties(); - // Verify that the props can NOT be loaded directly via a ClientBuilder due to - // the - // unknown readTimeout and autoCertRefreshTime properties - assertThatRuntimeException().isThrownBy(() -> PulsarAdmin.builder().loadConf(adminProps)).havingCause() - .withMessageContaining("Unrecognized field \"autoCertRefreshSeconds\""); + var adminProps = properties.getAdministration(); - assertThat(adminProps).containsEntry("serviceUrl", "my-service-url") - .containsEntry("connectionTimeoutMs", 12_000).containsEntry("readTimeoutMs", 13_000) - .containsEntry("requestTimeoutMs", 14_000).containsEntry("autoCertRefreshSeconds", 15) - .containsEntry("tlsHostnameVerificationEnable", true) - .containsEntry("tlsTrustCertsFilePath", "my-trust-certs-file-path") - .containsEntry("tlsCertificateFilePath", "my-certificate-file-path") - .containsEntry("tlsKeyFilePath", "my-key-file-path") - .containsEntry("tlsAllowInsecureConnection", true).containsEntry("useKeyStoreTls", true) - .containsEntry("sslProvider", "my-ssl-provider") - .containsEntry("tlsTrustStoreType", "my-trust-store-type") - .containsEntry("tlsTrustStorePath", "my-trust-store-path") - .containsEntry("tlsTrustStorePassword", "my-trust-store-password") - .hasEntrySatisfying("tlsCiphers", - ciphers -> assertThat(ciphers) - .asInstanceOf(InstanceOfAssertFactories.collection(String.class)) - .containsExactly("my-tls-cipher")) - .hasEntrySatisfying("tlsProtocols", - protocols -> assertThat(protocols) - .asInstanceOf(InstanceOfAssertFactories.collection(String.class)) - .containsExactly("my-tls-protocol")); + // Verify properties + assertThat(adminProps.getServiceUrl()).isEqualTo("my-service-url"); + assertThat(adminProps.getConnectionTimeout()).isEqualTo(Duration.ofMillis(12_000)); + assertThat(adminProps.getReadTimeout()).isEqualTo(Duration.ofMillis(13_000)); + assertThat(adminProps.getRequestTimeout()).isEqualTo(Duration.ofMillis(14_000)); + assertThat(adminProps.getAutoCertRefreshTime()).isEqualTo(Duration.ofMillis(15_000)); + assertThat(adminProps.isTlsHostnameVerificationEnable()).isTrue(); + assertThat(adminProps.getTlsTrustCertsFilePath()).isEqualTo("my-trust-certs-file-path"); + assertThat(adminProps.getTlsCertificateFilePath()).isEqualTo("my-certificate-file-path"); + assertThat(adminProps.getTlsKeyFilePath()).isEqualTo("my-key-file-path"); + assertThat(adminProps.isTlsAllowInsecureConnection()).isTrue(); + assertThat(adminProps.isUseKeyStoreTls()).isTrue(); + assertThat(adminProps.getSslProvider()).isEqualTo("my-ssl-provider"); + assertThat(adminProps.getTlsTrustStoreType()).isEqualTo("my-trust-store-type"); + assertThat(adminProps.getTlsTrustStorePath()).isEqualTo("my-trust-store-path"); + assertThat(adminProps.getTlsTrustStorePassword()).isEqualTo("my-trust-store-password"); + assertThat(adminProps.getTlsCiphers()).containsExactly("my-tls-cipher"); + assertThat(adminProps.getTlsProtocols()).containsExactly("my-tls-protocol"); + + // Verify customizer + var adminBuilder = mock(PulsarAdminBuilder.class); + var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); + adminCustomizer.customize(adminBuilder); + verify(adminBuilder).serviceHttpUrl("my-service-url"); + verify(adminBuilder).connectionTimeout(12_000, TimeUnit.MILLISECONDS); + verify(adminBuilder).readTimeout(13_000, TimeUnit.MILLISECONDS); + verify(adminBuilder).requestTimeout(14_000, TimeUnit.MILLISECONDS); + verify(adminBuilder).autoCertRefreshTime(15_000, TimeUnit.MILLISECONDS); + verify(adminBuilder).enableTlsHostnameVerification(true); + verify(adminBuilder).tlsTrustCertsFilePath("my-trust-certs-file-path"); + verify(adminBuilder).tlsCertificateFilePath("my-certificate-file-path"); + verify(adminBuilder).tlsKeyFilePath("my-key-file-path"); + verify(adminBuilder).allowTlsInsecureConnection(true); + verify(adminBuilder).useKeyStoreTls(true); + verify(adminBuilder).sslProvider("my-ssl-provider"); + verify(adminBuilder).tlsTrustStoreType("my-trust-store-type"); + verify(adminBuilder).tlsTrustStorePath("my-trust-store-path"); + verify(adminBuilder).tlsTrustStorePassword("my-trust-store-password"); + verify(adminBuilder).tlsCiphers(Set.of("my-tls-cipher")); + verify(adminBuilder).tlsProtocols(Set.of("my-tls-protocol")); } @Test - void authenticationUsingAuthParamsString() { + void authenticationUsingAuthParamsString() throws UnsupportedAuthenticationException { Map props = new HashMap<>(); props.put("spring.pulsar.administration.auth-plugin-class-name", "org.apache.pulsar.client.impl.auth.AuthenticationToken"); props.put("spring.pulsar.administration.auth-params", authParamsStr); bind(props); - assertThat(properties.getAdministration().getAuthParams()).isEqualTo(authParamsStr); - assertThat(properties.getAdministration().getAuthPluginClassName()).isEqualTo(authPluginClassName); - Map adminProps = properties.buildAdminProperties(); - assertThat(adminProps).containsEntry("authPluginClassName", authPluginClassName).containsEntry("authParams", - authParamsStr); + + // Verify properties + var adminProps = properties.getAdministration(); + assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); + assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr); + + // Verify customizer + var adminBuilder = mock(PulsarAdminBuilder.class); + var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); + adminCustomizer.customize(adminBuilder); + verify(adminBuilder).authentication(authPluginClassName, authParamsStr); } @Test - void authenticationUsingAuthenticationMap() { + void authenticationUsingAuthenticationMap() throws UnsupportedAuthenticationException { Map props = new HashMap<>(); props.put("spring.pulsar.administration.auth-plugin-class-name", authPluginClassName); props.put("spring.pulsar.administration.authentication.token", authToken); bind(props); - assertThat(properties.getAdministration().getAuthentication()).containsEntry("token", authToken); - assertThat(properties.getAdministration().getAuthPluginClassName()).isEqualTo(authPluginClassName); - Map adminProps = properties.buildAdminProperties(); - assertThat(adminProps).containsEntry("authPluginClassName", authPluginClassName).containsEntry("authParams", - authParamsStr); + + // Verify properties + var adminProps = properties.getAdministration(); + assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); + assertThat(adminProps.getAuthentication()).containsEntry("token", authToken); + + // Verify customizer + var adminBuilder = mock(PulsarAdminBuilder.class); + var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); + adminCustomizer.customize(adminBuilder); + verify(adminBuilder).authentication(authPluginClassName, authParamsStr); } @Test @@ -294,8 +321,18 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.administration.auth-params", authParamsStr); props.put("spring.pulsar.administration.authentication.token", authToken); bind(props); - assertThatIllegalArgumentException().isThrownBy(properties::buildAdminProperties).withMessageContaining( - "Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*"); + + // Verify properties + var adminProps = properties.getAdministration(); + assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName); + assertThat(adminProps.getAuthentication()).containsEntry("token", authToken); + assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr); + + // Verify customizer + var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer(); + assertThatIllegalArgumentException() + .isThrownBy(() -> adminCustomizer.customize(mock(PulsarAdminBuilder.class))).withMessageContaining( + "Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*"); } } diff --git a/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderTests.java b/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderTests.java index 8313daac..5b6bd56a 100644 --- a/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderTests.java +++ b/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderTests.java @@ -19,7 +19,6 @@ package org.springframework.pulsar.spring.cloud.stream.binder; import static org.assertj.core.api.Assertions.assertThat; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -100,8 +99,7 @@ public class PulsarBinderTests extends @Override protected PulsarTestBinder getBinder() { - var pulsarAdministration = new PulsarAdministration( - Map.of("serviceUrl", PulsarTestContainerSupport.getHttpServiceUrl())); + var pulsarAdministration = new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); var configProps = new PulsarBinderConfigurationProperties(); var provisioner = new PulsarTopicProvisioner(pulsarAdministration, configProps); var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java index b6eab137..77921700 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarClientFactory.java @@ -37,7 +37,7 @@ public class DefaultPulsarClientFactory implements PulsarClientFactory { * @param serviceUrl the service url */ public DefaultPulsarClientFactory(String serviceUrl) { - this((clientBuilder -> clientBuilder.serviceUrl(serviceUrl))); + this((clientBuilder) -> clientBuilder.serviceUrl(serviceUrl)); } /** diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdminBuilderCustomizer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdminBuilderCustomizer.java new file mode 100644 index 00000000..6315159e --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdminBuilderCustomizer.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import org.apache.pulsar.client.admin.PulsarAdminBuilder; + +/** + * The interface to customize a {@link PulsarAdminBuilder}. + * + * @author Chris Bono + */ +@FunctionalInterface +public interface PulsarAdminBuilderCustomizer { + + /** + * Customizes a {@link PulsarAdminBuilder}. + * @param adminBuilder the builder to customize + */ + void customize(PulsarAdminBuilder adminBuilder); + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java index 777096ce..2f7f65dc 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarAdministration.java @@ -18,20 +18,16 @@ package org.springframework.pulsar.core; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.springframework.beans.BeansException; import org.springframework.beans.factory.SmartInitializingSingleton; @@ -40,7 +36,6 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.util.CollectionUtils; -import org.springframework.util.StringUtils; /** * An administration class that delegates to {@link PulsarAdmin} to create and manage @@ -55,28 +50,27 @@ public class PulsarAdministration private final LogAccessor logger = new LogAccessor(this.getClass()); - private final PulsarAdminBuilder adminBuilder; - @Nullable private ApplicationContext applicationContext; + @Nullable + private final PulsarAdminBuilderCustomizer adminCustomizer; + /** - * Construct a {@code PulsarAdministration} instance using the given configuration for - * the underlying {@link PulsarAdmin}. - * @param adminConfig the {@link PulsarAdmin} configuration + * Construct a default instance using the specified service url. + * @param serviceHttpUrl the admin http service url */ - public PulsarAdministration(Map adminConfig) { - this.adminBuilder = PulsarAdmin.builder(); - loadConf(this.adminBuilder, adminConfig); + public PulsarAdministration(String serviceHttpUrl) { + this((adminBuilder) -> adminBuilder.serviceHttpUrl(serviceHttpUrl)); } /** - * Construct a {@code PulsarAdministration} instance using the given builder for the - * underlying {@link PulsarAdmin}. - * @param adminBuilder the {@link PulsarAdminBuilder} + * Construct an instance with the specified customizations. + * @param adminCustomizer the customizer to apply to the builder or null to use the + * default admin builder without modifications */ - public PulsarAdministration(PulsarAdminBuilder adminBuilder) { - this.adminBuilder = adminBuilder; + public PulsarAdministration(@Nullable PulsarAdminBuilderCustomizer adminCustomizer) { + this.adminCustomizer = adminCustomizer; } @Override @@ -89,39 +83,6 @@ public class PulsarAdministration this.applicationContext = applicationContext; } - private void loadConf(PulsarAdminBuilder builder, Map adminConfig) { - var conf = new HashMap<>(adminConfig); - - // Workaround the fact that the PulsarAdminImpl does not attempt to construct the - // timeout settings from the config props - if (conf.remove("connectionTimeoutMs") instanceof Integer connectTimeout) { - builder.connectionTimeout(connectTimeout, TimeUnit.MILLISECONDS); - } - if (conf.remove("readTimeoutMs") instanceof Integer readTimeout) { - builder.readTimeout(readTimeout, TimeUnit.MILLISECONDS); - } - if (conf.remove("requestTimeoutMs") instanceof Integer requestTimeout) { - builder.requestTimeout(requestTimeout, TimeUnit.MILLISECONDS); - } - if (conf.remove("autoCertRefreshSeconds") instanceof Integer autoCertRefreshTime) { - builder.autoCertRefreshTime(autoCertRefreshTime, TimeUnit.SECONDS); - } - builder.loadConf(conf); - - // Workaround the fact that the PulsarAdminImpl does not attempt to construct the - // authentication from the config props - var authPluginClassName = (String) conf.get("authPluginClassName"); - var authParams = (String) conf.get("authParams"); - if (StringUtils.hasText(authPluginClassName) && StringUtils.hasText(authParams)) { - try { - builder.authentication(authPluginClassName, authParams); - } - catch (UnsupportedAuthenticationException ex) { - throw new RuntimeException("Unable to create admin auth: " + ex.getMessage(), ex); - } - } - } - private void initialize() { var topics = Objects.requireNonNull(this.applicationContext, "Application context was not set") .getBeansOfType(PulsarTopic.class, false, false).values(); @@ -129,7 +90,11 @@ public class PulsarAdministration } public PulsarAdmin createAdminClient() throws PulsarClientException { - return this.adminBuilder.build(); + var adminBuilder = PulsarAdmin.builder(); + if (this.adminCustomizer != null) { + this.adminCustomizer.customize(adminBuilder); + } + return adminBuilder.build(); } @Override diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java index 0126f5bf..1598fd33 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarAdministrationTests.java @@ -18,19 +18,13 @@ package org.springframework.pulsar.core; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; -import static org.assertj.core.api.InstanceOfAssertFactories.type; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.auth.AuthenticationBasic; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -63,19 +57,6 @@ public class PulsarAdministrationTests implements PulsarTestContainerSupport { @Autowired private PulsarAdministration pulsarAdministration; - @Test - void constructorRespectsAuthenticationProps() { - Map props = new HashMap<>(); - props.put("authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationBasic"); - props.put("authParams", "{\"userId\":\"foo\", \"password\":\"bar\"}"); - PulsarAdministration admin = new PulsarAdministration(props); - - assertThat(admin).extracting("adminBuilder").asInstanceOf(type(PulsarAdminBuilder.class)).extracting("conf") - .asInstanceOf(type(ClientConfigurationData.class)) - .extracting(ClientConfigurationData::getAuthentication).isInstanceOf(AuthenticationBasic.class) - .hasFieldOrPropertyWithValue("userId", "foo").hasFieldOrPropertyWithValue("password", "bar"); - } - private void assertThatTopicsExist(List expected) throws PulsarAdminException { assertThatTopicsExistIn(expected, NAMESPACE); } @@ -106,8 +87,7 @@ public class PulsarAdministrationTests implements PulsarTestContainerSupport { @Bean PulsarAdministration pulsarAdministration() { - return new PulsarAdministration( - PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl())); + return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index 472d73f1..405e25a2 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -29,7 +29,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; @@ -129,8 +128,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @Bean PulsarAdministration pulsarAdministration() { - return new PulsarAdministration( - PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl())); + return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); } @Bean diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java index c74ed8fd..5cacd885 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationIntegrationTests.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -159,8 +158,7 @@ public class ObservationIntegrationTests extends SampleTestRunner implements Pul @Bean PulsarAdministration pulsarAdministration() { - return new PulsarAdministration( - PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl())); + return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); } @Bean diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java index dd19949f..a63aba68 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/observation/ObservationTests.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -223,8 +222,7 @@ public class ObservationTests implements PulsarTestContainerSupport { @Bean PulsarAdministration pulsarAdministration() { - return new PulsarAdministration( - PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl())); + return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl()); } @Bean