Add cache provider / shade caffeine (#386)

This commit is contained in:
Chris Bono
2023-03-27 19:13:05 -05:00
committed by GitHub
parent 634f60df96
commit 18eac19012
18 changed files with 380 additions and 101 deletions

View File

@@ -1,61 +0,0 @@
apply plugin: 'maven-publish'
apply plugin: 'com.jfrog.artifactory'
publishing {
publications {
mavenJava(MavenPublication) {
pom {
afterEvaluate {
name = project.description
description = project.description
}
url = linkScmUrl
// organization {
// name = 'Spring IO'
// url = 'https://spring.io/projects/spring-pulsar'
// }
licenses {
license {
name = 'Apache License, Version 2.0'
url = 'https://www.apache.org/licenses/LICENSE-2.0.txt'
distribution = 'repo'
}
}
scm {
url = linkScmUrl
connection = linkScmConnection
developerConnection = linkScmDevConnection
}
developers {
developer {
id = "schacko"
name = "Soby Chacko"
email = "chackos@vmware.com"
}
developer {
id = "onobc"
name = "Chris Bono"
email = "cbono@vmware.com"
}
}
issueManagement {
system = 'GitHub'
url = linkIssue
}
}
versionMapping {
usage('java-api') {
fromResolutionResult()
}
usage('java-runtime') {
fromResolutionResult()
}
}
}
}
}
artifactoryPublish {
dependsOn build
publications(publishing.publications.mavenJava)
}

View File

@@ -24,6 +24,8 @@ settings.gradle.projectsLoaded {
rootProject.name = 'spring-pulsar-dist' rootProject.name = 'spring-pulsar-dist'
include 'spring-pulsar' include 'spring-pulsar'
include 'spring-pulsar-cache-provider'
include 'spring-pulsar-cache-provider-caffeine'
include 'spring-pulsar-reactive' include 'spring-pulsar-reactive'
include 'spring-pulsar-dependencies' include 'spring-pulsar-dependencies'
include 'spring-pulsar-spring-boot-autoconfigure' include 'spring-pulsar-spring-boot-autoconfigure'

View File

@@ -0,0 +1,59 @@
plugins {
id 'org.springframework.pulsar.spring-module'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}
description = 'Spring Pulsar Caffeine Cache Provider'
dependencies {
api project (':spring-pulsar-cache-provider')
implementation 'com.github.ben-manes.caffeine:caffeine'
shadow project(':spring-pulsar-cache-provider')
}
jar {
archiveClassifier.set('original')
}
shadowJar {
archiveClassifier.set(null)
dependsOn(project.tasks.jar)
manifest {
inheritFrom project.tasks.jar.manifest
}
relocate 'com.github.benmanes.caffeine', 'org.springframework.pulsar.shade.com.github.benmanes.caffeine'
relocate 'com.google', 'org.springframework.pulsar.shade.com.google'
relocate 'org.checkerframework', 'org.springframework.pulsar.shade.org.checkerframework'
dependencies {
exclude(dependency {
!['com.github.ben-manes.caffeine', 'org.checkerframework', 'com.google.errorprone'].contains(it.moduleGroup)
})
}
}
tasks.build.dependsOn tasks.shadowJar
// delay the maven publishing - instead add shadowJar to the publication
components.java.withVariantsFromConfiguration(configurations.shadowRuntimeElements) {
skip()
}
publishing {
publications {
mavenJava {
artifact(shadowJar)
pom.withXml {
Node pomNode = asNode()
pomNode.dependencies.'*'.findAll() {
it.artifactId.text() == 'caffeine'
}.each() {
it.parent().remove(it)
}
}
}
}
}
test {
testLogging.showStandardStreams = true
}

View File

@@ -0,0 +1,58 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.github.benmanes.caffeine.cache.Cache;
/**
* Cache provider implementation backed by a {@code Caffeine} cache.
*
* @param <K> the type of cache key
* @param <V> the type of cache entries
* @author Chris Bono
*/
public class CaffeineCacheProvider<K, V> implements CacheProvider<K, V> {
private final Cache<K, V> cache;
public CaffeineCacheProvider(Cache<K, V> cache) {
this.cache = cache;
}
@Override
public V getOrCreateIfAbsent(K cacheKey, Function<K, V> createEntryFunction) {
return this.cache.get(cacheKey, createEntryFunction);
}
@Override
public Map<K, V> asMap() {
return this.cache.asMap();
}
@Override
public void invalidateAll(BiConsumer<K, V> onInvalidateEntry) {
this.cache.asMap().forEach((cacheKey, cacheEntry) -> {
this.cache.invalidate(cacheKey);
onInvalidateEntry.accept(cacheKey, cacheEntry);
});
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import java.time.Duration;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
/**
* Factory to create instances of {@link CaffeineCacheProvider}.
*
* @param <K> the type of cache key
* @param <V> the type of cache entries
* @author Chris Bono
*/
public class CaffeineCacheProviderFactory<K, V> implements CacheProviderFactory<K, V> {
@Override
public CacheProvider<K, V> create(Duration cacheExpireAfterAccess, Long cacheMaximumSize,
Integer cacheInitialCapacity, EvictionListener<K, V> evictionListener) {
Cache<K, V> cache = Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess)
.maximumSize(cacheMaximumSize).initialCapacity(cacheInitialCapacity)
.scheduler(Scheduler.systemScheduler()).evictionListener((RemovalListener<K, V>) (key, value,
cause) -> evictionListener.onEviction(key, value, cause.toString()))
.build();
return new CaffeineCacheProvider<>(cache);
}
}

View File

@@ -0,0 +1 @@
org.springframework.pulsar.core.CaffeineCacheProviderFactory

View File

@@ -0,0 +1,15 @@
plugins {
id 'org.springframework.pulsar.spring-module'
}
description = 'Spring Pulsar Cache Provider API'
dependencies {
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
test {
testLogging.showStandardStreams = true
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
/**
* Defines the contract for a cache provider.
*
* @param <K> the type of cache key
* @param <V> the type of cache entries
* @author Chris Bono
*/
public interface CacheProvider<K, V> {
/**
* Returns the value associated with the {@code key} in the cache.
* <p>
* If the key is not already associated with an entry in the cache, the
* {@code createEntryFunction} is used to compute the value to cache and return.
* @param key the cache key
* @param createEntryFunction the function to compute a value
* @return the current (existing or computed) value associated with the specified key,
* or null if the computed value is null
*/
V getOrCreateIfAbsent(K key, Function<K, V> createEntryFunction);
/**
* Returns a view of the entries stored in the cache as a thread-safe map.
* Modifications made to the map directly affect the cache.
* @return a thread-safe view of the cache supporting all optional {@link Map}
* operations
*/
Map<K, V> asMap();
/**
* Discards all entries in the cache and calls the {@code onInvalidateEntry} callback
* (if provided) for each entry.
* @param onInvalidateEntry callback invoked for each invalidated entry
*/
void invalidateAll(BiConsumer<K, V> onInvalidateEntry);
}

View File

@@ -0,0 +1,73 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import java.time.Duration;
import java.util.ServiceLoader;
/**
* Interface to create instances of {@link CacheProvider}.
*
* @param <K> the type of cache key
* @param <V> the type of cache entries
* @author Chris Bono
*/
public interface CacheProviderFactory<K, V> {
/**
* Create a cache provider instance with the specified options.
* @param cacheExpireAfterAccess time period to expire unused entries in the cache
* @param cacheMaximumSize maximum size of cache (entries)
* @param cacheInitialCapacity the initial size of cache
* @param evictionListener listener called when an entry is evicted from the cache
* @return cache provider instance
*/
CacheProvider<K, V> create(Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity,
EvictionListener<K, V> evictionListener);
/**
* Uses the Java ServiceLoader API to find the first available cache provider factory.
* @param <K> the type of cache key
* @param <V> the type of cache entries
* @return the cache provider factory service
* @throws IllegalStateException if no factory was found
*/
@SuppressWarnings("unchecked")
static <K, V> CacheProviderFactory<K, V> load() {
return ServiceLoader.load(CacheProviderFactory.class).findFirst()
.orElseThrow(() -> new IllegalStateException("No ProducerCacheFactory available"));
}
/**
* Interface for a cache eviction listener.
*
* @param <K> the type of cache key
* @param <V> the type of cache entries
*/
interface EvictionListener<K, V> {
/**
* Called when an entry is evicted from the cache.
* @param key the cache key
* @param value the cached value
* @param reason the reason for the eviction
*/
void onEviction(K key, V value, String reason);
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import org.junit.jupiter.api.Test;
/**
* Tests for {@link CacheProviderFactory}.
*
* @author Chris Bono
*/
public class CacheProviderFactoryTests {
@Test
void loadFactory() {
assertThat(CacheProviderFactory.load()).isInstanceOf(TestCacheProviderFactory.class);
}
public static class TestCacheProviderFactory implements CacheProviderFactory<String, Object> {
@Override
public CacheProvider<String, Object> create(Duration cacheExpireAfterAccess, Long cacheMaximumSize,
Integer cacheInitialCapacity, EvictionListener<String, Object> evictionListener) {
return null;
}
}
}

View File

@@ -0,0 +1 @@
org.springframework.pulsar.core.CacheProviderFactoryTests$TestCacheProviderFactory

View File

@@ -12,7 +12,6 @@ dependencies {
optional project (':spring-pulsar') optional project (':spring-pulsar')
optional project (':spring-pulsar-reactive') optional project (':spring-pulsar-reactive')
optional 'com.github.ben-manes.caffeine:caffeine'
optional 'org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine' optional 'org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine'
implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter'
implementation 'com.google.code.findbugs:jsr305' implementation 'com.google.code.findbugs:jsr305'

View File

@@ -51,7 +51,6 @@ import org.springframework.pulsar.function.PulsarSink;
import org.springframework.pulsar.function.PulsarSource; import org.springframework.pulsar.function.PulsarSource;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention; import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.micrometer.observation.ObservationRegistry; import io.micrometer.observation.ObservationRegistry;
/** /**
@@ -89,7 +88,6 @@ public class PulsarAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@ConditionalOnClass(Caffeine.class)
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
public PulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, public PulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient,
TopicResolver topicResolver) { TopicResolver topicResolver) {

View File

@@ -35,7 +35,6 @@ import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
@@ -501,14 +500,10 @@ class PulsarAutoConfigurationTests {
} }
@Test @Test
void cachingEnabledButCaffeineNotOnClasspath() { void cachingEnabledAndCaffeineNotOnClasspath() {
contextRunner.withClassLoader(new FilteredClassLoader(Caffeine.class)) contextRunner.withClassLoader(new FilteredClassLoader(Caffeine.class))
.withPropertyValues("spring.pulsar.producer.cache.enabled=true") .withPropertyValues("spring.pulsar.producer.cache.enabled=true")
.run((context -> assertThat(context).hasFailed().getFailure().cause() .run((context -> assertHasProducerFactoryOfType(CachingPulsarProducerFactory.class, context)));
.isInstanceOf(NoSuchBeanDefinitionException.class)
.asInstanceOf(InstanceOfAssertFactories.type(NoSuchBeanDefinitionException.class))
.extracting(NoSuchBeanDefinitionException::getBeanType)
.isEqualTo(PulsarProducerFactory.class)));
} }
@Test @Test
@@ -518,8 +513,7 @@ class PulsarAutoConfigurationTests {
"spring.pulsar.producer.cache.maximum-size=5150", "spring.pulsar.producer.cache.maximum-size=5150",
"spring.pulsar.producer.cache.initial-capacity=200") "spring.pulsar.producer.cache.initial-capacity=200")
.run((context -> assertThat(context).hasNotFailed().getBean(PulsarProducerFactory.class) .run((context -> assertThat(context).hasNotFailed().getBean(PulsarProducerFactory.class)
.extracting("producerCache").extracting("cache") .extracting("producerCache.cache.cache").hasFieldOrPropertyWithValue("maximum", 5150L)
.hasFieldOrPropertyWithValue("maximum", 5150L)
.hasFieldOrPropertyWithValue("expiresAfterAccessNanos", TimeUnit.SECONDS.toNanos(100)))); .hasFieldOrPropertyWithValue("expiresAfterAccessNanos", TimeUnit.SECONDS.toNanos(100))));
} }

View File

@@ -7,6 +7,5 @@ description = 'Spring Pulsar Spring Boot Starter'
dependencies { dependencies {
api project (':spring-pulsar') api project (':spring-pulsar')
api project (':spring-pulsar-spring-boot-autoconfigure') api project (':spring-pulsar-spring-boot-autoconfigure')
api 'com.github.ben-manes.caffeine:caffeine'
api 'org.springframework.boot:spring-boot-starter' api 'org.springframework.boot:spring-boot-starter'
} }

View File

@@ -17,13 +17,14 @@ dependencies {
api ('org.springframework.retry:spring-retry') { api ('org.springframework.retry:spring-retry') {
exclude group: 'org.springframework' exclude group: 'org.springframework'
} }
api project(':spring-pulsar-cache-provider')
implementation project(path: ':spring-pulsar-cache-provider-caffeine', configuration: 'shadow')
implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.google.code.findbugs:jsr305' implementation 'com.google.code.findbugs:jsr305'
optional 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8' optional 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
optional 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' optional 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
optional 'com.fasterxml.jackson.datatype:jackson-datatype-joda' optional 'com.fasterxml.jackson.datatype:jackson-datatype-joda'
optional 'com.github.ben-manes.caffeine:caffeine'
optional 'com.google.protobuf:protobuf-java' optional 'com.google.protobuf:protobuf-java'
optional 'com.jayway.jsonpath:json-path' optional 'com.jayway.jsonpath:json-path'

View File

@@ -41,11 +41,6 @@ import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
/** /**
* A {@link PulsarProducerFactory} that extends the {@link DefaultPulsarProducerFactory * A {@link PulsarProducerFactory} that extends the {@link DefaultPulsarProducerFactory
* default implementation} by caching the created producers. * default implementation} by caching the created producers.
@@ -66,7 +61,7 @@ public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactor
private final LogAccessor logger = new LogAccessor(this.getClass()); private final LogAccessor logger = new LogAccessor(this.getClass());
private final Cache<ProducerCacheKey<T>, Producer<T>> producerCache; private final CacheProvider<ProducerCacheKey<T>, Producer<T>> producerCache;
/** /**
* Construct a caching producer factory with the specified values for the cache * Construct a caching producer factory with the specified values for the cache
@@ -82,15 +77,13 @@ public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactor
TopicResolver topicResolver, Duration cacheExpireAfterAccess, Long cacheMaximumSize, TopicResolver topicResolver, Duration cacheExpireAfterAccess, Long cacheMaximumSize,
Integer cacheInitialCapacity) { Integer cacheInitialCapacity) {
super(pulsarClient, producerConfig, topicResolver); super(pulsarClient, producerConfig, topicResolver);
this.producerCache = Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess) var cacheFactory = CacheProviderFactory.<ProducerCacheKey<T>, Producer<T>>load();
.maximumSize(cacheMaximumSize).initialCapacity(cacheInitialCapacity) this.producerCache = cacheFactory.create(cacheExpireAfterAccess, cacheMaximumSize, cacheInitialCapacity,
.scheduler(Scheduler.systemScheduler()).evictionListener( (key, producer, cause) -> {
(RemovalListener<ProducerCacheKey<T>, Producer<T>>) (producerCacheKey, producer, cause) -> { this.logger.debug(() -> "Producer %s evicted from cache due to %s"
this.logger.debug(() -> "Producer %s evicted from cache due to %s" .formatted(ProducerUtils.formatProducer(producer), cause));
.formatted(ProducerUtils.formatProducer(producer), cause)); closeProducer(producer);
closeProducer(producer); });
})
.build();
} }
@Override @Override
@@ -100,7 +93,7 @@ public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactor
String resolveTopicName = resolveTopicName(topic); String resolveTopicName = resolveTopicName(topic);
ProducerCacheKey<T> producerCacheKey = new ProducerCacheKey<>(schema, resolveTopicName, ProducerCacheKey<T> producerCacheKey = new ProducerCacheKey<>(schema, resolveTopicName,
encryptionKeys == null ? null : new HashSet<>(encryptionKeys), customizers); encryptionKeys == null ? null : new HashSet<>(encryptionKeys), customizers);
return this.producerCache.get(producerCacheKey, return this.producerCache.getOrCreateIfAbsent(producerCacheKey,
(st) -> createCacheableProducer(st.schema, st.topic, st.encryptionKeys, customizers)); (st) -> createCacheableProducer(st.schema, st.topic, st.encryptionKeys, customizers));
} }
@@ -119,10 +112,7 @@ public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactor
@Override @Override
public void destroy() { public void destroy() {
this.producerCache.asMap().forEach((producerCacheKey, producer) -> { this.producerCache.invalidateAll((key, producer) -> closeProducer(producer));
this.producerCache.invalidate(producerCacheKey);
closeProducer(producer);
});
} }
private void closeProducer(Producer<T> producer) { private void closeProducer(Producer<T> producer) {

View File

@@ -51,8 +51,6 @@ import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerWith
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import com.github.benmanes.caffeine.cache.Cache;
/** /**
* Tests for {@link CachingPulsarProducerFactory}. * Tests for {@link CachingPulsarProducerFactory}.
* *
@@ -83,8 +81,8 @@ class CachingPulsarProducerFactoryTests extends PulsarProducerFactoryTests {
var producer3 = producerFactory.createProducer(new StringSchema(), "topic1"); var producer3 = producerFactory.createProducer(new StringSchema(), "topic1");
assertThat(producer1).isSameAs(producer2).isSameAs(producer3); assertThat(producer1).isSameAs(producer2).isSameAs(producer3);
Cache<ProducerCacheKey<String>, Producer<String>> producerCache = getAssertedProducerCache(producerFactory, CacheProvider<ProducerCacheKey<String>, Producer<String>> producerCache = getAssertedProducerCache(
Collections.singletonList(cacheKey)); producerFactory, Collections.singletonList(cacheKey));
Producer<String> cachedProducerWrapper = producerCache.asMap().get(cacheKey); Producer<String> cachedProducerWrapper = producerCache.asMap().get(cacheKey);
assertThat(cachedProducerWrapper).isSameAs(producer1); assertThat(cachedProducerWrapper).isSameAs(producer1);
} }
@@ -207,9 +205,9 @@ class CachingPulsarProducerFactoryTests extends PulsarProducerFactoryTests {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Cache<ProducerCacheKey<String>, Producer<String>> getAssertedProducerCache( private CacheProvider<ProducerCacheKey<String>, Producer<String>> getAssertedProducerCache(
PulsarProducerFactory<String> producerFactory, List<ProducerCacheKey<String>> expectedCacheKeys) { PulsarProducerFactory<String> producerFactory, List<ProducerCacheKey<String>> expectedCacheKeys) {
Cache<ProducerCacheKey<String>, Producer<String>> producerCache = (Cache<ProducerCacheKey<String>, Producer<String>>) ReflectionTestUtils CacheProvider<ProducerCacheKey<String>, Producer<String>> producerCache = (CacheProvider<ProducerCacheKey<String>, Producer<String>>) ReflectionTestUtils
.getField(producerFactory, "producerCache"); .getField(producerFactory, "producerCache");
assertThat(producerCache).isNotNull(); assertThat(producerCache).isNotNull();
if (ObjectUtils.isEmpty(expectedCacheKeys)) { if (ObjectUtils.isEmpty(expectedCacheKeys)) {