Create spring-messaging module

Consolidates new, messaging-related classes from spring-context and
spring-websocket into one module.
This commit is contained in:
Rossen Stoyanchev
2013-07-12 09:02:51 -04:00
parent 2803845151
commit d3cecfc6cc
81 changed files with 404 additions and 315 deletions

View File

@@ -1,39 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* A generic message representation with headers and body.
*
* @author Mark Fisher
* @author Arjen Poutsma
* @since 4.0
* @see org.springframework.messaging.support.MessageBuilder
*/
public interface Message<T> {
/**
* Returns message headers for the message (never {@code null}).
*/
MessageHeaders getHeaders();
/**
* Returns the message payload.
*/
T getPayload();
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Base channel interface defining common behavior for sending messages.
*
* @author Mark Fisher
* @since 4.0
*/
public interface MessageChannel {
/**
* Constant for sending a message without a prescribed timeout.
*/
public static final long INDEFINITE_TIMEOUT = -1;
/**
* Send a {@link Message} to this channel. May throw a RuntimeException for
* non-recoverable errors. Otherwise, if the Message cannot be sent for a non-fatal
* reason this method will return 'false', and if the Message is sent successfully, it
* will return 'true'.
*
* <p>Depending on the implementation, this method may block indefinitely. To provide a
* maximum wait time, use {@link #send(Message, long)}.
* @param message the {@link Message} to send
* @return whether or not the Message has been sent successfully
*/
boolean send(Message<?> message);
/**
* Send a message, blocking until either the message is accepted or the specified
* timeout period elapses.
* @param message the {@link Message} to send
* @param timeout the timeout in milliseconds or #INDEFINITE_TIMEOUT
* @return {@code true} if the message is sent successfully, {@code false} if the
* specified timeout period elapses or the send is interrupted
*/
boolean send(Message<?> message, long timeout);
}

View File

@@ -1,44 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Exception that indicates an error occurred during message delivery.
*
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("serial")
public class MessageDeliveryException extends MessagingException {
public MessageDeliveryException(String description) {
super(description);
}
public MessageDeliveryException(Message<?> undeliveredMessage) {
super(undeliveredMessage);
}
public MessageDeliveryException(Message<?> undeliveredMessage, String description) {
super(undeliveredMessage, description);
}
public MessageDeliveryException(Message<?> undeliveredMessage, String description, Throwable cause) {
super(undeliveredMessage, description, cause);
}
}

View File

@@ -1,52 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Base interface for any component that handles Messages.
*
* @author Mark Fisher
* @author Iwein Fuld
* @since 4.0
*/
public interface MessageHandler {
/**
* Handles the message if possible. If the handler cannot deal with the
* message this will result in a {@code MessageRejectedException} e.g.
* in case of a Selective Consumer. When a consumer tries to handle a
* message, but fails to do so, a {@code MessageHandlingException} is
* thrown. In the last case it is recommended to treat the message as tainted
* and go into an error scenario.
* <p>
* When the handling results in a failure of another message being sent
* (e.g. a "reply" message), that failure will trigger a
* {@code MessageDeliveryException}.
*
* @param message the message to be handled
* reply related to the handling of the message
*/
void handleMessage(Message<?> message) throws MessagingException;
/*
* TODO: exceptions
* @throws org.springframework.integration.MessageRejectedException if the handler doesn't accept the message
* @throws org.springframework.integration.MessageHandlingException when something fails during the handling
* @throws org.springframework.integration.MessageDeliveryException when this handler failed to deliver the
*/
}

View File

@@ -1,246 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The headers for a {@link Message}
*
* <p><b>IMPORTANT</b>: This class is immutable. Any mutating operation
* (e.g., put(..), putAll(..) etc.) will throw {@link UnsupportedOperationException}.
*
* <p>To create MessageHeaders instance use fluent
* {@link org.springframework.messaging.support.MessageBuilder MessageBuilder} API
* <pre class="code">
* MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2");
* </pre>
* or create an instance of GenericMessage passing payload as {@link Object} and headers as a regular {@link Map}
* <pre class="code">
* Map headers = new HashMap();
* headers.put("key1", "value1");
* headers.put("key2", "value2");
* new GenericMessage("foo", headers);
* </pre>
*
* @author Arjen Poutsma
* @author Mark Fisher
* @author Gary Russell
* @since 4.0
* @see org.springframework.messaging.support.MessageBuilder
*/
public final class MessageHeaders implements Map<String, Object>, Serializable {
private static final long serialVersionUID = -4615750558355702881L;
private static final Log logger = LogFactory.getLog(MessageHeaders.class);
private static volatile IdGenerator idGenerator = null;
/**
* The key for the Message ID. This is an automatically generated UUID and
* should never be explicitly set in the header map <b>except</b> in the
* case of Message deserialization where the serialized Message's generated
* UUID is being restored.
*/
public static final String ID = "id";
public static final String TIMESTAMP = "timestamp";
public static final String REPLY_CHANNEL = "replyChannel";
public static final String ERROR_CHANNEL = "errorChannel";
public static final String CONTENT_TYPE = "contentType";
public static final List<String> HEADER_NAMES = Arrays.asList(ID, TIMESTAMP);
private final Map<String, Object> headers;
public MessageHeaders(Map<String, Object> headers) {
this.headers = (headers != null) ? new HashMap<String, Object>(headers) : new HashMap<String, Object>();
if (MessageHeaders.idGenerator == null){
this.headers.put(ID, UUID.randomUUID());
}
else {
this.headers.put(ID, MessageHeaders.idGenerator.generateId());
}
this.headers.put(TIMESTAMP, new Long(System.currentTimeMillis()));
}
public UUID getId() {
return this.get(ID, UUID.class);
}
public Long getTimestamp() {
return this.get(TIMESTAMP, Long.class);
}
public Object getReplyChannel() {
return this.get(REPLY_CHANNEL);
}
public Object getErrorChannel() {
return this.get(ERROR_CHANNEL);
}
@SuppressWarnings("unchecked")
public <T> T get(Object key, Class<T> type) {
Object value = this.headers.get(key);
if (value == null) {
return null;
}
if (!type.isAssignableFrom(value.getClass())) {
throw new IllegalArgumentException("Incorrect type specified for header '" + key + "'. Expected [" + type
+ "] but actual type is [" + value.getClass() + "]");
}
return (T) value;
}
@Override
public int hashCode() {
return this.headers.hashCode();
}
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object != null && object instanceof MessageHeaders) {
MessageHeaders other = (MessageHeaders) object;
return this.headers.equals(other.headers);
}
return false;
}
@Override
public String toString() {
return this.headers.toString();
}
/*
* Map implementation
*/
public boolean containsKey(Object key) {
return this.headers.containsKey(key);
}
public boolean containsValue(Object value) {
return this.headers.containsValue(value);
}
public Set<Map.Entry<String, Object>> entrySet() {
return Collections.unmodifiableSet(this.headers.entrySet());
}
public Object get(Object key) {
return this.headers.get(key);
}
public boolean isEmpty() {
return this.headers.isEmpty();
}
public Set<String> keySet() {
return Collections.unmodifiableSet(this.headers.keySet());
}
public int size() {
return this.headers.size();
}
public Collection<Object> values() {
return Collections.unmodifiableCollection(this.headers.values());
}
// Unsupported operations
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public Object put(String key, Object value) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public void putAll(Map<? extends String, ? extends Object> t) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public Object remove(Object key) {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
/**
* Since MessageHeaders are immutable the call to this method will result in {@link UnsupportedOperationException}
*/
public void clear() {
throw new UnsupportedOperationException("MessageHeaders is immutable.");
}
// Serialization methods
private void writeObject(ObjectOutputStream out) throws IOException {
List<String> keysToRemove = new ArrayList<String>();
for (Map.Entry<String, Object> entry : this.headers.entrySet()) {
if (!(entry.getValue() instanceof Serializable)) {
keysToRemove.add(entry.getKey());
}
}
for (String key : keysToRemove) {
if (logger.isInfoEnabled()) {
logger.info("removing non-serializable header: " + key);
}
this.headers.remove(key);
}
out.defaultWriteObject();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
}
public static interface IdGenerator {
UUID generateId();
}
}

