From 18eac190125bf96d331604e8e109184d1ce98bbc Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 27 Mar 2023 19:13:05 -0500 Subject: [PATCH] Add cache provider / shade caffeine (#386) --- gradle/publish-artifactory.gradle | 61 ---------------- settings.gradle | 2 + .../build.gradle | 59 +++++++++++++++ .../pulsar/core/CaffeineCacheProvider.java | 58 +++++++++++++++ .../core/CaffeineCacheProviderFactory.java | 46 ++++++++++++ ...framework.pulsar.core.CacheProviderFactory | 1 + spring-pulsar-cache-provider/build.gradle | 15 ++++ .../pulsar/core/CacheProvider.java | 59 +++++++++++++++ .../pulsar/core/CacheProviderFactory.java | 73 +++++++++++++++++++ .../core/CacheProviderFactoryTests.java | 47 ++++++++++++ ...framework.pulsar.core.CacheProviderFactory | 1 + .../build.gradle | 1 - .../PulsarAutoConfiguration.java | 2 - .../PulsarAutoConfigurationTests.java | 12 +-- .../build.gradle | 1 - spring-pulsar/build.gradle | 3 +- .../core/CachingPulsarProducerFactory.java | 30 +++----- .../CachingPulsarProducerFactoryTests.java | 10 +-- 18 files changed, 380 insertions(+), 101 deletions(-) delete mode 100644 gradle/publish-artifactory.gradle create mode 100644 spring-pulsar-cache-provider-caffeine/build.gradle create mode 100644 spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProvider.java create mode 100644 spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProviderFactory.java create mode 100644 spring-pulsar-cache-provider-caffeine/src/main/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory create mode 100644 spring-pulsar-cache-provider/build.gradle create mode 100644 spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProvider.java create mode 100644 spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProviderFactory.java create mode 100644 spring-pulsar-cache-provider/src/test/java/org/springframework/pulsar/core/CacheProviderFactoryTests.java create mode 100644 spring-pulsar-cache-provider/src/test/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory diff --git a/gradle/publish-artifactory.gradle b/gradle/publish-artifactory.gradle deleted file mode 100644 index a2bbdf08..00000000 --- a/gradle/publish-artifactory.gradle +++ /dev/null @@ -1,61 +0,0 @@ -apply plugin: 'maven-publish' -apply plugin: 'com.jfrog.artifactory' - -publishing { - publications { - mavenJava(MavenPublication) { - pom { - afterEvaluate { - name = project.description - description = project.description - } - url = linkScmUrl -// organization { -// name = 'Spring IO' -// url = 'https://spring.io/projects/spring-pulsar' -// } - licenses { - license { - name = 'Apache License, Version 2.0' - url = 'https://www.apache.org/licenses/LICENSE-2.0.txt' - distribution = 'repo' - } - } - scm { - url = linkScmUrl - connection = linkScmConnection - developerConnection = linkScmDevConnection - } - developers { - developer { - id = "schacko" - name = "Soby Chacko" - email = "chackos@vmware.com" - } - developer { - id = "onobc" - name = "Chris Bono" - email = "cbono@vmware.com" - } - } - issueManagement { - system = 'GitHub' - url = linkIssue - } - } - versionMapping { - usage('java-api') { - fromResolutionResult() - } - usage('java-runtime') { - fromResolutionResult() - } - } - } - } -} - -artifactoryPublish { - dependsOn build - publications(publishing.publications.mavenJava) -} diff --git a/settings.gradle b/settings.gradle index 1ce3b914..a660bfc1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -24,6 +24,8 @@ settings.gradle.projectsLoaded { rootProject.name = 'spring-pulsar-dist' include 'spring-pulsar' +include 'spring-pulsar-cache-provider' +include 'spring-pulsar-cache-provider-caffeine' include 'spring-pulsar-reactive' include 'spring-pulsar-dependencies' include 'spring-pulsar-spring-boot-autoconfigure' diff --git a/spring-pulsar-cache-provider-caffeine/build.gradle b/spring-pulsar-cache-provider-caffeine/build.gradle new file mode 100644 index 00000000..a519bf97 --- /dev/null +++ b/spring-pulsar-cache-provider-caffeine/build.gradle @@ -0,0 +1,59 @@ +plugins { + id 'org.springframework.pulsar.spring-module' + id 'com.github.johnrengelman.shadow' version '7.1.2' +} + +description = 'Spring Pulsar Caffeine Cache Provider' + +dependencies { + api project (':spring-pulsar-cache-provider') + implementation 'com.github.ben-manes.caffeine:caffeine' + shadow project(':spring-pulsar-cache-provider') +} + +jar { + archiveClassifier.set('original') +} + +shadowJar { + archiveClassifier.set(null) + dependsOn(project.tasks.jar) + manifest { + inheritFrom project.tasks.jar.manifest + } + relocate 'com.github.benmanes.caffeine', 'org.springframework.pulsar.shade.com.github.benmanes.caffeine' + relocate 'com.google', 'org.springframework.pulsar.shade.com.google' + relocate 'org.checkerframework', 'org.springframework.pulsar.shade.org.checkerframework' + dependencies { + exclude(dependency { + !['com.github.ben-manes.caffeine', 'org.checkerframework', 'com.google.errorprone'].contains(it.moduleGroup) + }) + } +} + +tasks.build.dependsOn tasks.shadowJar + +// delay the maven publishing - instead add shadowJar to the publication +components.java.withVariantsFromConfiguration(configurations.shadowRuntimeElements) { + skip() +} + +publishing { + publications { + mavenJava { + artifact(shadowJar) + pom.withXml { + Node pomNode = asNode() + pomNode.dependencies.'*'.findAll() { + it.artifactId.text() == 'caffeine' + }.each() { + it.parent().remove(it) + } + } + } + } +} + +test { + testLogging.showStandardStreams = true +} diff --git a/spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProvider.java b/spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProvider.java new file mode 100644 index 00000000..f706ec3c --- /dev/null +++ b/spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProvider.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import com.github.benmanes.caffeine.cache.Cache; + +/** + * Cache provider implementation backed by a {@code Caffeine} cache. + * + * @param the type of cache key + * @param the type of cache entries + * @author Chris Bono + */ +public class CaffeineCacheProvider implements CacheProvider { + + private final Cache cache; + + public CaffeineCacheProvider(Cache cache) { + this.cache = cache; + } + + @Override + public V getOrCreateIfAbsent(K cacheKey, Function createEntryFunction) { + return this.cache.get(cacheKey, createEntryFunction); + } + + @Override + public Map asMap() { + return this.cache.asMap(); + } + + @Override + public void invalidateAll(BiConsumer onInvalidateEntry) { + this.cache.asMap().forEach((cacheKey, cacheEntry) -> { + this.cache.invalidate(cacheKey); + onInvalidateEntry.accept(cacheKey, cacheEntry); + }); + } + +} diff --git a/spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProviderFactory.java b/spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProviderFactory.java new file mode 100644 index 00000000..13d08e85 --- /dev/null +++ b/spring-pulsar-cache-provider-caffeine/src/main/java/org/springframework/pulsar/core/CaffeineCacheProviderFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import java.time.Duration; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; + +/** + * Factory to create instances of {@link CaffeineCacheProvider}. + * + * @param the type of cache key + * @param the type of cache entries + * @author Chris Bono + */ +public class CaffeineCacheProviderFactory implements CacheProviderFactory { + + @Override + public CacheProvider create(Duration cacheExpireAfterAccess, Long cacheMaximumSize, + Integer cacheInitialCapacity, EvictionListener evictionListener) { + Cache cache = Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess) + .maximumSize(cacheMaximumSize).initialCapacity(cacheInitialCapacity) + .scheduler(Scheduler.systemScheduler()).evictionListener((RemovalListener) (key, value, + cause) -> evictionListener.onEviction(key, value, cause.toString())) + .build(); + return new CaffeineCacheProvider<>(cache); + } + +} diff --git a/spring-pulsar-cache-provider-caffeine/src/main/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory b/spring-pulsar-cache-provider-caffeine/src/main/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory new file mode 100644 index 00000000..cf01003c --- /dev/null +++ b/spring-pulsar-cache-provider-caffeine/src/main/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory @@ -0,0 +1 @@ +org.springframework.pulsar.core.CaffeineCacheProviderFactory diff --git a/spring-pulsar-cache-provider/build.gradle b/spring-pulsar-cache-provider/build.gradle new file mode 100644 index 00000000..15f42e2b --- /dev/null +++ b/spring-pulsar-cache-provider/build.gradle @@ -0,0 +1,15 @@ +plugins { + id 'org.springframework.pulsar.spring-module' +} + +description = 'Spring Pulsar Cache Provider API' + +dependencies { + testImplementation 'org.assertj:assertj-core' + testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' +} + +test { + testLogging.showStandardStreams = true +} diff --git a/spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProvider.java b/spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProvider.java new file mode 100644 index 00000000..9b07c68d --- /dev/null +++ b/spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProvider.java @@ -0,0 +1,59 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Function; + +/** + * Defines the contract for a cache provider. + * + * @param the type of cache key + * @param the type of cache entries + * @author Chris Bono + */ +public interface CacheProvider { + + /** + * Returns the value associated with the {@code key} in the cache. + *

+ * If the key is not already associated with an entry in the cache, the + * {@code createEntryFunction} is used to compute the value to cache and return. + * @param key the cache key + * @param createEntryFunction the function to compute a value + * @return the current (existing or computed) value associated with the specified key, + * or null if the computed value is null + */ + V getOrCreateIfAbsent(K key, Function createEntryFunction); + + /** + * Returns a view of the entries stored in the cache as a thread-safe map. + * Modifications made to the map directly affect the cache. + * @return a thread-safe view of the cache supporting all optional {@link Map} + * operations + */ + Map asMap(); + + /** + * Discards all entries in the cache and calls the {@code onInvalidateEntry} callback + * (if provided) for each entry. + * @param onInvalidateEntry callback invoked for each invalidated entry + */ + void invalidateAll(BiConsumer onInvalidateEntry); + +} diff --git a/spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProviderFactory.java b/spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProviderFactory.java new file mode 100644 index 00000000..6003351a --- /dev/null +++ b/spring-pulsar-cache-provider/src/main/java/org/springframework/pulsar/core/CacheProviderFactory.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import java.time.Duration; +import java.util.ServiceLoader; + +/** + * Interface to create instances of {@link CacheProvider}. + * + * @param the type of cache key + * @param the type of cache entries + * @author Chris Bono + */ +public interface CacheProviderFactory { + + /** + * Create a cache provider instance with the specified options. + * @param cacheExpireAfterAccess time period to expire unused entries in the cache + * @param cacheMaximumSize maximum size of cache (entries) + * @param cacheInitialCapacity the initial size of cache + * @param evictionListener listener called when an entry is evicted from the cache + * @return cache provider instance + */ + CacheProvider create(Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity, + EvictionListener evictionListener); + + /** + * Uses the Java ServiceLoader API to find the first available cache provider factory. + * @param the type of cache key + * @param the type of cache entries + * @return the cache provider factory service + * @throws IllegalStateException if no factory was found + */ + @SuppressWarnings("unchecked") + static CacheProviderFactory load() { + return ServiceLoader.load(CacheProviderFactory.class).findFirst() + .orElseThrow(() -> new IllegalStateException("No ProducerCacheFactory available")); + } + + /** + * Interface for a cache eviction listener. + * + * @param the type of cache key + * @param the type of cache entries + */ + interface EvictionListener { + + /** + * Called when an entry is evicted from the cache. + * @param key the cache key + * @param value the cached value + * @param reason the reason for the eviction + */ + void onEviction(K key, V value, String reason); + + } + +} diff --git a/spring-pulsar-cache-provider/src/test/java/org/springframework/pulsar/core/CacheProviderFactoryTests.java b/spring-pulsar-cache-provider/src/test/java/org/springframework/pulsar/core/CacheProviderFactoryTests.java new file mode 100644 index 00000000..b67f0983 --- /dev/null +++ b/spring-pulsar-cache-provider/src/test/java/org/springframework/pulsar/core/CacheProviderFactoryTests.java @@ -0,0 +1,47 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link CacheProviderFactory}. + * + * @author Chris Bono + */ +public class CacheProviderFactoryTests { + + @Test + void loadFactory() { + assertThat(CacheProviderFactory.load()).isInstanceOf(TestCacheProviderFactory.class); + } + + public static class TestCacheProviderFactory implements CacheProviderFactory { + + @Override + public CacheProvider create(Duration cacheExpireAfterAccess, Long cacheMaximumSize, + Integer cacheInitialCapacity, EvictionListener evictionListener) { + return null; + } + + } + +} diff --git a/spring-pulsar-cache-provider/src/test/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory b/spring-pulsar-cache-provider/src/test/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory new file mode 100644 index 00000000..74fa5bd4 --- /dev/null +++ b/spring-pulsar-cache-provider/src/test/resources/META-INF/services/org.springframework.pulsar.core.CacheProviderFactory @@ -0,0 +1 @@ +org.springframework.pulsar.core.CacheProviderFactoryTests$TestCacheProviderFactory diff --git a/spring-pulsar-spring-boot-autoconfigure/build.gradle b/spring-pulsar-spring-boot-autoconfigure/build.gradle index 02a1fd99..2ed2f534 100644 --- a/spring-pulsar-spring-boot-autoconfigure/build.gradle +++ b/spring-pulsar-spring-boot-autoconfigure/build.gradle @@ -12,7 +12,6 @@ dependencies { optional project (':spring-pulsar') optional project (':spring-pulsar-reactive') - optional 'com.github.ben-manes.caffeine:caffeine' optional 'org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine' implementation 'org.springframework.boot:spring-boot-starter' implementation 'com.google.code.findbugs:jsr305' diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java index eddc2a20..2bf071ba 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -51,7 +51,6 @@ import org.springframework.pulsar.function.PulsarSink; import org.springframework.pulsar.function.PulsarSource; import org.springframework.pulsar.observation.PulsarTemplateObservationConvention; -import com.github.benmanes.caffeine.cache.Caffeine; import io.micrometer.observation.ObservationRegistry; /** @@ -89,7 +88,6 @@ public class PulsarAutoConfiguration { @Bean @ConditionalOnMissingBean - @ConditionalOnClass(Caffeine.class) @ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true) public PulsarProducerFactory cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver) { diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java index 55fc61b2..ba5bdaa0 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java @@ -35,7 +35,6 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.assertj.AssertableApplicationContext; @@ -501,14 +500,10 @@ class PulsarAutoConfigurationTests { } @Test - void cachingEnabledButCaffeineNotOnClasspath() { + void cachingEnabledAndCaffeineNotOnClasspath() { contextRunner.withClassLoader(new FilteredClassLoader(Caffeine.class)) .withPropertyValues("spring.pulsar.producer.cache.enabled=true") - .run((context -> assertThat(context).hasFailed().getFailure().cause() - .isInstanceOf(NoSuchBeanDefinitionException.class) - .asInstanceOf(InstanceOfAssertFactories.type(NoSuchBeanDefinitionException.class)) - .extracting(NoSuchBeanDefinitionException::getBeanType) - .isEqualTo(PulsarProducerFactory.class))); + .run((context -> assertHasProducerFactoryOfType(CachingPulsarProducerFactory.class, context))); } @Test @@ -518,8 +513,7 @@ class PulsarAutoConfigurationTests { "spring.pulsar.producer.cache.maximum-size=5150", "spring.pulsar.producer.cache.initial-capacity=200") .run((context -> assertThat(context).hasNotFailed().getBean(PulsarProducerFactory.class) - .extracting("producerCache").extracting("cache") - .hasFieldOrPropertyWithValue("maximum", 5150L) + .extracting("producerCache.cache.cache").hasFieldOrPropertyWithValue("maximum", 5150L) .hasFieldOrPropertyWithValue("expiresAfterAccessNanos", TimeUnit.SECONDS.toNanos(100)))); } diff --git a/spring-pulsar-spring-boot-starter/build.gradle b/spring-pulsar-spring-boot-starter/build.gradle index 50b051c6..422c14de 100644 --- a/spring-pulsar-spring-boot-starter/build.gradle +++ b/spring-pulsar-spring-boot-starter/build.gradle @@ -7,6 +7,5 @@ description = 'Spring Pulsar Spring Boot Starter' dependencies { api project (':spring-pulsar') api project (':spring-pulsar-spring-boot-autoconfigure') - api 'com.github.ben-manes.caffeine:caffeine' api 'org.springframework.boot:spring-boot-starter' } diff --git a/spring-pulsar/build.gradle b/spring-pulsar/build.gradle index 1cc1a975..2097dbb9 100644 --- a/spring-pulsar/build.gradle +++ b/spring-pulsar/build.gradle @@ -17,13 +17,14 @@ dependencies { api ('org.springframework.retry:spring-retry') { exclude group: 'org.springframework' } + api project(':spring-pulsar-cache-provider') + implementation project(path: ':spring-pulsar-cache-provider-caffeine', configuration: 'shadow') implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.google.code.findbugs:jsr305' optional 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8' optional 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' optional 'com.fasterxml.jackson.datatype:jackson-datatype-joda' - optional 'com.github.ben-manes.caffeine:caffeine' optional 'com.google.protobuf:protobuf-java' optional 'com.jayway.jsonpath:json-path' diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java index 3af80101..9131eb5e 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java @@ -41,11 +41,6 @@ import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; -import com.github.benmanes.caffeine.cache.Scheduler; - /** * A {@link PulsarProducerFactory} that extends the {@link DefaultPulsarProducerFactory * default implementation} by caching the created producers. @@ -66,7 +61,7 @@ public class CachingPulsarProducerFactory extends DefaultPulsarProducerFactor private final LogAccessor logger = new LogAccessor(this.getClass()); - private final Cache, Producer> producerCache; + private final CacheProvider, Producer> producerCache; /** * Construct a caching producer factory with the specified values for the cache @@ -82,15 +77,13 @@ public class CachingPulsarProducerFactory extends DefaultPulsarProducerFactor TopicResolver topicResolver, Duration cacheExpireAfterAccess, Long cacheMaximumSize, Integer cacheInitialCapacity) { super(pulsarClient, producerConfig, topicResolver); - this.producerCache = Caffeine.newBuilder().expireAfterAccess(cacheExpireAfterAccess) - .maximumSize(cacheMaximumSize).initialCapacity(cacheInitialCapacity) - .scheduler(Scheduler.systemScheduler()).evictionListener( - (RemovalListener, Producer>) (producerCacheKey, producer, cause) -> { - this.logger.debug(() -> "Producer %s evicted from cache due to %s" - .formatted(ProducerUtils.formatProducer(producer), cause)); - closeProducer(producer); - }) - .build(); + var cacheFactory = CacheProviderFactory., Producer>load(); + this.producerCache = cacheFactory.create(cacheExpireAfterAccess, cacheMaximumSize, cacheInitialCapacity, + (key, producer, cause) -> { + this.logger.debug(() -> "Producer %s evicted from cache due to %s" + .formatted(ProducerUtils.formatProducer(producer), cause)); + closeProducer(producer); + }); } @Override @@ -100,7 +93,7 @@ public class CachingPulsarProducerFactory extends DefaultPulsarProducerFactor String resolveTopicName = resolveTopicName(topic); ProducerCacheKey producerCacheKey = new ProducerCacheKey<>(schema, resolveTopicName, encryptionKeys == null ? null : new HashSet<>(encryptionKeys), customizers); - return this.producerCache.get(producerCacheKey, + return this.producerCache.getOrCreateIfAbsent(producerCacheKey, (st) -> createCacheableProducer(st.schema, st.topic, st.encryptionKeys, customizers)); } @@ -119,10 +112,7 @@ public class CachingPulsarProducerFactory extends DefaultPulsarProducerFactor @Override public void destroy() { - this.producerCache.asMap().forEach((producerCacheKey, producer) -> { - this.producerCache.invalidate(producerCacheKey); - closeProducer(producer); - }); + this.producerCache.invalidateAll((key, producer) -> closeProducer(producer)); } private void closeProducer(Producer producer) { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java index b34b63d0..5c443c93 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java @@ -51,8 +51,6 @@ import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerWith import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ObjectUtils; -import com.github.benmanes.caffeine.cache.Cache; - /** * Tests for {@link CachingPulsarProducerFactory}. * @@ -83,8 +81,8 @@ class CachingPulsarProducerFactoryTests extends PulsarProducerFactoryTests { var producer3 = producerFactory.createProducer(new StringSchema(), "topic1"); assertThat(producer1).isSameAs(producer2).isSameAs(producer3); - Cache, Producer> producerCache = getAssertedProducerCache(producerFactory, - Collections.singletonList(cacheKey)); + CacheProvider, Producer> producerCache = getAssertedProducerCache( + producerFactory, Collections.singletonList(cacheKey)); Producer cachedProducerWrapper = producerCache.asMap().get(cacheKey); assertThat(cachedProducerWrapper).isSameAs(producer1); } @@ -207,9 +205,9 @@ class CachingPulsarProducerFactoryTests extends PulsarProducerFactoryTests { } @SuppressWarnings("unchecked") - private Cache, Producer> getAssertedProducerCache( + private CacheProvider, Producer> getAssertedProducerCache( PulsarProducerFactory producerFactory, List> expectedCacheKeys) { - Cache, Producer> producerCache = (Cache, Producer>) ReflectionTestUtils + CacheProvider, Producer> producerCache = (CacheProvider, Producer>) ReflectionTestUtils .getField(producerFactory, "producerCache"); assertThat(producerCache).isNotNull(); if (ObjectUtils.isEmpty(expectedCacheKeys)) {