Changed FlowHandler to use correlationId headers

This commit is contained in:
David Turanski
2011-12-07 12:16:07 -05:00
parent c8dcca547c
commit 6792bfb911
4 changed files with 31 additions and 30 deletions

View File

@@ -23,11 +23,6 @@ public interface FlowConstants {
*/
public static final String FLOW_OUTPUT_PORT_HEADER = "flow.output.port";
/**
* Message header used to correlate port input and output messages
*/
public static final String FLOW_CONVERSATION_ID_HEADER = "flow.conversation.id";
/**
* FLOW_OUTPUT_PORT_HEADER value if FlowHandler catches an exception
*/

View File

@@ -16,7 +16,6 @@
package org.springframework.integration.flow.handler;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -85,16 +84,11 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler {
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
Message<?> message = requestMessage;
UUID conversationId = (UUID) message.getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER);
if (conversationId == null) {
conversationId = requestMessage.getHeaders().getId();
Map<String, Object> flowConversationIdHeader = Collections.singletonMap(
FlowConstants.FLOW_CONVERSATION_ID_HEADER, (Object) conversationId);
message = MessageBuilder.fromMessage(requestMessage).copyHeaders(flowConversationIdHeader).build();
}
UUID conversationId = requestMessage.getHeaders().getId();
Message<?> message = MessageBuilder
.fromMessage(requestMessage)
.pushSequenceDetails(conversationId, 0, 0)
.build();
try {
ResponseMessageHandler responseMessageHandler = new ResponseMessageHandler(conversationId);
@@ -105,8 +99,9 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler {
}
catch (MessagingException me) {
log.error(me.getMessage(), me);
if (conversationId
.equals(me.getFailedMessage().getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
.equals(me.getFailedMessage().getHeaders().getCorrelationId())) {
if (errorChannel != null) {
errorChannel.send(new ErrorMessage(me, Collections.singletonMap(
FlowConstants.FLOW_OUTPUT_PORT_HEADER,
@@ -141,9 +136,18 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler {
* (org.springframework.integration.Message)
*/
public void handleMessage(Message<?> message) throws MessagingException {
if (conversationId.equals(message.getHeaders().get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
this.response = message;
if(log.isDebugEnabled()){
log.debug("handling flow response message with conversation Id " + message.getHeaders().getCorrelationId() +
". Target conversation Id = " + this.conversationId + " match = " +
conversationId.equals(message.getHeaders().getCorrelationId())
);
}
if (conversationId.equals(message.getHeaders().getCorrelationId())) {
this.response = MessageBuilder.fromMessage(message).popSequenceDetails().build();
if(log.isDebugEnabled()){
log.debug("set flow response message " + this.response
);
}
}
else {
@@ -154,7 +158,7 @@ public class FlowMessageHandler extends AbstractReplyProducingMessageHandler {
if (message instanceof ErrorMessage) {
MessagingException me = (MessagingException) message.getPayload();
if (conversationId.equals(me.getFailedMessage().getHeaders()
.get(FlowConstants.FLOW_CONVERSATION_ID_HEADER))) {
.getCorrelationId())) {
this.response = message;
}
}

View File

@@ -30,7 +30,7 @@ import org.springframework.integration.support.MessageBuilder;
/**
* A ChannelInterceptor to set the Flow output port header
* @see FlowUtils
*
*
* @author David Turanski
*
*/
@@ -39,7 +39,7 @@ public class FlowInterceptor extends ChannelInterceptorAdapter {
private final String portName;
/**
/**
* @param portName the value of the message header
*/
public FlowInterceptor(String portName) {
@@ -48,11 +48,13 @@ public class FlowInterceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
log.debug("flow interceptor " + this.hashCode() + " received a message from port " + portName + " on channel "
+ channel);
Map<String, Object> headersToCopy = Collections.singletonMap(FlowConstants.FLOW_OUTPUT_PORT_HEADER, (Object) portName);
return MessageBuilder.fromMessage(message).copyHeadersIfAbsent(headersToCopy).build();
if (log.isDebugEnabled()) {
log.debug(this + " received a message from port " + portName + " on channel " + channel);
}
Map<String, Object> headersToCopy = Collections.singletonMap(FlowConstants.FLOW_OUTPUT_PORT_HEADER,
(Object) portName);
return MessageBuilder.fromMessage(message).copyHeaders(headersToCopy).build();
}
}

View File

@@ -20,7 +20,7 @@
</logger>
<logger name="org.springframework.integration.flow">
<level value="debug" />
<level value="info" />
</logger>
<!-- Root Logger -->