View File

@@ -1,71 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* The base exception for any failures related to messaging.
*
* @author Mark Fisher
* @author Gary Russell
* @since 4.0
*/
@SuppressWarnings("serial")
public class MessagingException extends RuntimeException {
private volatile Message<?> failedMessage;
public MessagingException(Message<?> message) {
super();
this.failedMessage = message;
}
public MessagingException(String description) {
super(description);
this.failedMessage = null;
}
public MessagingException(String description, Throwable cause) {
super(description, cause);
this.failedMessage = null;
}
public MessagingException(Message<?> message, String description) {
super(description);
this.failedMessage = message;
}
public MessagingException(Message<?> message, Throwable cause) {
super(cause);
this.failedMessage = message;
}
public MessagingException(Message<?> message, String description, Throwable cause) {
super(description, cause);
this.failedMessage = message;
}
public Message<?> getFailedMessage() {
return this.failedMessage;
}
public void setFailedMessage(Message<?> message) {
this.failedMessage = message;
}
}

View File

@@ -1,44 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Interface for Message Channels from which Messages may be actively received through
* polling.
*
* @author Mark Fisher
* @since 4.0
*/
public interface PollableChannel extends MessageChannel {
/**
* Receive a message from this channel, blocking indefinitely if necessary.
* @return the next available {@link Message} or {@code null} if interrupted
*/
Message<?> receive();
/**
* Receive a message from this channel, blocking until either a message is available
* or the specified timeout period elapses.
* @param timeout the timeout in milliseconds or
* {@link MessageChannel#INDEFINITE_TIMEOUT}.
* @return the next available {@link Message} or {@code null} if the specified timeout
* period elapses or the message reception is interrupted
*/
Message<?> receive(long timeout);
}

View File

@@ -1,44 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Interface for any MessageChannel implementation that accepts subscribers.
* The subscribers must implement the {@link MessageHandler} interface and
* will be invoked when a Message is available.
*
* @author Mark Fisher
* @since 4.0
*/
public interface SubscribableChannel extends MessageChannel {
/**
* Register a {@link MessageHandler} as a subscriber to this channel.
* @return {@code true} if the channel was not already subscribed to the specified
* handler
*/
boolean subscribe(MessageHandler handler);
/**
* Remove a {@link MessageHandler} from the subscribers of this channel.
* @return {@code true} if the channel was previously subscribed to the specified
* handler
*/
boolean unsubscribe(MessageHandler handler);
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MessageMapping {
/**
* Destination values for the message.
*/
String[] value() default {};
}

View File

@@ -1,53 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.channel;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public class BeanFactoryMessageChannelDestinationResolver implements DestinationResolver<MessageChannel> {
private final BeanFactory beanFactory;
public BeanFactoryMessageChannelDestinationResolver(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "beanFactory must not be null");
this.beanFactory = beanFactory;
}
@Override
public MessageChannel resolveDestination(String name) {
Assert.state(this.beanFactory != null, "BeanFactory is required");
try {
return this.beanFactory.getBean(name, MessageChannel.class);
}
catch (BeansException e) {
throw new DestinationResolutionException(
"failed to look up MessageChannel bean with name '" + name + "'", e);
}
}
}

View File

