Migrate Gemfire module from Spring Integration core

Starting with Spring Data 2022.0.0 there is not going to be VMware Tanzu GemFire or Apache Geode out-of-the-box.
This module is present here only as a source code for legacy.
This commit is contained in:
Artem Bilan
2022-10-05 10:40:08 -04:00
parent e373f4bfc1
commit d647fb55ac
55 changed files with 4539 additions and 0 deletions

View File

@@ -0,0 +1,298 @@
[[gemfire]]
== VMware Tanzu GemFire and Apache Geode Support
IMPORTANT: Starting with Spring Data 2022.0.0 there is not going to be VMware Tanzu GemFire or Apache Geode out-of-the-box.
This module is present here only as a source code for legacy.
Spring Integration provides support for VMware Tanzu GemFire and Apache Geode.
You need to include this dependency into your project:
====
[source, xml, subs="normal", role="primary"]
.Maven
----
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-gemfire</artifactId>
<version>{project-version}</version>
</dependency>
----
[source, groovy, subs="normal", role="secondary"]
.Gradle
----
compile "org.springframework.integration:spring-integration-gemfire:{project-version}"
----
====
GemFire is a distributed data management platform that provides a key-value data grid along with advanced distributed system features, such as event processing, continuous querying, and remote function execution.
This guide assumes some familiarity with the commercial https://tanzu.vmware.com/gemfire[VMware Tanzu GemFire] or Open Source https://geode.apache.org[Apache Geode].
Spring integration provides support for GemFire by implementing inbound adapters for entry and continuous query events, an outbound adapter to write entries to the cache, and message and metadata stores and `GemfireLockRegistry` implementations.
Spring integration leverages the https://projects.spring.io/spring-data-gemfire[Spring Data for VMware Tanzu GemFire] project, providing a thin wrapper over its components.
Starting with version 5.1, the Spring Integration GemFire module uses the https://github.com/spring-projects/spring-data-geode[Spring Data for Apache Geode] transitive dependency by default.
To switch to the commercial VMware Tanzu GemFire-based Spring Data for VMware Tanzu GemFire, exclude `spring-data-geode` from dependencies and add `spring-data-gemfire`, as the following Maven snippet shows:
====
[source,xml]
----
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-gemfire</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-geode</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-gemfire</artifactId>
</dependency>
----
====
To configure the 'int-gfe' namespace, include the following elements within the headers of your XML configuration file:
====
[source,xml]
----
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"
----
====
[[gemfire-inbound]]
=== Inbound Channel Adapter
The inbound channel adapter produces messages on a channel when triggered by a GemFire `EntryEvent`.
GemFire generates events whenever an entry is `CREATED`, `UPDATED`, `DESTROYED`, or `INVALIDATED` in the associated region.
The inbound channel adapter lets you filter on a subset of these events.
For example, you may want to produce messages only in response to an entry being created.
In addition, the inbound channel adapter can evaluate a SpEL expression if, for example, you want your message payload to contain an event property such as the new entry value.
The following example shows how to configure an inbound channel adapter with a SpEL language (in the `expression` attribute):
====
[source,xml]
----
<gfe:cache/>
<gfe:replicated-region id="region"/>
<int-gfe:inbound-channel-adapter id="inputChannel" region="region"
cache-events="CREATED" expression="newValue"/>
----
====
The preceding configuration creates a GemFire `Cache` and `Region` by using Spring GemFire's 'gfe' namespace.
The `inbound-channel-adapter` element requires a reference to the GemFire region on which the adapter listens for events.
Optional attributes include `cache-events`, which can contain a comma-separated list of event types for which a message is produced on the input channel.
By default, `CREATED` and `UPDATED` are enabled.
If no `channel` attribute is provided, the channel is created from the `id` attribute.
This adapter also supports an `error-channel`.
The GemFire https://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/EntryEvent.html[`EntryEvent`] is the `#root` object of the `expression` evaluation.
The following example shows an expression that replaces a value for a key:
====
[source]
----
expression="new something.MyEvent(key, oldValue, newValue)"
----
====
If the `expression` attribute is not provided, the message payload is the GemFire `EntryEvent` itself.
NOTE: This adapter conforms to Spring Integration conventions.
[[gemfire-cq]]
=== Continuous Query Inbound Channel Adapter
The continuous query inbound channel adapter produces messages on a channel when triggered by a GemFire continuous query or `CqEvent` event.
In release `1.1`, Spring Data introduced continuous query support, including `ContinuousQueryListenerContainer`, which provides a nice abstraction over the GemFire native API.
This adapter requires a reference to a `ContinuousQueryListenerContainer` instance, creates a listener for a given `query`, and executes the query.
The continuous query acts as an event source that fires whenever its result set changes state.
NOTE: GemFire queries are written in OQL and are scoped to the entire cache (not just one region).
Additionally, continuous queries require a remote (that is, running in a separate process or remote host) cache server.
See the https://gemfire82.docs.VMware Tanzu.io/docs-gemfire/gemfire_nativeclient/continuous-querying/continuous-querying.html[GemFire documentation] for more information on implementing continuous queries.
The following configuration creates a GemFire client cache (recall that a remote cache server is required for this implementation and its address is configured as a child element of the pool), a client region, and a `ContinuousQueryListenerContainer` that uses Spring Data:
====
[source,xml]
----
<gfe:client-cache id="client-cache" pool-name="client-pool"/>
<gfe:pool id="client-pool" subscription-enabled="true" >
<!--configure server or locator here required to address the cache server -->
</gfe:pool>
<gfe:client-region id="test" cache-ref="client-cache" pool-name="client-pool"/>
<gfe:cq-listener-container id="queryListenerContainer" cache="client-cache"
pool-name="client-pool"/>
<int-gfe:cq-inbound-channel-adapter id="inputChannel"
cq-listener-container="queryListenerContainer"
query="select * from /test"/>
----
====
The continuous query inbound channel adapter requires a `cq-listener-container` attribute, which must contain a reference to the `ContinuousQueryListenerContainer`.
Optionally, it accepts an `expression` attribute that uses SpEL to transform the `CqEvent` or extract an individual property as needed.
The `cq-inbound-channel-adapter` provides a `query-events` attribute that contains a comma-separated list of event types for which a message is produced on the input channel.
The available event types are `CREATED`, `UPDATED`, `DESTROYED`, `REGION_DESTROYED`, and `REGION_INVALIDATED`.
By default, `CREATED` and `UPDATED` are enabled.
Additional optional attributes include `query-name` (which provides an optional query name), `expression` (which works as described in the preceding section), and `durable` (a boolean value indicating if the query is durable -- it is false by default).
If you do not provide a `channel`, the channel is created from the `id` attribute.
This adapter also supports an `error-channel`.
NOTE: This adapter conforms to Spring Integration conventions.
[[gemfire-outbound]]
=== Outbound Channel Adapter
The outbound channel adapter writes cache entries that are mapped from the message payload.
In its simplest form, it expects a payload of type `java.util.Map` and puts the map entries into its configured region.
The following example shows how to configure an outbound channel adapter:
====
[source,xml]
----
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"/>
----
====
Given the preceding configuration, an exception is thrown if the payload is not a `Map`.
Additionally, you can configure the outbound channel adapter to create a map of cache entries by using SpEL.
The following example shows how to do so:
====
[source,xml]
----
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region">
<int-gfe:cache-entries>
<entry key="payload.toUpperCase()" value="payload.toLowerCase()"/>
<entry key="'thing1'" value="'thing2'"/>
</int-gfe:cache-entries>
</int-gfe:outbound-channel-adapter>
----
====
In the preceding configuration, the inner element (`cache-entries`) is semantically equivalent to a Spring 'map' element.
The adapter interprets the `key` and `value` attributes as SpEL expressions with the message as the evaluation context.
Note that this can contain arbitrary cache entries (not only those derived from the message) and that literal values must be enclosed in single quotes.
In the preceding example, if the message sent to `cacheChannel` has a `String` payload with a value `Hello`, two entries (`[HELLO:hello, thing1:thing2]`) are written (either created or updated) in the cache region.
This adapter also supports the `order` attribute, which may be useful if it is bound to a `PublishSubscribeChannel`.
[[gemfire-message-store]]
=== Gemfire Message Store
As described in EIP, a https://www.enterpriseintegrationpatterns.com/MessageStore.html[message store] lets you persist messages.
This can be useful when dealing with components that have a capability to buffer messages (`QueueChannel`, `Aggregator`, `Resequencer`, and others) if reliability is a concern.
In Spring Integration, the `MessageStore` strategy interface also provides the foundation for the https://www.enterpriseintegrationpatterns.com/StoreInLibrary.html[claim check] pattern, which is described in EIP as well.
Spring Integration's Gemfire module provides `GemfireMessageStore`, which is an implementation of both the `MessageStore` strategy (mainly used by the `QueueChannel` and `ClaimCheck` patterns) and the `MessageGroupStore` strategy (mainly used by the `Aggregator` and `Resequencer` patterns).
The following example configures the cache and region by using the `spring-gemfire` namespace (not to be confused with the `spring-integration-gemfire` namespace):
====
[source,xml]
----
<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore">
<constructor-arg ref="myRegion"/>
</bean>
<gfe:cache/>
<gfe:replicated-region id="myRegion"/>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="gemfireMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="gemfireMessageStore"/>
----
====
Often, it is desirable for the message store to be maintained in one or more remote cache servers in a client-server configuration.
In this case, you should configure a client cache, a client region, and a client pool and inject the region into the `MessageStore`.
The following example shows how to do so:
====
[source,xml]
----
<bean id="gemfireMessageStore"
class="org.springframework.integration.gemfire.store.GemfireMessageStore">
<constructor-arg ref="myRegion"/>
</bean>
<gfe:client-cache/>
<gfe:client-region id="myRegion" shortcut="PROXY" pool-name="messageStorePool"/>
<gfe:pool id="messageStorePool">
<gfe:server host="localhost" port="40404" />
</gfe:pool>
----
====
Note that the `pool` element is configured with the address of a cache server (you can substitute a locator here).
The region is configured as a 'PROXY' so that no data is stored locally.
The region's `id` corresponds to a region with the same name in the cache server.
Starting with version 4.3.12, the `GemfireMessageStore` supports the key `prefix` option to allow distinguishing between instances of the store on the same GemFire region.
[[gemfire-lock-registry]]
=== Gemfire Lock Registry
Starting with version 4.0, the `GemfireLockRegistry` is available.
Certain components (for example, the aggregator and the resequencer) use a lock obtained from a `LockRegistry` instance to ensure that only one thread is manipulating a group at any given time.
The `DefaultLockRegistry` performs this function within a single component.
You can now configure an external lock registry on these components.
When you use a shared `MessageGroupStore` with the `GemfireLockRegistry`, it can provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
NOTE: One of the `GemfireLockRegistry` constructors requires a `Region` as an argument.
It is used to obtain a `Lock` from the `getDistributedLock()` method.
This operation requires `GLOBAL` scope for the `Region`.
Another constructor requires a `Cache`, and the `Region` is created with `GLOBAL` scope and with the name, `LockRegistry`.
[[gemfire-metadata-store]]
=== Gemfire Metadata Store
Version 4.0 introduced a new Gemfire-based `MetadataStore` (<<./meta-data-store.adoc#metadata-store,Metadata Store>>) implementation.
You can use the `GemfireMetadataStore` to maintain metadata state across application restarts.
This new `MetadataStore` implementation can be used with adapters such as:
* <<./feed.adoc#feed-inbound-channel-adapter,Feed Inbound Channel Adapter>>
* <<./file.adoc#file-reading,Reading Files>>
* <<./ftp.adoc#ftp-inbound,FTP Inbound Channel Adapter>>
* <<./sftp.adoc#sftp-inbound,SFTP Inbound Channel Adapter>>
To get these adapters to use the new `GemfireMetadataStore`, declare a Spring bean with a bean name of `metadataStore`.
The feed inbound channel adapter automatically picks up and use the declared `GemfireMetadataStore`.
NOTE: The `GemfireMetadataStore` also implements `ConcurrentMetadataStore`, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key's value.
These methods give various levels of concurrency guarantees based on the scope and data policy of the region.
They are implemented in the peer cache and client-server cache but are disallowed in peer regions that have `NORMAL` or `EMPTY` data policies.
NOTE: Since version 5.0, the `GemfireMetadataStore` also implements `ListenableMetadataStore`, which lets you listen to cache events by providing `MetadataStoreListener` instances to the store, as the following example shows:
====
[source,java]
----
GemfireMetadataStore metadataStore = new GemfireMetadataStore(cache);
metadataStore.addListener(new MetadataStoreListenerAdapter() {
@Override
public void onAdd(String key, String value) {
...
}
});
----
====

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.AbstractChannelAdapterParser;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.gemfire.inbound.ContinuousQueryMessageProducer;
/**
* @author David Turanski
* @author Dan Oxlade
* @author Gary Russell
* @author Artem Bilan
* @since 2.1
*
*/
public class GemfireCqInboundChannelAdapterParser extends AbstractChannelAdapterParser {
private static final String ERROR_CHANNEL_ATTRIBUTE = "error-channel";
private static final String OUTPUT_CHANNEL_PROPERTY = "outputChannel";
private static final String QUERY_LISTENER_CONTAINER_ATTRIBUTE = "cq-listener-container";
private static final String DURABLE_ATTRIBUTE = "durable";
private static final String QUERY_NAME_ATTRIBUTE = "query-name";
private static final String QUERY_ATTRIBUTE = "query";
private static final String EXPRESSION_ATTRIBUTE = "expression";
private static final String SUPPORTED_EVENT_TYPES_PROPERTY = "supportedEventTypes";
private static final String QUERY_EVENTS_ATTRIBUTE = "query-events";
@Override
protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
BeanDefinitionBuilder continuousQueryMessageProducer =
BeanDefinitionBuilder.genericBeanDefinition(ContinuousQueryMessageProducer.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element,
EXPRESSION_ATTRIBUTE, "payloadExpressionString");
IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element,
QUERY_EVENTS_ATTRIBUTE, SUPPORTED_EVENT_TYPES_PROPERTY);
if (!element.hasAttribute(QUERY_LISTENER_CONTAINER_ATTRIBUTE)) {
parserContext.getReaderContext()
.error("'" + QUERY_LISTENER_CONTAINER_ATTRIBUTE + "' attribute is required.", element);
}
if (!element.hasAttribute(QUERY_ATTRIBUTE)) {
parserContext.getReaderContext().error("'" + QUERY_ATTRIBUTE + "' attribute is required.", element);
}
continuousQueryMessageProducer.addConstructorArgReference(element.getAttribute(QUERY_LISTENER_CONTAINER_ATTRIBUTE));
continuousQueryMessageProducer.addConstructorArgValue(element.getAttribute(QUERY_ATTRIBUTE));
continuousQueryMessageProducer.addPropertyReference(OUTPUT_CHANNEL_PROPERTY, channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(continuousQueryMessageProducer, element,
ERROR_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element, QUERY_NAME_ATTRIBUTE);
IntegrationNamespaceUtils.setValueIfAttributeDefined(continuousQueryMessageProducer, element, DURABLE_ATTRIBUTE);
return continuousQueryMessageProducer.getBeanDefinition();
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.AbstractChannelAdapterParser;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.gemfire.inbound.CacheListeningMessageProducer;
/**
* @author David Turanski
* @author Gary Russell
* @author Artem Bilan
* @since 2.1
*/
public class GemfireInboundChannelAdapterParser extends AbstractChannelAdapterParser {
private static final String ERROR_CHANNEL_ATTRIBUTE = "error-channel";
private static final String OUTPUT_CHANNEL_PROPERTY = "outputChannel";
private static final String REGION_ATTRIBUTE = "region";
private static final String EXPRESSION_ATTRIBUTE = "expression";
private static final String SUPPORTED_EVENT_TYPES_PROPERTY = "supportedEventTypes";
private static final String CACHE_EVENTS_ATTRIBUTE = "cache-events";
@Override
protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
BeanDefinitionBuilder listeningMessageProducer =
BeanDefinitionBuilder.genericBeanDefinition(CacheListeningMessageProducer.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(listeningMessageProducer, element,
EXPRESSION_ATTRIBUTE, "payloadExpressionString");
IntegrationNamespaceUtils.setValueIfAttributeDefined(listeningMessageProducer, element,
CACHE_EVENTS_ATTRIBUTE, SUPPORTED_EVENT_TYPES_PROPERTY);
if (!element.hasAttribute(REGION_ATTRIBUTE)) {
parserContext.getReaderContext().error("'region' attribute is required.", element);
}
listeningMessageProducer.addConstructorArgReference(element.getAttribute(REGION_ATTRIBUTE));
listeningMessageProducer.addPropertyReference(OUTPUT_CHANNEL_PROPERTY, channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(listeningMessageProducer, element,
ERROR_CHANNEL_ATTRIBUTE);
return listeningMessageProducer.getBeanDefinition();
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler;
/**
* @author David Turanski
* @since 2.1
*/
public class GemfireIntegrationNamespaceHandler extends AbstractIntegrationNamespaceHandler {
/* (non-Javadoc)
* @see org.springframework.beans.factory.xml.NamespaceHandler#init()
*/
public void init() {
registerBeanDefinitionParser("inbound-channel-adapter", new GemfireInboundChannelAdapterParser());
registerBeanDefinitionParser("cq-inbound-channel-adapter", new GemfireCqInboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-channel-adapter", new GemfireOutboundChannelAdapterParser());
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import java.util.Map;
import org.w3c.dom.Element;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
import org.springframework.integration.gemfire.outbound.CacheWritingMessageHandler;
import org.springframework.util.xml.DomUtils;
/**
* @author David Turanski
* @author Gary Russell
* @since 2.1
*/
public class GemfireOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
private static final String CACHE_ENTRIES_PROPERTY = "cacheEntries";
private static final String CACHE_ENTRIES_ELEMENT = "cache-entries";
private static final String REGION_ATTRIBUTE = "region";
/* (non-Javadoc)
* @see AbstractOutboundChannelAdapterParser#parseConsumer(Element, ParserContext)
*/
@Override
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
BeanDefinitionBuilder cacheWritingMessageHandler = BeanDefinitionBuilder.genericBeanDefinition(
CacheWritingMessageHandler.class);
if (!element.hasAttribute(REGION_ATTRIBUTE)) {
parserContext.getReaderContext().error("'region' attribute is required.", element);
}
cacheWritingMessageHandler.addConstructorArgReference(element.getAttribute(REGION_ATTRIBUTE));
Element cacheEntries = DomUtils.getChildElementByTagName(element, CACHE_ENTRIES_ELEMENT);
if (cacheEntries != null) {
Map<?, ?> map = parserContext.getDelegate()
.parseMapElement(cacheEntries, cacheWritingMessageHandler.getBeanDefinition());
cacheWritingMessageHandler.addPropertyValue(CACHE_ENTRIES_PROPERTY, map);
}
return cacheWritingMessageHandler.getBeanDefinition();
}
}

View File

@@ -0,0 +1,4 @@
/**
* Provides classes for configuration - parsers, namespace handlers.
*/
package org.springframework.integration.gemfire.config.xml;

View File

@@ -0,0 +1,148 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* An inbound endpoint that listens to a GemFire region for events and then publishes Messages to
* a channel. The default supported event types are CREATED and UPDATED. See the {@link EventType}
* enum for all options. A SpEL expression may be provided to generate a Message payload by
* evaluating that expression against the {@link EntryEvent} instance as the root object. If no
* payloadExpression is provided, the {@link EntryEvent} itself will be the payload.
*
* @author Mark Fisher
* @author David Turanski
* @author Artem Bilan
* @since 2.1
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public class CacheListeningMessageProducer extends ExpressionMessageProducerSupport {
private final Log logger = LogFactory.getLog(this.getClass());
private final Region region;
private final CacheListener<?, ?> listener;
private volatile Set<EventType> supportedEventTypes =
new HashSet<EventType>(Arrays.asList(EventType.CREATED, EventType.UPDATED));
public CacheListeningMessageProducer(Region<?, ?> region) {
Assert.notNull(region, "region must not be null");
this.region = region;
this.listener = new MessageProducingCacheListener();
}
public void setSupportedEventTypes(EventType... eventTypes) {
Assert.notEmpty(eventTypes, "eventTypes must not be empty");
this.supportedEventTypes = new HashSet<EventType>(Arrays.asList(eventTypes));
}
@Override
public String getComponentType() {
return "gemfire:inbound-channel-adapter";
}
@Override
protected void doStart() {
if (this.logger.isInfoEnabled()) {
this.logger.info("adding MessageProducingCacheListener to GemFire Region '" + this.region.getName() + "'");
}
this.region.getAttributesMutator().addCacheListener(this.listener);
}
@Override
protected void doStop() {
if (this.logger.isInfoEnabled()) {
this.logger.info("removing MessageProducingCacheListener from GemFire Region '" + this.region.getName() + "'");
}
try {
this.region.getAttributesMutator().removeCacheListener(this.listener);
}
catch (CacheClosedException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug(e.getMessage(), e);
}
}
}
private class MessageProducingCacheListener extends CacheListenerAdapter {
@Override
public void afterCreate(EntryEvent event) {
if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.CREATED)) {
processEvent(event);
}
}
@Override
public void afterUpdate(EntryEvent event) {
if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.UPDATED)) {
processEvent(event);
}
}
@Override
public void afterInvalidate(EntryEvent event) {
if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.INVALIDATED)) {
processEvent(event);
}
}
@Override
public void afterDestroy(EntryEvent event) {
if (CacheListeningMessageProducer.this.supportedEventTypes.contains(EventType.DESTROYED)) {
processEvent(event);
}
}
private void processEvent(EntryEvent event) {
publish(evaluatePayloadExpression(event));
}
private void publish(Object object) {
Message<?> message = null;
if (object instanceof Message) {
message = (Message<?>) object;
}
else {
message = getMessageBuilderFactory().withPayload(object).build();
}
sendMessage(message);
}
}
}

