Use builder to autoconfigure PulsarAdministration (#401)

This commit is contained in:
Chris Bono
2023-05-06 21:03:01 -05:00
committed by GitHub
parent a88dc541c7
commit d38cc6cb8d
12 changed files with 187 additions and 175 deletions

View File

@@ -28,7 +28,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -137,8 +136,7 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
@Bean
PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(
PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()));
return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
}
@Bean

View File

@@ -157,7 +157,7 @@ public class PulsarAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(this.properties.buildAdminProperties());
return new PulsarAdministration(this.properties.getAdministration().toPulsarAdminBuilderCustomizer());
}
@Bean

View File

@@ -18,19 +18,22 @@ package org.springframework.pulsar.autoconfigure;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.util.CollectionUtils;
@@ -107,10 +110,6 @@ public class PulsarProperties {
return this.defaults;
}
public Map<String, Object> buildAdminProperties() {
return new HashMap<>(this.admin.buildProperties());
}
public static class Template {
/**
@@ -1157,38 +1156,52 @@ public class PulsarProperties {
this.autoCertRefreshTime = autoCertRefreshTime;
}
public Map<String, Object> buildProperties() {
public PulsarAdminBuilderCustomizer toPulsarAdminBuilderCustomizer() {
return (adminBuilder) -> {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getServiceUrl).to(adminBuilder::serviceHttpUrl);
applyAuthentication(adminBuilder);
map.from(this::getTlsTrustCertsFilePath).to(adminBuilder::tlsTrustCertsFilePath);
map.from(this::getTlsCertificateFilePath).to(adminBuilder::tlsCertificateFilePath);
map.from(this::getTlsKeyFilePath).to(adminBuilder::tlsKeyFilePath);
map.from(this::isTlsAllowInsecureConnection).to(adminBuilder::allowTlsInsecureConnection);
map.from(this::isTlsHostnameVerificationEnable).to(adminBuilder::enableTlsHostnameVerification);
map.from(this::isUseKeyStoreTls).to(adminBuilder::useKeyStoreTls);
map.from(this::getSslProvider).to(adminBuilder::sslProvider);
map.from(this::getTlsTrustStoreType).to(adminBuilder::tlsTrustStoreType);
map.from(this::getTlsTrustStorePath).to(adminBuilder::tlsTrustStorePath);
map.from(this::getTlsTrustStorePassword).to(adminBuilder::tlsTrustStorePassword);
map.from(this::getTlsCiphers).to(adminBuilder::tlsCiphers);
map.from(this::getTlsProtocols).to(adminBuilder::tlsProtocols);
map.from(this::getConnectionTimeout).asInt(Duration::toMillis).to(adminBuilder,
(ab, val) -> ab.connectionTimeout(val, TimeUnit.MILLISECONDS));
map.from(this::getReadTimeout).asInt(Duration::toMillis).to(adminBuilder,
(ab, val) -> ab.readTimeout(val, TimeUnit.MILLISECONDS));
map.from(this::getRequestTimeout).asInt(Duration::toMillis).to(adminBuilder,
(ab, val) -> ab.requestTimeout(val, TimeUnit.MILLISECONDS));
map.from(this::getAutoCertRefreshTime).asInt(Duration::toMillis).to(adminBuilder,
(ab, val) -> ab.autoCertRefreshTime(val, TimeUnit.MILLISECONDS));
};
}
private void applyAuthentication(PulsarAdminBuilder adminBuilder) {
if (StringUtils.hasText(this.getAuthParams()) && !CollectionUtils.isEmpty(this.getAuthentication())) {
throw new IllegalArgumentException(
"Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*");
}
PulsarProperties.Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getServiceUrl).to(properties.in("serviceUrl"));
map.from(this::getAuthPluginClassName).to(properties.in("authPluginClassName"));
map.from(this::getAuthParams).to(properties.in("authParams"));
map.from(this::getAuthentication).as(AuthParameterUtils::maybeConvertToEncodedParamString)
.to(properties.in("authParams"));
map.from(this::getTlsTrustCertsFilePath).to(properties.in("tlsTrustCertsFilePath"));
map.from(this::getTlsCertificateFilePath).to(properties.in("tlsCertificateFilePath"));
map.from(this::getTlsKeyFilePath).to(properties.in("tlsKeyFilePath"));
map.from(this::isTlsAllowInsecureConnection).to(properties.in("tlsAllowInsecureConnection"));
map.from(this::isTlsHostnameVerificationEnable).to(properties.in("tlsHostnameVerificationEnable"));
map.from(this::isUseKeyStoreTls).to(properties.in("useKeyStoreTls"));
map.from(this::getSslProvider).to(properties.in("sslProvider"));
map.from(this::getTlsTrustStoreType).to(properties.in("tlsTrustStoreType"));
map.from(this::getTlsTrustStorePath).to(properties.in("tlsTrustStorePath"));
map.from(this::getTlsTrustStorePassword).to(properties.in("tlsTrustStorePassword"));
map.from(this::getTlsCiphers).to(properties.in("tlsCiphers"));
map.from(this::getTlsProtocols).to(properties.in("tlsProtocols"));
map.from(this::getConnectionTimeout).asInt(Duration::toMillis).to(properties.in("connectionTimeoutMs"));
map.from(this::getReadTimeout).asInt(Duration::toMillis).to(properties.in("readTimeoutMs"));
map.from(this::getRequestTimeout).asInt(Duration::toMillis).to(properties.in("requestTimeoutMs"));
map.from(this::getAutoCertRefreshTime).asInt(Duration::toSeconds)
.to(properties.in("autoCertRefreshSeconds"));
return properties;
var authPluginClass = this.getAuthPluginClassName();
if (StringUtils.hasText(authPluginClass)) {
var authParams = this.getAuthParams();
if (this.getAuthentication() != null) {
authParams = AuthParameterUtils.maybeConvertToEncodedParamString(this.getAuthentication());
}
try {
adminBuilder.authentication(authPluginClass, authParams);
}
catch (PulsarClientException.UnsupportedAuthenticationException ex) {
throw new IllegalArgumentException("Unable to configure authentication: " + ex.getMessage(), ex);
}
}
}
}
@@ -1355,12 +1368,4 @@ public class PulsarProperties {
}
}
static class Properties extends HashMap<String, Object> {
<V> java.util.function.Consumer<V> in(String key) {
return (value) -> put(key, value);
}
}
}

View File

@@ -19,7 +19,6 @@ package org.springframework.pulsar.autoconfigure;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatRuntimeException;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -29,8 +28,10 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.HashingScheme;
@@ -38,13 +39,13 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@@ -228,63 +229,89 @@ public class PulsarPropertiesTests {
props.put("spring.pulsar.administration.tls-trust-store-password", "my-trust-store-password");
props.put("spring.pulsar.administration.tls-ciphers[0]", "my-tls-cipher");
props.put("spring.pulsar.administration.tls-protocols[0]", "my-tls-protocol");
bind(props);
Map<String, Object> adminProps = properties.buildAdminProperties();
// Verify that the props can NOT be loaded directly via a ClientBuilder due to
// the
// unknown readTimeout and autoCertRefreshTime properties
assertThatRuntimeException().isThrownBy(() -> PulsarAdmin.builder().loadConf(adminProps)).havingCause()
.withMessageContaining("Unrecognized field \"autoCertRefreshSeconds\"");
var adminProps = properties.getAdministration();
assertThat(adminProps).containsEntry("serviceUrl", "my-service-url")
.containsEntry("connectionTimeoutMs", 12_000).containsEntry("readTimeoutMs", 13_000)
.containsEntry("requestTimeoutMs", 14_000).containsEntry("autoCertRefreshSeconds", 15)
.containsEntry("tlsHostnameVerificationEnable", true)
.containsEntry("tlsTrustCertsFilePath", "my-trust-certs-file-path")
.containsEntry("tlsCertificateFilePath", "my-certificate-file-path")
.containsEntry("tlsKeyFilePath", "my-key-file-path")
.containsEntry("tlsAllowInsecureConnection", true).containsEntry("useKeyStoreTls", true)
.containsEntry("sslProvider", "my-ssl-provider")
.containsEntry("tlsTrustStoreType", "my-trust-store-type")
.containsEntry("tlsTrustStorePath", "my-trust-store-path")
.containsEntry("tlsTrustStorePassword", "my-trust-store-password")
.hasEntrySatisfying("tlsCiphers",
ciphers -> assertThat(ciphers)
.asInstanceOf(InstanceOfAssertFactories.collection(String.class))
.containsExactly("my-tls-cipher"))
.hasEntrySatisfying("tlsProtocols",
protocols -> assertThat(protocols)
.asInstanceOf(InstanceOfAssertFactories.collection(String.class))
.containsExactly("my-tls-protocol"));
// Verify properties
assertThat(adminProps.getServiceUrl()).isEqualTo("my-service-url");
assertThat(adminProps.getConnectionTimeout()).isEqualTo(Duration.ofMillis(12_000));
assertThat(adminProps.getReadTimeout()).isEqualTo(Duration.ofMillis(13_000));
assertThat(adminProps.getRequestTimeout()).isEqualTo(Duration.ofMillis(14_000));
assertThat(adminProps.getAutoCertRefreshTime()).isEqualTo(Duration.ofMillis(15_000));
assertThat(adminProps.isTlsHostnameVerificationEnable()).isTrue();
assertThat(adminProps.getTlsTrustCertsFilePath()).isEqualTo("my-trust-certs-file-path");
assertThat(adminProps.getTlsCertificateFilePath()).isEqualTo("my-certificate-file-path");
assertThat(adminProps.getTlsKeyFilePath()).isEqualTo("my-key-file-path");
assertThat(adminProps.isTlsAllowInsecureConnection()).isTrue();
assertThat(adminProps.isUseKeyStoreTls()).isTrue();
assertThat(adminProps.getSslProvider()).isEqualTo("my-ssl-provider");
assertThat(adminProps.getTlsTrustStoreType()).isEqualTo("my-trust-store-type");
assertThat(adminProps.getTlsTrustStorePath()).isEqualTo("my-trust-store-path");
assertThat(adminProps.getTlsTrustStorePassword()).isEqualTo("my-trust-store-password");
assertThat(adminProps.getTlsCiphers()).containsExactly("my-tls-cipher");
assertThat(adminProps.getTlsProtocols()).containsExactly("my-tls-protocol");
// Verify customizer
var adminBuilder = mock(PulsarAdminBuilder.class);
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
adminCustomizer.customize(adminBuilder);
verify(adminBuilder).serviceHttpUrl("my-service-url");
verify(adminBuilder).connectionTimeout(12_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).readTimeout(13_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).requestTimeout(14_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).autoCertRefreshTime(15_000, TimeUnit.MILLISECONDS);
verify(adminBuilder).enableTlsHostnameVerification(true);
verify(adminBuilder).tlsTrustCertsFilePath("my-trust-certs-file-path");
verify(adminBuilder).tlsCertificateFilePath("my-certificate-file-path");
verify(adminBuilder).tlsKeyFilePath("my-key-file-path");
verify(adminBuilder).allowTlsInsecureConnection(true);
verify(adminBuilder).useKeyStoreTls(true);
verify(adminBuilder).sslProvider("my-ssl-provider");
verify(adminBuilder).tlsTrustStoreType("my-trust-store-type");
verify(adminBuilder).tlsTrustStorePath("my-trust-store-path");
verify(adminBuilder).tlsTrustStorePassword("my-trust-store-password");
verify(adminBuilder).tlsCiphers(Set.of("my-tls-cipher"));
verify(adminBuilder).tlsProtocols(Set.of("my-tls-protocol"));
}
@Test
void authenticationUsingAuthParamsString() {
void authenticationUsingAuthParamsString() throws UnsupportedAuthenticationException {
Map<String, String> props = new HashMap<>();
props.put("spring.pulsar.administration.auth-plugin-class-name",
"org.apache.pulsar.client.impl.auth.AuthenticationToken");
props.put("spring.pulsar.administration.auth-params", authParamsStr);
bind(props);
assertThat(properties.getAdministration().getAuthParams()).isEqualTo(authParamsStr);
assertThat(properties.getAdministration().getAuthPluginClassName()).isEqualTo(authPluginClassName);
Map<String, Object> adminProps = properties.buildAdminProperties();
assertThat(adminProps).containsEntry("authPluginClassName", authPluginClassName).containsEntry("authParams",
authParamsStr);
// Verify properties
var adminProps = properties.getAdministration();
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr);
// Verify customizer
var adminBuilder = mock(PulsarAdminBuilder.class);
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
adminCustomizer.customize(adminBuilder);
verify(adminBuilder).authentication(authPluginClassName, authParamsStr);
}
@Test
void authenticationUsingAuthenticationMap() {
void authenticationUsingAuthenticationMap() throws UnsupportedAuthenticationException {
Map<String, String> props = new HashMap<>();
props.put("spring.pulsar.administration.auth-plugin-class-name", authPluginClassName);
props.put("spring.pulsar.administration.authentication.token", authToken);
bind(props);
assertThat(properties.getAdministration().getAuthentication()).containsEntry("token", authToken);
assertThat(properties.getAdministration().getAuthPluginClassName()).isEqualTo(authPluginClassName);
Map<String, Object> adminProps = properties.buildAdminProperties();
assertThat(adminProps).containsEntry("authPluginClassName", authPluginClassName).containsEntry("authParams",
authParamsStr);
// Verify properties
var adminProps = properties.getAdministration();
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(adminProps.getAuthentication()).containsEntry("token", authToken);
// Verify customizer
var adminBuilder = mock(PulsarAdminBuilder.class);
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
adminCustomizer.customize(adminBuilder);
verify(adminBuilder).authentication(authPluginClassName, authParamsStr);
}
@Test
@@ -294,8 +321,18 @@ public class PulsarPropertiesTests {
props.put("spring.pulsar.administration.auth-params", authParamsStr);
props.put("spring.pulsar.administration.authentication.token", authToken);
bind(props);
assertThatIllegalArgumentException().isThrownBy(properties::buildAdminProperties).withMessageContaining(
"Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*");
// Verify properties
var adminProps = properties.getAdministration();
assertThat(adminProps.getAuthPluginClassName()).isEqualTo(authPluginClassName);
assertThat(adminProps.getAuthentication()).containsEntry("token", authToken);
assertThat(adminProps.getAuthParams()).isEqualTo(authParamsStr);
// Verify customizer
var adminCustomizer = adminProps.toPulsarAdminBuilderCustomizer();
assertThatIllegalArgumentException()
.isThrownBy(() -> adminCustomizer.customize(mock(PulsarAdminBuilder.class))).withMessageContaining(
"Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*");
}
}

View File

@@ -19,7 +19,6 @@ package org.springframework.pulsar.spring.cloud.stream.binder;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -100,8 +99,7 @@ public class PulsarBinderTests extends
@Override
protected PulsarTestBinder getBinder() {
var pulsarAdministration = new PulsarAdministration(
Map.of("serviceUrl", PulsarTestContainerSupport.getHttpServiceUrl()));
var pulsarAdministration = new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
var configProps = new PulsarBinderConfigurationProperties();
var provisioner = new PulsarTopicProvisioner(pulsarAdministration, configProps);
var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient);

View File

@@ -37,7 +37,7 @@ public class DefaultPulsarClientFactory implements PulsarClientFactory {
* @param serviceUrl the service url
*/
public DefaultPulsarClientFactory(String serviceUrl) {
this((clientBuilder -> clientBuilder.serviceUrl(serviceUrl)));
this((clientBuilder) -> clientBuilder.serviceUrl(serviceUrl));
}
/**

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
/**
* The interface to customize a {@link PulsarAdminBuilder}.
*
* @author Chris Bono
*/
@FunctionalInterface
public interface PulsarAdminBuilderCustomizer {
/**
* Customizes a {@link PulsarAdminBuilder}.
* @param adminBuilder the builder to customize
*/
void customize(PulsarAdminBuilder adminBuilder);
}

View File

@@ -18,20 +18,16 @@ package org.springframework.pulsar.core;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
@@ -40,7 +36,6 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
* An administration class that delegates to {@link PulsarAdmin} to create and manage
@@ -55,28 +50,27 @@ public class PulsarAdministration
private final LogAccessor logger = new LogAccessor(this.getClass());
private final PulsarAdminBuilder adminBuilder;
@Nullable
private ApplicationContext applicationContext;
@Nullable
private final PulsarAdminBuilderCustomizer adminCustomizer;
/**
* Construct a {@code PulsarAdministration} instance using the given configuration for
* the underlying {@link PulsarAdmin}.
* @param adminConfig the {@link PulsarAdmin} configuration
* Construct a default instance using the specified service url.
* @param serviceHttpUrl the admin http service url
*/
public PulsarAdministration(Map<String, Object> adminConfig) {
this.adminBuilder = PulsarAdmin.builder();
loadConf(this.adminBuilder, adminConfig);
public PulsarAdministration(String serviceHttpUrl) {
this((adminBuilder) -> adminBuilder.serviceHttpUrl(serviceHttpUrl));
}
/**
* Construct a {@code PulsarAdministration} instance using the given builder for the
* underlying {@link PulsarAdmin}.
* @param adminBuilder the {@link PulsarAdminBuilder}
* Construct an instance with the specified customizations.
* @param adminCustomizer the customizer to apply to the builder or null to use the
* default admin builder without modifications
*/
public PulsarAdministration(PulsarAdminBuilder adminBuilder) {
this.adminBuilder = adminBuilder;
public PulsarAdministration(@Nullable PulsarAdminBuilderCustomizer adminCustomizer) {
this.adminCustomizer = adminCustomizer;
}
@Override
@@ -89,39 +83,6 @@ public class PulsarAdministration
this.applicationContext = applicationContext;
}
private void loadConf(PulsarAdminBuilder builder, Map<String, Object> adminConfig) {
var conf = new HashMap<>(adminConfig);
// Workaround the fact that the PulsarAdminImpl does not attempt to construct the
// timeout settings from the config props
if (conf.remove("connectionTimeoutMs") instanceof Integer connectTimeout) {
builder.connectionTimeout(connectTimeout, TimeUnit.MILLISECONDS);
}
if (conf.remove("readTimeoutMs") instanceof Integer readTimeout) {
builder.readTimeout(readTimeout, TimeUnit.MILLISECONDS);
}
if (conf.remove("requestTimeoutMs") instanceof Integer requestTimeout) {
builder.requestTimeout(requestTimeout, TimeUnit.MILLISECONDS);
}
if (conf.remove("autoCertRefreshSeconds") instanceof Integer autoCertRefreshTime) {
builder.autoCertRefreshTime(autoCertRefreshTime, TimeUnit.SECONDS);
}
builder.loadConf(conf);
// Workaround the fact that the PulsarAdminImpl does not attempt to construct the
// authentication from the config props
var authPluginClassName = (String) conf.get("authPluginClassName");
var authParams = (String) conf.get("authParams");
if (StringUtils.hasText(authPluginClassName) && StringUtils.hasText(authParams)) {
try {
builder.authentication(authPluginClassName, authParams);
}
catch (UnsupportedAuthenticationException ex) {
throw new RuntimeException("Unable to create admin auth: " + ex.getMessage(), ex);
}
}
}
private void initialize() {
var topics = Objects.requireNonNull(this.applicationContext, "Application context was not set")
.getBeansOfType(PulsarTopic.class, false, false).values();
@@ -129,7 +90,11 @@ public class PulsarAdministration
}
public PulsarAdmin createAdminClient() throws PulsarClientException {
return this.adminBuilder.build();
var adminBuilder = PulsarAdmin.builder();
if (this.adminCustomizer != null) {
this.adminCustomizer.customize(adminBuilder);
}
return adminBuilder.build();
}
@Override

View File

@@ -18,19 +18,13 @@ package org.springframework.pulsar.core;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.InstanceOfAssertFactories.type;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -63,19 +57,6 @@ public class PulsarAdministrationTests implements PulsarTestContainerSupport {
@Autowired
private PulsarAdministration pulsarAdministration;
@Test
void constructorRespectsAuthenticationProps() {
Map<String, Object> props = new HashMap<>();
props.put("authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationBasic");
props.put("authParams", "{\"userId\":\"foo\", \"password\":\"bar\"}");
PulsarAdministration admin = new PulsarAdministration(props);
assertThat(admin).extracting("adminBuilder").asInstanceOf(type(PulsarAdminBuilder.class)).extracting("conf")
.asInstanceOf(type(ClientConfigurationData.class))
.extracting(ClientConfigurationData::getAuthentication).isInstanceOf(AuthenticationBasic.class)
.hasFieldOrPropertyWithValue("userId", "foo").hasFieldOrPropertyWithValue("password", "bar");
}
private void assertThatTopicsExist(List<PulsarTopic> expected) throws PulsarAdminException {
assertThatTopicsExistIn(expected, NAMESPACE);
}
@@ -106,8 +87,7 @@ public class PulsarAdministrationTests implements PulsarTestContainerSupport {
@Bean
PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(
PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()));
return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
}
}

View File

@@ -29,7 +29,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
@@ -129,8 +128,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport {
@Bean
PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(
PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()));
return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
}
@Bean

View File

@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -159,8 +158,7 @@ public class ObservationIntegrationTests extends SampleTestRunner implements Pul
@Bean
PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(
PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()));
return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
}
@Bean

View File

@@ -25,7 +25,6 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -223,8 +222,7 @@ public class ObservationTests implements PulsarTestContainerSupport {
@Bean
PulsarAdministration pulsarAdministration() {
return new PulsarAdministration(
PulsarAdmin.builder().serviceHttpUrl(PulsarTestContainerSupport.getHttpServiceUrl()));
return new PulsarAdministration(PulsarTestContainerSupport.getHttpServiceUrl());
}
@Bean