@@ -1,98 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.channel;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
/**
* A {@link SubscribableChannel} that sends messages to each of its subscribers.
*
* @author Phillip Webb
* @since 4.0
*/
public class PublishSubscribeChannel implements SubscribableChannel {
private Executor executor;
private Set<MessageHandler> handlers = new CopyOnWriteArraySet<MessageHandler>();
/**
* Create a new {@link PublishSubscribeChannel} instance where messages will be sent
* in the callers thread.
*/
public PublishSubscribeChannel() {
this(null);
}
/**
* Create a new {@link PublishSubscribeChannel} instance where messages will be sent
* via the specified executor.
* @param executor the executor used to send the message or {@code null} to execute in
* the callers thread.
*/
public PublishSubscribeChannel(Executor executor) {
this.executor = executor;
}
@Override
public boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
@Override
public boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
Assert.notNull(message.getPayload(), "Message payload must not be null");
for (final MessageHandler handler : this.handlers) {
dispatchToHandler(message, handler);
}
return true;
}
private void dispatchToHandler(final Message<?> message, final MessageHandler handler) {
if (this.executor == null) {
handler.handleMessage(message);
}
else {
this.executor.execute(new Runnable() {
@Override
public void run() {
handler.handleMessage(message);
}
});
}
}
@Override
public boolean subscribe(MessageHandler handler) {
return this.handlers.add(handler);
}
@Override
public boolean unsubscribe(MessageHandler handler) {
return this.handlers.remove(handler);
}
}

View File

@@ -1,141 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.channel;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import reactor.core.Reactor;
import reactor.event.Event;
import reactor.event.registry.Registration;
import reactor.event.selector.ObjectSelector;
import reactor.function.Consumer;
/**
* @author Rossen Stoyanchev
* @since 4.0
*/
public class ReactorMessageChannel implements SubscribableChannel {
private static Log logger = LogFactory.getLog(ReactorMessageChannel.class);
private final Reactor reactor;
private final Object key = new Object();
private String name = toString(); // TODO
private final Map<MessageHandler, Registration<?>> registrations =
new HashMap<MessageHandler, Registration<?>>();
public ReactorMessageChannel(Reactor reactor) {
this.reactor = reactor;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
@Override
public boolean send(Message<?> message) {
return send(message, -1);
}
@Override
public boolean send(Message<?> message, long timeout) {
if (logger.isTraceEnabled()) {
logger.trace("Channel " + getName() + ", sending message id=" + message.getHeaders().getId());
}
this.reactor.notify(this.key, Event.wrap(message));
return true;
}
@Override
public boolean subscribe(final MessageHandler handler) {
if (this.registrations.containsKey(handler)) {
logger.warn("Channel " + getName() + ", handler already subscribed " + handler);
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Channel " + getName() + ", subscribing handler " + handler);
}
Registration<Consumer<Event<Message<?>>>> registration = this.reactor.on(
ObjectSelector.objectSelector(key), new MessageHandlerConsumer(handler));
this.registrations.put(handler, registration);
return true;
}
@Override
public boolean unsubscribe(MessageHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("Channel " + getName() + ", removing subscription for handler " + handler);
}
Registration<?> registration = this.registrations.remove(handler);
if (registration == null) {
if (logger.isTraceEnabled()) {
logger.trace("Channel " + getName() + ", no subscription for handler " + handler);
}
return false;
}
registration.cancel();
return true;
}
private static final class MessageHandlerConsumer implements Consumer<Event<Message<?>>> {
private final MessageHandler handler;
private MessageHandlerConsumer(MessageHandler handler) {
this.handler = handler;
}
@Override
public void accept(Event<Message<?>> event) {
Message<?> message = event.getData();
try {
this.handler.handleMessage(message);
}
catch (Throwable t) {
// TODO
logger.error("Failed to process message " + message, t);
}
}
}
}

View File

@@ -1,4 +0,0 @@
/**
* Provides classes representing various channel types.
*/
package org.springframework.messaging.channel;

View File

@@ -1,116 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.converter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Type;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Rossen Stoyanchev
* @sicne 4.0
*/
public class MappingJackson2MessageConverter implements MessageConverter<Object> {
private ObjectMapper objectMapper = new ObjectMapper();
private Type defaultObjectType = Map.class;
private Class<?> defaultMessagePayloadClass = byte[].class;
/**
* Set the default target Object class to convert to in
* {@link #fromMessage(Message, Class)}.
*/
public void setDefaultObjectClass(Type defaultObjectType) {
Assert.notNull(defaultObjectType, "defaultObjectType is required");
this.defaultObjectType = defaultObjectType;
}
/**
* Set the type of Message payload to convert to in {@link #toMessage(Object)}.
* @param payloadClass either byte[] or String
*/
public void setDefaultTargetPayloadClass(Class<?> payloadClass) {
Assert.isTrue(byte[].class.equals(payloadClass) || String.class.equals(payloadClass),
"Payload class must be byte[] or String: " + payloadClass);
this.defaultMessagePayloadClass = payloadClass;
}
@Override
public Object fromMessage(Message<?> message, Type objectType) {
JavaType javaType = (objectType != null) ?
this.objectMapper.constructType(objectType) :
this.objectMapper.constructType(this.defaultObjectType);
Object payload = message.getPayload();
try {
if (payload instanceof byte[]) {
return this.objectMapper.readValue((byte[]) payload, javaType);
}
else if (payload instanceof String) {
return this.objectMapper.readValue((String) payload, javaType);
}
else {
throw new IllegalArgumentException("Unexpected message payload type: " + payload);
}
}
catch (IOException ex) {
throw new MessageConversionException("Could not read JSON: " + ex.getMessage(), ex);
}
}
@SuppressWarnings("unchecked")
@Override
public <P> Message<P> toMessage(Object object) {
P payload;
try {
if (byte[].class.equals(this.defaultMessagePayloadClass)) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
this.objectMapper.writeValue(out, object);
payload = (P) out.toByteArray();
}
else if (String.class.equals(this.defaultMessagePayloadClass)) {
Writer writer = new StringWriter();
this.objectMapper.writeValue(writer, object);
payload = (P) writer.toString();
}
else {
// Should never happen..
throw new IllegalStateException("Unexpected payload class: " + defaultMessagePayloadClass);
}
}
catch (IOException ex) {
throw new MessageConversionException("Could not write JSON: " + ex.getMessage(), ex);
}
return MessageBuilder.withPayload(payload).build();
}
}

