diff --git a/spring-integration-gemfire/README.adoc b/spring-integration-gemfire/README.adoc
new file mode 100644
index 0000000..30a5abc
--- /dev/null
+++ b/spring-integration-gemfire/README.adoc
@@ -0,0 +1,298 @@
+[[gemfire]]
+== VMware Tanzu GemFire and Apache Geode Support
+
+IMPORTANT: Starting with Spring Data 2022.0.0 there is not going to be VMware Tanzu GemFire or Apache Geode out-of-the-box.
+This module is present here only as a source code for legacy.
+
+Spring Integration provides support for VMware Tanzu GemFire and Apache Geode.
+
+You need to include this dependency into your project:
+
+====
+[source, xml, subs="normal", role="primary"]
+.Maven
+----
+
+ org.springframework.integration
+ spring-integration-gemfire
+ {project-version}
+
+----
+[source, groovy, subs="normal", role="secondary"]
+.Gradle
+----
+compile "org.springframework.integration:spring-integration-gemfire:{project-version}"
+----
+====
+
+GemFire is a distributed data management platform that provides a key-value data grid along with advanced distributed system features, such as event processing, continuous querying, and remote function execution.
+This guide assumes some familiarity with the commercial https://tanzu.vmware.com/gemfire[VMware Tanzu GemFire] or Open Source https://geode.apache.org[Apache Geode].
+
+Spring integration provides support for GemFire by implementing inbound adapters for entry and continuous query events, an outbound adapter to write entries to the cache, and message and metadata stores and `GemfireLockRegistry` implementations.
+Spring integration leverages the https://projects.spring.io/spring-data-gemfire[Spring Data for VMware Tanzu GemFire] project, providing a thin wrapper over its components.
+
+Starting with version 5.1, the Spring Integration GemFire module uses the https://github.com/spring-projects/spring-data-geode[Spring Data for Apache Geode] transitive dependency by default.
+To switch to the commercial VMware Tanzu GemFire-based Spring Data for VMware Tanzu GemFire, exclude `spring-data-geode` from dependencies and add `spring-data-gemfire`, as the following Maven snippet shows:
+
+====
+[source,xml]
+----
+
+ org.springframework.integration
+ spring-integration-gemfire
+
+
+ org.springframework.data
+ spring-data-geode
+
+
+
+
+
+ org.springframework.data
+ spring-data-gemfire
+
+----
+====
+
+To configure the 'int-gfe' namespace, include the following elements within the headers of your XML configuration file:
+
+====
+[source,xml]
+----
+xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
+xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
+ https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"
+----
+====
+
+[[gemfire-inbound]]
+=== Inbound Channel Adapter
+
+The inbound channel adapter produces messages on a channel when triggered by a GemFire `EntryEvent`.
+GemFire generates events whenever an entry is `CREATED`, `UPDATED`, `DESTROYED`, or `INVALIDATED` in the associated region.
+The inbound channel adapter lets you filter on a subset of these events.
+For example, you may want to produce messages only in response to an entry being created.
+In addition, the inbound channel adapter can evaluate a SpEL expression if, for example, you want your message payload to contain an event property such as the new entry value.
+The following example shows how to configure an inbound channel adapter with a SpEL language (in the `expression` attribute):
+
+====
+[source,xml]
+----
+
+
+
+----
+====
+
+The preceding configuration creates a GemFire `Cache` and `Region` by using Spring GemFire's 'gfe' namespace.
+The `inbound-channel-adapter` element requires a reference to the GemFire region on which the adapter listens for events.
+Optional attributes include `cache-events`, which can contain a comma-separated list of event types for which a message is produced on the input channel.
+By default, `CREATED` and `UPDATED` are enabled.
+If no `channel` attribute is provided, the channel is created from the `id` attribute.
+This adapter also supports an `error-channel`.
+The GemFire https://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/EntryEvent.html[`EntryEvent`] is the `#root` object of the `expression` evaluation.
+The following example shows an expression that replaces a value for a key:
+
+====
+[source]
+----
+expression="new something.MyEvent(key, oldValue, newValue)"
+----
+====
+
+If the `expression` attribute is not provided, the message payload is the GemFire `EntryEvent` itself.
+
+NOTE: This adapter conforms to Spring Integration conventions.
+
+[[gemfire-cq]]
+=== Continuous Query Inbound Channel Adapter
+
+The continuous query inbound channel adapter produces messages on a channel when triggered by a GemFire continuous query or `CqEvent` event.
+In release `1.1`, Spring Data introduced continuous query support, including `ContinuousQueryListenerContainer`, which provides a nice abstraction over the GemFire native API.
+This adapter requires a reference to a `ContinuousQueryListenerContainer` instance, creates a listener for a given `query`, and executes the query.
+The continuous query acts as an event source that fires whenever its result set changes state.
+
+NOTE: GemFire queries are written in OQL and are scoped to the entire cache (not just one region).
+Additionally, continuous queries require a remote (that is, running in a separate process or remote host) cache server.
+See the https://gemfire82.docs.VMware Tanzu.io/docs-gemfire/gemfire_nativeclient/continuous-querying/continuous-querying.html[GemFire documentation] for more information on implementing continuous queries.
+
+The following configuration creates a GemFire client cache (recall that a remote cache server is required for this implementation and its address is configured as a child element of the pool), a client region, and a `ContinuousQueryListenerContainer` that uses Spring Data:
+
+====
+[source,xml]
+----
+
+
+
+
+
+
+
+
+
+
+
+----
+====
+
+The continuous query inbound channel adapter requires a `cq-listener-container` attribute, which must contain a reference to the `ContinuousQueryListenerContainer`.
+Optionally, it accepts an `expression` attribute that uses SpEL to transform the `CqEvent` or extract an individual property as needed.
+The `cq-inbound-channel-adapter` provides a `query-events` attribute that contains a comma-separated list of event types for which a message is produced on the input channel.
+The available event types are `CREATED`, `UPDATED`, `DESTROYED`, `REGION_DESTROYED`, and `REGION_INVALIDATED`.
+By default, `CREATED` and `UPDATED` are enabled.
+Additional optional attributes include `query-name` (which provides an optional query name), `expression` (which works as described in the preceding section), and `durable` (a boolean value indicating if the query is durable -- it is false by default).
+If you do not provide a `channel`, the channel is created from the `id` attribute.
+This adapter also supports an `error-channel`.
+
+NOTE: This adapter conforms to Spring Integration conventions.
+
+[[gemfire-outbound]]
+=== Outbound Channel Adapter
+
+The outbound channel adapter writes cache entries that are mapped from the message payload.
+In its simplest form, it expects a payload of type `java.util.Map` and puts the map entries into its configured region.
+The following example shows how to configure an outbound channel adapter:
+
+====
+[source,xml]
+----
+
+----
+====
+
+Given the preceding configuration, an exception is thrown if the payload is not a `Map`.
+Additionally, you can configure the outbound channel adapter to create a map of cache entries by using SpEL.
+The following example shows how to do so:
+
+====
+[source,xml]
+----
+
+
+
+
+
+
+----
+====
+
+In the preceding configuration, the inner element (`cache-entries`) is semantically equivalent to a Spring 'map' element.
+The adapter interprets the `key` and `value` attributes as SpEL expressions with the message as the evaluation context.
+Note that this can contain arbitrary cache entries (not only those derived from the message) and that literal values must be enclosed in single quotes.
+In the preceding example, if the message sent to `cacheChannel` has a `String` payload with a value `Hello`, two entries (`[HELLO:hello, thing1:thing2]`) are written (either created or updated) in the cache region.
+This adapter also supports the `order` attribute, which may be useful if it is bound to a `PublishSubscribeChannel`.
+
+[[gemfire-message-store]]
+=== Gemfire Message Store
+
+As described in EIP, a https://www.enterpriseintegrationpatterns.com/MessageStore.html[message store] lets you persist messages.
+This can be useful when dealing with components that have a capability to buffer messages (`QueueChannel`, `Aggregator`, `Resequencer`, and others) if reliability is a concern.
+In Spring Integration, the `MessageStore` strategy interface also provides the foundation for the https://www.enterpriseintegrationpatterns.com/StoreInLibrary.html[claim check] pattern, which is described in EIP as well.
+
+Spring Integration's Gemfire module provides `GemfireMessageStore`, which is an implementation of both the `MessageStore` strategy (mainly used by the `QueueChannel` and `ClaimCheck` patterns) and the `MessageGroupStore` strategy (mainly used by the `Aggregator` and `Resequencer` patterns).
+
+The following example configures the cache and region by using the `spring-gemfire` namespace (not to be confused with the `spring-integration-gemfire` namespace):
+
+====
+[source,xml]
+----
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+----
+====
+
+Often, it is desirable for the message store to be maintained in one or more remote cache servers in a client-server configuration.
+In this case, you should configure a client cache, a client region, and a client pool and inject the region into the `MessageStore`.
+The following example shows how to do so:
+
+====
+[source,xml]
+----
+
+
+
+
+
+
+
+
+
+
+
+----
+====
+
+Note that the `pool` element is configured with the address of a cache server (you can substitute a locator here).
+The region is configured as a 'PROXY' so that no data is stored locally.
+The region's `id` corresponds to a region with the same name in the cache server.
+
+Starting with version 4.3.12, the `GemfireMessageStore` supports the key `prefix` option to allow distinguishing between instances of the store on the same GemFire region.
+
+[[gemfire-lock-registry]]
+=== Gemfire Lock Registry
+
+Starting with version 4.0, the `GemfireLockRegistry` is available.
+Certain components (for example, the aggregator and the resequencer) use a lock obtained from a `LockRegistry` instance to ensure that only one thread is manipulating a group at any given time.
+The `DefaultLockRegistry` performs this function within a single component.
+You can now configure an external lock registry on these components.
+When you use a shared `MessageGroupStore` with the `GemfireLockRegistry`, it can provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
+
+NOTE: One of the `GemfireLockRegistry` constructors requires a `Region` as an argument.
+It is used to obtain a `Lock` from the `getDistributedLock()` method.
+This operation requires `GLOBAL` scope for the `Region`.
+Another constructor requires a `Cache`, and the `Region` is created with `GLOBAL` scope and with the name, `LockRegistry`.
+
+[[gemfire-metadata-store]]
+=== Gemfire Metadata Store
+
+Version 4.0 introduced a new Gemfire-based `MetadataStore` (<<./meta-data-store.adoc#metadata-store,Metadata Store>>) implementation.
+You can use the `GemfireMetadataStore` to maintain metadata state across application restarts.
+This new `MetadataStore` implementation can be used with adapters such as:
+
+* <<./feed.adoc#feed-inbound-channel-adapter,Feed Inbound Channel Adapter>>
+* <<./file.adoc#file-reading,Reading Files>>
+* <<./ftp.adoc#ftp-inbound,FTP Inbound Channel Adapter>>
+* <<./sftp.adoc#sftp-inbound,SFTP Inbound Channel Adapter>>
+
+To get these adapters to use the new `GemfireMetadataStore`, declare a Spring bean with a bean name of `metadataStore`.
+The feed inbound channel adapter automatically picks up and use the declared `GemfireMetadataStore`.
+
+NOTE: The `GemfireMetadataStore` also implements `ConcurrentMetadataStore`, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key's value.
+These methods give various levels of concurrency guarantees based on the scope and data policy of the region.
+They are implemented in the peer cache and client-server cache but are disallowed in peer regions that have `NORMAL` or `EMPTY` data policies.
+
+NOTE: Since version 5.0, the `GemfireMetadataStore` also implements `ListenableMetadataStore`, which lets you listen to cache events by providing `MetadataStoreListener` instances to the store, as the following example shows:
+
+====
+[source,java]
+----
+GemfireMetadataStore metadataStore = new GemfireMetadataStore(cache);
+metadataStore.addListener(new MetadataStoreListenerAdapter() {
+
+ @Override
+ public void onAdd(String key, String value) {
+ ...
+ }
+
+});
+----
+====
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireCqInboundChannelAdapterParser.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireCqInboundChannelAdapterParser.java
new file mode 100644
index 0000000..93f3638
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireCqInboundChannelAdapterParser.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.config.xml;
+
+import org.w3c.dom.Element;
+
+import org.springframework.beans.factory.support.AbstractBeanDefinition;
+import org.springframework.beans.factory.support.BeanDefinitionBuilder;
+import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.integration.config.xml.AbstractChannelAdapterParser;
+import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
+import org.springframework.integration.gemfire.inbound.ContinuousQueryMessageProducer;
+
+/**
+ * @author David Turanski
+ * @author Dan Oxlade
+ * @author Gary Russell
+ * @author Artem Bilan
+ * @since 2.1
+ *
+ */
+public class GemfireCqInboundChannelAdapterParser extends AbstractChannelAdapterParser {
+
+
+ private static final String ERROR_CHANNEL_ATTRIBUTE = "error-channel";
+
+ private static final String OUTPUT_CHANNEL_PROPERTY = "outputChannel";
+
+ private static final String QUERY_LISTENER_CONTAINER_ATTRIBUTE = "cq-listener-container";
+
+ private static final String DURABLE_ATTRIBUTE = "durable";
+
+ private static final String QUERY_NAME_ATTRIBUTE = "query-name";
+
+ private static final String QUERY_ATTRIBUTE = "query";
+
+ private static final String EXPRESSION_ATTRIBUTE = "expression";
+
+ private static final String SUPPORTED_EVENT_TYPES_PROPERTY = "supportedEventTypes";
+
+ private static final String QUERY_EVENTS_ATTRIBUTE = "query-events";
+
+ @Override
+ protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
+ BeanDefinitionBuilder continuousQueryMessageProducer =
+ BeanDefinitionBuilder.genericBeanDefinition(ContinuousQueryMessageProducer.class);
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element,
+ EXPRESSION_ATTRIBUTE, "payloadExpressionString");
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element,
+ QUERY_EVENTS_ATTRIBUTE, SUPPORTED_EVENT_TYPES_PROPERTY);
+
+ if (!element.hasAttribute(QUERY_LISTENER_CONTAINER_ATTRIBUTE)) {
+ parserContext.getReaderContext()
+ .error("'" + QUERY_LISTENER_CONTAINER_ATTRIBUTE + "' attribute is required.", element);
+ }
+
+ if (!element.hasAttribute(QUERY_ATTRIBUTE)) {
+ parserContext.getReaderContext().error("'" + QUERY_ATTRIBUTE + "' attribute is required.", element);
+ }
+
+ continuousQueryMessageProducer.addConstructorArgReference(element.getAttribute(QUERY_LISTENER_CONTAINER_ATTRIBUTE));
+ continuousQueryMessageProducer.addConstructorArgValue(element.getAttribute(QUERY_ATTRIBUTE));
+
+ continuousQueryMessageProducer.addPropertyReference(OUTPUT_CHANNEL_PROPERTY, channelName);
+ IntegrationNamespaceUtils.setReferenceIfAttributeDefined(continuousQueryMessageProducer, element,
+ ERROR_CHANNEL_ATTRIBUTE);
+
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element, QUERY_NAME_ATTRIBUTE);
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element, DURABLE_ATTRIBUTE);
+ return continuousQueryMessageProducer.getBeanDefinition();
+ }
+
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireInboundChannelAdapterParser.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireInboundChannelAdapterParser.java
new file mode 100644
index 0000000..67928a8
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireInboundChannelAdapterParser.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.config.xml;
+
+import org.w3c.dom.Element;
+
+import org.springframework.beans.factory.support.AbstractBeanDefinition;
+import org.springframework.beans.factory.support.BeanDefinitionBuilder;
+import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.integration.config.xml.AbstractChannelAdapterParser;
+import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
+import org.springframework.integration.gemfire.inbound.CacheListeningMessageProducer;
+
+/**
+ * @author David Turanski
+ * @author Gary Russell
+ * @author Artem Bilan
+ * @since 2.1
+ */
+public class GemfireInboundChannelAdapterParser extends AbstractChannelAdapterParser {
+
+ private static final String ERROR_CHANNEL_ATTRIBUTE = "error-channel";
+
+ private static final String OUTPUT_CHANNEL_PROPERTY = "outputChannel";
+
+ private static final String REGION_ATTRIBUTE = "region";
+
+ private static final String EXPRESSION_ATTRIBUTE = "expression";
+
+ private static final String SUPPORTED_EVENT_TYPES_PROPERTY = "supportedEventTypes";
+
+ private static final String CACHE_EVENTS_ATTRIBUTE = "cache-events";
+
+ @Override
+ protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
+ BeanDefinitionBuilder listeningMessageProducer =
+ BeanDefinitionBuilder.genericBeanDefinition(CacheListeningMessageProducer.class);
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(listeningMessageProducer, element,
+ EXPRESSION_ATTRIBUTE, "payloadExpressionString");
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(listeningMessageProducer, element,
+ CACHE_EVENTS_ATTRIBUTE, SUPPORTED_EVENT_TYPES_PROPERTY);
+
+ if (!element.hasAttribute(REGION_ATTRIBUTE)) {
+ parserContext.getReaderContext().error("'region' attribute is required.", element);
+ }
+
+ listeningMessageProducer.addConstructorArgReference(element.getAttribute(REGION_ATTRIBUTE));
+
+ listeningMessageProducer.addPropertyReference(OUTPUT_CHANNEL_PROPERTY, channelName);
+ IntegrationNamespaceUtils.setReferenceIfAttributeDefined(listeningMessageProducer, element,
+ ERROR_CHANNEL_ATTRIBUTE);
+ return listeningMessageProducer.getBeanDefinition();
+ }
+
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireIntegrationNamespaceHandler.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireIntegrationNamespaceHandler.java
new file mode 100644
index 0000000..3492154
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireIntegrationNamespaceHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.config.xml;
+
+import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler;
+
+/**
+ * @author David Turanski
+ * @since 2.1
+ */
+public class GemfireIntegrationNamespaceHandler extends AbstractIntegrationNamespaceHandler {
+
+ /* (non-Javadoc)
+ * @see org.springframework.beans.factory.xml.NamespaceHandler#init()
+ */
+ public void init() {
+ registerBeanDefinitionParser("inbound-channel-adapter", new GemfireInboundChannelAdapterParser());
+ registerBeanDefinitionParser("cq-inbound-channel-adapter", new GemfireCqInboundChannelAdapterParser());
+ registerBeanDefinitionParser("outbound-channel-adapter", new GemfireOutboundChannelAdapterParser());
+ }
+
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireOutboundChannelAdapterParser.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireOutboundChannelAdapterParser.java
new file mode 100644
index 0000000..ec617c8
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/GemfireOutboundChannelAdapterParser.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.config.xml;
+
+import java.util.Map;
+
+import org.w3c.dom.Element;
+
+import org.springframework.beans.factory.support.AbstractBeanDefinition;
+import org.springframework.beans.factory.support.BeanDefinitionBuilder;
+import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
+import org.springframework.integration.gemfire.outbound.CacheWritingMessageHandler;
+import org.springframework.util.xml.DomUtils;
+
+/**
+ * @author David Turanski
+ * @author Gary Russell
+ * @since 2.1
+ */
+public class GemfireOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
+
+ private static final String CACHE_ENTRIES_PROPERTY = "cacheEntries";
+
+ private static final String CACHE_ENTRIES_ELEMENT = "cache-entries";
+
+ private static final String REGION_ATTRIBUTE = "region";
+
+ /* (non-Javadoc)
+ * @see AbstractOutboundChannelAdapterParser#parseConsumer(Element, ParserContext)
+ */
+ @Override
+ protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
+ BeanDefinitionBuilder cacheWritingMessageHandler = BeanDefinitionBuilder.genericBeanDefinition(
+ CacheWritingMessageHandler.class);
+ if (!element.hasAttribute(REGION_ATTRIBUTE)) {
+ parserContext.getReaderContext().error("'region' attribute is required.", element);
+ }
+
+ cacheWritingMessageHandler.addConstructorArgReference(element.getAttribute(REGION_ATTRIBUTE));
+
+ Element cacheEntries = DomUtils.getChildElementByTagName(element, CACHE_ENTRIES_ELEMENT);
+ if (cacheEntries != null) {
+ Map, ?> map = parserContext.getDelegate()
+ .parseMapElement(cacheEntries, cacheWritingMessageHandler.getBeanDefinition());
+ cacheWritingMessageHandler.addPropertyValue(CACHE_ENTRIES_PROPERTY, map);
+ }
+ return cacheWritingMessageHandler.getBeanDefinition();
+ }
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/package-info.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/package-info.java
new file mode 100644
index 0000000..83e13b5
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/config/xml/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Provides classes for configuration - parsers, namespace handlers.
+ */
+package org.springframework.integration.gemfire.config.xml;
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/CacheListeningMessageProducer.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/CacheListeningMessageProducer.java
new file mode 100644
index 0000000..61c609c
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/CacheListeningMessageProducer.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.inbound;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.CacheListener;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+
+import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
+import org.springframework.messaging.Message;
+import org.springframework.util.Assert;
+
+/**
+ * An inbound endpoint that listens to a GemFire region for events and then publishes Messages to
+ * a channel. The default supported event types are CREATED and UPDATED. See the {@link EventType}
+ * enum for all options. A SpEL expression may be provided to generate a Message payload by
+ * evaluating that expression against the {@link EntryEvent} instance as the root object. If no
+ * payloadExpression is provided, the {@link EntryEvent} itself will be the payload.
+ *
+ * @author Mark Fisher
+ * @author David Turanski
+ * @author Artem Bilan
+ * @since 2.1
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class CacheListeningMessageProducer extends ExpressionMessageProducerSupport {
+
+ private final Log logger = LogFactory.getLog(this.getClass());
+
+ private final Region region;
+
+ private final CacheListener, ?> listener;
+
+ private volatile Set supportedEventTypes =
+ new HashSet(Arrays.asList(EventType.CREATED, EventType.UPDATED));
+
+
+ public CacheListeningMessageProducer(Region, ?> region) {
+ Assert.notNull(region, "region must not be null");
+ this.region = region;
+ this.listener = new MessageProducingCacheListener();
+ }
+
+
+ public void setSupportedEventTypes(EventType... eventTypes) {
+ Assert.notEmpty(eventTypes, "eventTypes must not be empty");
+ this.supportedEventTypes = new HashSet(Arrays.asList(eventTypes));
+ }
+
+ @Override
+ public String getComponentType() {
+ return "gemfire:inbound-channel-adapter";
+ }
+
+ @Override
+ protected void doStart() {
+ if (this.logger.isInfoEnabled()) {
+ this.logger.info("adding MessageProducingCacheListener to GemFire Region '" + this.region.getName() + "'");
+ }
+ this.region.getAttributesMutator().addCacheListener(this.listener);
+ }
+
+ @Override
+ protected void doStop() {
+ if (this.logger.isInfoEnabled()) {
+ this.logger.info("removing MessageProducingCacheListener from GemFire Region '" + this.region.getName() + "'");
+ }
+ try {
+ this.region.getAttributesMutator().removeCacheListener(this.listener);
+ }
+ catch (CacheClosedException e) {
+ if (this.logger.isDebugEnabled()) {
+ this.logger.debug(e.getMessage(), e);
+ }
+ }
+
+ }
+
+ private class MessageProducingCacheListener extends CacheListenerAdapter {
+
+ @Override
+ public void afterCreate(EntryEvent event) {
+ if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.CREATED)) {
+ processEvent(event);
+ }
+ }
+
+ @Override
+ public void afterUpdate(EntryEvent event) {
+ if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.UPDATED)) {
+ processEvent(event);
+ }
+ }
+
+ @Override
+ public void afterInvalidate(EntryEvent event) {
+ if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.INVALIDATED)) {
+ processEvent(event);
+ }
+ }
+
+ @Override
+ public void afterDestroy(EntryEvent event) {
+ if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.DESTROYED)) {
+ processEvent(event);
+ }
+ }
+
+ private void processEvent(EntryEvent event) {
+ publish(evaluatePayloadExpression(event));
+
+ }
+
+ private void publish(Object object) {
+ Message> message = null;
+ if (object instanceof Message) {
+ message = (Message>) object;
+ }
+ else {
+ message = getMessageBuilderFactory().withPayload(object).build();
+ }
+ sendMessage(message);
+ }
+ }
+
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/ContinuousQueryMessageProducer.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/ContinuousQueryMessageProducer.java
new file mode 100644
index 0000000..665e3ab
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/ContinuousQueryMessageProducer.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.inbound;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geode.cache.query.CqEvent;
+
+import org.springframework.data.gemfire.listener.ContinuousQueryDefinition;
+import org.springframework.data.gemfire.listener.ContinuousQueryListener;
+import org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer;
+import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
+import org.springframework.messaging.Message;
+import org.springframework.util.Assert;
+
+/**
+ * Responds to a Gemfire continuous query (set using the #query field) that is
+ * constantly evaluated against a cache
+ * {@link org.apache.geode.cache.Region}. This is much faster than
+ * re-querying the cache manually.
+ *
+ * @author Josh Long
+ * @author David Turanski
+ * @author Artem Bilan
+ * @since 2.1
+ *
+ */
+public class ContinuousQueryMessageProducer extends ExpressionMessageProducerSupport
+ implements ContinuousQueryListener {
+
+ private static Log logger = LogFactory.getLog(ContinuousQueryMessageProducer.class);
+
+ private final String query;
+
+ private final ContinuousQueryListenerContainer queryListenerContainer;
+
+ private volatile String queryName;
+
+ private boolean durable;
+
+ private volatile Set supportedEventTypes =
+ new HashSet(Arrays.asList(CqEventType.CREATED, CqEventType.UPDATED));
+
+ /**
+ * @param queryListenerContainer a {@link ContinuousQueryListenerContainer}
+ * @param query the query string
+ */
+ public ContinuousQueryMessageProducer(ContinuousQueryListenerContainer queryListenerContainer, String query) {
+ Assert.notNull(queryListenerContainer, "'queryListenerContainer' cannot be null");
+ Assert.notNull(query, "'query' cannot be null");
+ this.queryListenerContainer = queryListenerContainer;
+ this.query = query;
+ }
+
+ /**
+ * @param queryName optional query name
+ */
+ public void setQueryName(String queryName) {
+ this.queryName = queryName;
+ }
+
+ /**
+ * @param durable true if the query is a durable subscription
+ */
+ public void setDurable(boolean durable) {
+ this.durable = durable;
+ }
+
+ public void setSupportedEventTypes(CqEventType... eventTypes) {
+ Assert.notEmpty(eventTypes, "eventTypes must not be empty");
+ this.supportedEventTypes = new HashSet(Arrays.asList(eventTypes));
+ }
+
+ @Override
+ public String getComponentType() {
+ return "gemfire:cq-inbound-channel-adapter";
+ }
+
+ @Override
+ protected void onInit() {
+ super.onInit();
+ if (this.queryName == null) {
+ this.queryListenerContainer.addListener(new ContinuousQueryDefinition(this.query, this, this.durable));
+ }
+ else {
+ this.queryListenerContainer.addListener(new ContinuousQueryDefinition(this.queryName, this.query, this,
+ this.durable));
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.gemfire.listener.QueryListener#onEvent(com.gemstone.gemfire.cache.query.CqEvent)
+ */
+ @Override
+ public void onEvent(CqEvent event) {
+ if (isEventSupported(event)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("processing cq event key [%s] event [%s]", event.getQueryOperation()
+ .toString(), event.getKey()));
+ }
+ Message> message = null;
+ Object object = evaluatePayloadExpression(event);
+ if (object instanceof Message) {
+ message = (Message>) object;
+ }
+ else {
+ message = getMessageBuilderFactory().withPayload(object).build();
+ }
+ sendMessage(message);
+ }
+ }
+
+ private boolean isEventSupported(CqEvent event) {
+
+ String eventName = event.getQueryOperation().toString() +
+ (event.getQueryOperation().toString().endsWith("Y") ? "ED" : "D");
+ CqEventType eventType = CqEventType.valueOf(eventName);
+ return this.supportedEventTypes.contains(eventType);
+ }
+
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/CqEventType.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/CqEventType.java
new file mode 100644
index 0000000..7964a53
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/CqEventType.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2002-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.inbound;
+
+/**
+ * Enumeration of GemFire Continuous Query Event Types.
+ *
+ * @author David Turanski
+ *
+ * @since 2.1
+ */
+public enum CqEventType {
+ CREATED,
+
+ UPDATED,
+
+ DESTROYED,
+
+ REGION_CLEARED,
+
+ REGION_INVALIDATED
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/EventType.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/EventType.java
new file mode 100644
index 0000000..60be238
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/EventType.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.inbound;
+
+/**
+ * Enumeration of GemFire event types.
+ *
+ * @author Mark Fisher
+ * @since 2.1
+ */
+public enum EventType {
+
+ CREATED,
+
+ UPDATED,
+
+ DESTROYED,
+
+ INVALIDATED
+
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/package-info.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/package-info.java
new file mode 100644
index 0000000..29f4b7f
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/inbound/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Provides classes supporting inbound endpoints.
+ */
+package org.springframework.integration.gemfire.inbound;
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/metadata/GemfireMetadataStore.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/metadata/GemfireMetadataStore.java
new file mode 100644
index 0000000..6630883
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/metadata/GemfireMetadataStore.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2014-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.metadata;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+
+import org.springframework.integration.metadata.ListenableMetadataStore;
+import org.springframework.integration.metadata.MetadataStoreListener;
+import org.springframework.util.Assert;
+
+/**
+ * Gemfire implementation of {@link ListenableMetadataStore}.
+ * Use this {@link org.springframework.integration.metadata.MetadataStore}
+ * to achieve meta-data persistence shared across application instances and
+ * restarts.
+ *
+ * @author Artem Bilan
+ * @author Venil Noronha
+ * @author Gary Russell
+ *
+ * @since 4.0
+ */
+public class GemfireMetadataStore implements ListenableMetadataStore {
+
+ private static final String KEY_MUST_NOT_BE_NULL = "'key' must not be null.";
+
+ public static final String KEY = "MetaData";
+
+ private final GemfireCacheListener cacheListener = new GemfireCacheListener();
+
+ private final Region region;
+
+ public GemfireMetadataStore(Cache cache) {
+ this(Objects.requireNonNull(cache, "'cache' must not be null")
+ .createRegionFactory()
+ .setScope(Scope.LOCAL)
+ .create(KEY));
+ }
+
+ public GemfireMetadataStore(Region region) {
+ Assert.notNull(region, "'region' must not be null");
+ this.region = region;
+ this.region.getAttributesMutator()
+ .addCacheListener(this.cacheListener);
+ }
+
+ @Override
+ public void put(String key, String value) {
+ Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
+ Assert.notNull(value, "'value' must not be null.");
+ this.region.put(key, value);
+ }
+
+ @Override
+ public String putIfAbsent(String key, String value) {
+ Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
+ Assert.notNull(value, "'value' must not be null.");
+ return this.region.putIfAbsent(key, value);
+ }
+
+ @Override
+ public boolean replace(String key, String oldValue, String newValue) {
+ Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
+ Assert.notNull(oldValue, "'oldValue' must not be null.");
+ Assert.notNull(newValue, "'newValue' must not be null.");
+ return this.region.replace(key, oldValue, newValue);
+ }
+
+ @Override
+ public String get(String key) {
+ Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
+ return this.region.get(key);
+ }
+
+ @Override
+ public String remove(String key) {
+ Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
+ return this.region.remove(key);
+ }
+
+ @Override
+ public void addListener(MetadataStoreListener listener) {
+ Assert.notNull(listener, "'listener' must not be null");
+ this.cacheListener.listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(MetadataStoreListener listener) {
+ this.cacheListener.listeners.remove(listener);
+ }
+
+ private static class GemfireCacheListener extends CacheListenerAdapter {
+
+ private final List listeners = new CopyOnWriteArrayList<>();
+
+ GemfireCacheListener() {
+ }
+
+ @Override
+ public void afterCreate(EntryEvent event) {
+ this.listeners.forEach(listener -> listener.onAdd(event.getKey(), event.getNewValue()));
+ }
+
+ @Override
+ public void afterUpdate(EntryEvent event) {
+ this.listeners.forEach(listener -> listener.onUpdate(event.getKey(), event.getNewValue()));
+ }
+
+ @Override
+ public void afterDestroy(EntryEvent event) {
+ this.listeners.forEach(listener -> listener.onRemove(event.getKey(), event.getOldValue()));
+ }
+
+ }
+
+}
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/metadata/package-info.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/metadata/package-info.java
new file mode 100644
index 0000000..5454827
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/metadata/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Provides classes for the Gemfire MetadataStore.
+ */
+package org.springframework.integration.gemfire.metadata;
diff --git a/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/outbound/CacheWritingMessageHandler.java b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/outbound/CacheWritingMessageHandler.java
new file mode 100644
index 0000000..018e6d3
--- /dev/null
+++ b/spring-integration-gemfire/src/main/java/org/springframework/integration/gemfire/outbound/CacheWritingMessageHandler.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2002-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.gemfire.outbound;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.geode.GemFireCheckedException;
+import org.apache.geode.GemFireException;
+import org.apache.geode.cache.Region;
+
+import org.springframework.data.gemfire.GemfireCallback;
+import org.springframework.data.gemfire.GemfireTemplate;
+import org.springframework.expression.EvaluationContext;
+import org.springframework.expression.Expression;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.integration.expression.ExpressionUtils;
+import org.springframework.integration.handler.AbstractMessageHandler;
+import org.springframework.messaging.Message;
+import org.springframework.util.Assert;
+
+/**
+ * A {@link org.springframework.messaging.MessageHandler} implementation that writes to a
+ * GemFire Region. The Message's payload must be an instance of {@link Map} or
+ * {@link #cacheEntryExpressions} must be provided.
+ *
+ * @author Mark Fisher
+ * @author David Turanski
+ * @author Artem Bilan
+ * @author Gary Russell
+ *
+ * @since 2.1
+ */
+public class CacheWritingMessageHandler extends AbstractMessageHandler {
+
+ private static final SpelExpressionParser PARSER = new SpelExpressionParser();
+
+ private final Map cacheEntryExpressions = new LinkedHashMap();
+
+ private final GemfireTemplate gemfireTemplate = new GemfireTemplate();
+
+ private volatile EvaluationContext evaluationContext;
+
+ @SuppressWarnings("rawtypes")
+ public CacheWritingMessageHandler(Region region) {
+ Assert.notNull(region, "region must not be null");
+ this.gemfireTemplate.setRegion(region);
+ }
+
+ @Override
+ public String getComponentType() {
+ return "gemfire:outbound-channel-adapter";
+ }
+
+ @Override
+ protected void onInit() {
+ super.onInit();
+ this.gemfireTemplate.afterPropertiesSet();
+ this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
+ }
+
+ @Override
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ protected void handleMessageInternal(Message> message) {
+ Object payload = message.getPayload();
+ Map, ?> cacheValues = (this.cacheEntryExpressions.size() > 0) ? evaluateCacheEntries(message) : null;
+
+ if (cacheValues == null) {
+ Assert.state(payload instanceof Map,
+ "If cache entry expressions are not configured, then payload must be a Map");
+ cacheValues = (Map) payload;
+ }
+
+ final Map, ?> map = cacheValues;
+
+ this.gemfireTemplate.execute(new GemfireCallback