371 lines
13 KiB
HTML
371 lines
13 KiB
HTML
<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<!--[if IE]><meta http-equiv="X-UA-Compatible" content="IE=edge"><![endif]-->
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<meta name="generator" content="Asciidoctor 1.5.8">
|
|
<title>Dead-Letter Queue Processing</title>
|
|
<link rel="stylesheet" href="css/spring.css">
|
|
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">
|
|
|
|
<style>
|
|
.hidden {
|
|
display: none;
|
|
}
|
|
|
|
.switch {
|
|
border-width: 1px 1px 0 1px;
|
|
border-style: solid;
|
|
border-color: #7a2518;
|
|
display: inline-block;
|
|
}
|
|
|
|
.switch--item {
|
|
padding: 10px;
|
|
background-color: #ffffff;
|
|
color: #7a2518;
|
|
display: inline-block;
|
|
cursor: pointer;
|
|
}
|
|
|
|
.switch--item:not(:first-child) {
|
|
border-width: 0 0 0 1px;
|
|
border-style: solid;
|
|
border-color: #7a2518;
|
|
}
|
|
|
|
.switch--item.selected {
|
|
background-color: #7a2519;
|
|
color: #ffffff;
|
|
}
|
|
</style>
|
|
<script src="https://cdnjs.cloudflare.com/ajax/libs/zepto/1.2.0/zepto.min.js"></script>
|
|
<script type="text/javascript">
|
|
function addBlockSwitches() {
|
|
$('.primary').each(function() {
|
|
primary = $(this);
|
|
createSwitchItem(primary, createBlockSwitch(primary)).item.addClass("selected");
|
|
primary.children('.title').remove();
|
|
});
|
|
$('.secondary').each(function(idx, node) {
|
|
secondary = $(node);
|
|
primary = findPrimary(secondary);
|
|
switchItem = createSwitchItem(secondary, primary.children('.switch'));
|
|
switchItem.content.addClass('hidden');
|
|
findPrimary(secondary).append(switchItem.content);
|
|
secondary.remove();
|
|
});
|
|
}
|
|
|
|
function createBlockSwitch(primary) {
|
|
blockSwitch = $('<div class="switch"></div>');
|
|
primary.prepend(blockSwitch);
|
|
return blockSwitch;
|
|
}
|
|
|
|
function findPrimary(secondary) {
|
|
candidate = secondary.prev();
|
|
while (!candidate.is('.primary')) {
|
|
candidate = candidate.prev();
|
|
}
|
|
return candidate;
|
|
}
|
|
|
|
function createSwitchItem(block, blockSwitch) {
|
|
blockName = block.children('.title').text();
|
|
content = block.children('.content').first().append(block.next('.colist'));
|
|
item = $('<div class="switch--item">' + blockName + '</div>');
|
|
item.on('click', '', content, function(e) {
|
|
$(this).addClass('selected');
|
|
$(this).siblings().removeClass('selected');
|
|
e.data.siblings('.content').addClass('hidden');
|
|
e.data.removeClass('hidden');
|
|
});
|
|
blockSwitch.append(item);
|
|
return {'item': item, 'content': content};
|
|
}
|
|
|
|
$(addBlockSwitches);
|
|
</script>
|
|
|
|
</head>
|
|
<body class="book toc2 toc-left">
|
|
<div id="header">
|
|
<div id="toc" class="toc2">
|
|
<div id="toctitle">Table of Contents</div>
|
|
<ul class="sectlevel1">
|
|
<li><a href="#rabbit-dlq-processing">Dead-Letter Queue Processing</a>
|
|
<ul class="sectlevel2">
|
|
<li><a href="#_non_partitioned_destinations">Non-Partitioned Destinations</a></li>
|
|
<li><a href="#_partitioned_destinations">Partitioned Destinations</a></li>
|
|
</ul>
|
|
</li>
|
|
</ul>
|
|
</div>
|
|
</div>
|
|
<div id="content">
|
|
<div class="sect1">
|
|
<h2 id="rabbit-dlq-processing"><a class="link" href="#rabbit-dlq-processing">Dead-Letter Queue Processing</a></h2>
|
|
<div class="sectionbody">
|
|
<div class="paragraph">
|
|
<p>Because you cannot anticipate how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
|
|
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue.
|
|
However, if the problem is a permanent issue, that could cause an infinite loop.
|
|
The following Spring Boot application shows an example of how to route those messages back to the original queue but moves them to a third “parking lot” queue after three attempts.
|
|
The second example uses the <a href="https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/">RabbitMQ Delayed Message Exchange</a> to introduce a delay to the re-queued message.
|
|
In this example, the delay increases for each attempt.
|
|
These examples use a <code>@RabbitListener</code> to receive messages from the DLQ.
|
|
You could also use <code>RabbitTemplate.receive()</code> in a batch process.</p>
|
|
</div>
|
|
<div class="paragraph">
|
|
<p>The examples assume the original destination is <code>so8400in</code> and the consumer group is <code>so8400</code>.</p>
|
|
</div>
|
|
<div class="sect2">
|
|
<h3 id="_non_partitioned_destinations"><a class="link" href="#_non_partitioned_destinations">Non-Partitioned Destinations</a></h3>
|
|
<div class="paragraph">
|
|
<p>The first two examples are for when the destination is <strong>not</strong> partitioned:</p>
|
|
</div>
|
|
<div class="listingblock">
|
|
<div class="content">
|
|
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
|
|
public class ReRouteDlqApplication {
|
|
|
|
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
|
|
|
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
|
|
|
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
|
|
|
private static final String X_RETRIES_HEADER = "x-retries";
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
|
System.out.println("Hit enter to terminate");
|
|
System.in.read();
|
|
context.close();
|
|
}
|
|
|
|
@Autowired
|
|
private RabbitTemplate rabbitTemplate;
|
|
|
|
@RabbitListener(queues = DLQ)
|
|
public void rePublish(Message failedMessage) {
|
|
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
|
|
if (retriesHeader == null) {
|
|
retriesHeader = Integer.valueOf(0);
|
|
}
|
|
if (retriesHeader < 3) {
|
|
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
|
|
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
|
|
}
|
|
else {
|
|
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
|
}
|
|
}
|
|
|
|
@Bean
|
|
public Queue parkingLot() {
|
|
return new Queue(PARKING_LOT);
|
|
}
|
|
|
|
}</code></pre>
|
|
</div>
|
|
</div>
|
|
<div class="listingblock">
|
|
<div class="content">
|
|
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
|
|
public class ReRouteDlqApplication {
|
|
|
|
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
|
|
|
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
|
|
|
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
|
|
|
private static final String X_RETRIES_HEADER = "x-retries";
|
|
|
|
private static final String DELAY_EXCHANGE = "dlqReRouter";
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
|
System.out.println("Hit enter to terminate");
|
|
System.in.read();
|
|
context.close();
|
|
}
|
|
|
|
@Autowired
|
|
private RabbitTemplate rabbitTemplate;
|
|
|
|
@RabbitListener(queues = DLQ)
|
|
public void rePublish(Message failedMessage) {
|
|
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
|
|
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
|
|
if (retriesHeader == null) {
|
|
retriesHeader = Integer.valueOf(0);
|
|
}
|
|
if (retriesHeader < 3) {
|
|
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
|
|
headers.put("x-delay", 5000 * retriesHeader);
|
|
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
|
|
}
|
|
else {
|
|
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
|
}
|
|
}
|
|
|
|
@Bean
|
|
public DirectExchange delayExchange() {
|
|
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
|
|
exchange.setDelayed(true);
|
|
return exchange;
|
|
}
|
|
|
|
@Bean
|
|
public Binding bindOriginalToDelay() {
|
|
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
|
|
}
|
|
|
|
@Bean
|
|
public Queue parkingLot() {
|
|
return new Queue(PARKING_LOT);
|
|
}
|
|
|
|
}</code></pre>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
<div class="sect2">
|
|
<h3 id="_partitioned_destinations"><a class="link" href="#_partitioned_destinations">Partitioned Destinations</a></h3>
|
|
<div class="paragraph">
|
|
<p>With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers.</p>
|
|
</div>
|
|
<div class="sect3">
|
|
<h4 id="_republishtodlqfalse"><a class="link" href="#_republishtodlqfalse"><code>republishToDlq=false</code></a></h4>
|
|
<div class="paragraph">
|
|
<p>When <code>republishToDlq</code> is <code>false</code>, RabbitMQ publishes the message to the DLX/DLQ with an <code>x-death</code> header containing information about the original destination, as shown in the following example:</p>
|
|
</div>
|
|
<div class="listingblock">
|
|
<div class="content">
|
|
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
|
|
public class ReRouteDlqApplication {
|
|
|
|
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
|
|
|
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
|
|
|
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
|
|
|
private static final String X_DEATH_HEADER = "x-death";
|
|
|
|
private static final String X_RETRIES_HEADER = "x-retries";
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
|
System.out.println("Hit enter to terminate");
|
|
System.in.read();
|
|
context.close();
|
|
}
|
|
|
|
@Autowired
|
|
private RabbitTemplate rabbitTemplate;
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@RabbitListener(queues = DLQ)
|
|
public void rePublish(Message failedMessage) {
|
|
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
|
|
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
|
|
if (retriesHeader == null) {
|
|
retriesHeader = Integer.valueOf(0);
|
|
}
|
|
if (retriesHeader < 3) {
|
|
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
|
|
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
|
|
String exchange = (String) xDeath.get(0).get("exchange");
|
|
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
|
|
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
|
|
}
|
|
else {
|
|
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
|
}
|
|
}
|
|
|
|
@Bean
|
|
public Queue parkingLot() {
|
|
return new Queue(PARKING_LOT);
|
|
}
|
|
|
|
}</code></pre>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
<div class="sect3">
|
|
<h4 id="_republishtodlqtrue"><a class="link" href="#_republishtodlqtrue"><code>republishToDlq=true</code></a></h4>
|
|
<div class="paragraph">
|
|
<p>When <code>republishToDlq</code> is <code>true</code>, the republishing recoverer adds the original exchange and routing key to headers, as shown in the following example:</p>
|
|
</div>
|
|
<div class="listingblock">
|
|
<div class="content">
|
|
<pre class="highlightjs highlight"><code class="language-java hljs" data-lang="java">@SpringBootApplication
|
|
public class ReRouteDlqApplication {
|
|
|
|
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
|
|
|
|
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
|
|
|
|
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
|
|
|
|
private static final String X_RETRIES_HEADER = "x-retries";
|
|
|
|
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
|
|
|
|
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
|
|
System.out.println("Hit enter to terminate");
|
|
System.in.read();
|
|
context.close();
|
|
}
|
|
|
|
@Autowired
|
|
private RabbitTemplate rabbitTemplate;
|
|
|
|
@RabbitListener(queues = DLQ)
|
|
public void rePublish(Message failedMessage) {
|
|
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
|
|
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
|
|
if (retriesHeader == null) {
|
|
retriesHeader = Integer.valueOf(0);
|
|
}
|
|
if (retriesHeader < 3) {
|
|
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
|
|
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
|
|
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
|
|
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
|
|
}
|
|
else {
|
|
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
|
|
}
|
|
}
|
|
|
|
@Bean
|
|
public Queue parkingLot() {
|
|
return new Queue(PARKING_LOT);
|
|
}
|
|
|
|
}</code></pre>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
</div>
|
|
<script type="text/javascript" src="js/tocbot/tocbot.min.js"></script>
|
|
<script type="text/javascript" src="js/toc.js"></script>
|
|
<link rel="stylesheet" href="js/highlight/styles/atom-one-dark-reasonable.min.css">
|
|
<script src="js/highlight/highlight.min.js"></script>
|
|
<script>hljs.initHighlighting()</script>
|
|
</body>
|
|
</html> |