View File

@@ -1,37 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.converter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("serial")
public class MessageConversionException extends MessagingException {
public MessageConversionException(String description, Throwable cause) {
super(description, cause);
}
public MessageConversionException(Message<?> failedMessage, String description, Throwable cause) {
super(failedMessage, description, cause);
}
}

View File

@@ -1,34 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.converter;
import java.lang.reflect.Type;
import org.springframework.messaging.Message;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface MessageConverter<T> {
<P> Message<P> toMessage(T object);
T fromMessage(Message<?> message, Type targetClass);
}

View File

@@ -1,40 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.converter;
import java.lang.reflect.Type;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
* @author Mark Fisher
* @since 4.0
*/
public class SimplePayloadMessageConverter implements MessageConverter<Object> {
@Override
public Message<Object> toMessage(Object object) {
return MessageBuilder.withPayload(object).build();
}
@Override
public Object fromMessage(Message<?> message, Type targetClass) {
return message.getPayload();
}
}

View File

@@ -1,4 +0,0 @@
/**
* Provides classes supporting message conversion.
*/
package org.springframework.messaging.converter;

View File

@@ -1,91 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends AbstractMessagingTemplate<D>
implements DestinationResolvingMessageSendingOperations<D>,
DestinationResolvingMessageReceivingOperations<D>,
DestinationResolvingMessageRequestReplyOperations<D> {
private volatile DestinationResolver<D> destinationResolver;
public void setDestinationResolver(DestinationResolver<D> destinationResolver) {
this.destinationResolver = destinationResolver;
}
@Override
public <P> void send(String destinationName, Message<P> message) {
D destination = resolveDestination(destinationName);
this.doSend(destination, message);
}
protected final D resolveDestination(String destinationName) {
Assert.notNull(destinationResolver, "destinationResolver is required when passing a name only");
return this.destinationResolver.resolveDestination(destinationName);
}
@Override
public <T> void convertAndSend(String destinationName, T message) {
this.convertAndSend(destinationName, message, null);
}
@Override
public <T> void convertAndSend(String destinationName, T message, MessagePostProcessor postProcessor) {
D destination = resolveDestination(destinationName);
super.convertAndSend(destination, message, postProcessor);
}
@Override
public <P> Message<P> receive(String destinationName) {
D destination = resolveDestination(destinationName);
return super.receive(destination);
}
@Override
public Object receiveAndConvert(String destinationName) {
D destination = resolveDestination(destinationName);
return super.receiveAndConvert(destination);
}
@Override
public Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) {
D destination = resolveDestination(destinationName);
return super.sendAndReceive(destination, requestMessage);
}
@Override
public Object convertSendAndReceive(String destinationName, Object request) {
D destination = resolveDestination(destinationName);
return super.convertSendAndReceive(destination, request);
}
@Override
public Object convertSendAndReceive(String destinationName, Object request, MessagePostProcessor postProcessor) {
D destination = resolveDestination(destinationName);
return super.convertSendAndReceive(destination, request, postProcessor);
}
}

View File

@@ -1,101 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.SimplePayloadMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractMessageSendingTemplate<D> implements MessageSendingOperations<D> {
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile D defaultDestination;
protected volatile MessageConverter converter = new SimplePayloadMessageConverter();
public void setDefaultDestination(D defaultDestination) {
this.defaultDestination = defaultDestination;
}
/**
* Set the {@link MessageConverter} that is to be used to convert
* between Messages and objects for this template.
* <p>The default is {@link SimplePayloadMessageConverter}.
*/
public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null");
this.converter = messageConverter;
}
@Override
public <P> void send(Message<P> message) {
this.send(getRequiredDefaultDestination(), message);
}
protected final D getRequiredDefaultDestination() {
Assert.state(this.defaultDestination != null,
"No 'defaultDestination' specified for MessagingTemplate. "
+ "Unable to invoke method without an explicit destination argument.");
return this.defaultDestination;
}
@Override
public <P> void send(D destination, Message<P> message) {
this.doSend(destination, message);
}
protected abstract void doSend(D destination, Message<?> message) ;
@Override
public <T> void convertAndSend(T message) {
this.convertAndSend(getRequiredDefaultDestination(), message);
}
@Override
public <T> void convertAndSend(D destination, T object) {
this.convertAndSend(destination, object, null);
}
@Override
public <T> void convertAndSend(T object, MessagePostProcessor postProcessor) {
this.convertAndSend(getRequiredDefaultDestination(), object, postProcessor);
}
@Override
public <T> void convertAndSend(D destination, T object, MessagePostProcessor postProcessor)
throws MessagingException {
Message<?> message = this.converter.toMessage(object);
if (postProcessor != null) {
message = postProcessor.postProcessMessage(message);
}
this.send(destination, message);
}
}

View File

@@ -1,92 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
/**
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendingTemplate<D>
implements MessageRequestReplyOperations<D>, MessageReceivingOperations<D> {
@Override
public <P> Message<P> receive() {
return this.receive(getRequiredDefaultDestination());
}
@Override
public <P> Message<P> receive(D destination) {
return this.doReceive(destination);
}
protected abstract <P> Message<P> doReceive(D destination);
@Override
public Object receiveAndConvert() {
return this.receiveAndConvert(getRequiredDefaultDestination());
}
@Override
public Object receiveAndConvert(D destination) {
Message<?> message = this.doReceive(destination);
return (message != null) ? this.converter.fromMessage(message, null) : null;
}
@Override
public Message<?> sendAndReceive(Message<?> requestMessage) {
return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage);
}
@Override
public Message<?> sendAndReceive(D destination, Message<?> requestMessage) {
return this.doSendAndReceive(destination, requestMessage);
}
protected abstract <S, R> Message<R> doSendAndReceive(D destination, Message<S> requestMessage);
@Override
public Object convertSendAndReceive(Object request) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request);
}
@Override
public Object convertSendAndReceive(D destination, Object request) {
return this.convertSendAndReceive(destination, request, null);
}
@Override
public Object convertSendAndReceive(Object request, MessagePostProcessor postProcessor) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, postProcessor);
}
@Override
public Object convertSendAndReceive(D destination, Object request, MessagePostProcessor postProcessor) {
Message<?> requestMessage = this.converter.toMessage(request);
if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage);
}
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage);
return this.converter.fromMessage(replyMessage, null);
}
}

View File

@@ -1,47 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.MessagingException;
/**
* Thrown by a ChannelResolver when it cannot resolve a channel name.
*
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("serial")
public class DestinationResolutionException extends MessagingException {
/**
* Create a new ChannelResolutionException.
* @param description the description
*/
public DestinationResolutionException(String description) {
super(description);
}
/**
* Create a new ChannelResolutionException.
* @param description the description
* @param cause the root cause (if any)
*/
public DestinationResolutionException(String description, Throwable cause) {
super(description, cause);
}
}

