From 0ef08a42babd74bfc8874ca7d0fb5770f5fd3e4a Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Wed, 12 Nov 2008 16:56:09 +0000 Subject: [PATCH] PriorityChannel now uses a null semaphore to indicate an unbounded queue rather than relying on Integer.MAX_VALUE. --- .../integration/channel/PriorityChannel.java | 64 ++++++++++++------- .../channel/PriorityChannelTests.java | 14 ++++ 2 files changed, 54 insertions(+), 24 deletions(-) diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java index 228ca2454f..9eaea8ae29 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java @@ -32,55 +32,52 @@ import org.springframework.integration.core.MessagePriority; */ public class PriorityChannel extends QueueChannel { - private static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE; - - private final Semaphore semaphore; /** - * Create a channel with the specified queue capacity. - * Priority will be determined by the provided {@link Comparator}. + * Create a channel with the specified queue capacity. If the capacity + * is a non-positive value, the queue will be unbounded. Message priority + * will be determined by the provided {@link Comparator}. If the comparator + * is null, the priority will be based upon the value of + * {@link MessageHeader#getPriority()}. */ public PriorityChannel(int capacity, Comparator> comparator) { - super(new PriorityBlockingQueue>(11, comparator)); - this.semaphore = new Semaphore(capacity, true); + super(new PriorityBlockingQueue>(11, + (comparator != null) ? comparator : new MessagePriorityComparator())); + this.semaphore = (capacity > 0) ? new Semaphore(capacity, true) : null; } /** - * Create a channel with the specified queue capacity. - * Priority will be based upon the value of {@link MessageHeader#getPriority()}. + * Create a channel with the specified queue capacity. Message priority + * will be based upon the value of {@link MessageHeader#getPriority()}. */ public PriorityChannel(int capacity) { - this(capacity, new MessagePriorityComparator()); + this(capacity, null); } /** - * Create a channel with the default queue capacity of {@link Integer#MAX_VALUE}. - * Priority will be determined by the provided {@link Comparator}. + * Create a channel with an unbounded queue. Message priority will be + * determined by the provided {@link Comparator}. If the comparator + * is null, the priority will be based upon the value of + * {@link MessageHeader#getPriority()}. */ public PriorityChannel(Comparator> comparator) { - this(DEFAULT_MAX_CAPACITY, comparator); + this(0, comparator); } /** - * Create a channel with the default queue capacity of {@link Integer#MAX_VALUE}. - * Priority will be based on the value of {@link MessageHeader#getPriority()}. + * Create a channel with an unbounded queue. Message priority will be + * based on the value of {@link MessageHeader#getPriority()}. */ public PriorityChannel() { - this(DEFAULT_MAX_CAPACITY, new MessagePriorityComparator()); + this(0, null); } @Override protected boolean doSend(Message message, long timeout) { - try { - if (!this.semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)) { - return false; - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); + if (!acquirePermitIfNecessary(timeout)) { return false; } return super.doSend(message, 0); @@ -90,12 +87,31 @@ public class PriorityChannel extends QueueChannel { protected Message doReceive(long timeout) { Message message = super.doReceive(timeout); if (message != null) { - this.semaphore.release(); + this.releasePermitIfNecessary(); return message; } return null; } + private boolean acquirePermitIfNecessary(long timeoutInMilliseconds) { + if (this.semaphore != null) { + try { + return this.semaphore.tryAcquire(timeoutInMilliseconds, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + return true; + } + + private void releasePermitIfNecessary() { + if (this.semaphore != null) { + this.semaphore.release(); + } + } + private static class MessagePriorityComparator implements Comparator> { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java index e52a0cbcb6..ce275cf147 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java @@ -99,6 +99,20 @@ public class PriorityChannelTests { assertEquals("test-LOW", channel.receive(0).getPayload()); } + @Test + public void testUnboundedCapacity() { + PriorityChannel channel = new PriorityChannel(); + Message highPriority = createPriorityMessage(MessagePriority.HIGH); + Message lowPriority = createPriorityMessage(MessagePriority.LOW); + Message nullPriority = new StringMessage("test-NULL"); + channel.send(lowPriority); + channel.send(highPriority); + channel.send(nullPriority); + assertEquals("test-HIGH", channel.receive(0).getPayload()); + assertEquals("test-NULL", channel.receive(0).getPayload()); + assertEquals("test-LOW", channel.receive(0).getPayload()); + } + private static Message createPriorityMessage(MessagePriority priority) { return MessageBuilder.withPayload("test-" + priority).setPriority(priority).build();