diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/RetrievalBlockingMessageStore.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/RetrievalBlockingMessageStore.java
deleted file mode 100644
index 738a09cb7e..0000000000
--- a/org.springframework.integration/src/main/java/org/springframework/integration/message/RetrievalBlockingMessageStore.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Copyright 2002-2008 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.integration.message;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.springframework.util.Assert;
-
-/**
- * A {@link MessageStore} implementation whose get and
- * remove methods block until a message is available.
- *
- * Alternative methods that accept an explicit timeout value are also available.
- *
- * @author Mark Fisher
- */
-public class RetrievalBlockingMessageStore implements MessageStore {
-
- private final MessageStore targetMessageStore;
-
- private final ConcurrentMap>> listeners =
- new ConcurrentHashMap>>();
-
- private final Object listenerMonitor = new Object();
-
-
- /**
- * Create a wrapper for the provided {@link MessageStore} so that its
- * retrieval methods will block.
- *
- * @param messageStore the MessageStore instance to wrap
- */
- public RetrievalBlockingMessageStore(MessageStore messageStore) {
- Assert.notNull(messageStore, "messageStore must not be null");
- Assert.isTrue(!(messageStore instanceof RetrievalBlockingMessageStore),
- "target MessageStore must not be an instance of '" + this.getClass().getName() + "'");
- this.targetMessageStore = messageStore;
- }
-
- /**
- * Create a wrapper for a {@link SimpleMessageStore} so that its
- * retrieval methods will block.
- *
- * @param capacity the capacity of the MessageStore
- */
- public RetrievalBlockingMessageStore(int capacity) {
- this.targetMessageStore = new SimpleMessageStore(capacity);
- }
-
-
- public int size() {
- return this.targetMessageStore.size();
- }
-
- public Message> put(Object key, Message> message) {
- Message> previousMessage = this.targetMessageStore.put(key, message);
- boolean sentReply = false;
- List> listenerList = null;
- synchronized (this.listenerMonitor) {
- listenerList = this.listeners.remove(key);
- }
- if (listenerList != null) {
- for (int i = 0; i < listenerList.size(); i++) {
- Queue queue = listenerList.get(i);
- if (!sentReply) {
- sentReply = queue.offer(new MessageHolder(message));
- }
- else {
- queue.offer(new MessageHolder(null));
- }
- }
- }
- return previousMessage;
- }
-
- public Message> get(Object key) {
- return this.get(key, -1);
- }
-
- public Message> get(Object key, long timeout) {
- Message> message = this.targetMessageStore.get(key);
- return (message != null) ? message : waitForMessage(key, timeout, false);
- }
-
- public List> list() {
- return this.targetMessageStore.list();
- }
-
- public Message> remove(Object key) {
- return this.remove(key, -1);
- }
-
- public Message> remove(Object key, long timeout) {
- Message> message = this.targetMessageStore.remove(key);
- return (message != null) ? message : waitForMessage(key, timeout, true);
- }
-
- private Message> waitForMessage(Object key, long timeout, boolean shouldRemove) {
- Message> message = null;
- SynchronousQueue queue = new SynchronousQueue();
- synchronized (this.listenerMonitor) {
- List> listenerList = this.listeners.get(key);
- if (listenerList == null) {
- listenerList = new LinkedList>();
- this.listeners.put(key, listenerList);
- }
- listenerList.add(queue);
- }
- try {
- MessageHolder holder = (timeout < 0) ? queue.take() : queue.poll(timeout, TimeUnit.MILLISECONDS);
- if (holder != null) {
- message = holder.getMessage();
- if (message != null && shouldRemove) {
- return this.targetMessageStore.remove(key);
- }
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- finally {
- synchronized (this.listenerMonitor) {
- List> listenerList = this.listeners.get(key);
- if (listenerList != null) {
- listenerList.remove(queue);
- if (listenerList.size() == 0) {
- this.listeners.remove(key);
- }
- }
- }
- }
- return message;
- }
-
-
- /**
- * A wrapper class to enable null messages in the queue.
- */
- private static class MessageHolder {
-
- private final Message> message;
-
- MessageHolder(Message> message) {
- this.message = message;
- }
-
- Message> getMessage() {
- return this.message;
- }
- }
-
-}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/RetrievalBlockingMessageStoreTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/RetrievalBlockingMessageStoreTests.java
deleted file mode 100644
index ac26e11051..0000000000
--- a/org.springframework.integration/src/test/java/org/springframework/integration/message/RetrievalBlockingMessageStoreTests.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2002-2008 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.integration.message;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import java.util.concurrent.Executors;
-
-import org.junit.Test;
-
-/**
- * @author Mark Fisher
- */
-public class RetrievalBlockingMessageStoreTests {
-
- @Test
- public void testGetWithElapsedTimeout() {
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(10);
- publishWithDelay(store, "foo", "bar", 2000);
- Message> message = store.get("foo", 5);
- assertNull(message);
- }
-
- @Test
- public void testWrappedTargetGetWithElapsedTimeout() {
- MessageStore target = new SimpleMessageStore(10);
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(target);
- publishWithDelay(store, "foo", "bar", 2000);
- Message> message = store.get("foo", 5);
- assertNull(message);
- }
-
- @Test
- public void testGetWithinTimeout() {
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(10);
- publishWithDelay(store, "foo", "bar", 50);
- Message> message = store.get("foo", 1000);
- assertNotNull(message);
- assertEquals("bar", message.getPayload());
- assertNotNull(store.get("foo", 0));
- }
-
- @Test
- public void testWrappedTargetGetWithinTimeout() {
- MessageStore target = new SimpleMessageStore(10);
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(target);
- publishWithDelay(store, "foo", "bar", 50);
- Message> message = store.get("foo", 1000);
- assertNotNull(message);
- assertEquals("bar", message.getPayload());
- assertNotNull(store.get("foo", 0));
- }
-
- @Test
- public void testRemoveWithElapsedTimeout() {
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(10);
- publishWithDelay(store, "foo", "bar", 1000);
- Message> message = store.remove("foo", 5);
- assertNull(message);
- }
-
- @Test
- public void testWrappedMessageStoreRemoveWithElapsedTimeout() {
- MessageStore target = new SimpleMessageStore(10);
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(target);
- publishWithDelay(store, "foo", "bar", 1000);
- Message> message = store.remove("foo", 5);
- assertNull(message);
- }
-
- @Test
- public void testRemoveWithinTimeout() {
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(10);
- publishWithDelay(store, "foo", "bar", 50);
- Message> message = store.remove("foo", 1000);
- assertNotNull(message);
- assertEquals("bar", message.getPayload());
- assertNull(store.get("foo", 0));
- }
-
- @Test
- public void testWrappedMessageStoreRemoveWithinTimeout() {
- MessageStore target = new SimpleMessageStore(10);
- final RetrievalBlockingMessageStore store = new RetrievalBlockingMessageStore(target);
- publishWithDelay(store, "foo", "bar", 50);
- Message> message = store.remove("foo", 1000);
- assertNotNull(message);
- assertEquals("bar", message.getPayload());
- assertNull(store.get("foo", 0));
- }
-
-
- private static void publishWithDelay(final MessageStore store, final String key, final String value, final long delay) {
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- public void run() {
- try {
- Thread.sleep(delay);
- }
- catch (InterruptedException e) {}
- store.put(key, new StringMessage(value));
- }
- });
- }
-
-}