View File

@@ -1,33 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface DestinationResolver<D> {
/**
* @param name
* @return
* @throws DestinationResolutionException
*/
D resolveDestination(String name) throws DestinationResolutionException;
}

View File

@@ -1,32 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface DestinationResolvingMessageReceivingOperations<D> extends MessageReceivingOperations<D> {
<P> Message<P> receive(String destinationName) throws MessagingException;
Object receiveAndConvert(String destinationName) throws MessagingException;
}

View File

@@ -1,33 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface DestinationResolvingMessageRequestReplyOperations<D> extends MessageRequestReplyOperations<D> {
Message<?> sendAndReceive(String destinationName, Message<?> requestMessage);
Object convertSendAndReceive(String destinationName, Object request);
Object convertSendAndReceive(String destinationName, Object request, MessagePostProcessor requestPostProcessor);
}

View File

@@ -1,35 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface DestinationResolvingMessageSendingOperations<D> extends MessageSendingOperations<D> {
<P> void send(String destinationName, Message<P> message) throws MessagingException;
<T> void convertAndSend(String destinationName, T message) throws MessagingException;
<T> void convertAndSend(String destinationName, T message, MessagePostProcessor postProcessor)
throws MessagingException;
}

View File

@@ -1,231 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.channel.BeanFactoryMessageChannelDestinationResolver;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel>
implements BeanFactoryAware {
private volatile long sendTimeout = -1;
private volatile long receiveTimeout = -1;
private volatile boolean throwExceptionOnLateReply = false;
/**
* Specify the timeout value to use for send operations.
*
* @param sendTimeout the send timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout;
}
/**
* Specify the timeout value to use for receive operations.
*
* @param receiveTimeout the receive timeout in milliseconds
*/
public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
/**
* Specify whether or not an attempt to send on the reply channel throws an exception
* if no receiving thread will actually receive the reply. This can occur
* if the receiving thread has already timed out, or will never call receive()
* because it caught an exception, or has already received a reply.
* (default false - just a WARN log is emitted in these cases).
* @param throwExceptionOnLateReply TRUE or FALSE.
*/
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
this.throwExceptionOnLateReply = throwExceptionOnLateReply;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
super.setDestinationResolver(new BeanFactoryMessageChannelDestinationResolver(beanFactory));
}
@Override
protected final void doSend(MessageChannel destination, Message<?> message) {
Assert.notNull(destination, "channel must not be null");
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0)
? destination.send(message, timeout)
: destination.send(message);
if (!sent) {
throw new MessageDeliveryException(message,
"failed to send message to channel '" + destination + "' within timeout: " + timeout);
}
}
@SuppressWarnings("unchecked")
@Override
protected final <P> Message<P> doReceive(MessageChannel destination) {
Assert.state(destination instanceof PollableChannel,
"The 'destination' must be a PollableChannel for receive operations.");
Assert.notNull(destination, "channel must not be null");
long timeout = this.receiveTimeout;
Message<?> message = (timeout >= 0)
? ((PollableChannel) destination).receive(timeout)
: ((PollableChannel) destination).receive();
if (message == null && this.logger.isTraceEnabled()) {
this.logger.trace("failed to receive message from channel '" + destination + "' within timeout: " + timeout);
}
return (Message<P>) message;
}
@Override
protected final <S, R> Message<R> doSendAndReceive(MessageChannel destination, Message<S> requestMessage) {
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
TemporaryReplyChannel replyChannel = new TemporaryReplyChannel(this.receiveTimeout, this.throwExceptionOnLateReply);
requestMessage = MessageBuilder.fromMessage(requestMessage)
.setReplyChannel(replyChannel)
.setErrorChannel(replyChannel)
.build();
try {
this.doSend(destination, requestMessage);
}
catch (RuntimeException e) {
replyChannel.setClientWontReceive(true);
throw e;
}
Message<R> reply = this.doReceive(replyChannel);
if (reply != null) {
reply = MessageBuilder.fromMessage(reply)
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
.build();
}
return reply;
}
private static class TemporaryReplyChannel implements PollableChannel {
private static final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
private volatile Message<?> message;
private final long receiveTimeout;
private final CountDownLatch latch = new CountDownLatch(1);
private final boolean throwExceptionOnLateReply;
private volatile boolean clientTimedOut;
private volatile boolean clientWontReceive;
private volatile boolean clientHasReceived;
public TemporaryReplyChannel(long receiveTimeout, boolean throwExceptionOnLateReply) {
this.receiveTimeout = receiveTimeout;
this.throwExceptionOnLateReply = throwExceptionOnLateReply;
}
public void setClientWontReceive(boolean clientWontReceive) {
this.clientWontReceive = clientWontReceive;
}
@Override
public Message<?> receive() {
return this.receive(-1);
}
@Override
public Message<?> receive(long timeout) {
try {
if (this.receiveTimeout < 0) {
this.latch.await();
this.clientHasReceived = true;
}
else {
if (this.latch.await(this.receiveTimeout, TimeUnit.MILLISECONDS)) {
this.clientHasReceived = true;
}
else {
this.clientTimedOut = true;
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return this.message;
}
@Override
public boolean send(Message<?> message) {
return this.send(message, -1);
}
@Override
public boolean send(Message<?> message, long timeout) {
this.message = message;
this.latch.countDown();
if (this.clientTimedOut || this.clientHasReceived || this.clientWontReceive) {
String exceptionMessage = "";
if (this.clientTimedOut) {
exceptionMessage = "Reply message being sent, but the receiving thread has already timed out";
}
else if (this.clientHasReceived) {
exceptionMessage = "Reply message being sent, but the receiving thread has already received a reply";
}
else if (this.clientWontReceive) {
exceptionMessage = "Reply message being sent, but the receiving thread has already caught an exception and won't receive";
}
if (logger.isWarnEnabled()) {
logger.warn(exceptionMessage + ":" + message);
}
if (this.throwExceptionOnLateReply) {
throw new MessageDeliveryException(message, exceptionMessage);
}
}
return true;
}
}
}

View File

@@ -1,41 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
/**
* To be used with MessagingTemplate's send method that converts an object to a message.
* It allows for further modification of the message after it has been processed
* by the converter.
*
* <p>This is often implemented as an anonymous class within a method implementation.
*
* @author Mark Fisher
* @since 4.0
*/
public interface MessagePostProcessor {
/**
* Apply a MessagePostProcessor to the message. The returned message is
* typically a modified version of the original.
* @param message the message returned from the MessageConverter
* @return the modified version of the Message
*/
Message<?> postProcessMessage(Message<?> message);
}

View File

@@ -1,36 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface MessageReceivingOperations<D> {
<P> Message<P> receive() throws MessagingException;
<P> Message<P> receive(D destination) throws MessagingException;
Object receiveAndConvert() throws MessagingException;
Object receiveAndConvert(D destination) throws MessagingException;
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface MessageRequestReplyOperations<D> {
Message<?> sendAndReceive(Message<?> requestMessage);
Message<?> sendAndReceive(D destination, Message<?> requestMessage);
Object convertSendAndReceive(Object request);
Object convertSendAndReceive(D destination, Object request);
Object convertSendAndReceive(Object request, MessagePostProcessor requestPostProcessor);
Object convertSendAndReceive(D destination, Object request, MessagePostProcessor requestPostProcessor);
}

View File

@@ -1,40 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface MessageSendingOperations<D> {
<P> void send(Message<P> message) throws MessagingException;
<P> void send(D destination, Message<P> message) throws MessagingException;
<T> void convertAndSend(T message) throws MessagingException;
<T> void convertAndSend(D destination, T message) throws MessagingException;
<T> void convertAndSend(T message, MessagePostProcessor postProcessor) throws MessagingException;
<T> void convertAndSend(D destination, T message, MessagePostProcessor postProcessor) throws MessagingException;
}

View File

@@ -1,4 +0,0 @@
/**
* Provides core messaging classes.
*/
package org.springframework.messaging.core;

View File

@@ -1,4 +0,0 @@
/**
* Generic support for working with messaging APIs and protocols.
*/
package org.springframework.messaging;

View File

@@ -1,43 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.util.Map;
/**
* A message implementation that accepts a {@link Throwable} payload.
* Once created this object is immutable.
*
* @author Mark Fisher
* @author Oleg Zhurakousky
* @since 4.0
* @see MessageBuilder
*/
public class ErrorMessage extends GenericMessage<Throwable> {
private static final long serialVersionUID = -5470210965279837728L;
public ErrorMessage(Throwable payload) {
super(payload);
}
public ErrorMessage(Throwable payload, Map<String, Object> headers) {
super(payload, headers);
}
}

View File

@@ -1,107 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* Base Message class defining common properties such as id, payload, and headers.
* Once created this object is immutable.
*
* @author Mark Fisher
* @since 4.0
* @see MessageBuilder
*/
public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = -9004496725833093406L;
private final T payload;
private final MessageHeaders headers;
/**
* Create a new message with the given payload.
*
* @param payload the message payload
*/
protected GenericMessage(T payload) {
this(payload, null);
}
/**
* Create a new message with the given payload. The provided map
* will be used to populate the message headers
*
* @param payload the message payload
* @param headers message headers
* @see MessageHeaders
*/
protected GenericMessage(T payload, Map<String, Object> headers) {
Assert.notNull(payload, "payload must not be null");
if (headers == null) {
headers = new HashMap<String, Object>();
}
else {
headers = new HashMap<String, Object>(headers);
}
this.headers = new MessageHeaders(headers);
this.payload = payload;
}
public MessageHeaders getHeaders() {
return this.headers;
}
public T getPayload() {
return this.payload;
}
public String toString() {
return "[Payload=" + this.payload + "][Headers=" + this.headers + "]";
}
public int hashCode() {
return this.headers.hashCode() * 23 + ObjectUtils.nullSafeHashCode(this.payload);
}
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj != null && obj instanceof GenericMessage<?>) {
GenericMessage<?> other = (GenericMessage<?>) obj;
if (!this.headers.getId().equals(other.headers.getId())) {
return false;
}
return this.headers.equals(other.headers)
&& this.payload.equals(other.payload);
}
return false;
}
}

