From 0576b6c6ce736415cb3aa3bd85aaceb5b1320e2d Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 30 May 2022 12:10:58 +0200 Subject: [PATCH] GH-878 Fix concurrency issue during registration and lookup of functions Resolves #878 --- .../BeanFactoryAwareFunctionRegistry.java | 78 ++++++++++--------- .../catalog/SimpleFunctionRegistry.java | 2 +- ...BeanFactoryAwareFunctionRegistryTests.java | 22 ++++++ .../catalog/SimpleFunctionRegistryTests.java | 26 +++++++ 4 files changed, 89 insertions(+), 39 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 83f1867dc..e8f7c8cfa 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -112,50 +112,52 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp return null; } FunctionInvocationWrapper function = this.doLookup(type, functionDefinition, expectedOutputMimeTypes); - - if (function == null) { - Set functionRegistratioinNames = super.getNames(null); - String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|"); - for (String functionName : functionNames) { - if (functionRegistratioinNames.contains(functionName) && logger.isDebugEnabled()) { - logger.debug("Skipping function '" + functionName + "' since it is already present"); - } - else { - Object functionCandidate = this.discoverFunctionInBeanFactory(functionName); - if (functionCandidate != null) { - Type functionType = null; - FunctionRegistration functionRegistration = null; - if (functionCandidate instanceof FunctionRegistration) { - functionRegistration = (FunctionRegistration) functionCandidate; - } - else if (this.isFunctionPojo(functionCandidate, functionName)) { - Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass()); - functionCandidate = this.proxyTarget(functionCandidate, functionalMethod); - functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod); - } - else if (this.isSpecialFunctionRegistration(functionNames, functionName)) { - functionRegistration = this.applicationContext - .getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class); - } - else { - functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext); - } - if (functionRegistration == null) { - functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType); - } - // Certain Kafka Streams functions such as KStream[] return types could be null (esp when using Kotlin). - if (functionRegistration != null) { - this.register(functionRegistration); - } + Object syncInstance = functionDefinition == null ? this : functionDefinition; + synchronized (syncInstance) { + if (function == null) { + Set functionRegistratioinNames = super.getNames(null); + String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|"); + for (String functionName : functionNames) { + if (functionRegistratioinNames.contains(functionName) && logger.isDebugEnabled()) { + logger.debug("Skipping function '" + functionName + "' since it is already present"); } else { - if (logger.isDebugEnabled()) { - logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory"); + Object functionCandidate = this.discoverFunctionInBeanFactory(functionName); + if (functionCandidate != null) { + Type functionType = null; + FunctionRegistration functionRegistration = null; + if (functionCandidate instanceof FunctionRegistration) { + functionRegistration = (FunctionRegistration) functionCandidate; + } + else if (this.isFunctionPojo(functionCandidate, functionName)) { + Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass()); + functionCandidate = this.proxyTarget(functionCandidate, functionalMethod); + functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod); + } + else if (this.isSpecialFunctionRegistration(functionNames, functionName)) { + functionRegistration = this.applicationContext + .getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class); + } + else { + functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext); + } + if (functionRegistration == null) { + functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType); + } + // Certain Kafka Streams functions such as KStream[] return types could be null (esp when using Kotlin). + if (functionRegistration != null) { + this.register(functionRegistration); + } + } + else { + if (logger.isDebugEnabled()) { + logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory"); + } } } } + function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes); } - function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes); } return (T) function; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 1091a471b..bfabe304f 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -286,7 +286,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect /* * */ - private synchronized FunctionInvocationWrapper compose(Class type, String functionDefinition) { + private FunctionInvocationWrapper compose(Class type, String functionDefinition) { String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|"); FunctionInvocationWrapper composedFunction = null; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 80fa546bf..c5c7344d9 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -22,6 +22,7 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -103,6 +104,27 @@ public class BeanFactoryAwareFunctionRegistryTests { System.clearProperty("spring.cloud.function.definition"); } + + @SuppressWarnings({ "rawtypes" }) + @Test + public void concurrencyLookupTest() throws Exception { + FunctionCatalog catalog = this.configureCatalog(); + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i = 0; i < 100; i++) { + executor.execute(() -> { + catalog.lookup("uppercase", "application/json"); + }); + executor.execute(() -> { + catalog.lookup("numberword", "application/json"); + }); + } + Thread.sleep(1000); + Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations"); + frField.setAccessible(true); + Collection c = (Collection) frField.get(catalog); + assertThat(c.size()).isEqualTo(2); + } + @SuppressWarnings("unchecked") @Test public void testDefaultLookup() throws Exception { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index 175de78e5..2e2f9fb0d 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -16,13 +16,17 @@ package org.springframework.cloud.function.context.catalog; +import java.lang.reflect.Field; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -68,6 +72,7 @@ import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.MimeType; +import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -93,6 +98,27 @@ public class SimpleFunctionRegistryTests { this.conversionService = new DefaultConversionService(); } + @SuppressWarnings("rawtypes") + @Test + public void concurrencyRegistrationTest() throws Exception { + Echo function = new Echo(); + FunctionRegistration registration = new FunctionRegistration<>( + function, "echo").type(FunctionType.of(Echo.class)); + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter, + new JacksonMapper(new ObjectMapper())); + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i = 0; i < 1000; i++) { + executor.execute(() -> { + catalog.register(registration); + }); + } + Thread.sleep(1000); + Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations"); + frField.setAccessible(true); + Collection c = (Collection) frField.get(catalog); + assertThat(c.size()).isEqualTo(1); + } + @Test public void testCachingOfFunction() { Echo function = new Echo();