diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java
index 228ca2454f..9eaea8ae29 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java
@@ -32,55 +32,52 @@ import org.springframework.integration.core.MessagePriority;
*/
public class PriorityChannel extends QueueChannel {
- private static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
-
-
private final Semaphore semaphore;
/**
- * Create a channel with the specified queue capacity.
- * Priority will be determined by the provided {@link Comparator}.
+ * Create a channel with the specified queue capacity. If the capacity
+ * is a non-positive value, the queue will be unbounded. Message priority
+ * will be determined by the provided {@link Comparator}. If the comparator
+ * is null, the priority will be based upon the value of
+ * {@link MessageHeader#getPriority()}.
*/
public PriorityChannel(int capacity, Comparator> comparator) {
- super(new PriorityBlockingQueue>(11, comparator));
- this.semaphore = new Semaphore(capacity, true);
+ super(new PriorityBlockingQueue>(11,
+ (comparator != null) ? comparator : new MessagePriorityComparator()));
+ this.semaphore = (capacity > 0) ? new Semaphore(capacity, true) : null;
}
/**
- * Create a channel with the specified queue capacity.
- * Priority will be based upon the value of {@link MessageHeader#getPriority()}.
+ * Create a channel with the specified queue capacity. Message priority
+ * will be based upon the value of {@link MessageHeader#getPriority()}.
*/
public PriorityChannel(int capacity) {
- this(capacity, new MessagePriorityComparator());
+ this(capacity, null);
}
/**
- * Create a channel with the default queue capacity of {@link Integer#MAX_VALUE}.
- * Priority will be determined by the provided {@link Comparator}.
+ * Create a channel with an unbounded queue. Message priority will be
+ * determined by the provided {@link Comparator}. If the comparator
+ * is null, the priority will be based upon the value of
+ * {@link MessageHeader#getPriority()}.
*/
public PriorityChannel(Comparator> comparator) {
- this(DEFAULT_MAX_CAPACITY, comparator);
+ this(0, comparator);
}
/**
- * Create a channel with the default queue capacity of {@link Integer#MAX_VALUE}.
- * Priority will be based on the value of {@link MessageHeader#getPriority()}.
+ * Create a channel with an unbounded queue. Message priority will be
+ * based on the value of {@link MessageHeader#getPriority()}.
*/
public PriorityChannel() {
- this(DEFAULT_MAX_CAPACITY, new MessagePriorityComparator());
+ this(0, null);
}
@Override
protected boolean doSend(Message> message, long timeout) {
- try {
- if (!this.semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
- return false;
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (!acquirePermitIfNecessary(timeout)) {
return false;
}
return super.doSend(message, 0);
@@ -90,12 +87,31 @@ public class PriorityChannel extends QueueChannel {
protected Message> doReceive(long timeout) {
Message> message = super.doReceive(timeout);
if (message != null) {
- this.semaphore.release();
+ this.releasePermitIfNecessary();
return message;
}
return null;
}
+ private boolean acquirePermitIfNecessary(long timeoutInMilliseconds) {
+ if (this.semaphore != null) {
+ try {
+ return this.semaphore.tryAcquire(timeoutInMilliseconds, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void releasePermitIfNecessary() {
+ if (this.semaphore != null) {
+ this.semaphore.release();
+ }
+ }
+
private static class MessagePriorityComparator implements Comparator> {
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java
index e52a0cbcb6..ce275cf147 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java
@@ -99,6 +99,20 @@ public class PriorityChannelTests {
assertEquals("test-LOW", channel.receive(0).getPayload());
}
+ @Test
+ public void testUnboundedCapacity() {
+ PriorityChannel channel = new PriorityChannel();
+ Message> highPriority = createPriorityMessage(MessagePriority.HIGH);
+ Message> lowPriority = createPriorityMessage(MessagePriority.LOW);
+ Message> nullPriority = new StringMessage("test-NULL");
+ channel.send(lowPriority);
+ channel.send(highPriority);
+ channel.send(nullPriority);
+ assertEquals("test-HIGH", channel.receive(0).getPayload());
+ assertEquals("test-NULL", channel.receive(0).getPayload());
+ assertEquals("test-LOW", channel.receive(0).getPayload());
+ }
+
private static Message createPriorityMessage(MessagePriority priority) {
return MessageBuilder.withPayload("test-" + priority).setPriority(priority).build();