View File

@@ -1,163 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* A builder for creating {@link GenericMessage} or {@link ErrorMessage} if the payload is
* {@link Throwable}.
*
* @author Arjen Poutsma
* @author Mark Fisher
* @since 4.0
* @see GenericMessage
* @see ErrorMessage
*/
public final class MessageBuilder<T> {
private final T payload;
private final MessageHeaderAccessor headerAccessor;
private final Message<T> originalMessage;
/**
* Private constructor to be invoked from the static factory methods only.
*/
private MessageBuilder(T payload, Message<T> originalMessage) {
Assert.notNull(payload, "payload must not be null");
this.payload = payload;
this.originalMessage = originalMessage;
this.headerAccessor = new MessageHeaderAccessor(originalMessage);
}
/**
* Create a builder for a new {@link Message} instance pre-populated with all of the
* headers copied from the provided message. The payload of the provided Message will
* also be used as the payload for the new message.
*
* @param message the Message from which the payload and all headers will be copied
*/
public static <T> MessageBuilder<T> fromMessage(Message<T> message) {
Assert.notNull(message, "message must not be null");
MessageBuilder<T> builder = new MessageBuilder<T>(message.getPayload(), message);
return builder;
}
/**
* Create a builder for a new {@link Message} instance with the provided payload.
*
* @param payload the payload for the new message
*/
public static <T> MessageBuilder<T> withPayload(T payload) {
MessageBuilder<T> builder = new MessageBuilder<T>(payload, null);
return builder;
}
/**
* Set the value for the given header name. If the provided value is {@code null},
* the header will be removed.
*/
public MessageBuilder<T> setHeader(String headerName, Object headerValue) {
this.headerAccessor.setHeader(headerName, headerValue);
return this;
}
/**
* Set the value for the given header name only if the header name is not already
* associated with a value.
*/
public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue) {
this.headerAccessor.setHeaderIfAbsent(headerName, headerValue);
return this;
}
/**
* Removes all headers provided via array of 'headerPatterns'. As the name suggests
* the array may contain simple matching patterns for header names. Supported pattern
* styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*/
public MessageBuilder<T> removeHeaders(String... headerPatterns) {
this.headerAccessor.removeHeaders(headerPatterns);
return this;
}
/**
* Remove the value for the given header name.
*/
public MessageBuilder<T> removeHeader(String headerName) {
this.headerAccessor.removeHeader(headerName);
return this;
}
/**
* Copy the name-value pairs from the provided Map. This operation will overwrite any
* existing values. Use { {@link #copyHeadersIfAbsent(Map)} to avoid overwriting
* values. Note that the 'id' and 'timestamp' header values will never be overwritten.
*/
public MessageBuilder<T> copyHeaders(Map<String, ?> headersToCopy) {
this.headerAccessor.copyHeaders(headersToCopy);
return this;
}
/**
* Copy the name-value pairs from the provided Map. This operation will <em>not</em>
* overwrite any existing values.
*/
public MessageBuilder<T> copyHeadersIfAbsent(Map<String, ?> headersToCopy) {
this.headerAccessor.copyHeadersIfAbsent(headersToCopy);
return this;
}
public MessageBuilder<T> setReplyChannel(MessageChannel replyChannel) {
this.headerAccessor.setReplyChannel(replyChannel);
return this;
}
public MessageBuilder<T> setReplyChannelName(String replyChannelName) {
this.headerAccessor.setReplyChannelName(replyChannelName);
return this;
}
public MessageBuilder<T> setErrorChannel(MessageChannel errorChannel) {
this.headerAccessor.setErrorChannel(errorChannel);
return this;
}
public MessageBuilder<T> setErrorChannelName(String errorChannelName) {
this.headerAccessor.setErrorChannelName(errorChannelName);
return this;
}
@SuppressWarnings("unchecked")
public Message<T> build() {
if ((this.originalMessage != null) && !this.headerAccessor.isModified()) {
return this.originalMessage;
}
if (this.payload instanceof Throwable) {
return (Message<T>) new ErrorMessage((Throwable) this.payload, this.headerAccessor.toMap());
}
return new GenericMessage<T>(this.payload, this.headerAccessor.toMap());
}
}

