Moved input/output channel configuration to Method-level annotations. Also, the @Poller annotation is now expected at Method-level instead of Class-level. The @MessageEndpoint is now strictly a stereotype. Removed the @MessageTarget and @Pollable annotations. The @ChannelAdapter annotation post-processor now handles both inbound and outbound channel adapters based on the Method signature.
This commit is contained in:
@@ -122,20 +122,20 @@ public class DirectChannelSubscriptionTests {
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="sourceChannel", output="targetChannel")
|
||||
@MessageEndpoint
|
||||
public static class TestEndpoint {
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="sourceChannel", outputChannel="targetChannel")
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return new StringMessage(message.getPayload() + "-from-annotated-endpoint");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="sourceChannel", output="targetChannel")
|
||||
@MessageEndpoint
|
||||
public static class FailingTestEndpoint {
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="sourceChannel", outputChannel="targetChannel")
|
||||
public Message<?> handle(Message<?> message) {
|
||||
throw new RuntimeException("intentional test failure");
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.aop.support.DelegatingIntroductionInterceptor;
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.integration.aggregator.AggregatingMessageHandler;
|
||||
@@ -91,27 +90,23 @@ public class AggregatorAnnotationTests {
|
||||
ApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
new String[] { "classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml" });
|
||||
final String endpointName = "endpointWithDefaultAnnotationAndCustomCompletionStrategy";
|
||||
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context,
|
||||
endpointName);
|
||||
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof CompletionStrategyAdapter);
|
||||
DirectFieldAccessor invokerAccessor = new DirectFieldAccessor(new DirectFieldAccessor(
|
||||
aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy")).getPropertyValue("invoker"));
|
||||
Assert.assertSame(((Advised) context.getBean(endpointName)).getTargetSource().getTarget(), invokerAccessor.getPropertyValue("object"));
|
||||
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context, endpointName);
|
||||
Object completionStrategy = aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy");
|
||||
Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter);
|
||||
CompletionStrategyAdapter completionStrategyAdapter = (CompletionStrategyAdapter) completionStrategy;
|
||||
DirectFieldAccessor invokerAccessor = new DirectFieldAccessor(
|
||||
new DirectFieldAccessor(completionStrategyAdapter).getPropertyValue("invoker"));
|
||||
Object targetObject = invokerAccessor.getPropertyValue("object");
|
||||
Assert.assertSame(context.getBean(endpointName), targetObject);
|
||||
Method completionCheckerMethod = (Method) invokerAccessor.getPropertyValue("method");
|
||||
Assert.assertEquals("completionChecker", completionCheckerMethod.getName());
|
||||
}
|
||||
|
||||
@Test(expected=BeanCreationException.class)
|
||||
public void testInvalidCompletionStrategyAnnotation() {
|
||||
new ClassPathXmlApplicationContext(new String[] {
|
||||
"classpath:/org/springframework/integration/config/annotation/testInvalidCompletionStrategyAnnotation.xml" });
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) {
|
||||
MessageBus messageBus = this.getMessageBus(context);
|
||||
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) messageBus.lookupEndpoint(endpointName + ".MessageHandler.endpoint");
|
||||
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) messageBus.lookupEndpoint(endpointName + ".aggregator");
|
||||
MessageHandler handler = (MessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
|
||||
try {
|
||||
if (AopUtils.isAopProxy(handler)) {
|
||||
|
||||
@@ -21,10 +21,6 @@ import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -35,15 +31,10 @@ import org.springframework.aop.framework.ProxyFactory;
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.context.support.AbstractApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.integration.ConfigurationException;
|
||||
import org.springframework.integration.annotation.ChannelAdapter;
|
||||
import org.springframework.integration.annotation.Handler;
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.annotation.MessageTarget;
|
||||
import org.springframework.integration.annotation.Pollable;
|
||||
import org.springframework.integration.annotation.Poller;
|
||||
import org.springframework.integration.annotation.Splitter;
|
||||
import org.springframework.integration.annotation.Transformer;
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.bus.MessageBus;
|
||||
@@ -53,8 +44,8 @@ import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.endpoint.DefaultEndpoint;
|
||||
import org.springframework.integration.handler.MessageHandler;
|
||||
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
|
||||
import org.springframework.integration.endpoint.ServiceInvoker;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageSource;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
@@ -74,26 +65,16 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
postProcessor.afterPropertiesSet();
|
||||
HandlerAnnotatedBean bean = new HandlerAnnotatedBean();
|
||||
Object result = postProcessor.postProcessAfterInitialization(bean, "testBean");
|
||||
assertTrue(result instanceof MessageHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomHandlerAnnotation() {
|
||||
MessageBus messageBus = new DefaultMessageBus();
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.afterPropertiesSet();
|
||||
CustomHandlerAnnotatedBean bean = new CustomHandlerAnnotatedBean();
|
||||
Object result = postProcessor.postProcessAfterInitialization(bean, "testBean");
|
||||
assertTrue(result instanceof MessageHandler);
|
||||
assertTrue(result instanceof ServiceInvoker);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleHandlerWithContext() {
|
||||
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"handlerAnnotationPostProcessorTests.xml", this.getClass());
|
||||
MessageHandler handler = (MessageHandler) context.getBean("simpleHandler");
|
||||
Message<?> reply = handler.handle(new StringMessage("world"));
|
||||
assertEquals("hello world", reply.getPayload());
|
||||
ServiceInvoker invoker = (ServiceInvoker) context.getBean("simpleHandler");
|
||||
String reply = (String) invoker.invoke(new StringMessage("world"));
|
||||
assertEquals("hello world", reply);
|
||||
context.stop();
|
||||
}
|
||||
|
||||
@@ -303,49 +284,6 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
messageBus.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitterAnnotation() throws InterruptedException {
|
||||
MessageBus messageBus = new DefaultMessageBus();
|
||||
QueueChannel input = new QueueChannel();
|
||||
QueueChannel output = new QueueChannel();
|
||||
input.setBeanName("input");
|
||||
output.setBeanName("output");
|
||||
messageBus.registerChannel(input);
|
||||
messageBus.registerChannel(output);
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.afterPropertiesSet();
|
||||
SplitterAnnotationTestEndpoint endpoint = new SplitterAnnotationTestEndpoint();
|
||||
postProcessor.postProcessAfterInitialization(endpoint, "endpoint");
|
||||
messageBus.start();
|
||||
input.send(new StringMessage("this.is.a.test"));
|
||||
Message<?> message1 = output.receive(500);
|
||||
assertNotNull(message1);
|
||||
assertEquals("this", message1.getPayload());
|
||||
Message<?> message2 = output.receive(500);
|
||||
assertNotNull(message2);
|
||||
assertEquals("is", message2.getPayload());
|
||||
Message<?> message3 = output.receive(500);
|
||||
assertNotNull(message3);
|
||||
assertEquals("a", message3.getPayload());
|
||||
Message<?> message4 = output.receive(500);
|
||||
assertNotNull(message4);
|
||||
assertEquals("test", message4.getPayload());
|
||||
assertNull(output.receive(500));
|
||||
messageBus.stop();
|
||||
}
|
||||
|
||||
@Test(expected=ConfigurationException.class)
|
||||
public void testEndpointWithNoHandlerMethod() {
|
||||
MessageBus messageBus = new DefaultMessageBus();
|
||||
QueueChannel testChannel = new QueueChannel();
|
||||
testChannel.setBeanName("testChannel");
|
||||
messageBus.registerChannel(testChannel);
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.afterPropertiesSet();
|
||||
AnnotatedEndpointWithNoHandlerMethod endpoint = new AnnotatedEndpointWithNoHandlerMethod();
|
||||
postProcessor.postProcessAfterInitialization(endpoint, "endpoint");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndpointWithPollerAnnotation() {
|
||||
MessageBus messageBus = new DefaultMessageBus();
|
||||
@@ -356,7 +294,7 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
postProcessor.afterPropertiesSet();
|
||||
AnnotatedEndpointWithPolledAnnotation endpoint = new AnnotatedEndpointWithPolledAnnotation();
|
||||
postProcessor.postProcessAfterInitialization(endpoint, "testBean");
|
||||
DefaultEndpoint<?> processedEndpoint = (DefaultEndpoint<?>) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint");
|
||||
ServiceActivatorEndpoint processedEndpoint = (ServiceActivatorEndpoint) messageBus.lookupEndpoint("testBean.handler");
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(processedEndpoint);
|
||||
MessageSource<?> source = (MessageSource<?>) accessor.getPropertyValue("source");
|
||||
assertTrue(source instanceof SubscribableSource);
|
||||
@@ -395,19 +333,19 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandlerWithTransformers() {
|
||||
public void testTransformer() {
|
||||
MessageBus messageBus = new DefaultMessageBus();
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.afterPropertiesSet();
|
||||
HandlerWithTransformers testBean = new HandlerWithTransformers();
|
||||
MessageHandler handler = (MessageHandler) postProcessor.postProcessAfterInitialization(testBean, "testBean");
|
||||
Message<?> reply = handler.handle(new StringMessage("foo"));
|
||||
assertEquals("PRE.FOO.post", reply.getPayload());
|
||||
TransformerAnnotationTestBean testBean = new TransformerAnnotationTestBean();
|
||||
org.springframework.integration.transformer.Transformer transformer =
|
||||
(org.springframework.integration.transformer.Transformer) postProcessor.postProcessAfterInitialization(testBean, "testBean");
|
||||
Message<?> reply = transformer.transform(new StringMessage("foo"));
|
||||
assertEquals("FOO", reply.getPayload());
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint
|
||||
@ChannelAdapter("testChannel")
|
||||
private static class TargetAnnotationTestBean {
|
||||
|
||||
private String messageText;
|
||||
@@ -423,7 +361,7 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
return this.messageText;
|
||||
}
|
||||
|
||||
@MessageTarget
|
||||
@ChannelAdapter("testChannel")
|
||||
public void countdown(String input) {
|
||||
this.messageText = input;
|
||||
latch.countDown();
|
||||
@@ -431,7 +369,7 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="inputChannel")
|
||||
@MessageEndpoint
|
||||
private static class ChannelRegistryAwareTestBean implements ChannelRegistryAware {
|
||||
|
||||
private ChannelRegistry channelRegistry;
|
||||
@@ -444,7 +382,7 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
return this.channelRegistry;
|
||||
}
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="inputChannel")
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
@@ -455,7 +393,7 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="inputChannel", output="outputChannel")
|
||||
@MessageEndpoint
|
||||
private static interface SimpleAnnotatedEndpointInterface {
|
||||
String test(String input);
|
||||
}
|
||||
@@ -463,33 +401,18 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
|
||||
private static class SimpleAnnotatedEndpointImplementation implements SimpleAnnotatedEndpointInterface {
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="inputChannel", outputChannel="outputChannel")
|
||||
public String test(String input) {
|
||||
return "test-" + input;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="input", output="output")
|
||||
private static class SplitterAnnotationTestEndpoint {
|
||||
|
||||
@Splitter
|
||||
public String[] split(String input) {
|
||||
return input.split("\\.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="testChannel")
|
||||
private static class AnnotatedEndpointWithNoHandlerMethod {
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint(input="testChannel")
|
||||
@Poller(period=1234, initialDelay=5678, fixedRate=true, timeUnit=TimeUnit.SECONDS)
|
||||
@MessageEndpoint
|
||||
private static class AnnotatedEndpointWithPolledAnnotation {
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="testChannel")
|
||||
@Poller(period=1234, initialDelay=5678, fixedRate=true, timeUnit=TimeUnit.SECONDS)
|
||||
public String prependFoo(String s) {
|
||||
return "foo" + s;
|
||||
}
|
||||
@@ -507,30 +430,11 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
}
|
||||
|
||||
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Handler
|
||||
private static @interface CustomHandler {
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint
|
||||
private static class CustomHandlerAnnotatedBean {
|
||||
|
||||
@CustomHandler
|
||||
public String test(String s) {
|
||||
return s + s;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint
|
||||
@ChannelAdapter("testChannel")
|
||||
@Poller(period = 1000, initialDelay = 0, maxMessagesPerPoll = 1)
|
||||
private static class ChannelAdapterAnnotationTestBean {
|
||||
|
||||
@Pollable
|
||||
@ChannelAdapter("testChannel")
|
||||
@Poller(period = 1000, initialDelay = 0, maxMessagesPerPoll = 1)
|
||||
public String test() {
|
||||
return "test";
|
||||
}
|
||||
@@ -538,25 +442,12 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
|
||||
|
||||
@MessageEndpoint
|
||||
private static class HandlerWithTransformers {
|
||||
private static class TransformerAnnotationTestBean {
|
||||
|
||||
@Transformer
|
||||
@Order(-1)
|
||||
public String transformBefore(String input) {
|
||||
return "pre." + input;
|
||||
}
|
||||
|
||||
@Handler
|
||||
@Order(0)
|
||||
public String handle(String input) {
|
||||
return input.toUpperCase();
|
||||
}
|
||||
|
||||
@Transformer
|
||||
@Order(1)
|
||||
public String transformAfter(String input) {
|
||||
return input + ".post";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Copyright 2002-2008 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.config.annotation;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.annotation.Router;
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.bus.MessageBus;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
public class RouterAnnotationPostProcessorTests {
|
||||
|
||||
private MessageBus messageBus;
|
||||
|
||||
private DirectChannel inputChannel;
|
||||
|
||||
private QueueChannel outputChannel;
|
||||
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
inputChannel = new DirectChannel();
|
||||
outputChannel = new QueueChannel();
|
||||
inputChannel.setBeanName("input");
|
||||
outputChannel.setBeanName("output");
|
||||
messageBus = new DefaultMessageBus();
|
||||
messageBus.registerChannel(inputChannel);
|
||||
messageBus.registerChannel(outputChannel);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRouter() {
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.afterPropertiesSet();
|
||||
messageBus.start();
|
||||
TestRouter testRouter = new TestRouter();
|
||||
postProcessor.postProcessAfterInitialization(testRouter, "test");
|
||||
inputChannel.send(new StringMessage("foo"));
|
||||
Message<?> replyMessage = outputChannel.receive(0);
|
||||
assertEquals("foo", replyMessage.getPayload());
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint
|
||||
public static class TestRouter {
|
||||
|
||||
@Router(inputChannel="input", defaultOutputChannel="output")
|
||||
public String test(String s) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -23,10 +23,10 @@ import org.springframework.integration.endpoint.annotation.ITestEndpoint;
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
@MessageEndpoint(input="inputChannel", output="outputChannel")
|
||||
@MessageEndpoint
|
||||
public class SimpleAnnotatedEndpoint implements ITestEndpoint {
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="inputChannel", outputChannel="outputChannel")
|
||||
public String sayHello(String name) {
|
||||
return "hello " + name;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Copyright 2002-2008 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.config.annotation;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.annotation.Splitter;
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.bus.MessageBus;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
public class SplitterAnnotationPostProcessorTests {
|
||||
|
||||
private MessageBus messageBus;
|
||||
|
||||
private DirectChannel inputChannel;
|
||||
|
||||
private QueueChannel outputChannel;
|
||||
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
inputChannel = new DirectChannel();
|
||||
outputChannel = new QueueChannel();
|
||||
inputChannel.setBeanName("input");
|
||||
outputChannel.setBeanName("output");
|
||||
messageBus = new DefaultMessageBus();
|
||||
messageBus.registerChannel(inputChannel);
|
||||
messageBus.registerChannel(outputChannel);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSplitterAnnotation() throws InterruptedException {
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.afterPropertiesSet();
|
||||
TestSplitter splitter = new TestSplitter();
|
||||
postProcessor.postProcessAfterInitialization(splitter, "testSplitter");
|
||||
messageBus.start();
|
||||
inputChannel.send(new StringMessage("this.is.a.test"));
|
||||
Message<?> message1 = outputChannel.receive(500);
|
||||
assertNotNull(message1);
|
||||
assertEquals("this", message1.getPayload());
|
||||
Message<?> message2 = outputChannel.receive(500);
|
||||
assertNotNull(message2);
|
||||
assertEquals("is", message2.getPayload());
|
||||
Message<?> message3 = outputChannel.receive(500);
|
||||
assertNotNull(message3);
|
||||
assertEquals("a", message3.getPayload());
|
||||
Message<?> message4 = outputChannel.receive(500);
|
||||
assertNotNull(message4);
|
||||
assertEquals("test", message4.getPayload());
|
||||
assertNull(outputChannel.receive(0));
|
||||
messageBus.stop();
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint
|
||||
public static class TestSplitter {
|
||||
|
||||
@Splitter(inputChannel="input", outputChannel="output")
|
||||
public String[] split(String s) {
|
||||
return s.split("\\.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -28,18 +28,16 @@ import org.springframework.integration.annotation.CompletionStrategy;
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
@MessageEndpoint(input="inputChannel")
|
||||
@Component("endpointWithDefaultAnnotationAndCustomCompletionStrategy")
|
||||
@MessageEndpoint("endpointWithDefaultAnnotationAndCustomCompletionStrategy")
|
||||
public class TestAnnotatedEndpointWithCompletionStrategy {
|
||||
|
||||
private final ConcurrentMap<Object, Message<?>> aggregatedMessages = new ConcurrentHashMap<Object, Message<?>>();
|
||||
|
||||
@Aggregator
|
||||
@Aggregator(inputChannel = "inputChannel")
|
||||
public Message<?> aggregatingMethod(List<Message<?>> messages) {
|
||||
List<Message<?>> sortableList = new ArrayList<Message<?>>(messages);
|
||||
Collections.sort(sortableList, new MessageSequenceComparator());
|
||||
|
||||
@@ -1,38 +0,0 @@
|
||||
/*
|
||||
* Copyright 2002-2008 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.config.annotation;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.integration.annotation.CompletionStrategy;
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
@MessageEndpoint(input="inputChannel")
|
||||
@Component("endpointWithoutAggregatorAndWithCompletionStrategy")
|
||||
public class TestAnnotatedEndpointWithCompletionStrategyOnly {
|
||||
|
||||
@CompletionStrategy
|
||||
public boolean checkCompleteness(List<Message<?>> messages) {
|
||||
throw new UnsupportedOperationException("Not intended to be called");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.springframework.integration.aggregator.MessageSequenceComparator;
|
||||
import org.springframework.integration.annotation.Aggregator;
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -32,13 +31,15 @@ import org.springframework.stereotype.Component;
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
@MessageEndpoint(input = "inputChannel", output = "outputChannel")
|
||||
@Component("endpointWithCustomizedAnnotation")
|
||||
public class TestAnnotatedEndpointWithCustomizedAggregator {
|
||||
|
||||
private final ConcurrentMap<Object, Message<?>> aggregatedMessages = new ConcurrentHashMap<Object, Message<?>>();
|
||||
|
||||
@Aggregator(discardChannel = "discardChannel",
|
||||
@Aggregator(
|
||||
inputChannel = "inputChannel",
|
||||
outputChannel = "outputChannel",
|
||||
discardChannel = "discardChannel",
|
||||
reaperInterval = 1234, sendPartialResultsOnTimeout = true,
|
||||
sendTimeout = 98765432, timeout = 4567890, trackedCorrelationIdCapacity = 42)
|
||||
public Message<?> aggregatingMethod(List<Message<?>> messages) {
|
||||
|
||||
@@ -27,18 +27,16 @@ import org.springframework.integration.annotation.Aggregator;
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
@MessageEndpoint(input="inputChannel")
|
||||
@Component("endpointWithDefaultAnnotation")
|
||||
@MessageEndpoint("endpointWithDefaultAnnotation")
|
||||
public class TestAnnotatedEndpointWithDefaultAggregator {
|
||||
|
||||
private final ConcurrentMap<Object, Message<?>> aggregatedMessages = new ConcurrentHashMap<Object, Message<?>>();
|
||||
|
||||
@Aggregator
|
||||
@Aggregator(inputChannel="inputChannel")
|
||||
public Message<?> aggregatingMethod(List<Message<?>> messages) {
|
||||
List<Message<?>> sortableList = new ArrayList<Message<?>>(messages);
|
||||
Collections.sort(sortableList, new MessageSequenceComparator());
|
||||
|
||||
@@ -22,10 +22,10 @@ import org.springframework.integration.annotation.MessageEndpoint;
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
@MessageEndpoint(input="inputChannel", output="outputChannel")
|
||||
@MessageEndpoint
|
||||
public class TypeConvertingTestEndpoint {
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="inputChannel", outputChannel="outputChannel")
|
||||
public int multiplyByTwo(int number) {
|
||||
return number * 2;
|
||||
}
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans:beans xmlns="http://www.springframework.org/schema/integration"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
|
||||
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
|
||||
http://www.springframework.org/schema/integration
|
||||
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd
|
||||
http://www.springframework.org/schema/context
|
||||
http://www.springframework.org/schema/context/spring-context-2.5.xsd">
|
||||
|
||||
<message-bus/>
|
||||
|
||||
<annotation-driven/>
|
||||
|
||||
<channel id="inputChannel"/>
|
||||
|
||||
<channel id="replyChannel"/>
|
||||
|
||||
<channel id="discardChannel"/>
|
||||
|
||||
<context:component-scan base-package="org.springframework.integration.config" use-default-filters="false">
|
||||
<context:include-filter type="regex"
|
||||
expression="org\.springframework\.integration\.config\.annotation\.TestAnnotatedEndpointWithCompletionStrategyOnly"/>
|
||||
</context:component-scan>
|
||||
|
||||
</beans:beans>
|
||||
@@ -24,10 +24,10 @@ import org.springframework.integration.message.StringMessage;
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
@MessageEndpoint(input="inputChannel", output="outputChannel")
|
||||
@MessageEndpoint
|
||||
public class MessageParameterAnnotatedEndpoint {
|
||||
|
||||
@Handler
|
||||
@Handler(inputChannel="inputChannel", outputChannel="outputChannel")
|
||||
public StringMessage sayHello(Message<?> message) {
|
||||
return new StringMessage("hello " + message.getPayload());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user