View File

@@ -0,0 +1,140 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geode.cache.query.CqEvent;
import org.springframework.data.gemfire.listener.ContinuousQueryDefinition;
import org.springframework.data.gemfire.listener.ContinuousQueryListener;
import org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer;
import org.springframework.integration.endpoint.ExpressionMessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* Responds to a Gemfire continuous query (set using the #query field) that is
* constantly evaluated against a cache
* {@link org.apache.geode.cache.Region}. This is much faster than
* re-querying the cache manually.
*
* @author Josh Long
* @author David Turanski
* @author Artem Bilan
* @since 2.1
*
*/
public class ContinuousQueryMessageProducer extends ExpressionMessageProducerSupport
implements ContinuousQueryListener {
private static Log logger = LogFactory.getLog(ContinuousQueryMessageProducer.class);
private final String query;
private final ContinuousQueryListenerContainer queryListenerContainer;
private volatile String queryName;
private boolean durable;
private volatile Set<CqEventType> supportedEventTypes =
new HashSet<CqEventType>(Arrays.asList(CqEventType.CREATED, CqEventType.UPDATED));
/**
* @param queryListenerContainer a {@link ContinuousQueryListenerContainer}
* @param query the query string
*/
public ContinuousQueryMessageProducer(ContinuousQueryListenerContainer queryListenerContainer, String query) {
Assert.notNull(queryListenerContainer, "'queryListenerContainer' cannot be null");
Assert.notNull(query, "'query' cannot be null");
this.queryListenerContainer = queryListenerContainer;
this.query = query;
}
/**
* @param queryName optional query name
*/
public void setQueryName(String queryName) {
this.queryName = queryName;
}
/**
* @param durable true if the query is a durable subscription
*/
public void setDurable(boolean durable) {
this.durable = durable;
}
public void setSupportedEventTypes(CqEventType... eventTypes) {
Assert.notEmpty(eventTypes, "eventTypes must not be empty");
this.supportedEventTypes = new HashSet<CqEventType>(Arrays.asList(eventTypes));
}
@Override
public String getComponentType() {
return "gemfire:cq-inbound-channel-adapter";
}
@Override
protected void onInit() {
super.onInit();
if (this.queryName == null) {
this.queryListenerContainer.addListener(new ContinuousQueryDefinition(this.query, this, this.durable));
}
else {
this.queryListenerContainer.addListener(new ContinuousQueryDefinition(this.queryName, this.query, this,
this.durable));
}
}
/*
* (non-Javadoc)
* @see org.springframework.data.gemfire.listener.QueryListener#onEvent(com.gemstone.gemfire.cache.query.CqEvent)
*/
@Override
public void onEvent(CqEvent event) {
if (isEventSupported(event)) {
if (logger.isDebugEnabled()) {
logger.debug(String.format("processing cq event key [%s] event [%s]", event.getQueryOperation()
.toString(), event.getKey()));
}
Message<?> message = null;
Object object = evaluatePayloadExpression(event);
if (object instanceof Message) {
message = (Message<?>) object;
}
else {
message = getMessageBuilderFactory().withPayload(object).build();
}
sendMessage(message);
}
}
private boolean isEventSupported(CqEvent event) {
String eventName = event.getQueryOperation().toString() +
(event.getQueryOperation().toString().endsWith("Y") ? "ED" : "D");
CqEventType eventType = CqEventType.valueOf(eventName);
return this.supportedEventTypes.contains(eventType);
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
/**
* Enumeration of GemFire Continuous Query Event Types.
*
* @author David Turanski
*
* @since 2.1
*/
public enum CqEventType {
CREATED,
UPDATED,
DESTROYED,
REGION_CLEARED,
REGION_INVALIDATED
}

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
/**
* Enumeration of GemFire event types.
*
* @author Mark Fisher
* @since 2.1
*/
public enum EventType {
CREATED,
UPDATED,
DESTROYED,
INVALIDATED
}

View File

@@ -0,0 +1,4 @@
/**
* Provides classes supporting inbound endpoints.
*/
package org.springframework.integration.gemfire.inbound;

View File

@@ -0,0 +1,138 @@
/*
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.metadata;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.springframework.integration.metadata.ListenableMetadataStore;
import org.springframework.integration.metadata.MetadataStoreListener;
import org.springframework.util.Assert;
/**
* Gemfire implementation of {@link ListenableMetadataStore}.
* Use this {@link org.springframework.integration.metadata.MetadataStore}
* to achieve meta-data persistence shared across application instances and
* restarts.
*
* @author Artem Bilan
* @author Venil Noronha
* @author Gary Russell
*
* @since 4.0
*/
public class GemfireMetadataStore implements ListenableMetadataStore {
private static final String KEY_MUST_NOT_BE_NULL = "'key' must not be null.";
public static final String KEY = "MetaData";
private final GemfireCacheListener cacheListener = new GemfireCacheListener();
private final Region<String, String> region;
public GemfireMetadataStore(Cache cache) {
this(Objects.requireNonNull(cache, "'cache' must not be null")
.<String, String>createRegionFactory()
.setScope(Scope.LOCAL)
.create(KEY));
}
public GemfireMetadataStore(Region<String, String> region) {
Assert.notNull(region, "'region' must not be null");
this.region = region;
this.region.getAttributesMutator()
.addCacheListener(this.cacheListener);
}
@Override
public void put(String key, String value) {
Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
Assert.notNull(value, "'value' must not be null.");
this.region.put(key, value);
}
@Override
public String putIfAbsent(String key, String value) {
Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
Assert.notNull(value, "'value' must not be null.");
return this.region.putIfAbsent(key, value);
}
@Override
public boolean replace(String key, String oldValue, String newValue) {
Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
Assert.notNull(oldValue, "'oldValue' must not be null.");
Assert.notNull(newValue, "'newValue' must not be null.");
return this.region.replace(key, oldValue, newValue);
}
@Override
public String get(String key) {
Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
return this.region.get(key);
}
@Override
public String remove(String key) {
Assert.notNull(key, KEY_MUST_NOT_BE_NULL);
return this.region.remove(key);
}
@Override
public void addListener(MetadataStoreListener listener) {
Assert.notNull(listener, "'listener' must not be null");
this.cacheListener.listeners.add(listener);
}
@Override
public void removeListener(MetadataStoreListener listener) {
this.cacheListener.listeners.remove(listener);
}
private static class GemfireCacheListener extends CacheListenerAdapter<String, String> {
private final List<MetadataStoreListener> listeners = new CopyOnWriteArrayList<>();
GemfireCacheListener() {
}
@Override
public void afterCreate(EntryEvent<String, String> event) {
this.listeners.forEach(listener -> listener.onAdd(event.getKey(), event.getNewValue()));
}
@Override
public void afterUpdate(EntryEvent<String, String> event) {
this.listeners.forEach(listener -> listener.onUpdate(event.getKey(), event.getNewValue()));
}
@Override
public void afterDestroy(EntryEvent<String, String> event) {
this.listeners.forEach(listener -> listener.onRemove(event.getKey(), event.getOldValue()));
}
}
}

View File

@@ -0,0 +1,4 @@
/**
* Provides classes for the Gemfire MetadataStore.
*/
package org.springframework.integration.gemfire.metadata;

View File

@@ -0,0 +1,137 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.outbound;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.geode.GemFireCheckedException;
import org.apache.geode.GemFireException;
import org.apache.geode.cache.Region;
import org.springframework.data.gemfire.GemfireCallback;
import org.springframework.data.gemfire.GemfireTemplate;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A {@link org.springframework.messaging.MessageHandler} implementation that writes to a
* GemFire Region. The Message's payload must be an instance of {@link Map} or
* {@link #cacheEntryExpressions} must be provided.
*
* @author Mark Fisher
* @author David Turanski
* @author Artem Bilan
* @author Gary Russell
*
* @since 2.1
*/
public class CacheWritingMessageHandler extends AbstractMessageHandler {
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
private final Map<Expression, Expression> cacheEntryExpressions = new LinkedHashMap<Expression, Expression>();
private final GemfireTemplate gemfireTemplate = new GemfireTemplate();
private volatile EvaluationContext evaluationContext;
@SuppressWarnings("rawtypes")
public CacheWritingMessageHandler(Region region) {
Assert.notNull(region, "region must not be null");
this.gemfireTemplate.setRegion(region);
}
@Override
public String getComponentType() {
return "gemfire:outbound-channel-adapter";
}
@Override
protected void onInit() {
super.onInit();
this.gemfireTemplate.afterPropertiesSet();
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
protected void handleMessageInternal(Message<?> message) {
Object payload = message.getPayload();
Map<?, ?> cacheValues = (this.cacheEntryExpressions.size() > 0) ? evaluateCacheEntries(message) : null;
if (cacheValues == null) {
Assert.state(payload instanceof Map,
"If cache entry expressions are not configured, then payload must be a Map");
cacheValues = (Map) payload;
}
final Map<?, ?> map = cacheValues;
this.gemfireTemplate.execute(new GemfireCallback<Object>() {
@Override
public Object doInGemfire(Region region) throws GemFireCheckedException, GemFireException {
region.putAll(map);
return null;
}
});
}
private Map<Object, Object> evaluateCacheEntries(Message<?> message) {
if (this.cacheEntryExpressions.size() == 0) {
return null;
}
else {
Map<Object, Object> cacheValues = new HashMap<Object, Object>();
for (Entry<Expression, Expression> expressionEntry : this.cacheEntryExpressions.entrySet()) {
cacheValues.put(expressionEntry.getKey().getValue(this.evaluationContext, message),
expressionEntry.getValue().getValue(this.evaluationContext, message));
}
return cacheValues;
}
}
public void setCacheEntries(Map<String, String> cacheEntries) {
Assert.notNull(cacheEntries, "'cacheEntries' must not be null");
if (this.cacheEntryExpressions.size() > 0) {
this.cacheEntryExpressions.clear();
}
for (Entry<String, String> cacheEntry : cacheEntries.entrySet()) {
this.cacheEntryExpressions.put(PARSER.parseExpression(cacheEntry.getKey()),
PARSER.parseExpression(cacheEntry.getValue()));
}
}
public void setCacheEntryExpressions(Map<Expression, Expression> cacheEntryExpressions) {
Assert.notNull(cacheEntryExpressions, "'cacheEntryExpressions' must not be null");
if (this.cacheEntryExpressions.size() > 0) {
this.cacheEntryExpressions.clear();
}
this.cacheEntryExpressions.putAll(cacheEntryExpressions);
}
}

View File

@@ -0,0 +1,4 @@
/**
* Provides classes supporting outbound endpoints.
*/
package org.springframework.integration.gemfire.outbound;

View File

@@ -0,0 +1,120 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.store;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.geode.cache.Region;
import org.springframework.integration.store.AbstractKeyValueMessageStore;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils;
/**
* Gemfire implementation of the key/value style
* {@link org.springframework.integration.store.MessageStore} and
* {@link org.springframework.integration.store.MessageGroupStore}.
*
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author David Turanski
* @author Artem Bilan
* @author Gary Russell
*
* @since 2.1
*/
public class GemfireMessageStore extends AbstractKeyValueMessageStore {
private static final String ID_MUST_NOT_BE_NULL = "'id' must not be null";
private final Region<Object, Object> messageStoreRegion;
/**
* Provides the region to be used for the message store. This is useful when
* using a configured region. This is also required if using a client region
* on a remote cache server.
* @param messageStoreRegion The region.
*/
public GemfireMessageStore(Region<Object, Object> messageStoreRegion) {
this(messageStoreRegion, "");
}
/**
* Construct a {@link GemfireMessageStore} instance based on the provided.
* @param messageStoreRegion the region to use.
* @param prefix the key prefix to use, allowing the same region to be used for
* multiple stores.
* @since 4.3.12
*/
public GemfireMessageStore(Region<Object, Object> messageStoreRegion, String prefix) {
super(prefix);
Assert.notNull(messageStoreRegion, "'messageStoreRegion' must not be null");
this.messageStoreRegion = messageStoreRegion;
}
@Override
protected Object doRetrieve(Object id) {
Assert.notNull(id, ID_MUST_NOT_BE_NULL);
return this.messageStoreRegion.get(id);
}
@Override
protected void doStore(Object id, Object objectToStore) {
Assert.notNull(id, ID_MUST_NOT_BE_NULL);
Assert.notNull(objectToStore, "'objectToStore' must not be null");
this.messageStoreRegion.put(id, objectToStore);
}
@Override
protected void doStoreIfAbsent(Object id, Object objectToStore) {
Assert.notNull(id, ID_MUST_NOT_BE_NULL);
Assert.notNull(objectToStore, "'objectToStore' must not be null");
Object present = this.messageStoreRegion.putIfAbsent(id, objectToStore);
if (present != null && logger.isDebugEnabled()) {
logger.debug("The message: [" + present + "] is already present in the store. " +
"The [" + objectToStore + "] is ignored.");
}
}
@Override
protected Object doRemove(Object id) {
Assert.notNull(id, ID_MUST_NOT_BE_NULL);
return this.messageStoreRegion.remove(id);
}
@Override
protected void doRemoveAll(Collection<Object> ids) {
this.messageStoreRegion.removeAll(ids);
}
@Override
protected Collection<?> doListKeys(String keyPattern) {
Assert.hasText(keyPattern, "'keyPattern' must not be empty");
Collection<Object> keys = this.messageStoreRegion.keySet();
List<Object> keyList = new ArrayList<Object>();
for (Object key : keys) {
String keyValue = key.toString();
if (PatternMatchUtils.simpleMatch(keyPattern, keyValue)) {
keyList.add(keyValue);
}
}
return keyList;
}
}

View File

@@ -0,0 +1,4 @@
/**
* Provides classes releated to storing messages.
*/
package org.springframework.integration.gemfire.store;

View File

@@ -0,0 +1,55 @@
/*
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.util;
import java.util.concurrent.locks.Lock;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.util.Assert;
/**
* Implementation of {@link LockRegistry} providing a distributed lock using Gemfire.
*
* @author Artem Bilan
* @since 4.0
*/
public class GemfireLockRegistry implements LockRegistry {
public static final String LOCK_REGISTRY_REGION = "LockRegistry";
private final Region<?, ?> region;
public GemfireLockRegistry(Cache cache) {
Assert.notNull(cache, "'cache' must not be null");
this.region = cache.createRegionFactory().setScope(Scope.GLOBAL).create(LOCK_REGISTRY_REGION);
}
public GemfireLockRegistry(Region<?, ?> region) {
Assert.notNull(region, "'region' must not be null");
this.region = region;
}
@Override
public Lock obtain(Object lockKey) {
return this.region.getDistributedLock(lockKey);
}
}

View File

@@ -0,0 +1,4 @@
/**
* Provides utility classes.
*/
package org.springframework.integration.gemfire.util;

View File

@@ -0,0 +1 @@
http\://www.springframework.org/schema/integration/gemfire=org.springframework.integration.gemfire.config.xml.GemfireIntegrationNamespaceHandler

View File

@@ -0,0 +1,22 @@
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-2.1.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-2.2.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-3.0.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.0.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.1.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.2.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.3.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-5.0.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-5.1.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-5.2.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
http\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-2.1.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-2.2.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-3.0.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.0.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.1.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.2.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-4.3.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-5.0.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-5.1.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire-5.2.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd
https\://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.xsd

View File

@@ -0,0 +1,4 @@
# Tooling related information for the integration gemfire namespace
http\://www.springframework.org/schema/integration/gemfire@name=integration gemfire Namespace
http\://www.springframework.org/schema/integration/gemfire@prefix=int-gfe
http\://www.springframework.org/schema/integration@icon=org/springframework/integration/gemfire/config/xml/spring-integration-gemfire.gif

View File

@@ -0,0 +1,208 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.springframework.org/schema/integration/gemfire"
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:tool="http://www.springframework.org/schema/tool"
xmlns:integration="http://www.springframework.org/schema/integration"
targetNamespace="http://www.springframework.org/schema/integration/gemfire"
elementFormDefault="qualified">
<xsd:import namespace="http://www.springframework.org/schema/beans"
schemaLocation="https://www.springframework.org/schema/beans/spring-beans.xsd"/>
<xsd:import namespace="http://www.springframework.org/schema/tool"/>
<xsd:import namespace="http://www.springframework.org/schema/integration"
schemaLocation="https://www.springframework.org/schema/integration/spring-integration.xsd"/>
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines the core configuration elements for Spring Integration GemFire Support.
]]></xsd:documentation>
</xsd:annotation>
<xsd:element name="inbound-channel-adapter">
<xsd:annotation>
<xsd:documentation>
Configures a Message Producing Endpoint for the
'org.springframework.integration.gemfire.inbound.CacheListeningMessageProducer' that backed by a
GemFire CacheListener
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="InboundChannelAdapterType">
<xsd:attribute name="region" type="xsd:string" use="required">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="com.gemstone.gemfire.cache.Region"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="cache-events" type="xsd:string"
use="optional" default="CREATED,UPDATED">
<xsd:annotation>
<xsd:documentation><![CDATA[
Enabled cache entry event types
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.gemfire.inbound.EventType"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
<xsd:element name="cq-inbound-channel-adapter">
<xsd:annotation>
<xsd:documentation>
Configures a 'SourcePollingChannelAdapter' Endpoint for the
'org.springframework.integration.gemfire.inbound.ContinuousQueryMessageProducer'
that backed by a
Spring Gemfire QueryListener
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="InboundChannelAdapterType">
<xsd:attribute name="cq-listener-container" use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
Reference to a ContinuousQueryListenerContainer
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="query-events" type="xsd:string"
use="optional" default="CREATED,UPDATED">
<xsd:annotation>
<xsd:documentation><![CDATA[
Enabled continuous query event types
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.gemfire.inbound.CqEventType"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="query" use="required" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The query string
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="query-name" use="optional" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The query name
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="durable" use="optional" default="false">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates if the query is a durable subscription
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:string xsd:boolean"/>
</xsd:simpleType>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
<xsd:element name="outbound-channel-adapter">
<xsd:complexType>
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the
'org.springframework.integration.gemfire.outbound.CacheWritingMessageHandler' that
writes Message to a Gemfire cache
</xsd:documentation>
</xsd:annotation>
<xsd:choice minOccurs="0" maxOccurs="3">
<xsd:element name="cache-entries" type="beans:mapType"
minOccurs="0" maxOccurs="1">
<xsd:annotation>
<xsd:documentation>
A map of SpEL expressions used to create cache entries. If not
provided, payload must be a Map
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element ref="integration:poller" minOccurs="0" maxOccurs="1"/>
<xsd:element name="request-handler-advice-chain" type="integration:handlerAdviceChainType" minOccurs="0"
maxOccurs="1"/>
</xsd:choice>
<xsd:attributeGroup ref="integration:channelAdapterAttributes"/>
<xsd:attribute name="region" type="xsd:string" use="required">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="com.gemstone.gemfire.cache.Region"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="order" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Specifies the order for invocation when this endpoint is connected as a
subscriber to a SubscribableChannel.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="InboundChannelAdapterType">
<xsd:attributeGroup ref="integration:channelAdapterAttributes"/>
<xsd:attribute name="error-channel" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:appinfo>
<xsd:documentation>
Identifies the channel to which error messages will be sent if a failure occurs in this
component's invocation. If no "error-channel" reference is provided, this component will
propagate Exceptions to the caller. To completely suppress Exceptions, provide a
reference to the "nullChannel" here.
</xsd:documentation>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.messaging.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="expression" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Expression to be evaluated to produce a value for the payload. The root object of the
expression evaluation is the GemFire 'EntryEvent'.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:schema>

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.util.CacheListenerAdapter;
/**
* (this is the CacheLogger class that ships in the Spring-Gemfire samples)
*
* @author Costin Leau
*/
public class TestCacheListenerLogger extends CacheListenerAdapter<Object, Object> {
private static final Log log = LogFactory.getLog(TestCacheListenerLogger.class);
@Override
public void afterCreate(EntryEvent<Object, Object> event) {
log.info("Added " + messageLog(event) + " to the cache");
}
@Override
public void afterDestroy(EntryEvent<Object, Object> event) {
log.info("Removed " + messageLog(event) + " from the cache");
}
@Override
public void afterUpdate(EntryEvent<Object, Object> event) {
log.info("Updated " + messageLog(event) + " in the cache");
}
private String messageLog(EntryEvent<Object, Object> event) {
Object key = event.getKey();
Object value = event.getNewValue();
if (event.getOperation().isUpdate()) {
return "[" + key + "] from [" + event.getOldValue() + "] to [" + event.getNewValue() + "]";
}
return "[" + key + "=" + value + "]";
}
}

View File

@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
https://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder properties-ref="props" />
<util:properties id="props">
<prop key="durable">true</prop>
</util:properties>
<bean id="queryListenerContainer" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer"/>
</bean>
<int-gfe:cq-inbound-channel-adapter id="withDurable"
cq-listener-container="queryListenerContainer" query="select * from /test"
channel="outputChannel1" durable="${durable}" auto-startup="false" phase="2"/>
<int:channel id="outputChannel1">
<int:queue/>
</int:channel>
</beans>

View File

@@ -0,0 +1,73 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.integration.gemfire.config.xml.ParserTestUtil.createFakeParserContext;
import static org.springframework.integration.gemfire.config.xml.ParserTestUtil.loadXMLFrom;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.w3c.dom.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
import org.springframework.integration.gemfire.inbound.ContinuousQueryMessageProducer;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Dan Oxlade
* @author Liujiong
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class GemfireCqInboundChannelAdapterParserTests {
private GemfireCqInboundChannelAdapterParser underTest = new GemfireCqInboundChannelAdapterParser();
@Autowired
@Qualifier("withDurable")
ContinuousQueryMessageProducer adapter;
@Test(expected = BeanDefinitionParsingException.class)
public void cqListenerContainerIsARequiredAttribute() throws Exception {
String xml = "<cq-inbound-channel-adapter query=\"some-query\"/>";
Element element = loadXMLFrom(xml).getDocumentElement();
underTest.doParse(element, createFakeParserContext(), null);
}
@Test(expected = BeanDefinitionParsingException.class)
public void queryIsARequiredAttribute() throws Exception {
String xml = "<cq-inbound-channel-adapter cq-listener-container=\"some-reference\" />";
Element element = loadXMLFrom(xml).getDocumentElement();
underTest.doParse(element, createFakeParserContext(), null);
}
@Test
public void testPhase() {
assertThat(adapter.getPhase()).isEqualTo(2);
}
@Test
public void testAutoStartup() {
assertThat(adapter.isAutoStartup()).isEqualTo(false);
}
}

View File

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="region" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.apache.geode.cache.Region"/>
</bean>
<int-gfe:inbound-channel-adapter id="channel1"
region="region"
cache-events="CREATED"
expression="newValue"
auto-startup="false"
phase="2"/>
</beans>

View File

@@ -0,0 +1,67 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.integration.gemfire.config.xml.ParserTestUtil.createFakeParserContext;
import static org.springframework.integration.gemfire.config.xml.ParserTestUtil.loadXMLFrom;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.w3c.dom.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
import org.springframework.integration.gemfire.inbound.CacheListeningMessageProducer;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Dan Oxlade
* @author Liujiong
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class GemfireInboundChannelAdapterParserTests {
private GemfireInboundChannelAdapterParser underTest = new GemfireInboundChannelAdapterParser();
@Autowired
@Qualifier("channel1.adapter")
CacheListeningMessageProducer adapter1;
@Test(expected = BeanDefinitionParsingException.class)
public void regionIsARequiredAttribute() throws Exception {
String xml = "<inbound-channel-adapter />";
Element element = loadXMLFrom(xml).getDocumentElement();
underTest.doParse(element, createFakeParserContext(), null);
}
@Test
public void testPhase() {
assertThat(adapter1.getPhase()).isEqualTo(2);
}
@Test
public void testAutoStart() {
assertThat(adapter1.isAutoStartup()).isEqualTo(false);
}
}

View File

@@ -0,0 +1,28 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">
<int:channel id="input">
<int:queue/>
</int:channel>
<int-gfe:outbound-channel-adapter id="adapter" region="region" channel="input" auto-startup="false" phase="2">
<int:poller fixed-delay="100"/>
<int-gfe:request-handler-advice-chain>
<bean class="org.springframework.integration.gemfire.config.xml.GemfireOutboundChannelAdapterParserTests$FooAdvice"/>
</int-gfe:request-handler-advice-chain>
</int-gfe:outbound-channel-adapter>
<bean id="region" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.apache.geode.cache.Region"/>
</bean>
</beans>

View File

@@ -0,0 +1,98 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.integration.gemfire.config.xml.ParserTestUtil.createFakeParserContext;
import static org.springframework.integration.gemfire.config.xml.ParserTestUtil.loadXMLFrom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.w3c.dom.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Dan Oxlade
* @author Liujiong
* @author Artem Bilan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class GemfireOutboundChannelAdapterParserTests {
private final GemfireOutboundChannelAdapterParser underTest = new GemfireOutboundChannelAdapterParser();
private static final CountDownLatch adviceCalled = new CountDownLatch(1);
@Autowired
@Qualifier("adapter")
ConsumerEndpointFactoryBean adapter1;
@Autowired
ApplicationContext ctx;
@Test(expected = BeanDefinitionParsingException.class)
public void regionIsARequiredAttribute() throws Exception {
String xml = "<outbound-channel-adapter />";
Element element = loadXMLFrom(xml).getDocumentElement();
underTest.parseConsumer(element, createFakeParserContext());
}
@Test
public void withAdvice() throws InterruptedException {
adapter1.start();
MessageChannel channel = ctx.getBean("input", MessageChannel.class);
channel.send(new GenericMessage<String>("foo"));
assertThat(adviceCalled.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
public void testPhase() {
assertThat(adapter1.getPhase()).isEqualTo(2);
}
@Test
public void testAutoStart() {
assertThat(adapter1.isAutoStartup()).isEqualTo(false);
}
public static class FooAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
adviceCalled.countDown();
return null;
}
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.config.xml;
import org.springframework.beans.factory.parsing.FailFastProblemReporter;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.beans.factory.xml.XmlReaderContext;
import org.springframework.core.io.InputStreamResource;
/**
* @author Dan Oxlade
* @author Gary Russell
*/
class ParserTestUtil {
private ParserTestUtil() {
super();
}
static ParserContext createFakeParserContext() {
return new ParserContext(
new XmlReaderContext(thisClassAsResource(), new FailFastProblemReporter(), null, null, null, null),
null);
}
static InputStreamResource thisClassAsResource() {
return new InputStreamResource(
ParserTestUtil.class.getResourceAsStream(ParserTestUtil.class.getSimpleName() + ".class"));
}
static org.w3c.dom.Document loadXMLFrom(String xml) throws org.xml.sax.SAXException, java.io.IOException {
return loadXMLFrom(new java.io.ByteArrayInputStream(xml.getBytes()));
}
static org.w3c.dom.Document loadXMLFrom(java.io.InputStream is)
throws org.xml.sax.SAXException, java.io.IOException {
javax.xml.parsers.DocumentBuilderFactory factory = javax.xml.parsers.DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
javax.xml.parsers.DocumentBuilder builder = null;
try {
builder = factory.newDocumentBuilder();
}
catch (javax.xml.parsers.ParserConfigurationException ex) {
}
org.w3c.dom.Document doc = builder.parse(is);
is.close();
return doc;
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.fork;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.server.CacheServer;
/**
* @author Costin Leau
* @author David Turanski
* @author Gunnar Hillert
* @author Soby Chacko
* @author Gary Russell
*
* Runs as a standalone Java app.
* Modified from SGF implementation for testing client/server CQ features
*/
public class CacheServerProcess {
private static final Log logger = LogFactory.getLog(CacheServerProcess.class);
private CacheServerProcess() {
super();
}
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.setProperty("name", "CacheServer at " + new Date());
props.setProperty("log-level", "info");
logger.info("Connecting to the distributed system and creating the cache.");
Cache cache = new CacheFactory(props).create();
// Create region.
Region<?, ?> region = cache.createRegionFactory(RegionShortcut.REPLICATE)
.setScope(Scope.DISTRIBUTED_ACK)
.create("test");
logger.info("Test region, " + region.getFullPath() + ", created in cache.");
// Start Cache Server.
CacheServer server = cache.addCacheServer();
server.setPort(40404);
logger.info("Starting server");
server.start();
ForkUtil.createControlFile(CacheServerProcess.class.getName());
logger.info("Waiting for shutdown");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
bufferedReader.readLine();
}
}

View File

@@ -0,0 +1,171 @@
/*
* Copyright 2011-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.fork;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Utility for forking Java processes. Modified from the SGF version for SI
*
* @author Costin Leau
* @author David Turanski
* @author Gary Russell
*
*
*/
public class ForkUtil {
private static final Log logger = LogFactory.getLog(ForkUtil.class);
private static String TEMP_DIR = System.getProperty("java.io.tmpdir");
private ForkUtil() {
super();
}
public static OutputStream cloneJVM(String argument) {
String cp = System.getProperty("java.class.path");
String home = System.getProperty("java.home");
Process proc = null;
String java = home + "/bin/java".replace("\\", "/");
String argClass = argument;
String[] cmdArray = {java, "-cp", cp, argClass};
try {
// ProcessBuilder builder = new ProcessBuilder(cmd, argCp,
// argClass);
// builder.redirectErrorStream(true);
proc = Runtime.getRuntime().exec(cmdArray);
}
catch (IOException ioe) {
throw new IllegalStateException("Cannot start command " + cmdArray, ioe);
}
logger.info("Started fork");
final Process p = proc;
final BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
final BufferedReader ebr = new BufferedReader(new InputStreamReader(p.getErrorStream()));
final AtomicBoolean run = new AtomicBoolean(true);
Thread reader = copyStdXxx(br, run, System.out);
Thread errReader = copyStdXxx(ebr, run, System.err);
reader.start();
errReader.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
logger.info("Stopping fork...");
run.set(false);
if (p != null) {
p.destroy();
}
try {
p.waitFor();
}
catch (InterruptedException e) {
// ignore
}
logger.info("Fork stopped");
}
});
return proc.getOutputStream();
}
private static Thread copyStdXxx(final BufferedReader br,
final AtomicBoolean run, final PrintStream out) {
Thread reader = new Thread(() -> {
try {
String line = null;
do {
while ((line = br.readLine()) != null) {
out.println("[FORK] " + line);
}
} while (run.get());
}
catch (Exception ex) {
// ignore and exit
}
});
return reader;
}
public static OutputStream cacheServer() {
return startCacheServer("org.springframework.integration.gemfire.fork.CacheServerProcess");
}
public static OutputStream cacheServer(String className) {
return startCacheServer(className);
}
private static OutputStream startCacheServer(String className) {
if (controlFileExists(className)) {
deleteControlFile(className);
}
OutputStream os = cloneJVM(className);
int maxTime = 60000;
int time = 0;
while (!controlFileExists(className) && time < maxTime) {
try {
Thread.sleep(500);
time += 500;
}
catch (InterruptedException ex) {
// ignore and move on
}
}
if (controlFileExists(className)) {
logger.info("Started cache server");
}
else {
throw new RuntimeException("could not fork cache server");
}
return os;
}
public static boolean deleteControlFile(String name) {
String path = TEMP_DIR + File.separator + name;
return new File(path).delete();
}
public static boolean createControlFile(String name) throws IOException {
String path = TEMP_DIR + File.separator + name;
return new File(path).createNewFile();
}
public static boolean controlFileExists(String name) {
String path = TEMP_DIR + File.separator + name;
return new File(path).exists();
}
}

View File

@@ -0,0 +1,165 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.apache.geode.cache.Region;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.data.gemfire.CacheFactoryBean;
import org.springframework.data.gemfire.GenericRegionFactoryBean;
import org.springframework.data.gemfire.RegionAttributesFactoryBean;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
/**
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*/
public class CacheListeningMessageProducerTests {
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
private static CacheFactoryBean cacheFactoryBean;
private static GenericRegionFactoryBean<String, String> regionFactoryBean;
private static Region<String, String> region;
@BeforeClass
public static void setup() throws Exception {
cacheFactoryBean = new CacheFactoryBean();
regionFactoryBean = new GenericRegionFactoryBean<>();
regionFactoryBean.setName("test.receiveNewValuePayloadForCreateEvent");
regionFactoryBean.setCache(cacheFactoryBean.getObject());
setRegionAttributes(regionFactoryBean);
regionFactoryBean.afterPropertiesSet();
region = regionFactoryBean.getObject();
}
@AfterClass
public static void teardown() throws Exception {
regionFactoryBean.destroy();
cacheFactoryBean.destroy();
}
@Test
public void receiveNewValuePayloadForCreateEvent() {
QueueChannel channel = new QueueChannel();
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
producer.setPayloadExpression(PARSER.parseExpression("key + '=' + newValue"));
producer.setOutputChannel(channel);
producer.setBeanFactory(mock(BeanFactory.class));
producer.afterPropertiesSet();
producer.start();
assertThat(channel.receive(0)).isNull();
region.put("x", "abc");
Message<?> message = channel.receive(0);
assertThat(message).isNotNull();
assertThat(message.getPayload()).isEqualTo("x=abc");
producer.stop();
}
@Test
public void receiveNewValuePayloadForUpdateEvent() {
QueueChannel channel = new QueueChannel();
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
producer.setPayloadExpression(PARSER.parseExpression("newValue"));
producer.setOutputChannel(channel);
producer.setBeanFactory(mock(BeanFactory.class));
producer.afterPropertiesSet();
producer.start();
assertThat(channel.receive(0)).isNull();
region.put("x", "abc");
Message<?> message1 = channel.receive(0);
assertThat(message1).isNotNull();
assertThat(message1.getPayload()).isEqualTo("abc");
region.put("x", "xyz");
Message<?> message2 = channel.receive(0);
assertThat(message2).isNotNull();
assertThat(message2.getPayload()).isEqualTo("xyz");
producer.stop();
}
@Test
public void receiveOldValuePayloadForDestroyEvent() {
QueueChannel channel = new QueueChannel();
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
producer.setSupportedEventTypes(EventType.DESTROYED);
producer.setPayloadExpression(PARSER.parseExpression("oldValue"));
producer.setOutputChannel(channel);
producer.setBeanFactory(mock(BeanFactory.class));
producer.afterPropertiesSet();
producer.start();
assertThat(channel.receive(0)).isNull();
region.put("foo", "abc");
assertThat(channel.receive(0)).isNull();
region.destroy("foo");
Message<?> message2 = channel.receive(0);
assertThat(message2).isNotNull();
assertThat(message2.getPayload()).isEqualTo("abc");
producer.stop();
}
@Test
public void receiveOldValuePayloadForInvalidateEvent() {
QueueChannel channel = new QueueChannel();
CacheListeningMessageProducer producer = new CacheListeningMessageProducer(region);
producer.setSupportedEventTypes(EventType.INVALIDATED);
producer.setPayloadExpression(PARSER.parseExpression("key + ' was ' + oldValue"));
producer.setOutputChannel(channel);
producer.setBeanFactory(mock(BeanFactory.class));
producer.afterPropertiesSet();
producer.start();
assertThat(channel.receive(0)).isNull();
region.put("foo", "abc");
assertThat(channel.receive(0)).isNull();
region.invalidate("foo");
Message<?> message2 = channel.receive(0);
assertThat(message2).isNotNull();
assertThat(message2.getPayload()).isEqualTo("foo was abc");
producer.stop();
}
private static void setRegionAttributes(GenericRegionFactoryBean<String, String> regionFactoryBean)
throws Exception {
RegionAttributesFactoryBean<String, String> attributesFactoryBean = new RegionAttributesFactoryBean<>();
attributesFactoryBean.afterPropertiesSet();
regionFactoryBean.setAttributes(attributesFactoryBean.getObject());
}
}

View File

@@ -0,0 +1,151 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.cq.internal.ServerCQImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.data.gemfire.listener.ContinuousQueryListenerContainer;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
/**
* @author David Turanski
* @author Artem Bilan
*
* @since 2.1
*/
public class ContinuousQueryMessageProducerTests {
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
private ContinuousQueryMessageProducer cqMessageProducer;
private CqMessageHandler handler;
@BeforeEach
void setUp() {
ContinuousQueryListenerContainer queryListenerContainer = mock(ContinuousQueryListenerContainer.class);
this.cqMessageProducer = new ContinuousQueryMessageProducer(queryListenerContainer, "foo");
DirectChannel outputChannel = new DirectChannel();
this.cqMessageProducer.setOutputChannel(outputChannel);
this.cqMessageProducer.setBeanFactory(mock(BeanFactory.class));
this.handler = new CqMessageHandler();
outputChannel.subscribe(this.handler);
}
@Test
void testMessageProduced() {
CqEvent cqEvent = event(Operation.CREATE, "hello");
this.cqMessageProducer.onEvent(cqEvent);
assertThat(this.handler.count).isEqualTo(1);
assertThat(this.handler.payload).isEqualTo(cqEvent);
}
@Test
void testMessageNotProducedForUnsupportedEventType() {
CqEvent cqEvent = event(Operation.DESTROY, "hello");
this.cqMessageProducer.onEvent(cqEvent);
assertThat(this.handler.count).isEqualTo(0);
}
@Test
void testMessageProducedForAddedEventType() {
CqEvent cqEvent = event(Operation.DESTROY, null);
this.cqMessageProducer.setSupportedEventTypes(CqEventType.DESTROYED);
this.cqMessageProducer.onEvent(cqEvent);
assertThat(this.handler.count).isEqualTo(1);
assertThat(this.handler.payload).isEqualTo(cqEvent);
}
@Test
void testPayloadExpression() {
CqEvent cqEvent = event(Operation.CREATE, "hello");
this.cqMessageProducer.setPayloadExpression(PARSER.parseExpression("newValue.toUpperCase() + ', WORLD'"));
this.cqMessageProducer.afterPropertiesSet();
this.cqMessageProducer.onEvent(cqEvent);
assertThat(this.handler.count).isEqualTo(1);
assertThat(this.handler.payload).isEqualTo("HELLO, WORLD");
}
CqEvent event(final Operation operation, final Object value) {
return new CqEvent() {
final CqQuery cq = new ServerCQImpl();
final byte[] ba = new byte[0];
final Object key = new Object();
final Exception ex = new Exception();
public Operation getBaseOperation() {
return operation;
}
public CqQuery getCq() {
return this.cq;
}
public byte[] getDeltaValue() {
return this.ba;
}
public Object getKey() {
return this.key;
}
public Object getNewValue() {
return value;
}
public Operation getQueryOperation() {
return operation;
}
public Throwable getThrowable() {
return this.ex;
}
};
}
private static class CqMessageHandler implements MessageHandler {
int count;
Object payload;
public void handleMessage(Message<?> message) throws MessagingException {
this.count++;
this.payload = message.getPayload();
}
}
}

View File

@@ -0,0 +1,51 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gfe="http://www.springframework.org/schema/geode"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd
http://www.springframework.org/schema/geode https://www.springframework.org/schema/geode/spring-geode.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder properties-ref="props" />
<util:properties id="props">
<prop key="durable">true</prop>
</util:properties>
<gfe:client-cache use-bean-factory-locator="false"
id="client-cache" pool-name="client-pool" />
<gfe:pool id="client-pool" subscription-enabled="true">
<gfe:server host="localhost" port="40404" />
</gfe:pool>
<gfe:client-region id="test" cache-ref="client-cache"
pool-name="client-pool" data-policy="EMPTY" />
<gfe:cq-listener-container id="queryListenerContainer"
cache="client-cache" pool-name="client-pool"/>
<int-gfe:cq-inbound-channel-adapter id="withDurable"
cq-listener-container="queryListenerContainer" query="select * from /test"
channel="outputChannel1" durable="${durable}" />
<int:channel id="outputChannel1">
<int:queue />
</int:channel>
<int-gfe:cq-inbound-channel-adapter
cq-listener-container="queryListenerContainer" query="select * from /test"
channel="outputChannel2" expression="newValue" query-events="CREATED" />
<int:channel id="outputChannel2">
<int:queue />
</int:channel>
</beans>

View File

@@ -0,0 +1,105 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.query.CqEvent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.gemfire.fork.ForkUtil;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
/**
* @author David Turanski
* @author Gary Russell
* @author Artem Bilan
*
*/
@SpringJUnitConfig
@DirtiesContext
public class CqInboundChannelAdapterTests {
@Autowired
@Qualifier("test")
Region<String, Integer> region;
@Autowired
ConfigurableApplicationContext applicationContext;
@Autowired
PollableChannel outputChannel1;
@Autowired
PollableChannel outputChannel2;
@Autowired
ContinuousQueryMessageProducer withDurable;
static OutputStream os;
@BeforeAll
public static void startUp() {
os = ForkUtil.cacheServer();
}
@Test
public void testCqEvent() {
assertThat(TestUtils.getPropertyValue(withDurable, "durable", Boolean.class)).isTrue();
region.put("one", 1);
Message<?> msg = outputChannel1.receive(10000);
assertThat(msg).isNotNull();
assertThat(msg.getPayload() instanceof CqEvent).isTrue();
}
@Test
public void testPayloadExpression() {
region.put("one", 1);
Message<?> msg = outputChannel2.receive(10000);
assertThat(msg).isNotNull();
assertThat(msg.getPayload()).isEqualTo(1);
}
@AfterAll
public static void cleanUp() {
sendSignal();
}
public static void sendSignal() {
try {
os.write("\n".getBytes());
os.flush();
}
catch (IOException ex) {
throw new IllegalStateException("Cannot communicate with forked VM", ex);
}
}
}

View File

@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:gfe="http://www.springframework.org/schema/geode"
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd
http://www.springframework.org/schema/geode
https://www.springframework.org/schema/geode/spring-geode.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">
<gfe:cache id="gemfire-cache-2"/>
<gfe:local-region id="region1" cache-ref="gemfire-cache-2"/>
<gfe:local-region id="region2" cache-ref="gemfire-cache-2"/>
<gfe:local-region id="region3" cache-ref="gemfire-cache-2"/>
<int-gfe:inbound-channel-adapter id="channel1" region="region1" cache-events="CREATED" expression="newValue"/>
<int-gfe:inbound-channel-adapter id="channel2" region="region2"/>
<int-gfe:inbound-channel-adapter id="channel3" region="region3" error-channel="errorChannel"/>
</beans>

View File

@@ -0,0 +1,128 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.inbound;
import static org.assertj.core.api.Assertions.assertThat;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
/**
* @author David Turanski
* @author Artem Bilan
*
* @since 2.1
*/
@SpringJUnitConfig
@DirtiesContext
public class GemfireInboundChannelAdapterTests {
@Autowired
SubscribableChannel channel1;
@Autowired
SubscribableChannel channel2;
@Autowired
SubscribableChannel channel3;
@Autowired
SubscribableChannel errorChannel;
@Autowired
@Qualifier("region1")
Region<String, String> region1;
@Autowired
@Qualifier("region2")
Region<String, String> region2;
@Autowired
@Qualifier("region3")
Region<String, String> region3;
@Test
public void testGemfireInboundChannelAdapterWithExpression() {
EventHandler eventHandler1 = new EventHandler();
channel1.subscribe(eventHandler1);
region1.put("payload", "payload");
assertThat(eventHandler1.event).isEqualTo("payload");
}
@Test
public void testGemfireInboundChannelAdapterDefault() {
EventHandler eventHandler2 = new EventHandler();
channel2.subscribe(eventHandler2);
region2.put("payload", "payload");
assertThat(eventHandler2.event instanceof EntryEvent).isTrue();
EntryEvent<?, ?> event = (EntryEvent<?, ?>) eventHandler2.event;
assertThat(event.getNewValue()).isEqualTo("payload");
}
@Test
public void testErrorChannel() {
channel3.subscribe(message -> {
throw new MessagingException("got an error");
});
ErrorHandler errorHandler = new ErrorHandler();
errorChannel.subscribe(errorHandler);
region3.put("payload", "payload");
assertThat(errorHandler.count).isEqualTo(1);
}
static class ErrorHandler implements MessageHandler {
public int count = 0;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
assertThat(message).isInstanceOf(ErrorMessage.class);
count++;
}
}
static class EventHandler implements MessageHandler {
public Object event = null;
@Override
public void handleMessage(Message<?> message) throws MessagingException {
event = message.getPayload();
}
}
}

View File

@@ -0,0 +1,158 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.metadata;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.integration.metadata.MetadataStoreListenerAdapter;
import org.springframework.util.Assert;
/**
* @author Venil Noronha
*
* @since 5.0
*
*/
public class GemfireMetadataStoreCacheListenerTests {
private static Cache cache;
private static GemfireMetadataStore metadataStore;
private static Region<Object, Object> region;
@BeforeClass
public static void startUp() throws Exception {
cache = new CacheFactory().create();
metadataStore = new GemfireMetadataStore(cache);
region = cache.getRegion(GemfireMetadataStore.KEY);
}
@AfterClass
public static void cleanUp() {
if (region != null) {
region.close();
}
if (cache != null) {
cache.close();
Assert.isTrue(cache.isClosed(), "Cache did not close after close() call");
}
}
@Before
@After
public void setup() {
if (region != null) {
region.clear();
}
}
@Test
public void testAdd() throws InterruptedException {
String testKey = "key";
String testValue = "value";
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> actualKey = new AtomicReference<>();
AtomicReference<String> actualValue = new AtomicReference<>();
metadataStore.addListener(new MetadataStoreListenerAdapter() {
@Override
public void onAdd(String key, String value) {
actualKey.set(key);
actualValue.set(value);
latch.countDown();
}
});
metadataStore.put(testKey, testValue);
latch.await(10, TimeUnit.SECONDS);
assertThat(actualKey.get()).isEqualTo(testKey);
assertThat(actualValue.get()).isEqualTo(testValue);
}
@Test
public void testRemove() throws InterruptedException {
String testKey = "key";
String testValue = "value";
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> actualKey = new AtomicReference<>();
AtomicReference<String> actualValue = new AtomicReference<>();
metadataStore.addListener(new MetadataStoreListenerAdapter() {
@Override
public void onRemove(String key, String oldValue) {
actualKey.set(key);
actualValue.set(oldValue);
latch.countDown();
}
});
metadataStore.put(testKey, testValue);
metadataStore.remove(testKey);
latch.await(10, TimeUnit.SECONDS);
assertThat(actualKey.get()).isEqualTo(testKey);
assertThat(actualValue.get()).isEqualTo(testValue);
}
@Test
public void testUpdate() throws InterruptedException {
String testKey = "key";
String testValue = "value";
String testNewValue = "new-value";
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> actualKey = new AtomicReference<>();
AtomicReference<String> actualValue = new AtomicReference<>();
metadataStore.addListener(new MetadataStoreListenerAdapter() {
@Override
public void onUpdate(String key, String newValue) {
actualKey.set(key);
actualValue.set(newValue);
latch.countDown();
}
});
metadataStore.put(testKey, testValue);
metadataStore.put(testKey, testNewValue);
latch.await(10, TimeUnit.SECONDS);
assertThat(actualKey.get()).isEqualTo(testKey);
assertThat(actualValue.get()).isEqualTo(testNewValue);
}
}

View File

@@ -0,0 +1,170 @@
/*
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.metadata;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.data.gemfire.GemfireTemplate;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
/**
* @author Artem Bilan
* @since 4.0
*
*/
public class GemfireMetadataStoreTests {
private static Cache cache;
private static ConcurrentMetadataStore metadataStore;
private static Region<Object, Object> region;
@BeforeClass
public static void startUp() throws Exception {
cache = new CacheFactory().create();
metadataStore = new GemfireMetadataStore(cache);
region = cache.getRegion(GemfireMetadataStore.KEY);
}
@AfterClass
public static void cleanUp() {
if (region != null) {
region.close();
}
if (cache != null) {
cache.close();
assertThat(cache.isClosed()).as("Cache did not close after close() call").isTrue();
}
}
@Before
@After
public void setup() {
if (region != null) {
region.clear();
}
}
@Test
public void testGetNonExistingKeyValue() {
String retrievedValue = metadataStore.get("does-not-exist");
assertThat(retrievedValue).isNull();
}
@Test
public void testPersistKeyValue() {
metadataStore.put("GemfireMetadataStoreTests-Spring", "Integration");
GemfireTemplate gemfireTemplate = new GemfireTemplate(region);
Object v = gemfireTemplate.get("GemfireMetadataStoreTests-Spring");
assertThat(v).isEqualTo("Integration");
}
@Test
public void testGetValueFromMetadataStore() {
metadataStore.put("GemfireMetadataStoreTests-GetValue", "Hello Gemfire");
String retrievedValue = metadataStore.get("GemfireMetadataStoreTests-GetValue");
assertThat(retrievedValue).isEqualTo("Hello Gemfire");
}
@Test
public void testPersistEmptyStringToMetadataStore() {
metadataStore.put("GemfireMetadataStoreTests-PersistEmpty", "");
String retrievedValue = metadataStore.get("GemfireMetadataStoreTests-PersistEmpty");
assertThat(retrievedValue).isEqualTo("");
}
@Test
public void testPersistNullStringToMetadataStore() {
try {
metadataStore.put("GemfireMetadataStoreTests-PersistEmpty", null);
fail("Expected an IllegalArgumentException to be thrown.");
}
catch (IllegalArgumentException e) {
assertThat(e.getMessage()).isEqualTo("'value' must not be null.");
}
}
@Test
public void testPersistWithEmptyKeyToMetadataStore() {
metadataStore.put("", "PersistWithEmptyKey");
String retrievedValue = metadataStore.get("");
assertThat(retrievedValue).isEqualTo("PersistWithEmptyKey");
}
@Test
public void testPersistWithNullKeyToMetadataStore() {
try {
metadataStore.put(null, "something");
fail("Expected an IllegalArgumentException to be thrown.");
}
catch (IllegalArgumentException e) {
assertThat(e.getMessage()).isEqualTo("'key' must not be null.");
}
}
@Test
public void testGetValueWithNullKeyFromMetadataStore() {
try {
metadataStore.get(null);
}
catch (IllegalArgumentException e) {
assertThat(e.getMessage()).isEqualTo("'key' must not be null.");
return;
}
fail("Expected an IllegalArgumentException to be thrown.");
}
@Test
public void testRemoveFromMetadataStore() {
String testKey = "GemfireMetadataStoreTests-Remove";
String testValue = "Integration";
metadataStore.put(testKey, testValue);
assertThat(metadataStore.remove(testKey)).isEqualTo(testValue);
assertThat(metadataStore.remove(testKey)).isNull();
}
@Test
public void testPersistKeyValueIfAbsent() {
metadataStore.putIfAbsent("GemfireMetadataStoreTests-Spring", "Integration");
GemfireTemplate gemfireTemplate = new GemfireTemplate(region);
Object v = gemfireTemplate.get("GemfireMetadataStoreTests-Spring");
assertThat(v).isEqualTo("Integration");
}
}

View File

@@ -0,0 +1,128 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.outbound;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.data.gemfire.CacheFactoryBean;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
/**
* @author Mark Fisher
* @author David Turanski
* @author Gunnar Hillert
* @author Gary Russell
* @author Artem Bilan
* @since 2.1
*/
public class CacheWritingMessageHandlerTests {
private static CacheFactoryBean cacheFactoryBean;
private static Region<Object, Object> region;
@BeforeClass
public static void startUp() throws Exception {
cacheFactoryBean = new CacheFactoryBean();
cacheFactoryBean.afterPropertiesSet();
Cache cache = (Cache) cacheFactoryBean.getObject();
region = cache.createRegionFactory().setScope(Scope.LOCAL).create("sig-tests");
}
@AfterClass
public static void cleanUp() throws Exception {
if (region != null) {
region.close();
}
if (cacheFactoryBean != null) {
cacheFactoryBean.destroy();
}
}
@Before
public void prepare() {
if (region != null) {
region.clear();
}
}
@Test
public void mapPayloadWritesToCache() throws Exception {
assertThat(region.size()).isEqualTo(0);
CacheWritingMessageHandler handler = new CacheWritingMessageHandler(region);
handler.setBeanFactory(mock(BeanFactory.class));
handler.afterPropertiesSet();
Map<String, String> map = new HashMap<String, String>();
map.put("foo", "bar");
Message<?> message = MessageBuilder.withPayload(map).build();
handler.handleMessage(message);
assertThat(region.size()).isEqualTo(1);
assertThat(region.get("foo")).isEqualTo("bar");
}
@Test
public void ExpressionsWriteToCache() throws Exception {
assertThat(region.size()).isEqualTo(0);
CacheWritingMessageHandler handler = new CacheWritingMessageHandler(region);
Map<String, String> expressions = new HashMap<String, String>();
expressions.put("'foo'", "'bar'");
expressions.put("payload.toUpperCase()", "headers['bar'].toUpperCase()");
handler.setCacheEntries(expressions);
handler.setBeanFactory(mock(BeanFactory.class));
handler.afterPropertiesSet();
Message<?> message = MessageBuilder.withPayload("foo")
.copyHeaders(Collections.singletonMap("bar", "bar"))
.build();
handler.handleMessage(message);
assertThat(region.size()).isEqualTo(2);
assertThat(region.get("FOO")).isEqualTo("BAR");
assertThat(region.get("foo")).isEqualTo("bar");
handler.setCacheEntryExpressions(Collections.<Expression, Expression>singletonMap(new LiteralExpression("baz"),
new ValueExpression<Long>(10L)));
handler.handleMessage(new GenericMessage<String>("test"));
assertThat(region.size()).isEqualTo(3);
assertThat(region.get("baz")).isEqualTo(10L);
}
}

View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:gfe="http://www.springframework.org/schema/geode"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd
http://www.springframework.org/schema/geode https://www.springframework.org/schema/geode/spring-geode.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
<gfe:cache/>
<gfe:replicated-region id="region1"/>
<gfe:replicated-region id="region2"/>
<int-gfe:outbound-channel-adapter id="cacheChannel1" region="region1"/>
<bean id="bar" class="java.lang.String">
<constructor-arg value="bar"/>
</bean>
<int-gfe:outbound-channel-adapter id="cacheChannel2" region="region2" order="19">
<int-gfe:cache-entries>
<entry key="payload.toUpperCase()" value="payload.toLowerCase()"/>
<entry key="'foo'" value="@bar"/>
</int-gfe:cache-entries>
</int-gfe:outbound-channel-adapter>
<int:chain input-channel="cacheChainChannel">
<int-gfe:outbound-channel-adapter region="region1"/>
</int:chain>
</beans>

View File

@@ -0,0 +1,102 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.outbound;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.Map;
import org.apache.geode.cache.Region;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
/**
* @author David Turanski
* @author Artem Bilan
*
* @since 2.1
*/
@SpringJUnitConfig
@DirtiesContext
public class GemfireOutboundChannelAdapterTests {
@Autowired
MessageChannel cacheChannel1;
@Autowired
@Qualifier("region1")
Region<String, String> region1;
@Autowired
MessageChannel cacheChannel2;
@Autowired
@Qualifier("region2")
Region<String, String> region2;
@Autowired
MessageChannel cacheChainChannel;
@BeforeEach
public void setUp() {
region1.clear();
region2.clear();
}
@Test
public void testWriteMapPayload() {
Map<String, String> map = new HashMap<>();
map.put("foo", "bar");
Message<?> message = MessageBuilder.withPayload(map).build();
cacheChannel1.send(message);
assertThat(region1.size()).isEqualTo(1);
assertThat(region1.get("foo")).isEqualTo("bar");
}
@Test
public void testWriteExpressions() {
Message<?> message = MessageBuilder.withPayload("Hello").build();
cacheChannel2.send(message);
assertThat(region2.size()).isEqualTo(2);
assertThat(region2.get("HELLO")).isEqualTo("hello");
assertThat(region2.get("foo")).isEqualTo("bar");
}
@Test
public void testWriteWithinChain() {
Map<String, String> map = new HashMap<>();
map.put("foo", "bar");
Message<?> message = MessageBuilder.withPayload(map).build();
cacheChainChannel.send(message);
assertThat(region1.size()).isEqualTo(1);
assertThat(region1.get("foo")).isEqualTo("bar");
}
}

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
<beans:bean id="messageStore" class="org.springframework.integration.gemfire.store.GemfireMessageStore">
<beans:constructor-arg
value="#{T (org.springframework.integration.gemfire.store.DelayerHandlerRescheduleIntegrationTests).region}"/>
</beans:bean>
<channel id="output">
<queue/>
</channel>
<delayer id="#{T (org.springframework.integration.gemfire.store.DelayerHandlerRescheduleIntegrationTests).DELAYER_ID}"
input-channel="input"
output-channel="output"
default-delay="10000"
message-store="messageStore"/>
</beans:beans>

View File

@@ -0,0 +1,149 @@
/*
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.store;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import java.util.concurrent.TimeUnit;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.gemfire.CacheFactoryBean;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.handler.DelayHandler;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.support.LongRunningIntegrationTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* @author Artem Bilan
* @author Gary Russell
*
* @since 3.0
*/
public class DelayerHandlerRescheduleIntegrationTests {
public static final String DELAYER_ID = "delayerWithGemfireMS";
public static Region<Object, Object> region;
private static CacheFactoryBean cacheFactoryBean;
@ClassRule
public static LongRunningIntegrationTest longTests = new LongRunningIntegrationTest();
@BeforeClass
public static void startUp() throws Exception {
cacheFactoryBean = new CacheFactoryBean();
cacheFactoryBean.afterPropertiesSet();
Cache cache = (Cache) cacheFactoryBean.getObject();
region = cache.createRegionFactory().setScope(Scope.LOCAL).create("sig-tests");
}
@AfterClass
public static void cleanUp() throws Exception {
if (region != null) {
region.close();
}
if (cacheFactoryBean != null) {
cacheFactoryBean.destroy();
}
}
@Test
public void testDelayerHandlerRescheduleWithGemfireMessageStore() throws Exception {
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"DelayerHandlerRescheduleIntegrationTests-context.xml", this.getClass());
MessageChannel input = context.getBean("input", MessageChannel.class);
MessageGroupStore messageStore = context.getBean("messageStore", MessageGroupStore.class);
String delayerMessageGroupId = DELAYER_ID + ".messageGroupId";
Message<String> message1 = MessageBuilder.withPayload("test1").build();
input.send(message1);
input.send(MessageBuilder.withPayload("test2").build());
// Emulate restart and check Cache state before next start
// Interrupt taskScheduler as quickly as possible
ThreadPoolTaskScheduler taskScheduler =
(ThreadPoolTaskScheduler) IntegrationContextUtils.getTaskScheduler(context);
taskScheduler.shutdown();
taskScheduler.getScheduledExecutor().awaitTermination(10, TimeUnit.SECONDS);
context.close();
try {
context.getBean("input", MessageChannel.class);
fail("IllegalStateException expected");
}
catch (Exception e) {
assertThat(e instanceof IllegalStateException).isTrue();
assertThat(e.getMessage().contains("BeanFactory not initialized or already closed - call 'refresh'"))
.isTrue();
}
assertThat(messageStore.getMessageGroupCount()).isEqualTo(1);
assertThat(messageStore.iterator().next().getGroupId()).isEqualTo(delayerMessageGroupId);
assertThat(messageStore.messageGroupSize(delayerMessageGroupId)).isEqualTo(2);
assertThat(messageStore.getMessageCountForAllMessageGroups()).isEqualTo(2);
MessageGroup messageGroup = messageStore.getMessageGroup(delayerMessageGroupId);
Message<?> messageInStore = messageGroup.getMessages().iterator().next();
Object payload = messageInStore.getPayload();
// INT-3049
assertThat(payload instanceof DelayHandler.DelayedMessageWrapper).isTrue();
assertThat(((DelayHandler.DelayedMessageWrapper) payload).getOriginal()).isEqualTo(message1);
context.refresh();
PollableChannel output = context.getBean("output", PollableChannel.class);
Message<?> message = output.receive(20000);
assertThat(message).isNotNull();
Object payload1 = message.getPayload();
message = output.receive(20000);
assertThat(message).isNotNull();
Object payload2 = message.getPayload();
assertThat(payload2).isNotSameAs(payload1);
assertThat(messageStore.getMessageGroupCount()).isEqualTo(1);
int n = 0;
while (n++ < 200 && messageStore.messageGroupSize(delayerMessageGroupId) > 0) {
Thread.sleep(100);
}
assertThat(messageStore.messageGroupSize(delayerMessageGroupId)).isEqualTo(0);
context.close();
}
}

View File

@@ -0,0 +1,368 @@
/*
* Copyright 2007-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.store;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.gemfire.CacheFactoryBean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import junit.framework.AssertionFailedError;
/**
* @author Oleg Zhurakousky
* @author David Turanski
* @author Gary Russell
* @author Artem Bilan
*
*/
public class GemfireGroupStoreTests {
private static CacheFactoryBean cacheFactoryBean;
public static Region<Object, Object> region;
@Test
public void testNonExistingEmptyMessageGroup() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
MessageGroup messageGroup = store.getMessageGroup(1);
assertThat(messageGroup).isNotNull();
assertThat(messageGroup instanceof SimpleMessageGroup).isTrue();
assertThat(messageGroup.size()).isEqualTo(0);
}
@Test
public void testMessageGroupWithAddedMessage() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
MessageGroup messageGroup = store.getMessageGroup(1);
Message<?> message = new GenericMessage<String>("Hello");
messageGroup = store.addMessageToGroup(1, message);
assertThat(messageGroup.size()).isEqualTo(1);
// make sure the store is properly rebuild from Gemfire
store = new GemfireMessageStore(region);
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.size()).isEqualTo(1);
}
@Test
public void testRemoveMessageFromTheGroup() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
MessageGroup messageGroup = store.getMessageGroup(1);
Message<?> message = new GenericMessage<String>("2");
messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage<String>("1"));
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.size()).isEqualTo(1);
Thread.sleep(1); //since it adds to a local region some times CREATED_DATE ends up to be the same
// Unrealistic in a real scenario
messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message);
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.size()).isEqualTo(2);
Thread.sleep(1);
messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage<String>("3"));
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.size()).isEqualTo(3);
store.removeMessagesFromGroup(messageGroup.getGroupId(), message);
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.size()).isEqualTo(2);
// make sure the store is properly rebuild from Gemfire
store = new GemfireMessageStore(region);
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.size()).isEqualTo(2);
}
@Test
public void testRemoveMessageGroup() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
MessageGroup messageGroup = store.getMessageGroup(1);
Message<?> message = new GenericMessage<String>("Hello");
messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message);
assertThat(messageGroup.size()).isEqualTo(1);
store.removeMessageGroup(1);
MessageGroup messageGroupA = store.getMessageGroup(1);
assertThat(messageGroupA).isNotSameAs(messageGroup);
assertThat(messageGroupA.getMessages().size()).isEqualTo(0);
assertThat(messageGroupA.size()).isEqualTo(0);
// make sure the store is properly rebuild from Gemfire
store = new GemfireMessageStore(region);
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.getMessages().size()).isEqualTo(0);
assertThat(messageGroup.size()).isEqualTo(0);
}
@Test
public void testRemoveNonExistingMessageFromTheGroup() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
MessageGroup messageGroup = store.getMessageGroup(1);
store.addMessagesToGroup(messageGroup.getGroupId(), new GenericMessage<String>("1"));
store.removeMessagesFromGroup(1, new GenericMessage<String>("2"));
}
@Test
public void testRemoveNonExistingMessageFromNonExistingTheGroup() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
store.removeMessagesFromGroup(1, new GenericMessage<String>("2"));
}
@Test
public void testCompleteMessageGroup() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
MessageGroup messageGroup = store.getMessageGroup(1);
Message<?> messageToMark = new GenericMessage<String>("1");
store.addMessagesToGroup(messageGroup.getGroupId(), messageToMark);
store.completeGroup(messageGroup.getGroupId());
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.isComplete()).isTrue();
}
@Test
public void testLastReleasedSequenceNumber() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
MessageGroup messageGroup = store.getMessageGroup(1);
Message<?> messageToMark = new GenericMessage<String>("1");
store.addMessagesToGroup(messageGroup.getGroupId(), messageToMark);
store.setLastReleasedSequenceNumberForGroup(messageGroup.getGroupId(), 5);
messageGroup = store.getMessageGroup(1);
assertThat(messageGroup.getLastReleasedMessageSequenceNumber()).isEqualTo(5);
}
@Test
public void testMultipleInstancesOfGroupStore() throws Exception {
GemfireMessageStore store1 = new GemfireMessageStore(region);
GemfireMessageStore store2 = new GemfireMessageStore(region);
Message<?> message = new GenericMessage<String>("1");
store1.addMessagesToGroup(1, message);
MessageGroup messageGroup = store2.addMessageToGroup(1, new GenericMessage<String>("2"));
assertThat(messageGroup.getMessages().size()).isEqualTo(2);
GemfireMessageStore store3 = new GemfireMessageStore(region);
store3.removeMessagesFromGroup(1, message);
messageGroup = store3.getMessageGroup(1);
assertThat(messageGroup.getMessages().size()).isEqualTo(1);
}
@Test
public void testWithMessageHistory() throws Exception {
GemfireMessageStore store = new GemfireMessageStore(region);
store.getMessageGroup(1);
Message<?> message = new GenericMessage<String>("Hello");
DirectChannel fooChannel = new DirectChannel();
fooChannel.setBeanName("fooChannel");
DirectChannel barChannel = new DirectChannel();
barChannel.setBeanName("barChannel");
message = MessageHistory.write(message, fooChannel);
message = MessageHistory.write(message, barChannel);
store.addMessagesToGroup(1, message);
message = store.getMessageGroup(1).getMessages().iterator().next();
MessageHistory messageHistory = MessageHistory.read(message);
assertThat(messageHistory).isNotNull();
assertThat(messageHistory.size()).isEqualTo(2);
Properties fooChannelHistory = messageHistory.get(0);
assertThat(fooChannelHistory.get("name")).isEqualTo("fooChannel");
assertThat(fooChannelHistory.get("type")).isEqualTo("channel");
}
@Test
public void testIteratorOfMessageGroups() throws Exception {
GemfireMessageStore store1 = new GemfireMessageStore(region);
GemfireMessageStore store2 = new GemfireMessageStore(region);
store1.addMessagesToGroup(1, new GenericMessage<String>("1"));
store2.addMessagesToGroup(2, new GenericMessage<String>("2"));
store1.addMessagesToGroup(3, new GenericMessage<String>("3"), new GenericMessage<String>("3A"));
Iterator<MessageGroup> messageGroups = store1.iterator();
int counter = 0;
while (messageGroups.hasNext()) {
messageGroups.next();
counter++;
}
assertThat(counter).isEqualTo(3);
store2.removeMessageGroup(3);
messageGroups = store1.iterator();
counter = 0;
while (messageGroups.hasNext()) {
messageGroups.next();
counter++;
}
assertThat(counter).isEqualTo(2);
}
@Test
@Ignore
public void testConcurrentModifications() throws Exception {
final GemfireMessageStore store1 = new GemfireMessageStore(region);
final GemfireMessageStore store2 = new GemfireMessageStore(region);
final Message<?> message = new GenericMessage<String>("1");
ExecutorService executor = null;
final List<Object> failures = new ArrayList<Object>();
for (int i = 0; i < 100; i++) {
executor = Executors.newCachedThreadPool();
executor.execute(() -> {
MessageGroup group = store1.addMessageToGroup(1, message);
if (group.getMessages().size() != 1) {
failures.add("ADD");
throw new AssertionFailedError("Failed on ADD");
}
});
executor.execute(() -> {
store2.removeMessagesFromGroup(1, message);
MessageGroup group = store2.getMessageGroup(1);
if (group.getMessages().size() != 0) {
failures.add("REMOVE");
throw new AssertionFailedError("Failed on Remove");
}
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
store2.removeMessagesFromGroup(1, message); // ensures that if ADD thread executed after REMOVE, the store is empty for the next cycle
}
assertThat(failures.size() == 0).isTrue();
}
@Test
public void testWithAggregatorWithShutdown() {
ClassPathXmlApplicationContext context1 = new ClassPathXmlApplicationContext("gemfire-aggregator-config.xml",
this.getClass());
MessageChannel input = context1.getBean("inputChannel", MessageChannel.class);
QueueChannel output = context1.getBean("outputChannel", QueueChannel.class);
Message<?> m1 = MessageBuilder.withPayload("1").setSequenceNumber(1).setSequenceSize(3).setCorrelationId(1)
.build();
Message<?> m2 = MessageBuilder.withPayload("2").setSequenceNumber(2).setSequenceSize(3).setCorrelationId(1)
.build();
input.send(m1);
assertThat(output.receive(1000)).isNull();
input.send(m2);
assertThat(output.receive(1000)).isNull();
ClassPathXmlApplicationContext context2 = new ClassPathXmlApplicationContext("gemfire-aggregator-config-a.xml",
this.getClass());
MessageChannel inputA = context2.getBean("inputChannel", MessageChannel.class);
QueueChannel outputA = context2.getBean("outputChannel", QueueChannel.class);
Message<?> m3 = MessageBuilder.withPayload("3").setSequenceNumber(3).setSequenceSize(3).setCorrelationId(1)
.build();
inputA.send(m3);
assertThat(outputA.receive(1000)).isNotNull();
context1.close();
context2.close();
}
@Test
public void testQueue() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("gemfire-queue-config.xml",
this.getClass());
QueueChannel gemfireQueue = context.getBean("gemfireQueue", QueueChannel.class);
QueueChannel outputQueue = context.getBean("outputQueue", QueueChannel.class);
for (int i = 0; i < 20; i++) {
gemfireQueue.send(new GenericMessage<String>("Hello"));
Thread.sleep(1);
}
for (int i = 0; i < 20; i++) {
assertThat(outputQueue.receive(5000)).isNotNull();
}
assertThat(outputQueue.receive(1)).isNull();
context.close();
}
@Before
public void prepare() {
if (region != null) {
region.clear();
}
}
@BeforeClass
public static void init() throws Exception {
cacheFactoryBean = new CacheFactoryBean();
cacheFactoryBean.afterPropertiesSet();
Cache cache = (Cache) cacheFactoryBean.getObject();
region = cache.createRegionFactory().setScope(Scope.LOCAL).create("sig-tests");
}
@AfterClass
public static void cleanup() throws Exception {
if (region != null) {
region.close();
}
if (cacheFactoryBean != null) {
cacheFactoryBean.destroy();
}
}
}