View File

@@ -1,236 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
/**
* A base class for read/write access to {@link MessageHeaders}. Supports creation of new
* headers or modification of existing message headers.
*
* <p>Sub-classes can provide additional typed getters and setters for convenient access
* to specific headers. Getters and setters should delegate to {@link #getHeader(String)}
* or {@link #setHeader(String, Object)} respectively. At the end {@link #toMap()} can be
* used to obtain the resulting headers.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class MessageHeaderAccessor {
protected Log logger = LogFactory.getLog(getClass());
// wrapped read-only message headers
private final MessageHeaders originalHeaders;
// header updates
private final Map<String, Object> headers = new HashMap<String, Object>(4);
/**
* A constructor for creating new message headers.
*/
public MessageHeaderAccessor() {
this.originalHeaders = null;
}
/**
* A constructor for accessing and modifying existing message headers.
*/
public MessageHeaderAccessor(Message<?> message) {
this.originalHeaders = (message != null) ? message.getHeaders() : null;
}
/**
* Return a header map including original, wrapped headers (if any) plus additional
* header updates made through accessor methods.
*/
public Map<String, Object> toMap() {
Map<String, Object> result = new HashMap<String, Object>();
if (this.originalHeaders != null) {
result.putAll(this.originalHeaders);
}
for (String key : this.headers.keySet()) {
Object value = this.headers.get(key);
if (value == null) {
result.remove(key);
}
else {
result.put(key, value);
}
}
return result;
}
public boolean isModified() {
return (!this.headers.isEmpty());
}
public Object getHeader(String headerName) {
if (this.headers.containsKey(headerName)) {
return this.headers.get(headerName);
}
else if (this.originalHeaders != null) {
return this.originalHeaders.get(headerName);
}
return null;
}
/**
* Set the value for the given header name. If the provided value is {@code null} the
* header will be removed.
*/
public void setHeader(String name, Object value) {
Assert.isTrue(!isReadOnly(name), "The '" + name + "' header is read-only.");
verifyType(name, value);
if (!ObjectUtils.nullSafeEquals(value, getHeader(name))) {
this.headers.put(name, value);
}
}
protected boolean isReadOnly(String headerName) {
return MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName);
}
/**
* Set the value for the given header name only if the header name is not already associated with a value.
*/
public void setHeaderIfAbsent(String name, Object value) {
if (getHeader(name) == null) {
setHeader(name, value);
}
}
/**
* Removes all headers provided via array of 'headerPatterns'. As the name suggests
* the array may contain simple matching patterns for header names. Supported pattern
* styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*/
public void removeHeaders(String... headerPatterns) {
List<String> headersToRemove = new ArrayList<String>();
for (String pattern : headerPatterns) {
if (StringUtils.hasLength(pattern)){
if (pattern.contains("*")){
headersToRemove.addAll(getMatchingHeaderNames(pattern, this.headers));
headersToRemove.addAll(getMatchingHeaderNames(pattern, this.originalHeaders));
}
else {
headersToRemove.add(pattern);
}
}
}
for (String headerToRemove : headersToRemove) {
removeHeader(headerToRemove);
}
}
private List<String> getMatchingHeaderNames(String pattern, Map<String, Object> headers) {
List<String> matchingHeaderNames = new ArrayList<String>();
if (headers != null) {
for (Map.Entry<String, Object> header: headers.entrySet()) {
if (PatternMatchUtils.simpleMatch(pattern, header.getKey())) {
matchingHeaderNames.add(header.getKey());
}
}
}
return matchingHeaderNames;
}
/**
* Remove the value for the given header name.
*/
public void removeHeader(String headerName) {
if (StringUtils.hasLength(headerName) && !isReadOnly(headerName)) {
setHeader(headerName, null);
}
}
/**
* Copy the name-value pairs from the provided Map. This operation will overwrite any
* existing values. Use { {@link #copyHeadersIfAbsent(Map)} to avoid overwriting
* values.
*/
public void copyHeaders(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!isReadOnly(key)) {
setHeader(key, headersToCopy.get(key));
}
}
}
/**
* Copy the name-value pairs from the provided Map. This operation will <em>not</em>
* overwrite any existing values.
*/
public void copyHeadersIfAbsent(Map<String, ?> headersToCopy) {
Set<String> keys = headersToCopy.keySet();
for (String key : keys) {
if (!this.isReadOnly(key)) {
setHeaderIfAbsent(key, headersToCopy.get(key));
}
}
}
public void setReplyChannel(MessageChannel replyChannel) {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
}
public void setReplyChannelName(String replyChannelName) {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
}
public void setErrorChannel(MessageChannel errorChannel) {
setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
}
public void setErrorChannelName(String errorChannelName) {
setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName);
}
@Override
public String toString() {
return getClass().getSimpleName() + " [originalHeaders=" + this.originalHeaders
+ ", updated headers=" + this.headers + "]";
}
protected void verifyType(String headerName, Object headerValue) {
if (headerName != null && headerValue != null) {
if (MessageHeaders.ERROR_CHANNEL.equals(headerName)
|| MessageHeaders.REPLY_CHANNEL.endsWith(headerName)) {
Assert.isTrue(headerValue instanceof MessageChannel || headerValue instanceof String, "The '"
+ headerName + "' header value must be a MessageChannel or String.");
}
}
}
}

