INTSAMPLES-132 AMQP Publisher Confirms/Returns

JIRA: https://jira.spring.io/browse/INTSAMPLES-132

Polishing - PR Comments
This commit is contained in:
Gary Russell
2014-08-07 11:02:27 +03:00
committed by Artem Bilan
parent 90d5ee9282
commit 28116776c2
8 changed files with 229 additions and 17 deletions

View File

@@ -11,13 +11,51 @@ Once the application is started, you enter some text on the command prompt and a
# How to Run the Sample
If you imported the example into your IDE, you can just run class **org.springframework.integration.samples.amqp.Main**. For example in [SpringSource Tool Suite](http://www.springsource.com/developer/sts) (STS) do:
If you imported the example into your IDE, you can just run class **org.springframework.integration.samples.amqp.SampleSimple**. For example in [SpringSource Tool Suite](http://www.springsource.com/developer/sts) (STS) do:
* Right-click on Main class --> Run As --> Java Application
* Right-click on SampleSimple class --> Run As --> Java Application
Alternatively, you can start the sample from the command line ([Gradle](http://www.gradle.org) required):
Alternatively, you can start the sample from the command line:
* gradlew :amqp:run
* ./gradlew :amqp:runSimple
Enter some data (e.g. 'foo') on the console; you will see a [tapInbound] log and 'Received: foo'.
Ctrl-C to terminate.
The __SamplePubConfirmsReturns__ class is similar, but demonstrates publisher confirms and returns.
* Right-click on SamplePubConfirmsReturns class --> Run As --> Java Application
Or:
* ./gradlew :amqp:runPubConfirmsReturns
When you enter a message in the console you will see the message received, together with a send confirmation:
````
foo
Received: foo
foo sent ok
````
When you enter 'fail', the message is sent with a bad routing key; you will see the message is sent ok, but returned because it is not routable:
````
fail
fail returned:NO_ROUTE
fail sent ok
````
When you enter 'nack', the message is sent to a non-existent exchange; the broker reacts to this by closing the channel with an error and Spring AMQP generates a Nack:
````
nack
nack send failed (nack)
11:54:00.818 ERROR [pool-1-thread-1][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'badExchange' in vhost '/', class-id=60, method-id=40)
````
Ctrl-C to terminate.
# Used Spring Integration components

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.samples.amqp;
import org.apache.log4j.Logger;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Starts the Spring Context and will initialize the Spring Integration message flow.
*
* @author Gary Russell.
* @since 4.0
*
*/
public final class SamplePubConfirmsReturns {
private static final Logger LOGGER = Logger.getLogger(SamplePubConfirmsReturns.class);
private SamplePubConfirmsReturns() { }
/**
* Load the Spring Integration Application Context
*
* @param args - command line arguments
*/
public static void main(final String... args) {
LOGGER.info("\n========================================================="
+ "\n "
+ "\n Welcome to Spring Integration! "
+ "\n "
+ "\n For more information please visit: "
+ "\n http://www.springsource.org/spring-integration "
+ "\n "
+ "\n=========================================================" );
@SuppressWarnings("resource")
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/spring-integration-confirms-context.xml");
context.registerShutdownHook();
LOGGER.info("\n========================================================="
+ "\n "
+ "\n This is the AMQP Sample with confirms/returns - "
+ "\n "
+ "\n Please enter some text and press return. The entered "
+ "\n Message will be sent to the configured RabbitMQ Queue,"
+ "\n then again immediately retrieved from the Message "
+ "\n Broker and ultimately printed to the command line. "
+ "\n Send 'fail' to demonstrate a return because the "
+ "\n message couldn't be routed to a queue. "
+ "\n Send 'nack' to demonstrate a NACK because the "
+ "\n exchange doesn't exist, causing the channel to be "
+ "\n closed in error by the broker. "
+ "\n "
+ "\n=========================================================" );
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,14 +24,14 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
* Starts the Spring Context and will initialize the Spring Integration message flow.
*
* @author Gunnar Hillert
* @version 1.0
* @since 1.0
*
*/
public final class Main {
public final class SampleSimple {
private static final Logger LOGGER = Logger.getLogger(Main.class);
private static final Logger LOGGER = Logger.getLogger(SampleSimple.class);
private Main() { }
private SampleSimple() { }
/**
* Load the Spring Integration Application Context
@@ -49,8 +49,9 @@ public final class Main {
+ "\n "
+ "\n=========================================================" );
@SuppressWarnings("resource")
final AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml");
new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/spring-integration-context.xml");
context.registerShutdownHook();

View File

@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- From STDIN To RabbitMQ -->
<int-stream:stdin-channel-adapter id="consoleIn"
channel="toRabbit">
<int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-stream:stdin-channel-adapter>
<int:channel id="toRabbit" />
<int-amqp:outbound-channel-adapter
channel="toRabbit" amqp-template="amqpTemplate"
exchange-name-expression="payload.toLowerCase() == 'nack' ? 'badExchange' : 'si.test.exchange'"
routing-key-expression="payload.toLowerCase() == 'fail' ? 'badKey' : 'si.test.binding'"
confirm-correlation-expression="payload"
confirm-ack-channel="good"
confirm-nack-channel="errors"
return-channel="returns" />
<!--Confirms are correlated with the entire payload; for rich objects, we might just use 'payload.invoiceId' -->
<int:transformer input-channel="good" output-channel="stdOut" expression="payload + ' sent ok'"/>
<int:transformer input-channel="errors" output-channel="stdErr" expression="payload + ' send failed (nack)'"/>
<int:transformer input-channel="returns" output-channel="stdErr" expression="payload + ' returned:' + headers['amqp_returnReplyText']"/>
<!-- From RabbitMQ To STDOUT -->
<int-amqp:inbound-channel-adapter channel="fromRabbit"
queue-names="si.test.queue" connection-factory="connectionFactory" />
<int:channel id="fromRabbit">
<int:interceptors>
<int:wire-tap channel="loggingChannel" />
</int:interceptors>
</int:channel>
<int:transformer input-channel="fromRabbit" output-channel="stdOut" expression="'Received: ' + payload" />
<int-stream:stdout-channel-adapter id="stdOut"
append-newline="true" />
<int-stream:stderr-channel-adapter id="stdErr"
append-newline="true" />
<int:logging-channel-adapter id="loggingChannel" log-full-message="true" logger-name="tapInbound"
level="INFO" />
<!-- Infrastructure -->
<rabbit:connection-factory id="connectionFactory" host="localhost" publisher-confirms="true" publisher-returns="true" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true" /> <!-- for nacks -->
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="si.test.queue" />
<rabbit:direct-exchange name="si.test.exchange">
<rabbit:bindings>
<rabbit:binding queue="si.test.queue" key="si.test.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>

View File

@@ -34,10 +34,12 @@
</int:interceptors>
</int:channel>
<int-stream:stdout-channel-adapter id="consoleOut" channel="fromRabbit"
<int:transformer input-channel="fromRabbit" output-channel="consoleOut" expression="'Received: ' + payload" />
<int-stream:stdout-channel-adapter id="consoleOut"
append-newline="true" />
<int:logging-channel-adapter id="loggingChannel" log-full-message="true"
<int:logging-channel-adapter id="loggingChannel" log-full-message="true" logger-name="tapInbound"
level="INFO" />
<!-- Infrastructure -->

View File

@@ -19,6 +19,10 @@
<level value="info" />
</logger>
<logger name="tapInbound">
<level value="info" />
</logger>
<!-- Root Logger -->
<root>
<priority value="warn" />

View File

@@ -388,14 +388,29 @@ project('loanshark') {
project('amqp') {
description = 'AMQP Basic Sample'
apply plugin: 'application'
mainClassName = 'org.springframework.integration.samples.amqp.Main'
dependencies {
compile "org.springframework.integration:spring-integration-stream:$springIntegrationVersion"
compile "org.springframework.integration:spring-integration-amqp:$springIntegrationVersion"
}
task runSimple(type: JavaExec) {
main 'org.springframework.integration.samples.amqp.SampleSimple'
classpath = sourceSets.main.runtimeClasspath
standardInput = System.in
// useful for debugging the GradleWorkerMain
// jvmArgs "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
}
task runPubConfirmsReturns(type: JavaExec) {
main 'org.springframework.integration.samples.amqp.SamplePubConfirmsReturns'
classpath = sourceSets.main.runtimeClasspath
standardInput = System.in
// useful for debugging the GradleWorkerMain
// jvmArgs "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000"
}
}
project('control-bus') {

View File

@@ -1,2 +1,2 @@
version=2.2.0.BUILD-SNAPSHOT
version=3.0.0.BUILD-SNAPSHOT
springBootVersion=1.1.0.RELEASE