View File

@@ -0,0 +1,167 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.store;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Scope;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.data.gemfire.CacheFactoryBean;
import org.springframework.data.gemfire.GenericRegionFactoryBean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupMetadata;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
/**
* @author Mark Fisher
* @author David Turanski
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*/
public class GemfireMessageStoreTests {
private static CacheFactoryBean cacheFactoryBean;
private static Region<Object, Object> region;
@Test
public void addAndGetMessage() {
GemfireMessageStore store = new GemfireMessageStore(region);
Message<?> message = MessageBuilder.withPayload("test").build();
store.addMessage(message);
Message<?> retrieved = store.getMessage(message.getHeaders().getId());
assertThat(retrieved).isEqualTo(message);
}
@Test
public void testRegionConstructor() throws Exception {
GenericRegionFactoryBean<Object, Object> region = new GenericRegionFactoryBean<>();
region.setName("someRegion");
region.setCache(cacheFactoryBean.getObject());
region.afterPropertiesSet();
GemfireMessageStore store = new GemfireMessageStore(region.getObject());
assertThat(TestUtils.getPropertyValue(store, "messageStoreRegion")).isSameAs(region.getObject());
region.destroy();
}
@Test
public void testWithMessageHistory() {
GemfireMessageStore store = new GemfireMessageStore(region);
Message<?> message = new GenericMessage<>("Hello");
DirectChannel fooChannel = new DirectChannel();
fooChannel.setBeanName("fooChannel");
DirectChannel barChannel = new DirectChannel();
barChannel.setBeanName("barChannel");
message = MessageHistory.write(message, fooChannel);
message = MessageHistory.write(message, barChannel);
store.addMessage(message);
message = store.getMessage(message.getHeaders().getId());
MessageHistory messageHistory = MessageHistory.read(message);
assertThat(messageHistory).isNotNull();
assertThat(messageHistory.size()).isEqualTo(2);
Properties fooChannelHistory = messageHistory.get(0);
assertThat(fooChannelHistory.get("name")).isEqualTo("fooChannel");
assertThat(fooChannelHistory.get("type")).isEqualTo("channel");
}
@Test
public void testAddAndRemoveMessagesFromMessageGroup() {
GemfireMessageStore messageStore = new GemfireMessageStore(region);
String groupId = "X";
List<Message<?>> messages = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
messageStore.addMessagesToGroup(groupId, message);
messages.add(message);
}
MessageGroup group = messageStore.getMessageGroup(groupId);
assertThat(group.size()).isEqualTo(25);
messageStore.removeMessagesFromGroup(groupId, messages);
group = messageStore.getMessageGroup(groupId);
assertThat(group.size()).isEqualTo(0);
}
@Test
public void testAddAndRemoveMessagesFromMessageGroupWithPrefix() {
GemfireMessageStore messageStore = new GemfireMessageStore(region, "foo_");
String groupId = "X";
List<Message<?>> messages = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Message<String> message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build();
messageStore.addMessagesToGroup(groupId, message);
messages.add(message);
}
MessageGroupMetadata messageGroupMetadata =
(MessageGroupMetadata) region.get("foo_" + "MESSAGE_GROUP_" + groupId);
assertThat(messageGroupMetadata).isNotNull();
assertThat(messageGroupMetadata.size()).isEqualTo(25);
messageStore.removeMessagesFromGroup(groupId, messages);
MessageGroup group = messageStore.getMessageGroup(groupId);
assertThat(group.size()).isEqualTo(0);
}
@Before
public void prepare() {
if (region != null) {
region.clear();
}
}
@BeforeClass
public static void init() throws Exception {
cacheFactoryBean = new CacheFactoryBean();
Cache cache = (Cache) cacheFactoryBean.getObject();
region = cache.createRegionFactory().setScope(Scope.LOCAL).create("sig-tests");
}
@AfterClass
public static void cleanup() throws Exception {
if (region != null) {
region.close();
}
if (cacheFactoryBean != null) {
cacheFactoryBean.destroy();
}
}
}

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
<int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="gemfireStore"/>
<int:channel id="outputChannel">
<int:queue/>
</int:channel>
<bean id="gemfireStore" class="org.springframework.integration.gemfire.store.GemfireMessageStore">
<constructor-arg
value="#{T (org.springframework.integration.gemfire.store.GemfireGroupStoreTests).region}"/>
</bean>
</beans>

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
<int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="gemfireStore"/>
<int:channel id="outputChannel">
<int:queue/>
</int:channel>
<bean id="gemfireStore" class="org.springframework.integration.gemfire.store.GemfireMessageStore">
<constructor-arg
value="#{T (org.springframework.integration.gemfire.store.GemfireGroupStoreTests).region}"/>
</bean>
</beans>

