Remove PulsarClientConfiguration (#350)
* replace w/ map of config props
This commit is contained in:
@@ -57,7 +57,6 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.pulsar.annotation.EnablePulsar;
|
||||
import org.springframework.pulsar.config.PulsarClientConfiguration;
|
||||
import org.springframework.pulsar.config.PulsarClientFactoryBean;
|
||||
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
|
||||
import org.springframework.pulsar.core.DefaultSchemaResolver;
|
||||
@@ -112,13 +111,8 @@ public class ReactivePulsarListenerTests implements PulsarTestContainerSupport {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) {
|
||||
return new PulsarClientFactoryBean(pulsarClientConfiguration);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PulsarClientConfiguration pulsarClientConfiguration() {
|
||||
return new PulsarClientConfiguration(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean() {
|
||||
return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -30,7 +30,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.pulsar.config.PulsarClientConfiguration;
|
||||
import org.springframework.pulsar.config.PulsarClientFactoryBean;
|
||||
import org.springframework.pulsar.core.CachingPulsarProducerFactory;
|
||||
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
|
||||
@@ -75,14 +74,8 @@ public class PulsarAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public PulsarClientConfiguration pulsarClientConfiguration() {
|
||||
return new PulsarClientConfiguration(this.properties.buildClientProperties());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) {
|
||||
return new PulsarClientFactoryBean(pulsarClientConfiguration);
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean() {
|
||||
return new PulsarClientFactoryBean(this.properties.buildClientProperties());
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -44,7 +44,6 @@ import org.springframework.pulsar.annotation.EnablePulsar;
|
||||
import org.springframework.pulsar.annotation.PulsarBootstrapConfiguration;
|
||||
import org.springframework.pulsar.annotation.PulsarListenerAnnotationBeanPostProcessor;
|
||||
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.config.PulsarClientConfiguration;
|
||||
import org.springframework.pulsar.config.PulsarClientFactoryBean;
|
||||
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
|
||||
@@ -106,31 +105,19 @@ class PulsarAutoConfigurationTests {
|
||||
|
||||
@Test
|
||||
void defaultBeansAreAutoConfigured() {
|
||||
this.contextRunner
|
||||
.run((context) -> assertThat(context).hasNotFailed().hasSingleBean(PulsarClientConfiguration.class)
|
||||
.hasSingleBean(PulsarClientFactoryBean.class).hasSingleBean(PulsarProducerFactory.class)
|
||||
.hasSingleBean(PulsarTemplate.class).hasSingleBean(PulsarConsumerFactory.class)
|
||||
.hasSingleBean(ConcurrentPulsarListenerContainerFactory.class)
|
||||
.hasSingleBean(PulsarListenerAnnotationBeanPostProcessor.class)
|
||||
.hasSingleBean(PulsarListenerEndpointRegistry.class).hasSingleBean(PulsarAdministration.class)
|
||||
.hasSingleBean(DefaultSchemaResolver.class).hasSingleBean(DefaultTopicResolver.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void customPulsarClientConfigurationIsRespected() {
|
||||
PulsarClientConfiguration clientConfig = new PulsarClientConfiguration(
|
||||
new PulsarProperties().buildClientProperties());
|
||||
this.contextRunner
|
||||
.withBean("customPulsarClientConfiguration", PulsarClientConfiguration.class, () -> clientConfig)
|
||||
.run((context) -> assertThat(context).hasNotFailed().getBean(PulsarClientConfiguration.class)
|
||||
.isSameAs(clientConfig));
|
||||
this.contextRunner.run((context) -> assertThat(context).hasNotFailed()
|
||||
.hasSingleBean(PulsarClientFactoryBean.class).hasSingleBean(PulsarProducerFactory.class)
|
||||
.hasSingleBean(PulsarTemplate.class).hasSingleBean(PulsarConsumerFactory.class)
|
||||
.hasSingleBean(ConcurrentPulsarListenerContainerFactory.class)
|
||||
.hasSingleBean(PulsarListenerAnnotationBeanPostProcessor.class)
|
||||
.hasSingleBean(PulsarListenerEndpointRegistry.class).hasSingleBean(PulsarAdministration.class)
|
||||
.hasSingleBean(DefaultSchemaResolver.class).hasSingleBean(DefaultTopicResolver.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void customPulsarClientFactoryBeanIsRespected() {
|
||||
PulsarClientConfiguration clientConfig = new PulsarClientConfiguration(
|
||||
PulsarClientFactoryBean clientFactoryBean = new PulsarClientFactoryBean(
|
||||
new PulsarProperties().buildClientProperties());
|
||||
PulsarClientFactoryBean clientFactoryBean = new PulsarClientFactoryBean(clientConfig);
|
||||
this.contextRunner
|
||||
.withBean("customPulsarClientFactoryBean", PulsarClientFactoryBean.class, () -> clientFactoryBean)
|
||||
.run((context) -> assertThat(context)
|
||||
@@ -323,8 +310,8 @@ class PulsarAutoConfigurationTests {
|
||||
"spring.pulsar.client.auth-plugin-class-name=org.apache.pulsar.client.impl.auth.AuthenticationBasic",
|
||||
"spring.pulsar.client.authentication.userId=username",
|
||||
"spring.pulsar.client.authentication.password=topsecret")
|
||||
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarClientConfiguration.class)
|
||||
.extracting("configs", InstanceOfAssertFactories.map(String.class, Object.class))
|
||||
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarClientFactoryBean.class)
|
||||
.extracting("config", InstanceOfAssertFactories.map(String.class, Object.class))
|
||||
.doesNotContainKey("authParamMap").doesNotContainKey("userId").doesNotContainKey("password")
|
||||
.containsEntry("authParams", "{\"password\":\"topsecret\",\"userId\":\"username\"}")));
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@@ -262,7 +263,7 @@ class PulsarReactiveAutoConfigurationTests {
|
||||
@SuppressWarnings("rawtypes")
|
||||
void beansAreInjectedInReactivePulsarClient() throws Exception {
|
||||
try (PulsarClient client = mock(PulsarClient.class)) {
|
||||
PulsarClientFactoryBean factoryBean = new PulsarClientFactoryBean(null) {
|
||||
PulsarClientFactoryBean factoryBean = new PulsarClientFactoryBean(Collections.emptyMap()) {
|
||||
@Override
|
||||
protected PulsarClient createInstance() {
|
||||
return client;
|
||||
@@ -318,7 +319,7 @@ class PulsarReactiveAutoConfigurationTests {
|
||||
ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache();
|
||||
try (MockedStatic<AdaptedReactivePulsarClientFactory> mockedClientFactory = Mockito
|
||||
.mockStatic(AdaptedReactivePulsarClientFactory.class)) {
|
||||
mockedClientFactory.when(() -> AdaptedReactivePulsarClientFactory.createCache()).thenReturn(cache);
|
||||
mockedClientFactory.when(AdaptedReactivePulsarClientFactory::createCache).thenReturn(cache);
|
||||
mockedClientFactory.when(() -> AdaptedReactivePulsarClientFactory.create(any(PulsarClient.class)))
|
||||
.thenReturn(mock(ReactivePulsarClient.class));
|
||||
contextRunner.withClassLoader(new FilteredClassLoader(CaffeineProducerCacheProvider.class))
|
||||
@@ -332,10 +333,9 @@ class PulsarReactiveAutoConfigurationTests {
|
||||
|
||||
@Test
|
||||
void cacheCanBeDisabled() {
|
||||
contextRunner.withPropertyValues("spring.pulsar.reactive.sender.cache.enabled=false").run((context -> {
|
||||
assertThat(context).hasNotFailed().doesNotHaveBean(ProducerCacheProvider.class)
|
||||
.doesNotHaveBean(ReactiveMessageSenderCache.class);
|
||||
}));
|
||||
contextRunner.withPropertyValues("spring.pulsar.reactive.sender.cache.enabled=false")
|
||||
.run((context -> assertThat(context).hasNotFailed().doesNotHaveBean(ProducerCacheProvider.class)
|
||||
.doesNotHaveBean(ReactiveMessageSenderCache.class)));
|
||||
}
|
||||
|
||||
private AbstractObjectAssert<?, ProducerCacheProvider> assertCaffeineProducerCacheProvider(
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.pulsar.config;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Configuration for the Pulsar client.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Chris Bono
|
||||
*/
|
||||
public class PulsarClientConfiguration {
|
||||
|
||||
private final Map<String, Object> configs = new HashMap<>();
|
||||
|
||||
public PulsarClientConfiguration(Map<String, Object> configs) {
|
||||
Objects.requireNonNull(configs, "Configuration map cannot be null");
|
||||
this.configs.putAll(configs);
|
||||
}
|
||||
|
||||
public Map<String, Object> getConfigs() {
|
||||
return this.configs;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,6 +16,10 @@
|
||||
|
||||
package org.springframework.pulsar.config;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
|
||||
import org.springframework.beans.factory.FactoryBean;
|
||||
@@ -33,10 +37,11 @@ public class PulsarClientFactoryBean extends AbstractFactoryBean<PulsarClient> {
|
||||
|
||||
private final LogAccessor logger = new LogAccessor(this.getClass());
|
||||
|
||||
private final PulsarClientConfiguration pulsarClientConfiguration;
|
||||
private final Map<String, Object> config = new HashMap<>();
|
||||
|
||||
public PulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) {
|
||||
this.pulsarClientConfiguration = pulsarClientConfiguration;
|
||||
public PulsarClientFactoryBean(Map<String, Object> config) {
|
||||
Objects.requireNonNull(config, "Config map cannot be null");
|
||||
this.config.putAll(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -46,7 +51,7 @@ public class PulsarClientFactoryBean extends AbstractFactoryBean<PulsarClient> {
|
||||
|
||||
@Override
|
||||
protected PulsarClient createInstance() throws Exception {
|
||||
return PulsarClient.builder().loadConf(this.pulsarClientConfiguration.getConfigs()).build();
|
||||
return PulsarClient.builder().loadConf(this.config).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -60,7 +60,6 @@ import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.pulsar.annotation.EnablePulsar;
|
||||
import org.springframework.pulsar.annotation.PulsarListener;
|
||||
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.config.PulsarClientConfiguration;
|
||||
import org.springframework.pulsar.config.PulsarClientFactoryBean;
|
||||
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.config.PulsarListenerEndpointRegistry;
|
||||
@@ -108,13 +107,8 @@ public class PulsarListenerTests implements PulsarTestContainerSupport {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) {
|
||||
return new PulsarClientFactoryBean(pulsarClientConfiguration);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PulsarClientConfiguration pulsarClientConfiguration() {
|
||||
return new PulsarClientConfiguration(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean() {
|
||||
return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -35,7 +35,6 @@ import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.pulsar.annotation.EnablePulsar;
|
||||
import org.springframework.pulsar.annotation.PulsarListener;
|
||||
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.config.PulsarClientConfiguration;
|
||||
import org.springframework.pulsar.config.PulsarClientFactoryBean;
|
||||
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
|
||||
@@ -136,13 +135,8 @@ public class ObservationIntegrationTests extends SampleTestRunner implements Pul
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) {
|
||||
return new PulsarClientFactoryBean(pulsarClientConfiguration);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PulsarClientConfiguration pulsarClientConfiguration() {
|
||||
return new PulsarClientConfiguration(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
public PulsarClientFactoryBean pulsarClientFactoryBean() {
|
||||
return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -40,7 +40,6 @@ import org.springframework.lang.Nullable;
|
||||
import org.springframework.pulsar.annotation.EnablePulsar;
|
||||
import org.springframework.pulsar.annotation.PulsarListener;
|
||||
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.config.PulsarClientConfiguration;
|
||||
import org.springframework.pulsar.config.PulsarClientFactoryBean;
|
||||
import org.springframework.pulsar.config.PulsarListenerContainerFactory;
|
||||
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
|
||||
@@ -176,13 +175,8 @@ public class ObservationTests implements PulsarTestContainerSupport {
|
||||
}
|
||||
|
||||
@Bean
|
||||
PulsarClientFactoryBean pulsarClientFactoryBean(PulsarClientConfiguration pulsarClientConfiguration) {
|
||||
return new PulsarClientFactoryBean(pulsarClientConfiguration);
|
||||
}
|
||||
|
||||
@Bean
|
||||
PulsarClientConfiguration pulsarClientConfiguration() {
|
||||
return new PulsarClientConfiguration(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
PulsarClientFactoryBean pulsarClientFactoryBean() {
|
||||
return new PulsarClientFactoryBean(Map.of("serviceUrl", PulsarTestContainerSupport.getPulsarBrokerUrl()));
|
||||
}
|
||||
|
||||
@Bean(name = "observationTestsTemplate")
|
||||
|
||||
Reference in New Issue
Block a user