GH-878 Fix concurrency issue during registration and lookup of functions
Resolves #878
This commit is contained in:
@@ -112,50 +112,52 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
|
||||
return null;
|
||||
}
|
||||
FunctionInvocationWrapper function = this.doLookup(type, functionDefinition, expectedOutputMimeTypes);
|
||||
|
||||
if (function == null) {
|
||||
Set<String> functionRegistratioinNames = super.getNames(null);
|
||||
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
|
||||
for (String functionName : functionNames) {
|
||||
if (functionRegistratioinNames.contains(functionName) && logger.isDebugEnabled()) {
|
||||
logger.debug("Skipping function '" + functionName + "' since it is already present");
|
||||
}
|
||||
else {
|
||||
Object functionCandidate = this.discoverFunctionInBeanFactory(functionName);
|
||||
if (functionCandidate != null) {
|
||||
Type functionType = null;
|
||||
FunctionRegistration functionRegistration = null;
|
||||
if (functionCandidate instanceof FunctionRegistration) {
|
||||
functionRegistration = (FunctionRegistration) functionCandidate;
|
||||
}
|
||||
else if (this.isFunctionPojo(functionCandidate, functionName)) {
|
||||
Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass());
|
||||
functionCandidate = this.proxyTarget(functionCandidate, functionalMethod);
|
||||
functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod);
|
||||
}
|
||||
else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
|
||||
functionRegistration = this.applicationContext
|
||||
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
|
||||
}
|
||||
else {
|
||||
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
|
||||
}
|
||||
if (functionRegistration == null) {
|
||||
functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType);
|
||||
}
|
||||
// Certain Kafka Streams functions such as KStream[] return types could be null (esp when using Kotlin).
|
||||
if (functionRegistration != null) {
|
||||
this.register(functionRegistration);
|
||||
}
|
||||
Object syncInstance = functionDefinition == null ? this : functionDefinition;
|
||||
synchronized (syncInstance) {
|
||||
if (function == null) {
|
||||
Set<String> functionRegistratioinNames = super.getNames(null);
|
||||
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
|
||||
for (String functionName : functionNames) {
|
||||
if (functionRegistratioinNames.contains(functionName) && logger.isDebugEnabled()) {
|
||||
logger.debug("Skipping function '" + functionName + "' since it is already present");
|
||||
}
|
||||
else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory");
|
||||
Object functionCandidate = this.discoverFunctionInBeanFactory(functionName);
|
||||
if (functionCandidate != null) {
|
||||
Type functionType = null;
|
||||
FunctionRegistration functionRegistration = null;
|
||||
if (functionCandidate instanceof FunctionRegistration) {
|
||||
functionRegistration = (FunctionRegistration) functionCandidate;
|
||||
}
|
||||
else if (this.isFunctionPojo(functionCandidate, functionName)) {
|
||||
Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass());
|
||||
functionCandidate = this.proxyTarget(functionCandidate, functionalMethod);
|
||||
functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod);
|
||||
}
|
||||
else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
|
||||
functionRegistration = this.applicationContext
|
||||
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
|
||||
}
|
||||
else {
|
||||
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
|
||||
}
|
||||
if (functionRegistration == null) {
|
||||
functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType);
|
||||
}
|
||||
// Certain Kafka Streams functions such as KStream[] return types could be null (esp when using Kotlin).
|
||||
if (functionRegistration != null) {
|
||||
this.register(functionRegistration);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes);
|
||||
}
|
||||
function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes);
|
||||
}
|
||||
|
||||
return (T) function;
|
||||
|
||||
@@ -286,7 +286,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
/*
|
||||
*
|
||||
*/
|
||||
private synchronized FunctionInvocationWrapper compose(Class<?> type, String functionDefinition) {
|
||||
private FunctionInvocationWrapper compose(Class<?> type, String functionDefinition) {
|
||||
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
|
||||
FunctionInvocationWrapper composedFunction = null;
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import java.lang.reflect.Field;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
@@ -103,6 +104,27 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
System.clearProperty("spring.cloud.function.definition");
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings({ "rawtypes" })
|
||||
@Test
|
||||
public void concurrencyLookupTest() throws Exception {
|
||||
FunctionCatalog catalog = this.configureCatalog();
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
executor.execute(() -> {
|
||||
catalog.lookup("uppercase", "application/json");
|
||||
});
|
||||
executor.execute(() -> {
|
||||
catalog.lookup("numberword", "application/json");
|
||||
});
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations");
|
||||
frField.setAccessible(true);
|
||||
Collection c = (Collection) frField.get(catalog);
|
||||
assertThat(c.size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testDefaultLookup() throws Exception {
|
||||
|
||||
@@ -16,13 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.function.context.catalog;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
@@ -68,6 +72,7 @@ import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.converter.StringMessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -93,6 +98,27 @@ public class SimpleFunctionRegistryTests {
|
||||
this.conversionService = new DefaultConversionService();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void concurrencyRegistrationTest() throws Exception {
|
||||
Echo function = new Echo();
|
||||
FunctionRegistration<Echo> registration = new FunctionRegistration<>(
|
||||
function, "echo").type(FunctionType.of(Echo.class));
|
||||
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter,
|
||||
new JacksonMapper(new ObjectMapper()));
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
executor.execute(() -> {
|
||||
catalog.register(registration);
|
||||
});
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations");
|
||||
frField.setAccessible(true);
|
||||
Collection c = (Collection) frField.get(catalog);
|
||||
assertThat(c.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCachingOfFunction() {
|
||||
Echo function = new Echo();
|
||||
|
||||
Reference in New Issue
Block a user