View File

@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
<int:channel id="gemfireQueue">
<int:queue message-store="gemfireStore"/>
</int:channel>
<int:bridge input-channel="gemfireQueue" output-channel="outputQueue">
<int:poller fixed-rate="1000" max-messages-per-poll="200"/>
</int:bridge>
<int:channel id="outputQueue">
<int:queue/>
</int:channel>
<bean id="gemfireStore" class="org.springframework.integration.gemfire.store.GemfireMessageStore">
<constructor-arg
value="#{T (org.springframework.integration.gemfire.store.GemfireGroupStoreTests).region}"/>
</bean>
</beans>

View File

@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:gfe="http://www.springframework.org/schema/geode"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/geode https://www.springframework.org/schema/geode/spring-geode.xsd
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd">
<gfe:cache/>
<bean id="lockRegistry" class="org.springframework.integration.gemfire.util.GemfireLockRegistry">
<constructor-arg ref="gemfireCache"/>
</bean>
<bean id="lockRegistry2" class="org.springframework.integration.gemfire.util.GemfireLockRegistry">
<constructor-arg value="#{gemfireCache.getRegion(T(org.springframework.integration.gemfire.util.GemfireLockRegistry).LOCK_REGISTRY_REGION)}"/>
</bean>
<bean id="latching"
class="org.springframework.integration.gemfire.util.AggregatorWithGemfireLocksTests$LatchingReleaseStrategy"/>
<bean id="sms" class="org.springframework.integration.store.SimpleMessageStore"/>
<int:aggregator input-channel="in" release-strategy="latching" output-channel="out"
message-store="sms"
expire-groups-upon-completion="true" lock-registry="lockRegistry"/>
<int:aggregator input-channel="in2" release-strategy="latching" output-channel="out"
message-store="sms"
expire-groups-upon-completion="true" lock-registry="lockRegistry2"/>
<int:channel id="out">
<int:queue/>
</int:channel>
</beans>

