GH-878 Fix concurrency issue during registration and lookup of functions
Resolves #878
This commit is contained in:
@@ -22,6 +22,7 @@ import java.lang.reflect.Field;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
@@ -103,6 +104,27 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
System.clearProperty("spring.cloud.function.definition");
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings({ "rawtypes" })
|
||||
@Test
|
||||
public void concurrencyLookupTest() throws Exception {
|
||||
FunctionCatalog catalog = this.configureCatalog();
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
for (int i = 0; i < 100; i++) {
|
||||
executor.execute(() -> {
|
||||
catalog.lookup("uppercase", "application/json");
|
||||
});
|
||||
executor.execute(() -> {
|
||||
catalog.lookup("numberword", "application/json");
|
||||
});
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations");
|
||||
frField.setAccessible(true);
|
||||
Collection c = (Collection) frField.get(catalog);
|
||||
assertThat(c.size()).isEqualTo(2);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testDefaultLookup() throws Exception {
|
||||
|
||||
@@ -16,13 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.function.context.catalog;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
@@ -68,6 +72,7 @@ import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.converter.StringMessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -93,6 +98,27 @@ public class SimpleFunctionRegistryTests {
|
||||
this.conversionService = new DefaultConversionService();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void concurrencyRegistrationTest() throws Exception {
|
||||
Echo function = new Echo();
|
||||
FunctionRegistration<Echo> registration = new FunctionRegistration<>(
|
||||
function, "echo").type(FunctionType.of(Echo.class));
|
||||
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter,
|
||||
new JacksonMapper(new ObjectMapper()));
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
executor.execute(() -> {
|
||||
catalog.register(registration);
|
||||
});
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations");
|
||||
frField.setAccessible(true);
|
||||
Collection c = (Collection) frField.get(catalog);
|
||||
assertThat(c.size()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCachingOfFunction() {
|
||||
Echo function = new Echo();
|
||||
|
||||
Reference in New Issue
Block a user