merged r2368 changes for AbstractDispatcher's handler set
This commit is contained in:
@@ -16,7 +16,7 @@
|
||||
|
||||
package org.springframework.integration.dispatcher;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@@ -25,17 +25,15 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.integration.message.MessageHandler;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Base class for {@link MessageDispatcher} implementations.
|
||||
* <p>
|
||||
* The subclasses implement the actual dispatching strategy, but this base
|
||||
* class manages the registration of {@link MessageHandler}s. Although the
|
||||
* implemented dispatching strategies may invoke handles in different ways
|
||||
* (e.g. round-robin vs. failover), this class does maintain the order of the
|
||||
* underlying collection. See the {@link OrderedAwareLinkedHashSet} for more
|
||||
* detail.
|
||||
* The subclasses implement the actual dispatching strategy, but this base class
|
||||
* manages the registration of {@link MessageHandler}s. Although the implemented
|
||||
* dispatching strategies may invoke handles in different ways (e.g. round-robin
|
||||
* vs. failover), this class does maintain the order of the underlying
|
||||
* collection. See the {@link OrderedAwareLinkedHashSet} for more detail.
|
||||
*
|
||||
* @author Mark Fisher
|
||||
* @author Iwein Fuld
|
||||
@@ -49,25 +47,34 @@ public abstract class AbstractDispatcher implements MessageDispatcher {
|
||||
|
||||
|
||||
/**
|
||||
* Returns a copied, unmodifiable List of this dispatcher's handlers.
|
||||
* This is provided for access by subclasses.
|
||||
* Returns a copied, unmodifiable List of this dispatcher's handlers. This
|
||||
* is provided for access by subclasses.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected List<MessageHandler> getHandlers() {
|
||||
return Collections.unmodifiableList(new ArrayList(this.handlers));
|
||||
return Collections.<MessageHandler>unmodifiableList(Arrays.<MessageHandler>asList(
|
||||
this.handlers.toArray(new MessageHandler[this.handlers.size()])));
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the handler to the internal Set.
|
||||
*
|
||||
* @return the result of {@link Set#add(Object)}
|
||||
*/
|
||||
public boolean addHandler(MessageHandler handler) {
|
||||
return this.handlers.add(handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the handler from the internal handler Set.
|
||||
*
|
||||
* @return the result of {@link Set#remove(Object)}
|
||||
*/
|
||||
public boolean removeHandler(MessageHandler handler) {
|
||||
return this.handlers.remove(handler);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
String handlerList = StringUtils.collectionToCommaDelimitedString(this.handlers);
|
||||
return this.getClass().getSimpleName() + " with handlers: " + handlerList;
|
||||
return this.getClass().getSimpleName() + " with handlers: " + this.handlers.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -20,14 +20,16 @@ import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.springframework.core.OrderComparator;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Special Set that maintains the following semantics:
|
||||
@@ -39,8 +41,12 @@ import org.springframework.util.CollectionUtils;
|
||||
* order but themselves do not equal to one another the more recent addition will be placed to the
|
||||
* right of (appended next to) the existing element with the same order, thus preserving the order
|
||||
* of the insertion and maintaining {@link LinkedHashSet} semantics for the un-ordered elements.
|
||||
* <p>
|
||||
* The class is package-protected and only intended for use by the AbstractDispatcher. It
|
||||
* <emphasis>must</emphasis> enforce safe concurrent access for all usage by the dispatcher.
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Mark Fisher
|
||||
* @since 1.0.3
|
||||
*/
|
||||
@SuppressWarnings({"unchecked", "serial"})
|
||||
@@ -48,7 +54,11 @@ class OrderedAwareLinkedHashSet<E> extends LinkedHashSet<E> {
|
||||
|
||||
private final OrderComparator comparator = new OrderComparator();
|
||||
|
||||
private final Lock lock = new ReentrantLock();
|
||||
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
|
||||
private final ReadLock readLock = rwl.readLock();
|
||||
|
||||
private final WriteLock writeLock = rwl.writeLock();
|
||||
|
||||
|
||||
/**
|
||||
@@ -58,17 +68,19 @@ class OrderedAwareLinkedHashSet<E> extends LinkedHashSet<E> {
|
||||
*/
|
||||
public boolean add(E o) {
|
||||
Assert.notNull(o,"Can not add NULL object");
|
||||
lock.lock();
|
||||
writeLock.lock();
|
||||
try {
|
||||
boolean present = false;
|
||||
if (o instanceof Ordered){
|
||||
present = this.addOrderedElement((Ordered) o);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
present = super.add(o);
|
||||
}
|
||||
return present;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,14 +89,15 @@ class OrderedAwareLinkedHashSet<E> extends LinkedHashSet<E> {
|
||||
*/
|
||||
public boolean addAll(Collection<? extends E> c) {
|
||||
Assert.notNull(c,"Can not merge with NULL set");
|
||||
lock.lock();
|
||||
writeLock.lock();
|
||||
try {
|
||||
for (E object : c) {
|
||||
this.add(object);
|
||||
}
|
||||
return true;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,11 +105,12 @@ class OrderedAwareLinkedHashSet<E> extends LinkedHashSet<E> {
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public boolean remove(Object o) {
|
||||
lock.lock();
|
||||
writeLock.lock();
|
||||
try {
|
||||
return super.remove(o);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,39 +121,65 @@ class OrderedAwareLinkedHashSet<E> extends LinkedHashSet<E> {
|
||||
if (CollectionUtils.isEmpty(c)){
|
||||
return false;
|
||||
}
|
||||
lock.lock();
|
||||
writeLock.lock();
|
||||
try {
|
||||
return super.removeAll(c);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
readLock.lock();
|
||||
try {
|
||||
return super.toArray(a);
|
||||
}
|
||||
finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return StringUtils.collectionToCommaDelimitedString(this);
|
||||
}
|
||||
finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean addOrderedElement(Ordered adding) {
|
||||
boolean added = false;
|
||||
E[] tempUnorderedElements = (E[]) this.toArray();
|
||||
if (super.contains(adding)){
|
||||
if (super.contains(adding)) {
|
||||
return false;
|
||||
}
|
||||
super.clear();
|
||||
|
||||
if (tempUnorderedElements.length == 0){
|
||||
if (tempUnorderedElements.length == 0) {
|
||||
added = super.add((E) adding);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
Set tempSet = new LinkedHashSet();
|
||||
for (E current : tempUnorderedElements) {
|
||||
if (current instanceof Ordered) {
|
||||
if (this.comparator.compare(adding, current) < 0) {
|
||||
added = super.add((E) adding);
|
||||
super.add(current);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
super.add(current);
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
tempSet.add(current);
|
||||
}
|
||||
}
|
||||
if (!added){
|
||||
if (!added) {
|
||||
added = super.add((E) adding);
|
||||
}
|
||||
for (Object object : tempSet) {
|
||||
|
||||
Reference in New Issue
Block a user