View File

@@ -0,0 +1,176 @@
/*
* Copyright 2014-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.gemfire.util;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.store.MessageGroup;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Gary Russell
* @author Artem Bilan
* @since 4.0
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class AggregatorWithGemfireLocksTests {
@Autowired
private LatchingReleaseStrategy releaseStrategy;
@Autowired
private MessageChannel in;
@Autowired
private MessageChannel in2;
@Autowired
private PollableChannel out;
private volatile Exception exception;
@Test
public void testLockSingleGroup() throws Exception {
this.releaseStrategy.reset(1);
Executors.newSingleThreadExecutor().execute(asyncSend("foo", 1, 1));
Executors.newSingleThreadExecutor().execute(asyncSend("bar", 2, 1));
assertThat(this.releaseStrategy.latch2.await(10, TimeUnit.SECONDS)).isTrue();
this.releaseStrategy.latch1.countDown();
assertThat(this.out.receive(10000)).isNotNull();
assertThat(this.releaseStrategy.maxCallers.get()).isEqualTo(1);
assertThat(this.exception)
.as("Unexpected exception:" + (this.exception != null ? this.exception.toString() : "")).isNull();
}
@Test
public void testLockThreeGroups() throws Exception {
this.releaseStrategy.reset(3);
Executors.newSingleThreadExecutor().execute(asyncSend("foo", 1, 1));
Executors.newSingleThreadExecutor().execute(asyncSend("bar", 2, 1));
Executors.newSingleThreadExecutor().execute(asyncSend("foo", 1, 2));
Executors.newSingleThreadExecutor().execute(asyncSend("bar", 2, 2));
Executors.newSingleThreadExecutor().execute(asyncSend("foo", 1, 3));
Executors.newSingleThreadExecutor().execute(asyncSend("bar", 2, 3));
assertThat(this.releaseStrategy.latch2.await(10, TimeUnit.SECONDS)).isTrue();
this.releaseStrategy.latch1.countDown();
this.releaseStrategy.latch1.countDown();
this.releaseStrategy.latch1.countDown();
assertThat(this.out.receive(10000)).isNotNull();
assertThat(this.out.receive(10000)).isNotNull();
assertThat(this.out.receive(10000)).isNotNull();
assertThat(this.releaseStrategy.maxCallers.get()).isEqualTo(3);
assertThat(this.exception)
.as("Unexpected exception:" + (this.exception != null ? this.exception.toString() : "")).isNull();
}
@Test
public void testDistributedAggregator() throws Exception {
this.releaseStrategy.reset(1);
Executors.newSingleThreadExecutor().execute(asyncSend("foo", 1, 1));
Executors.newSingleThreadExecutor().execute(() -> {
try {
in2.send(new GenericMessage<String>("bar", stubHeaders(2, 2, 1)));
}
catch (Exception e) {
exception = e;
}
});
assertThat(this.releaseStrategy.latch2.await(10, TimeUnit.SECONDS)).isTrue();
this.releaseStrategy.latch1.countDown();
assertThat(this.out.receive(10000)).isNotNull();
assertThat(this.releaseStrategy.maxCallers.get()).isEqualTo(1);
assertThat(this.exception)
.as("Unexpected exception:" + (this.exception != null ? this.exception.toString() : "")).isNull();
}
private Runnable asyncSend(final String payload, final int sequence, final int correlation) {
return () -> {
try {
in.send(new GenericMessage<String>(payload, stubHeaders(sequence, 2, correlation)));
}
catch (Exception e) {
exception = e;
}
};
}
private Map<String, Object> stubHeaders(int sequenceNumber, int sequenceSize, int correlationId) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, sequenceNumber);
headers.put(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, sequenceSize);
headers.put(IntegrationMessageHeaderAccessor.CORRELATION_ID, correlationId);
return headers;
}
public static class LatchingReleaseStrategy implements ReleaseStrategy {
private volatile CountDownLatch latch1;
private volatile CountDownLatch latch2;
private volatile AtomicInteger callers;
private volatile AtomicInteger maxCallers;
@Override
public boolean canRelease(MessageGroup group) {
synchronized (this) {
this.callers.incrementAndGet();
this.maxCallers.set(Math.max(this.maxCallers.get(), this.callers.get()));
}
this.latch2.countDown();
try {
this.latch1.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.callers.decrementAndGet();
return group.size() > 1;
}
public void reset(int expectedConcurrency) {
this.latch1 = new CountDownLatch(expectedConcurrency);
this.latch2 = new CountDownLatch(expectedConcurrency);
this.callers = new AtomicInteger();
this.maxCallers = new AtomicInteger();
}
}
}

View File

@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout pattern="%d %p [%t] [%c] - %m%n" />
</Console>
</Appenders>
<Loggers>
<Logger name="org.springframework" level="warn"/>
<Logger name="org.springframework.integration" level="warn"/>
<Logger name="org.springframework.integration.gemfire" level="warn"/>
<Root level="warn">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>