AMQP-160, AMQP-161: Custom exchanges and exchange bindings

This commit is contained in:
Dave Syer
2011-04-18 14:46:17 +01:00
parent a708c6ce36
commit 0cab85b927
5 changed files with 44 additions and 37 deletions

View File

@@ -16,9 +16,9 @@ package org.springframework.amqp.core;
import java.util.Map;
/**
* Simple container collecting information to describe a binding. Takes String destination and Exchange source instances
* as arguments to facilitate wiring using code based configuration. Can be used in conjunction with {@link AmqpAdmin},
* or created via a {@link BindingBuilder}.
* Simple container collecting information to describe a binding. Takes String destination and exchange names as
* arguments to facilitate wiring using code based configuration. Can be used in conjunction with {@link AmqpAdmin}, or
* created via a {@link BindingBuilder}.
*
* @author Mark Pollack
* @author Mark Fisher
@@ -28,7 +28,9 @@ import java.util.Map;
*/
public class Binding {
public static final String QUEUE_TYPE = "queue";
public static enum DestinationType {
QUEUE, EXCHANGE;
}
private final String destination;
@@ -38,13 +40,10 @@ public class Binding {
private final Map<String, Object> arguments;
private final String destinationType;
private final DestinationType destinationType;
// public Binding(String destination, String exchange, String routingKey, Map<String, Object> arguments) {
// this(destination, QUEUE_TYPE, exchange, routingKey, arguments);
// }
//
public Binding(String destination, String destinationType, String exchange, String routingKey, Map<String, Object> arguments) {
public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
Map<String, Object> arguments) {
this.destination = destination;
this.destinationType = destinationType;
this.exchange = exchange;
@@ -56,7 +55,7 @@ public class Binding {
return this.destination;
}
public String getDestinationType() {
public DestinationType getDestinationType() {
return this.destinationType;
}
@@ -72,4 +71,8 @@ public class Binding {
return this.arguments;
}
public boolean isDestinationQueue() {
return DestinationType.QUEUE.equals(destinationType);
}
}

View File

@@ -17,6 +17,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.util.Assert;
/**
@@ -29,19 +30,19 @@ import org.springframework.util.Assert;
public final class BindingBuilder {
public static DestinationConfigurer bind(Queue queue) {
return new DestinationConfigurer(queue.getName(), "queue");
return new DestinationConfigurer(queue.getName(), DestinationType.QUEUE);
}
public static DestinationConfigurer bind(Exchange exchange) {
return new DestinationConfigurer(exchange.getName(), "exchange");
return new DestinationConfigurer(exchange.getName(), DestinationType.EXCHANGE);
}
public static class DestinationConfigurer {
protected final String name;
protected final String type;
protected final DestinationType type;
private DestinationConfigurer(String name, String type) {
private DestinationConfigurer(String name, DestinationType type) {
this.name = name;
this.type = type;
}

View File

@@ -60,26 +60,17 @@ public class BindingBuilderTests {
return "x-custom";
}
}
;
Binding binding = BindingBuilder.bind(new Queue("q")).to(new CustomExchange("f")).with("r")
.and(Collections.<String, Object> singletonMap("k", new Object()));
Binding binding = BindingBuilder.//
bind(new Queue("q")).//
to(new CustomExchange("f")).//
with("r").//
and(Collections.<String, Object> singletonMap("k", new Object()));
assertNotNull(binding);
}
@Test
public void exchangeBinding() {
class CustomExchange extends AbstractExchange {
public CustomExchange(String name) {
super(name);
}
@Override
public String getType() {
return "x-custom";
}
}
;
Binding binding = BindingBuilder.bind(new CustomExchange("q")).to(new FanoutExchange("f"));
Binding binding = BindingBuilder.bind(new DirectExchange("q")).to(new FanoutExchange("f"));
assertNotNull(binding);
}

View File

@@ -177,8 +177,13 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Initiali
public void removeBinding(final Binding binding) {
rabbitTemplate.execute(new ChannelCallback<Object>() {
public Object doInRabbit(Channel channel) throws Exception {
channel.queueUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
if (binding.isDestinationQueue()) {
channel.queueUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
} else {
channel.exchangeUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
}
return null;
}
});
@@ -331,11 +336,17 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Initiali
private void declareBindings(final Channel channel, final Binding... bindings) throws IOException {
for (Binding binding : bindings) {
if (logger.isDebugEnabled()) {
logger.debug("Binding queue [" + binding.getDestination() + "] to exchange [" + binding.getExchange()
+ "] with routing key [" + binding.getRoutingKey() + "]");
logger.debug("Binding destination [" + binding.getDestination() + " (" + binding.getDestinationType()
+ ")] to exchange [" + binding.getExchange() + "] with routing key [" + binding.getRoutingKey()
+ "]");
}
if (binding.isDestinationQueue()) {
channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
} else {
channel.exchangeBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
}
channel.queueBind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
binding.getArguments());
}
}

View File

@@ -18,6 +18,7 @@ package org.springframework.amqp.rabbit.log4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
@@ -59,7 +60,7 @@ public class AmqpAppenderConfiguration {
@Bean
public Binding testBinding() {
return new Binding(testQueue(), testExchange(), ROUTING_KEY);
return BindingBuilder.bind(testQueue()).to(testExchange()).with(ROUTING_KEY);
}
@Bean