|
|
|
|
@@ -1,5 +1,5 @@
|
|
|
|
|
/*
|
|
|
|
|
* Copyright 2016-2019 the original author or authors.
|
|
|
|
|
* Copyright 2016-2020 the original author or authors.
|
|
|
|
|
*
|
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
|
@@ -35,6 +35,7 @@ import org.springframework.beans.DirectFieldAccessor;
|
|
|
|
|
import org.springframework.boot.SpringApplication;
|
|
|
|
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
|
|
|
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|
|
|
|
import org.springframework.boot.web.client.RestTemplateBuilder;
|
|
|
|
|
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
|
|
|
|
|
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
|
|
|
|
|
import org.springframework.cache.CacheManager;
|
|
|
|
|
@@ -59,6 +60,7 @@ import org.springframework.context.annotation.Configuration;
|
|
|
|
|
import org.springframework.messaging.Message;
|
|
|
|
|
import org.springframework.messaging.support.MessageBuilder;
|
|
|
|
|
import org.springframework.test.util.ReflectionTestUtils;
|
|
|
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
|
|
|
|
|
|
import static org.assertj.core.api.Assertions.assertThat;
|
|
|
|
|
import static org.mockito.ArgumentMatchers.any;
|
|
|
|
|
@@ -79,10 +81,7 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
private String propertyPrefix;
|
|
|
|
|
|
|
|
|
|
private ConfigurableApplicationContext schemaRegistryServerContext;
|
|
|
|
|
|
|
|
|
|
public AvroSchemaRegistryClientMessageConverterTests(String propertyPrefix) {
|
|
|
|
|
this.propertyPrefix = propertyPrefix;
|
|
|
|
|
}
|
|
|
|
|
private RestTemplateBuilder restTemplateBuilder;
|
|
|
|
|
|
|
|
|
|
// Use parametrization to test the deprecated prefix (spring.cloud.stream) is handled as the new (spring.cloud)
|
|
|
|
|
// prefix.
|
|
|
|
|
@@ -91,11 +90,15 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
return Arrays.asList("spring.cloud.stream", "spring.cloud");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public AvroSchemaRegistryClientMessageConverterTests(String propertyPrefix) {
|
|
|
|
|
this.propertyPrefix = propertyPrefix;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Before
|
|
|
|
|
public void setup() {
|
|
|
|
|
this.schemaRegistryServerContext = SpringApplication.run(
|
|
|
|
|
ServerApplication.class,
|
|
|
|
|
"--spring.main.allow-bean-definition-overriding=true");
|
|
|
|
|
ServerApplication.class, "--spring.main.allow-bean-definition-overriding=true");
|
|
|
|
|
this.restTemplateBuilder = this.schemaRegistryServerContext.getBean(RestTemplateBuilder.class);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@After
|
|
|
|
|
@@ -116,8 +119,7 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
firstOutboundFoo.setFavoriteColor("foo" + UUID.randomUUID().toString());
|
|
|
|
|
firstOutboundFoo.setName("foo" + UUID.randomUUID().toString());
|
|
|
|
|
source.output().send(MessageBuilder.withPayload(firstOutboundFoo).build());
|
|
|
|
|
MessageCollector sourceMessageCollector = sourceContext
|
|
|
|
|
.getBean(MessageCollector.class);
|
|
|
|
|
MessageCollector sourceMessageCollector = sourceContext.getBean(MessageCollector.class);
|
|
|
|
|
Message<?> outboundMessage = sourceMessageCollector.forChannel(source.output())
|
|
|
|
|
.poll(1000, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
@@ -131,8 +133,7 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
firstOutboundUser2.setFavoriteColor("foo" + UUID.randomUUID().toString());
|
|
|
|
|
firstOutboundUser2.setName("foo" + UUID.randomUUID().toString());
|
|
|
|
|
barSource.output().send(MessageBuilder.withPayload(firstOutboundUser2).build());
|
|
|
|
|
MessageCollector barSourceMessageCollector = barSourceContext
|
|
|
|
|
.getBean(MessageCollector.class);
|
|
|
|
|
MessageCollector barSourceMessageCollector = barSourceContext.getBean(MessageCollector.class);
|
|
|
|
|
Message<?> barOutboundMessage = barSourceMessageCollector
|
|
|
|
|
.forChannel(barSource.output()).poll(1000, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
@@ -146,35 +147,27 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
.forChannel(source.output()).poll(1000, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
ConfigurableApplicationContext sinkContext = SpringApplication.run(
|
|
|
|
|
AvroSinkApplication.class, "--server.port=0",
|
|
|
|
|
"--spring.jmx.enabled=false");
|
|
|
|
|
AvroSinkApplication.class, "--server.port=0", "--spring.jmx.enabled=false");
|
|
|
|
|
Sink sink = sinkContext.getBean(Sink.class);
|
|
|
|
|
sink.input().send(outboundMessage);
|
|
|
|
|
sink.input().send(barOutboundMessage);
|
|
|
|
|
sink.input().send(secondBarOutboundMessage);
|
|
|
|
|
List<User2> receivedPojos = sinkContext
|
|
|
|
|
.getBean(AvroSinkApplication.class).receivedPojos;
|
|
|
|
|
List<User2> receivedPojos = sinkContext.getBean(AvroSinkApplication.class).receivedPojos;
|
|
|
|
|
assertThat(receivedPojos).hasSize(3);
|
|
|
|
|
assertThat(receivedPojos.get(0)).isNotSameAs(firstOutboundFoo);
|
|
|
|
|
assertThat(receivedPojos.get(0).getFavoriteColor())
|
|
|
|
|
.isEqualTo(firstOutboundFoo.getFavoriteColor());
|
|
|
|
|
assertThat(receivedPojos.get(0).getFavoriteColor()).isEqualTo(firstOutboundFoo.getFavoriteColor());
|
|
|
|
|
assertThat(receivedPojos.get(0).getName()).isEqualTo(firstOutboundFoo.getName());
|
|
|
|
|
assertThat(receivedPojos.get(0).getFavoritePlace()).isEqualTo("NYC");
|
|
|
|
|
|
|
|
|
|
assertThat(receivedPojos.get(1)).isNotSameAs(firstOutboundUser2);
|
|
|
|
|
assertThat(receivedPojos.get(1).getFavoriteColor())
|
|
|
|
|
.isEqualTo(firstOutboundUser2.getFavoriteColor());
|
|
|
|
|
assertThat(receivedPojos.get(1).getName())
|
|
|
|
|
.isEqualTo(firstOutboundUser2.getName());
|
|
|
|
|
assertThat(receivedPojos.get(1).getFavoriteColor()).isEqualTo(firstOutboundUser2.getFavoriteColor());
|
|
|
|
|
assertThat(receivedPojos.get(1).getName()).isEqualTo(firstOutboundUser2.getName());
|
|
|
|
|
assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("Boston");
|
|
|
|
|
|
|
|
|
|
assertThat(receivedPojos.get(2)).isNotSameAs(secondBarOutboundPojo);
|
|
|
|
|
assertThat(receivedPojos.get(2).getFavoriteColor())
|
|
|
|
|
.isEqualTo(secondBarOutboundPojo.getFavoriteColor());
|
|
|
|
|
assertThat(receivedPojos.get(2).getName())
|
|
|
|
|
.isEqualTo(secondBarOutboundPojo.getName());
|
|
|
|
|
assertThat(receivedPojos.get(2).getFavoritePlace())
|
|
|
|
|
.isEqualTo(secondBarOutboundPojo.getFavoritePlace());
|
|
|
|
|
assertThat(receivedPojos.get(2).getFavoriteColor()).isEqualTo(secondBarOutboundPojo.getFavoriteColor());
|
|
|
|
|
assertThat(receivedPojos.get(2).getName()).isEqualTo(secondBarOutboundPojo.getName());
|
|
|
|
|
assertThat(receivedPojos.get(2).getFavoritePlace()).isEqualTo(secondBarOutboundPojo.getFavoritePlace());
|
|
|
|
|
|
|
|
|
|
sinkContext.close();
|
|
|
|
|
barSourceContext.close();
|
|
|
|
|
@@ -193,22 +186,18 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
"--" + propertyPrefix + ".schema.avro.schema-imports=classpath:schemas/imports/Sms.avsc,"
|
|
|
|
|
+ " classpath:schemas/imports/Email.avsc, classpath:schemas/imports/PushNotification.avsc" };
|
|
|
|
|
|
|
|
|
|
final ConfigurableApplicationContext sourceContext = SpringApplication
|
|
|
|
|
.run(AvroSourceApplication.class, args);
|
|
|
|
|
final ConfigurableApplicationContext sinkContext = SpringApplication
|
|
|
|
|
.run(CommandSinkApplication.class, args);
|
|
|
|
|
final ConfigurableApplicationContext sourceContext = SpringApplication.run(AvroSourceApplication.class, args);
|
|
|
|
|
final ConfigurableApplicationContext sinkContext = SpringApplication.run(CommandSinkApplication.class, args);
|
|
|
|
|
final Source barSource = sourceContext.getBean(Source.class);
|
|
|
|
|
final Command notification = notification();
|
|
|
|
|
barSource.output().send(MessageBuilder.withPayload(notification).build());
|
|
|
|
|
final MessageCollector barSourceMessageCollector = sourceContext
|
|
|
|
|
.getBean(MessageCollector.class);
|
|
|
|
|
final MessageCollector barSourceMessageCollector = sourceContext.getBean(MessageCollector.class);
|
|
|
|
|
final Message<?> outboundMessage = barSourceMessageCollector
|
|
|
|
|
.forChannel(barSource.output()).poll(1000, TimeUnit.MILLISECONDS);
|
|
|
|
|
assertThat(outboundMessage).isNotNull();
|
|
|
|
|
Sink sink = sinkContext.getBean(Sink.class);
|
|
|
|
|
sink.input().send(outboundMessage);
|
|
|
|
|
List<Command> receivedPojos = sinkContext
|
|
|
|
|
.getBean(CommandSinkApplication.class).receivedPojos;
|
|
|
|
|
List<Command> receivedPojos = sinkContext.getBean(CommandSinkApplication.class).receivedPojos;
|
|
|
|
|
|
|
|
|
|
assertThat(receivedPojos).hasSize(1);
|
|
|
|
|
assertThat(receivedPojos.get(0)).isEqualTo(notification);
|
|
|
|
|
@@ -217,14 +206,13 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
|
|
|
|
|
@Test
|
|
|
|
|
public void testNoCacheConfiguration() {
|
|
|
|
|
if (propertyPrefix.equalsIgnoreCase("spring.cloud.stream")) {
|
|
|
|
|
if (propertyPrefix.equalsIgnoreCase("spring.cloud")) {
|
|
|
|
|
ConfigurableApplicationContext sourceContext = SpringApplication
|
|
|
|
|
.run(NoCacheConfiguration.class, "--spring.main.web-environment=false");
|
|
|
|
|
AvroSchemaRegistryClientMessageConverter converter = sourceContext
|
|
|
|
|
.getBean(AvroSchemaRegistryClientMessageConverter.class);
|
|
|
|
|
DirectFieldAccessor accessor = new DirectFieldAccessor(converter);
|
|
|
|
|
assertThat(accessor.getPropertyValue("cacheManager"))
|
|
|
|
|
.isInstanceOf(NoOpCacheManager.class);
|
|
|
|
|
assertThat(accessor.getPropertyValue("cacheManager")).isInstanceOf(NoOpCacheManager.class);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -233,7 +221,8 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
CacheManager mockCache = Mockito.mock(CacheManager.class);
|
|
|
|
|
when(mockCache.getCache(any())).thenReturn(new NoOpCache(""));
|
|
|
|
|
AvroSchemaServiceManager manager = new AvroSchemaServiceManagerImpl();
|
|
|
|
|
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(new DefaultSchemaRegistryClient(), mockCache, manager);
|
|
|
|
|
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
|
|
|
|
|
new DefaultSchemaRegistryClient(restTemplateBuilder), mockCache, manager);
|
|
|
|
|
ReflectionTestUtils.invokeMethod(converter, "getCache", "TEST_CACHE");
|
|
|
|
|
verify(mockCache).getCache("TEST_CACHE");
|
|
|
|
|
}
|
|
|
|
|
@@ -280,7 +269,7 @@ public class AvroSchemaRegistryClientMessageConverterTests {
|
|
|
|
|
AvroSchemaRegistryClientMessageConverter avroSchemaRegistryClientMessageConverter() {
|
|
|
|
|
AvroSchemaServiceManager manager = new AvroSchemaServiceManagerImpl();
|
|
|
|
|
return new AvroSchemaRegistryClientMessageConverter(
|
|
|
|
|
new DefaultSchemaRegistryClient(), new NoOpCacheManager(), manager);
|
|
|
|
|
new DefaultSchemaRegistryClient(new RestTemplate()), new NoOpCacheManager(), manager);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
|