View File

@@ -1,140 +0,0 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
/**
* An extension of {@link MessageHeaderAccessor} that also provides read/write access to
* message headers from an external message source. Native message headers are kept
* in a {@link MultiValueMap} under the key {@link #NATIVE_HEADERS}.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
public static final String NATIVE_HEADERS = "nativeHeaders";
// wrapped native headers
private final Map<String, List<String>> originalNativeHeaders;
// native header updates
private final MultiValueMap<String, String> nativeHeaders = new LinkedMultiValueMap<String, String>(4);
/**
* A constructor for creating new headers, accepting an optional native header map.
*/
public NativeMessageHeaderAccessor(Map<String, List<String>> nativeHeaders) {
super();
this.originalNativeHeaders = nativeHeaders;
}
/**
* A constructor for accessing and modifying existing message headers.
*/
public NativeMessageHeaderAccessor(Message<?> message) {
super(message);
this.originalNativeHeaders = initNativeHeaders(message);
}
private static Map<String, List<String>> initNativeHeaders(Message<?> message) {
if (message != null) {
@SuppressWarnings("unchecked")
Map<String, List<String>> headers = (Map<String, List<String>>) message.getHeaders().get(NATIVE_HEADERS);
if (headers != null) {
return headers;
}
}
return null;
}
@Override
public Map<String, Object> toMap() {
Map<String, Object> result = super.toMap();
result.put(NATIVE_HEADERS, toNativeHeaderMap());
return result;
}
@Override
public boolean isModified() {
return (super.isModified() || (!this.nativeHeaders.isEmpty()));
}
/**
* Return a map with native headers including original, wrapped headers (if any) plus
* additional header updates made through accessor methods.
*/
public Map<String, List<String>> toNativeHeaderMap() {
Map<String, List<String>> result = new HashMap<String, List<String>>();
if (this.originalNativeHeaders != null) {
result.putAll(this.originalNativeHeaders);
}
for (String key : this.nativeHeaders.keySet()) {
List<String> value = this.nativeHeaders.get(key);
if (value == null) {
result.remove(key);
}
else {
result.put(key, value);
}
}
return result;
}
protected List<String> getNativeHeader(String headerName) {
if (this.nativeHeaders.containsKey(headerName)) {
return this.nativeHeaders.get(headerName);
}
else if (this.originalNativeHeaders != null) {
return this.originalNativeHeaders.get(headerName);
}
return null;
}
protected String getFirstNativeHeader(String headerName) {
List<String> values = getNativeHeader(headerName);
return CollectionUtils.isEmpty(values) ? null : values.get(0);
}
/**
* Set the value for the given header name. If the provided value is {@code null} the
* header will be removed.
*/
protected void putNativeHeader(String name, List<String> value) {
if (!ObjectUtils.nullSafeEquals(value, getHeader(name))) {
this.nativeHeaders.put(name, value);
}
}
protected void setNativeHeader(String name, String value) {
this.nativeHeaders.set(name, value);
}
}