Polishing

This commit is contained in:
Marius Bogoevici
2016-11-18 16:44:42 -05:00
parent 453a9b0c10
commit c6e0c74626
16 changed files with 257 additions and 224 deletions

View File

@@ -40,18 +40,22 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
@SuppressWarnings("unchecked")
public class MessageChannelToInputFluxParameterAdapterTests {
@Test
public void testWrapperFluxSupportsMultipleSubscriptions() throws Exception {
List<String> results = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(4);
final MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxParameterAdapter
= new MessageChannelToInputFluxParameterAdapter(new CompositeMessageConverter(Collections.singleton(new MappingJackson2MessageConverter())));
final Method processMethod = ReflectionUtils.findMethod(MessageChannelToInputFluxParameterAdapterTests.class, "process", Flux.class);
final MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxParameterAdapter = new MessageChannelToInputFluxParameterAdapter(
new CompositeMessageConverter(
Collections.singleton(new MappingJackson2MessageConverter())));
final Method processMethod = ReflectionUtils.findMethod(
MessageChannelToInputFluxParameterAdapterTests.class, "process",
Flux.class);
final DirectChannel adaptedChannel = new DirectChannel();
final Flux<Message<?>> adapterFlux = (Flux<Message<?>>) messageChannelToInputFluxParameterAdapter.adapt(adaptedChannel, new MethodParameter(processMethod, 0));
@SuppressWarnings("unchecked")
final Flux<Message<?>> adapterFlux = (Flux<Message<?>>) messageChannelToInputFluxParameterAdapter
.adapt(adaptedChannel, new MethodParameter(processMethod, 0));
String uuid1 = UUID.randomUUID().toString();
String uuid2 = UUID.randomUUID().toString();
adapterFlux.map(m -> m.getPayload() + uuid1).subscribe(s -> {
@@ -67,11 +71,12 @@ public class MessageChannelToInputFluxParameterAdapterTests {
adaptedChannel.send(MessageBuilder.withPayload("B").build());
assertThat(latch.await(5000, TimeUnit.MILLISECONDS)).isTrue();
assertThat(results).containsExactlyInAnyOrder("A" + uuid1, "B" + uuid1, "A" + uuid2, "B" + uuid2);
assertThat(results).containsExactlyInAnyOrder("A" + uuid1, "B" + uuid1,
"A" + uuid2, "B" + uuid2);
}
public void process(Flux<Message<?>> message) {
// do nothing
// do nothing - we just reference this method from the test
}
}