From c060e68fbce65a080f252fd55a4dba61d578402b Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Tue, 19 Aug 2008 02:02:57 +0000 Subject: [PATCH] Added initial implementation of messaging bridge. --- .../integration/endpoint/MessagingBridge.java | 41 ++++++++++++ .../endpoint/MessagingBridgeTests.java | 66 +++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingBridge.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingBridge.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingBridge.java new file mode 100644 index 0000000000..3172b2bab6 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingBridge.java @@ -0,0 +1,41 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import org.springframework.integration.message.Message; + +/** + * @author Mark Fisher + */ +public class MessagingBridge extends AbstractRequestReplyEndpoint { + + @Override + protected Message handleRequestMessage(Message requestMessage) { + return requestMessage; + } + + @Override + protected boolean isValidReplyMessage(Message replyMessage) { + return replyMessage != null && replyMessage.getPayload() != null; + } + + @Override + protected void sendReplyMessage(Message replyMessage, Message requestMessage) { + this.getMessageExchangeTemplate().send(replyMessage, this.getTarget()); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java new file mode 100644 index 0000000000..51d64b1ce5 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java @@ -0,0 +1,66 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.springframework.integration.bus.DefaultMessageBus; +import org.springframework.integration.dispatcher.PollingDispatcher; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.PollableSource; +import org.springframework.integration.message.StringMessage; +import org.springframework.integration.scheduling.PollingSchedule; + +/** + * @author Mark Fisher + */ +public class MessagingBridgeTests { + + @Test + public void simplePassThrough() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + DefaultMessageBus bus = new DefaultMessageBus(); + MessagingBridge bridge = new MessagingBridge(); + bridge.setBeanName("bridge"); + PollableSource source = new PollableSource() { + public Message receive() { + return new StringMessage("test"); + } + }; + PollingDispatcher poller = new PollingDispatcher(source, new PollingSchedule(1000)); + poller.setMaxMessagesPerPoll(1); + bridge.setSource(poller); + bridge.setTarget(new MessageTarget() { + public boolean send(Message message) { + latch.countDown(); + return true; + } + }); + bus.registerEndpoint(bridge); + bus.start(); + latch.await(1, TimeUnit.SECONDS); + bus.stop(); + assertEquals(0, latch.getCount()); + } + +}