INTEXT-150 Extract Hazelcast Headers and Payloads

JIRA: https://jira.spring.io/browse/INTEXT-150

Refactorings are committed.

Polishing:
* Add generic type to the `AbstractHazelcastEventListener` for cleaner implementors
* Remove redundant `Assert`s
* Polishing some code style
This commit is contained in:
Eren Avsarogullari
2015-04-15 23:57:01 +03:00
committed by Artem Bilan
parent 20d0192fc0
commit 95f7b239f7
27 changed files with 618 additions and 399 deletions

View File

@@ -37,7 +37,7 @@ Basically, Hazelcast Event-Driven Inbound Channel Adapter requires following att
3. Supported cache event types for IList, ISet and IQueue : ADDED, REMOVED.
4. There is no need to cache event type definition for ITopic.
* **cache-listening-policy :** Specifies cache listening policy as SINGLE or ALL. It is optional attribute and its default value is SINGLE. Each Hazelcast inbound channel adapter which listens same cache object with same cache-events attribute, can receive a single event message or all event messages. If this is ALL, all Hazelcast inbound channel adapters which listens same cache object with same cache-events attribute, will receive same event messages. If this is SINGLE, they will receive unique event messages.
* **cache-listening-policy :** Specifies cache listening policy as SINGLE or ALL. It is optional attribute and its default value is SINGLE. Each Hazelcast CQ inbound channel adapter listening same cache object with same cache-events attribute, can receive a single event message or all event messages. If it is ALL, all Hazelcast CQ inbound channel adapters listening same cache object with same cache-events attribute, will receive same event messages. If it is SINGLE, they will receive unique event messages.
Sample namespace and schemaLocation definitions are as follows :
```
@@ -74,9 +74,7 @@ Sample definitions are as follows :
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED,
REMOVED,
CLEAR_ALL" />
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
@@ -139,7 +137,7 @@ Sample definitions are as follows :
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED,UPDATED, REMOVED"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
@@ -163,14 +161,18 @@ Hazelcast Continuous Query enables to listen to the modifications performed on s
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname" />
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE" />
```
Basically, it requires four attributes as follows :
* **channel :** Specifies channel which message is sent. It is mandatory attribute.
* **cache :** Specifies distributed Map reference which is listened. It is mandatory attribute.
* **cache-events :** Specifies cache events which are listened. It is optional attribute with ADDED default value. Supported values are ADDED, REMOVED, UPDATED, EVICTED, EVICT_ALL and CLEAR_ALL.
* **predicate :** Specifies predicate to listen to the modifications performed on specific map entries. It is mandatory attribute.
* **predicate :** Specifies predicate to listen to the modifications performed on specific map entries. It is mandatory attribute.
* **include-value :** Specifies including of value and oldValue in continuous query result. It is optional attribute with 'true' default value.
* **cache-listening-policy :** Specifies cache listening policy as SINGLE or ALL. It is optional attribute and its default value is SINGLE. Each Hazelcast CQ inbound channel adapter listening same cache object with same cache-events attribute, can receive a single event message or all event messages. If it is ALL, all Hazelcast CQ inbound channel adapters listening same cache object with same cache-events attribute, will receive same event messages. If it is SINGLE, they will receive unique event messages.
Sample definition is as follows :
```
@@ -180,7 +182,9 @@ Sample definition is as follows :
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"/>
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="cqMap"/>
@@ -205,7 +209,7 @@ Hazelcast allows to run distributed queries on the distributed map. Hazelcast Di
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller cron="*/10 * * * * *"/>
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
```
Basically, it requires a poller and four attributes such as
@@ -224,7 +228,7 @@ Sample definition is as follows :
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller cron="*/10 * * * * *"/>
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.hazelcast.common;
package org.springframework.integration.hazelcast;
/**
* Enumeration of Cache Event Types

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.hazelcast.common;
package org.springframework.integration.hazelcast;
/**
* Enumeration of Cache Listening Policy Type

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.hazelcast.common;
package org.springframework.integration.hazelcast;
/**
* Enumeration of Distributed SQL Iteration Type

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.hazelcast;
/**
* Hazelcast Message Headers
*
* @author Eren Avsarogullari
* @since 1.0.0
*/
public abstract class HazelcastHeaders {
private static final String PREFIX = "hazelcast_";
public static final String EVENT_TYPE = PREFIX + "eventType";
public static final String MEMBER = PREFIX + "member";
public static final String CACHE_NAME = PREFIX + "cacheName";
public static final String PUBLISHING_TIME = PREFIX + "publishingTime";
}

View File

@@ -14,12 +14,15 @@
* limitations under the License.
*/
package org.springframework.integration.hazelcast.common;
package org.springframework.integration.hazelcast;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import reactor.util.CollectionUtils;
import reactor.util.StringUtils;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.IList;
import com.hazelcast.core.IMap;
@@ -29,9 +32,6 @@ import com.hazelcast.core.ITopic;
import com.hazelcast.core.MultiMap;
import com.hazelcast.core.ReplicatedMap;
import reactor.util.CollectionUtils;
import reactor.util.StringUtils;
/**
* Common Validator for Hazelcast Integration. It validates cache types and events.
*

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.hazelcast.common;
package org.springframework.integration.hazelcast;
import java.net.SocketAddress;
import java.util.concurrent.locks.Lock;

View File

@@ -1,4 +0,0 @@
/**
* Provides common used types and classes.
*/
package org.springframework.integration.hazelcast.common;

View File

@@ -21,7 +21,7 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.integration.config.IntegrationConfigurationInitializer;
import org.springframework.integration.hazelcast.common.HazelcastLocalInstanceRegistrar;
import org.springframework.integration.hazelcast.HazelcastLocalInstanceRegistrar;
/**
* The Hazelcast Integration infrastructure {@code beanFactory} initializer.

View File

@@ -19,15 +19,19 @@ package org.springframework.integration.hazelcast.inbound;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.EventObject;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.hazelcast.common.CacheEventType;
import org.springframework.integration.hazelcast.common.CacheListeningPolicyType;
import org.springframework.integration.hazelcast.common.HazelcastIntegrationDefinitionValidator;
import org.springframework.integration.hazelcast.common.HazelcastLocalInstanceRegistrar;
import org.springframework.integration.hazelcast.CacheEventType;
import org.springframework.integration.hazelcast.CacheListeningPolicyType;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationDefinitionValidator;
import org.springframework.integration.hazelcast.HazelcastLocalInstanceRegistrar;
import org.springframework.integration.hazelcast.message.EntryEventMessagePayload;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import com.hazelcast.core.AbstractIMapEvent;
@@ -45,20 +49,21 @@ import reactor.util.StringUtils;
* Hazelcast Base Event-Driven Message Producer.
*
* @author Eren Avsarogullari
* @author Artem Bilan
* @since 1.0.0
*/
public abstract class AbstractHazelcastMessageProducer extends MessageProducerSupport {
protected final DistributedObject distributedObject;
private CacheListeningPolicyType cacheListeningPolicy = CacheListeningPolicyType.SINGLE;
private volatile CacheListeningPolicyType cacheListeningPolicy = CacheListeningPolicyType.SINGLE;
private String hazelcastRegisteredEventListenerId;
private volatile String hazelcastRegisteredEventListenerId;
private Set<String> cacheEvents = Collections.singleton(CacheEventType.ADDED.name());
protected AbstractHazelcastMessageProducer(DistributedObject distributedObject) {
Assert.notNull(distributedObject, "cache must not be null");
public AbstractHazelcastMessageProducer(DistributedObject distributedObject) {
Assert.notNull(distributedObject, "'distributedObject' must not be null");
this.distributedObject = distributedObject;
}
@@ -68,8 +73,8 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu
public void setCacheEventTypes(String cacheEventTypes) {
HazelcastIntegrationDefinitionValidator.validateEnumType(CacheEventType.class, cacheEventTypes);
final Set<String> cacheEvents = StringUtils.commaDelimitedListToSet(cacheEventTypes);
Assert.notEmpty(cacheEvents, "cacheEvents must have elements");
Set<String> cacheEvents = StringUtils.commaDelimitedListToSet(cacheEventTypes);
Assert.notEmpty(cacheEvents, "'cacheEvents' must have elements");
HazelcastIntegrationDefinitionValidator.validateCacheEventsByDistributedObject(
this.distributedObject, cacheEvents);
this.cacheEvents = cacheEvents;
@@ -80,7 +85,7 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu
}
public void setCacheListeningPolicy(CacheListeningPolicyType cacheListeningPolicy) {
Assert.notNull(cacheListeningPolicy, "cacheListeningPolicy must not be null");
Assert.notNull(cacheListeningPolicy, "'cacheListeningPolicy' must not be null");
this.cacheListeningPolicy = cacheListeningPolicy;
}
@@ -92,28 +97,27 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu
this.hazelcastRegisteredEventListenerId = hazelcastRegisteredEventListenerId;
}
protected abstract class AbstractHazelcastEventListener {
protected abstract class AbstractHazelcastEventListener<E> {
protected abstract void processEvent(EventObject event);
protected abstract void processEvent(E event);
protected void sendMessage(final EventObject event, final InetSocketAddress socketAddress,
final CacheListeningPolicyType cacheListeningPolicyType) {
protected abstract Message<?> toMessage(E event);
protected void sendMessage(E event, InetSocketAddress socketAddress,
CacheListeningPolicyType cacheListeningPolicyType) {
if (CacheListeningPolicyType.ALL == cacheListeningPolicyType || isEventAcceptable(socketAddress)) {
AbstractHazelcastMessageProducer.this.sendMessage(getMessageBuilderFactory().withPayload(event).build());
AbstractHazelcastMessageProducer.this.sendMessage(toMessage(event));
}
}
private boolean isEventAcceptable(final InetSocketAddress socketAddress) {
final Set<HazelcastInstance> hazelcastInstanceSet = Hazelcast.getAllHazelcastInstances();
final Set<SocketAddress> localSocketAddressesSet = getLocalSocketAddresses(hazelcastInstanceSet);
if ((!localSocketAddressesSet.isEmpty())
return (!localSocketAddressesSet.isEmpty())
&& (localSocketAddressesSet.contains(socketAddress) ||
isEventComingFromNonRegisteredHazelcastInstance(hazelcastInstanceSet.iterator().next(),
localSocketAddressesSet, socketAddress))) {
return true;
}
localSocketAddressesSet, socketAddress));
return false;
}
private Set<SocketAddress> getLocalSocketAddresses(final Set<HazelcastInstance> hazelcastInstanceSet) {
@@ -140,7 +144,7 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu
}
protected final class HazelcastEntryListener<K, V> extends
AbstractHazelcastEventListener implements EntryListener<K, V> {
AbstractHazelcastEventListener<AbstractIMapEvent> implements EntryListener<K, V> {
@Override
public void entryAdded(EntryEvent<K, V> event) {
@@ -173,12 +177,9 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu
}
@Override
protected void processEvent(EventObject event) {
Assert.notNull(event, "event must not be null");
if (getCacheEvents().contains(((AbstractIMapEvent) event).getEventType().toString())) {
sendMessage(event, ((AbstractIMapEvent) event).getMember().getSocketAddress(),
getCacheListeningPolicy());
protected void processEvent(AbstractIMapEvent event) {
if (getCacheEvents().contains(event.getEventType().toString())) {
sendMessage(event, event.getMember().getSocketAddress(), getCacheListeningPolicy());
}
if (logger.isDebugEnabled()) {
@@ -186,6 +187,29 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu
}
}
@Override
@SuppressWarnings("unchecked")
protected Message<?> toMessage(AbstractIMapEvent event) {
final Map<String, Object> headers = new HashMap<String, Object>();
headers.put(HazelcastHeaders.EVENT_TYPE, event.getEventType().name());
headers.put(HazelcastHeaders.MEMBER, event.getMember().getSocketAddress());
headers.put(HazelcastHeaders.CACHE_NAME, event.getName());
if (event instanceof EntryEvent) {
EntryEvent<K, V> entryEvent = (EntryEvent<K, V>) event;
EntryEventMessagePayload<K, V> messagePayload = new EntryEventMessagePayload<>(entryEvent.getKey(),
entryEvent.getValue(), entryEvent.getOldValue());
return getMessageBuilderFactory().withPayload(messagePayload).copyHeaders(headers).build();
}
else if (event instanceof MapEvent) {
return getMessageBuilderFactory()
.withPayload(((MapEvent) event).getNumberOfEntriesAffected()).copyHeaders(headers).build();
}
else {
throw new IllegalStateException("Invalid event is received. Event : " + event);
}
}
}
}

View File

@@ -39,7 +39,7 @@ public class HazelcastContinuousQueryMessageProducer extends AbstractHazelcastMe
@SuppressWarnings("rawtypes")
public HazelcastContinuousQueryMessageProducer(IMap distributedMap, String predicate) {
super(distributedMap);
Assert.hasText(predicate, "predicate must not be null");
Assert.hasText(predicate, "'predicate' must not be null");
this.predicate = predicate;
}

View File

@@ -20,7 +20,7 @@ import java.util.Collection;
import java.util.Collections;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.hazelcast.common.DistributedSQLIterationType;
import org.springframework.integration.hazelcast.DistributedSQLIterationType;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -39,19 +39,19 @@ public class HazelcastDistributedSQLMessageSource extends AbstractMessageSource
private final IMap<?, ?> distributedMap;
private final String distributedSQL;
private final String distributedSql;
private DistributedSQLIterationType iterationType = DistributedSQLIterationType.VALUE;
public HazelcastDistributedSQLMessageSource(IMap distributedMap, String distributedSQL) {
Assert.notNull(distributedMap, "cache must not be null");
Assert.hasText(distributedSQL, "distributed-sql must not be null");
public HazelcastDistributedSQLMessageSource(IMap distributedMap, String distributedSql) {
Assert.notNull(distributedMap, "'distributedMap' must not be null");
Assert.hasText(distributedSql, "'distributedSql' must not be empty");
this.distributedMap = distributedMap;
this.distributedSQL = distributedSQL;
this.distributedSql = distributedSql;
}
public void setIterationType(DistributedSQLIterationType iterationType) {
Assert.notNull(this.iterationType, "iterationType must not be null");
Assert.notNull(this.iterationType, "'iterationType' must not be null");
this.iterationType = iterationType;
}
@@ -65,18 +65,18 @@ public class HazelcastDistributedSQLMessageSource extends AbstractMessageSource
switch (this.iterationType) {
case ENTRY:
return getDistributedSQLResultSet(Collections
.unmodifiableCollection(this.distributedMap.entrySet(new SqlPredicate(this.distributedSQL))));
.unmodifiableCollection(this.distributedMap.entrySet(new SqlPredicate(this.distributedSql))));
case KEY:
return getDistributedSQLResultSet(Collections
.unmodifiableCollection(this.distributedMap.keySet(new SqlPredicate(this.distributedSQL))));
.unmodifiableCollection(this.distributedMap.keySet(new SqlPredicate(this.distributedSql))));
case LOCAL_KEY:
return getDistributedSQLResultSet(Collections
.unmodifiableCollection(this.distributedMap.localKeySet(new SqlPredicate(this.distributedSQL))));
.unmodifiableCollection(this.distributedMap.localKeySet(new SqlPredicate(this.distributedSql))));
default:
return getDistributedSQLResultSet(this.distributedMap.values(new SqlPredicate(this.distributedSQL)));
return getDistributedSQLResultSet(this.distributedMap.values(new SqlPredicate(this.distributedSql)));
}
}

View File

@@ -16,9 +16,11 @@
package org.springframework.integration.hazelcast.inbound;
import java.util.EventObject;
import java.util.HashMap;
import java.util.Map;
import org.springframework.integration.hazelcast.common.HazelcastIntegrationDefinitionValidator;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationDefinitionValidator;
import org.springframework.util.Assert;
import com.hazelcast.core.DistributedObject;
@@ -42,9 +44,9 @@ import com.hazelcast.core.ReplicatedMap;
* to listen related cache events and sends events to related channel.
*
* @author Eren Avsarogullari
* @author Artem Bilan
* @since 1.0.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessageProducer {
public HazelcastEventDrivenMessageProducer(DistributedObject distributedObject) {
@@ -58,32 +60,33 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag
}
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
protected void doStart() {
if(this.distributedObject instanceof IMap) {
if (this.distributedObject instanceof IMap) {
setHazelcastRegisteredEventListenerId(((IMap<?, ?>) this.distributedObject)
.addEntryListener(new HazelcastEntryListener(), true));
}
else if(this.distributedObject instanceof MultiMap) {
else if (this.distributedObject instanceof MultiMap) {
setHazelcastRegisteredEventListenerId(((MultiMap<?, ?>) this.distributedObject)
.addEntryListener(new HazelcastEntryListener(), true));
}
else if(this.distributedObject instanceof ReplicatedMap) {
else if (this.distributedObject instanceof ReplicatedMap) {
setHazelcastRegisteredEventListenerId(((ReplicatedMap<?, ?>) this.distributedObject)
.addEntryListener(new HazelcastEntryListener()));
}
else if(this.distributedObject instanceof IList) {
else if (this.distributedObject instanceof IList) {
setHazelcastRegisteredEventListenerId(((IList<?>) this.distributedObject)
.addItemListener(new HazelcastItemListener(), true));
}
else if(this.distributedObject instanceof ISet) {
else if (this.distributedObject instanceof ISet) {
setHazelcastRegisteredEventListenerId(((ISet<?>) this.distributedObject)
.addItemListener(new HazelcastItemListener(), true));
}
else if(this.distributedObject instanceof IQueue) {
else if (this.distributedObject instanceof IQueue) {
setHazelcastRegisteredEventListenerId(((IQueue<?>) this.distributedObject)
.addItemListener(new HazelcastItemListener(), true));
}
else if(this.distributedObject instanceof ITopic) {
else if (this.distributedObject instanceof ITopic) {
setHazelcastRegisteredEventListenerId(((ITopic<?>) this.distributedObject)
.addMessageListener(new HazelcastMessageListener()));
}
@@ -91,25 +94,25 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag
@Override
protected void doStop() {
if(this.distributedObject instanceof IMap) {
if (this.distributedObject instanceof IMap) {
((IMap<?, ?>) this.distributedObject).removeEntryListener(getHazelcastRegisteredEventListenerId());
}
else if(this.distributedObject instanceof MultiMap) {
else if (this.distributedObject instanceof MultiMap) {
((MultiMap<?, ?>) this.distributedObject).removeEntryListener(getHazelcastRegisteredEventListenerId());
}
else if(this.distributedObject instanceof ReplicatedMap) {
else if (this.distributedObject instanceof ReplicatedMap) {
((ReplicatedMap<?, ?>) this.distributedObject).removeEntryListener(getHazelcastRegisteredEventListenerId());
}
else if(this.distributedObject instanceof IList) {
else if (this.distributedObject instanceof IList) {
((IList<?>) this.distributedObject).removeItemListener(getHazelcastRegisteredEventListenerId());
}
else if(this.distributedObject instanceof ISet) {
else if (this.distributedObject instanceof ISet) {
((ISet<?>) this.distributedObject).removeItemListener(getHazelcastRegisteredEventListenerId());
}
else if(this.distributedObject instanceof IQueue) {
else if (this.distributedObject instanceof IQueue) {
((IQueue<?>) this.distributedObject).removeItemListener(getHazelcastRegisteredEventListenerId());
}
else if(this.distributedObject instanceof ITopic) {
else if (this.distributedObject instanceof ITopic) {
((ITopic<?>) this.distributedObject).removeMessageListener(getHazelcastRegisteredEventListenerId());
}
}
@@ -119,7 +122,8 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag
return "hazelcast:inbound-channel-adapter";
}
private class HazelcastItemListener<E> extends AbstractHazelcastEventListener implements ItemListener<E> {
private class HazelcastItemListener<E> extends AbstractHazelcastEventListener<ItemEvent<E>>
implements ItemListener<E> {
@Override
public void itemAdded(ItemEvent<E> item) {
@@ -132,21 +136,29 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag
}
@Override
protected void processEvent(EventObject event) {
Assert.notNull(event, "event must not be null");
if (getCacheEvents().contains(((ItemEvent<E>) event).getEventType().toString())) {
sendMessage(event, ((ItemEvent<E>) event).getMember().getSocketAddress(), getCacheListeningPolicy());
protected void processEvent(ItemEvent<E> event) {
if (getCacheEvents().contains(event.getEventType().toString())) {
sendMessage(event, event.getMember().getSocketAddress(), getCacheListeningPolicy());
}
if (logger.isDebugEnabled()){
if (logger.isDebugEnabled()) {
logger.debug("Received ItemEvent : " + event);
}
}
@Override
protected org.springframework.messaging.Message<?> toMessage(ItemEvent<E> event) {
final Map<String, Object> headers = new HashMap<>();
headers.put(HazelcastHeaders.EVENT_TYPE, event.getEventType().name());
headers.put(HazelcastHeaders.MEMBER, event.getMember().getSocketAddress());
return getMessageBuilderFactory().withPayload(event.getItem()).copyHeaders(headers).build();
}
}
private class HazelcastMessageListener<E> extends AbstractHazelcastEventListener implements MessageListener<E> {
private class HazelcastMessageListener<E> extends AbstractHazelcastEventListener<Message<E>>
implements MessageListener<E> {
@Override
public void onMessage(Message<E> message) {
@@ -154,15 +166,26 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag
}
@Override
protected void processEvent(EventObject event) {
Assert.notNull(event, "event must not be null");
sendMessage(event, ((Message<E>) event).getPublishingMember().getSocketAddress(), null);
protected void processEvent(Message<E> event) {
sendMessage(event, event.getPublishingMember().getSocketAddress(), null);
if (logger.isDebugEnabled()){
if (logger.isDebugEnabled()) {
logger.debug("Received Message : " + event);
}
}
@Override
protected org.springframework.messaging.Message<?> toMessage(Message<E> event) {
Assert.notNull(event.getMessageObject(), "message must not be null");
final Map<String, Object> headers = new HashMap<>();
headers.put(HazelcastHeaders.MEMBER, event.getPublishingMember().getSocketAddress());
headers.put(HazelcastHeaders.CACHE_NAME, event.getSource());
headers.put(HazelcastHeaders.PUBLISHING_TIME, event.getPublishTime());
return getMessageBuilderFactory().withPayload(event.getMessageObject()).copyHeaders(headers).build();
}
}
}

View File

@@ -20,7 +20,7 @@ import java.net.SocketAddress;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import org.springframework.integration.hazelcast.common.HazelcastLocalInstanceRegistrar;
import org.springframework.integration.hazelcast.HazelcastLocalInstanceRegistrar;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

View File

@@ -0,0 +1,67 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.hazelcast.message;
import org.springframework.util.Assert;
/**
* Hazelcast Message Payload for Entry Events
*
* @author Eren Avsarogullari
* @since 1.0.0
*/
public class EntryEventMessagePayload<K, V> {
public final K key;
public final V value;
public final V oldValue;
public EntryEventMessagePayload(final K key, final V value, final V oldValue) {
Assert.notNull(key, "'key' must not be null");
this.key = key;
this.value = value;
this.oldValue = oldValue;
}
@Override
public String toString() {
return "EntryEventMessagePayload [key=" + key + ", value=" + value + ", oldValue=" + oldValue + "]";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EntryEventMessagePayload<?, ?> that = (EntryEventMessagePayload<?, ?>) o;
return key.equals(that.key) && !(value != null ? !value.equals(that.value)
: that.value != null) && !(oldValue != null
? !oldValue.equals(that.oldValue) : that.oldValue != null);
}
@Override
public int hashCode() {
int result = key.hashCode();
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (oldValue != null ? oldValue.hashCode() : 0);
return result;
}
}

View File

@@ -0,0 +1,4 @@
/**
* Provides classes supporting Hazelcast message headers and payload.
*/
package org.springframework.integration.hazelcast.message;

View File

@@ -22,7 +22,7 @@ import java.util.Queue;
import java.util.Set;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.hazelcast.common.HazelcastIntegrationDefinitionValidator;
import org.springframework.integration.hazelcast.HazelcastIntegrationDefinitionValidator;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
@@ -46,7 +46,7 @@ public class HazelcastCacheWritingMessageHandler extends AbstractMessageHandler
private final DistributedObject distributedObject;
public HazelcastCacheWritingMessageHandler(DistributedObject distributedObject) {
Assert.notNull(distributedObject, "cache must not be null");
Assert.notNull(distributedObject, "'distributedObject' must not be null");
this.distributedObject = distributedObject;
}

View File

@@ -0,0 +1,4 @@
/**
* Provides common used types and classes.
*/
package org.springframework.integration.hazelcast;

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.hazelcast;
import org.junit.Assert;
import org.springframework.integration.hazelcast.message.EntryEventMessagePayload;
import org.springframework.messaging.Message;
import com.hazelcast.core.EntryEventType;
/**
* Base Class for Hazelcast Test Support
*
* @author Eren Avsarogullari
* @since 1.0.0
*/
public class AbstractHazelcastTestSupport {
protected void verifyEntryEvent(Message<?> msg, String cacheName, EntryEventType event) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
if (event == EntryEventType.CLEAR_ALL || event == EntryEventType.EVICT_ALL) {
Assert.assertTrue(msg.getPayload() instanceof Integer);
}
else {
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
}
Assert.assertEquals(cacheName, msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(event.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
}
protected void verifyItemEvent(Message<?> msg, EntryEventType event) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(event.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString());
}
}

View File

@@ -16,22 +16,27 @@
package org.springframework.integration.hazelcast.inbound;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import javax.annotation.Resource;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.integration.hazelcast.message.EntryEventMessagePayload;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hazelcast.core.AbstractIMapEvent;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.IMap;
@@ -44,7 +49,8 @@ import com.hazelcast.core.IMap;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class HazelcastCQDistributedMapInboundChannelAdapterTests {
@SuppressWarnings("unchecked")
public class HazelcastCQDistributedMapInboundChannelAdapterTests extends AbstractHazelcastTestSupport {
@Autowired
private PollableChannel cqMapChannel1;
@@ -82,20 +88,25 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests {
cqDistributedMap1.remove(1);
cqDistributedMap1.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
Message<?> msg = cqMapChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.ADDED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("cqDistributedMap1",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(1, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
assertNotNull(msg);
assertNotNull(msg.getPayload());
assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
assertEquals("cqDistributedMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
assertEquals(Integer.valueOf(1),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
assertEquals(1,
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
assertEquals("TestName1",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
assertEquals("TestSurname1",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
@@ -104,43 +115,48 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests {
cqDistributedMap2.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
cqDistributedMap2.remove(2);
Message<?> msg = cqMapChannel2.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.REMOVED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("cqDistributedMap2",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(2, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getSurname());
assertNotNull(msg);
assertNotNull(msg.getPayload());
assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
assertEquals("cqDistributedMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
assertEquals(Integer.valueOf(2),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
assertEquals(2,
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getId());
assertEquals("TestName2",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getName());
assertEquals("TestSurname2",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getSurname());
}
@Test
public void testContinuousQueryForALLEntryEvent() {
cqDistributedMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"));
Message<?> msg = cqMapChannel3.receive(2_000);
verify(msg, "cqDistributedMap3", EntryEventType.ADDED);
verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.ADDED);
cqDistributedMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated"));
msg = cqMapChannel3.receive(2_000);
verify(msg, "cqDistributedMap3", EntryEventType.UPDATED);
verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.UPDATED);
cqDistributedMap3.remove(1);
msg = cqMapChannel3.receive(2_000);
verify(msg, "cqDistributedMap3", EntryEventType.REMOVED);
verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.REMOVED);
cqDistributedMap3.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
msg = cqMapChannel3.receive(2_000);
verify(msg, "cqDistributedMap3", EntryEventType.ADDED);
verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.ADDED);
cqDistributedMap3.clear();
msg = cqMapChannel3.receive(2_000);
verify(msg, "cqDistributedMap3", EntryEventType.CLEAR_ALL);
verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.CLEAR_ALL);
}
@Test
@@ -148,26 +164,34 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests {
cqDistributedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"));
cqDistributedMap4.put(1, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
Message<?> msg = cqMapChannel4.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.UPDATED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("cqDistributedMap4",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(1, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getSurname());
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
assertNotNull(msg);
assertNotNull(msg.getPayload());
assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
assertEquals("cqDistributedMap4", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
assertEquals(Integer.valueOf(1),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
assertEquals(1,
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getId());
assertEquals("TestName1",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getName());
assertEquals("TestSurname1",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getSurname());
assertEquals(2,
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
assertEquals("TestName2",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
assertEquals("TestSurname2",
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
@@ -175,22 +199,17 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests {
cqDistributedMap5.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"));
cqDistributedMap5.put(1, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
Message<?> msg = cqMapChannel5.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.UPDATED, ((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("cqDistributedMap5", ((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(1, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertNull(((EntryEvent<?, ?>) msg.getPayload()).getOldValue());
Assert.assertNull(((EntryEvent<?, ?>) msg.getPayload()).getValue());
}
assertNotNull(msg);
assertNotNull(msg.getPayload());
assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
assertEquals("cqDistributedMap5", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
private void verify(Message<?> msg, String cacheName, EntryEventType type) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent);
Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName());
Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType());
assertEquals(Integer.valueOf(1),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg.getPayload()).key);
assertNull(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg.getPayload()).oldValue);
assertNull(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg.getPayload()).value);
}
}

View File

@@ -16,13 +16,17 @@
package org.springframework.integration.hazelcast.inbound;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import javax.annotation.Resource;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
@@ -32,7 +36,6 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.IList;
import com.hazelcast.core.ItemEvent;
/**
* Hazelcast Distributed List Event Driven Inbound Channel Adapter Test Class
@@ -43,7 +46,7 @@ import com.hazelcast.core.ItemEvent;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests {
public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport {
@Autowired
private PollableChannel edListChannel1;
@@ -67,17 +70,13 @@ public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests {
public void testEventDrivenForOnlyADDEDEntryEvent() {
edDistributedList1.add(new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"));
Message<?> msg = edListChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(EntryEventType.ADDED.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getSurname());
assertNotNull(msg);
assertNotNull(msg.getPayload());
assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
assertEquals(EntryEventType.ADDED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString());
assertEquals(1, ((HazelcastIntegrationTestUser) msg.getPayload()).getId());
assertEquals("TestName1", ((HazelcastIntegrationTestUser) msg.getPayload()).getName());
assertEquals("TestSurname1", ((HazelcastIntegrationTestUser) msg.getPayload()).getSurname());
}
@Test
@@ -86,17 +85,13 @@ public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests {
edDistributedList2.add(user);
edDistributedList2.remove(user);
Message<?> msg = edListChannel2.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(EntryEventType.REMOVED.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getSurname());
assertNotNull(msg);
assertNotNull(msg.getPayload());
assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
assertEquals(EntryEventType.REMOVED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString());
assertEquals(2, ((HazelcastIntegrationTestUser) msg.getPayload()).getId());
assertEquals("TestName2", ((HazelcastIntegrationTestUser) msg.getPayload()).getName());
assertEquals("TestSurname2", ((HazelcastIntegrationTestUser) msg.getPayload()).getSurname());
}
@Test
@@ -104,24 +99,16 @@ public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests {
HazelcastIntegrationTestUser user = new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1");
edDistributedList3.add(user);
Message<?> msg = edListChannel3.receive(2_000);
verify(msg, EntryEventType.ADDED);
verifyItemEvent(msg, EntryEventType.ADDED);
edDistributedList3.remove(user);
msg = edListChannel3.receive(2_000);
verify(msg, EntryEventType.REMOVED);
verifyItemEvent(msg, EntryEventType.REMOVED);
user = new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2");
edDistributedList3.add(user);
msg = edListChannel3.receive(2_000);
verify(msg, EntryEventType.ADDED);
}
private void verify(Message<?> msg, EntryEventType type) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(type.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
verifyItemEvent(msg, EntryEventType.ADDED);
}
}

View File

@@ -23,15 +23,16 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.integration.hazelcast.message.EntryEventMessagePayload;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hazelcast.core.AbstractIMapEvent;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.IMap;
@@ -44,7 +45,8 @@ import com.hazelcast.core.IMap;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests {
@SuppressWarnings("unchecked")
public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport {
@Autowired
private PollableChannel edMapChannel1;
@@ -76,18 +78,22 @@ public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edMapChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.ADDED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edDistributedMap1",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(1, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edDistributedMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(1),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg.getPayload()).key);
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
@@ -97,24 +103,32 @@ public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edMapChannel2.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.UPDATED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edDistributedMap2",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(2, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edDistributedMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(2),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getSurname());
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
@@ -125,49 +139,46 @@ public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edMapChannel3.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.REMOVED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edDistributedMap3",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(2, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edDistributedMap3", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(2),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getSurname());
}
@Test
public void testEventDrivenForALLEntryEvent() {
edDistributedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"));
Message<?> msg = edMapChannel4.receive(2_000);
verify(msg, "edDistributedMap4", EntryEventType.ADDED);
verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.ADDED);
edDistributedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated"));
msg = edMapChannel4.receive(2_000);
verify(msg, "edDistributedMap4", EntryEventType.UPDATED);
verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.UPDATED);
edDistributedMap4.remove(1);
msg = edMapChannel4.receive(2_000);
verify(msg, "edDistributedMap4", EntryEventType.REMOVED);
verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.REMOVED);
edDistributedMap4.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
msg = edMapChannel4.receive(2_000);
verify(msg, "edDistributedMap4", EntryEventType.ADDED);
verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.ADDED);
edDistributedMap4.clear();
msg = edMapChannel4.receive(2_000);
verify(msg, "edDistributedMap4", EntryEventType.CLEAR_ALL);
}
private void verify(Message<?> msg, String cacheName, EntryEventType type) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent);
Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName());
Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType());
verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.CLEAR_ALL);
}
}

View File

@@ -23,6 +23,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
@@ -32,7 +34,6 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
/**
* Hazelcast Distributed Queue Event Driven Inbound Channel Adapter Test
@@ -43,7 +44,7 @@ import com.hazelcast.core.ItemEvent;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests {
public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport {
@Autowired
private PollableChannel edQueueChannel1;
@@ -69,15 +70,11 @@ public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests {
Message<?> msg = edQueueChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(EntryEventType.ADDED.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getSurname());
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.ADDED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString());
Assert.assertEquals(1, (((HazelcastIntegrationTestUser) msg.getPayload()).getId()));
Assert.assertEquals("TestName1", (((HazelcastIntegrationTestUser) msg.getPayload()).getName()));
Assert.assertEquals("TestSurname1", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname()));
}
@Test
@@ -88,15 +85,11 @@ public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests {
Message<?> msg = edQueueChannel2.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(EntryEventType.REMOVED.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getSurname());
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.REMOVED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString());
Assert.assertEquals(2, (((HazelcastIntegrationTestUser) msg.getPayload()).getId()));
Assert.assertEquals("TestName2", (((HazelcastIntegrationTestUser) msg.getPayload()).getName()));
Assert.assertEquals("TestSurname2", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname()));
}
@Test
@@ -104,24 +97,16 @@ public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests {
HazelcastIntegrationTestUser user = new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1");
edDistributedQueue3.add(user);
Message<?> msg = edQueueChannel3.receive(2_000);
verify(msg, EntryEventType.ADDED);
verifyItemEvent(msg, EntryEventType.ADDED);
edDistributedQueue3.remove(user);
msg = edQueueChannel3.receive(2_000);
verify(msg, EntryEventType.REMOVED);
verifyItemEvent(msg, EntryEventType.REMOVED);
user = new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2");
edDistributedQueue3.add(user);
msg = edQueueChannel3.receive(2_000);
verify(msg, EntryEventType.ADDED);
}
private void verify(Message<?> msg, EntryEventType type) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(type.toString(), ((ItemEvent<?>) msg.getPayload())
.getEventType().toString());
verifyItemEvent(msg, EntryEventType.ADDED);
}
}

View File

@@ -23,6 +23,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
@@ -32,7 +34,6 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.ISet;
import com.hazelcast.core.ItemEvent;
/**
* Hazelcast Distributed Set Event Driven Inbound Channel Adapter Test
@@ -43,7 +44,7 @@ import com.hazelcast.core.ItemEvent;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests {
public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport {
@Autowired
private PollableChannel edSetChannel1;
@@ -69,15 +70,11 @@ public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests {
Message<?> msg = edSetChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(EntryEventType.ADDED.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getSurname());
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.ADDED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString());
Assert.assertEquals(1, (((HazelcastIntegrationTestUser) msg.getPayload()).getId()));
Assert.assertEquals("TestName1", (((HazelcastIntegrationTestUser) msg.getPayload()).getName()));
Assert.assertEquals("TestSurname1", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname()));
}
@Test
@@ -88,15 +85,11 @@ public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests {
Message<?> msg = edSetChannel2.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(EntryEventType.REMOVED.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((ItemEvent<?>) msg.getPayload()).getItem()).getSurname());
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.REMOVED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString());
Assert.assertEquals(2, (((HazelcastIntegrationTestUser) msg.getPayload()).getId()));
Assert.assertEquals("TestName2", (((HazelcastIntegrationTestUser) msg.getPayload()).getName()));
Assert.assertEquals("TestSurname2", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname()));
}
@Test
@@ -104,24 +97,16 @@ public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests {
HazelcastIntegrationTestUser user = new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1");
edDistributedSet3.add(user);
Message<?> msg = edSetChannel3.receive(2_000);
verify(msg, EntryEventType.ADDED);
verifyItemEvent(msg, EntryEventType.ADDED);
edDistributedSet3.remove(user);
msg = edSetChannel3.receive(2_000);
verify(msg, EntryEventType.REMOVED);
verifyItemEvent(msg, EntryEventType.REMOVED);
user = new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2");
edDistributedSet3.add(user);
msg = edSetChannel3.receive(2_000);
verify(msg, EntryEventType.ADDED);
}
private void verify(Message<?> msg, EntryEventType type) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof ItemEvent);
Assert.assertEquals(type.toString(),
((ItemEvent<?>) msg.getPayload()).getEventType().toString());
verifyItemEvent(msg, EntryEventType.ADDED);
}
}

View File

@@ -23,6 +23,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
@@ -55,13 +56,12 @@ public class HazelcastDistributedTopicEventDrivenInboundChannelAdapterTests {
Message<?> msg = edTopicChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof com.hazelcast.core.Message);
Assert.assertEquals(1, ((HazelcastIntegrationTestUser) ((com.hazelcast.core.Message<?>) msg.getPayload())
.getMessageObject()).getId());
Assert.assertEquals("TestName1", ((HazelcastIntegrationTestUser) ((com.hazelcast.core.Message<?>) msg
.getPayload()).getMessageObject()).getName());
Assert.assertEquals("TestSurname1", ((HazelcastIntegrationTestUser) ((com.hazelcast.core.Message<?>) msg
.getPayload()).getMessageObject()).getSurname());
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.PUBLISHING_TIME));
Assert.assertEquals("edDistributedTopic1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(1, ((HazelcastIntegrationTestUser) msg.getPayload()).getId());
Assert.assertEquals("TestName1", ((HazelcastIntegrationTestUser) msg.getPayload()).getName());
Assert.assertEquals("TestSurname1", ((HazelcastIntegrationTestUser) msg.getPayload()).getSurname());
}
}

View File

@@ -23,15 +23,16 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.integration.hazelcast.message.EntryEventMessagePayload;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hazelcast.core.AbstractIMapEvent;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.MultiMap;
@@ -44,7 +45,8 @@ import com.hazelcast.core.MultiMap;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests {
@SuppressWarnings("unchecked")
public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport {
@Autowired
private PollableChannel edMultiMapChannel1;
@@ -70,18 +72,23 @@ public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edMultiMapChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.ADDED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edMultiMap1",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(1, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edMultiMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(1),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
@@ -92,51 +99,48 @@ public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edMultiMapChannel2.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.REMOVED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edMultiMap2",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(2, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edMultiMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(2),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
public void testEventDrivenForALLEntryEvent() {
edMultiMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"));
Message<?> msg = edMultiMapChannel3.receive(2_000);
verify(msg, "edMultiMap3", EntryEventType.ADDED);
verifyEntryEvent(msg, "edMultiMap3", EntryEventType.ADDED);
edMultiMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated"));
msg = edMultiMapChannel3.receive(2_000);
verify(msg, "edMultiMap3", EntryEventType.ADDED);
verifyEntryEvent(msg, "edMultiMap3", EntryEventType.ADDED);
edMultiMap3.remove(1);
msg = edMultiMapChannel3.receive(2_000);
verify(msg, "edMultiMap3", EntryEventType.REMOVED);
verifyEntryEvent(msg, "edMultiMap3", EntryEventType.REMOVED);
msg = edMultiMapChannel3.receive(2_000);
verify(msg, "edMultiMap3", EntryEventType.REMOVED);
verifyEntryEvent(msg, "edMultiMap3", EntryEventType.REMOVED);
edMultiMap3.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
msg = edMultiMapChannel3.receive(2_000);
verify(msg, "edMultiMap3", EntryEventType.ADDED);
verifyEntryEvent(msg, "edMultiMap3", EntryEventType.ADDED);
edMultiMap3.clear();
msg = edMultiMapChannel3.receive(2_000);
verify(msg, "edMultiMap3", EntryEventType.CLEAR_ALL);
}
private void verify(Message<?> msg, String cacheName, EntryEventType type) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent);
Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName());
Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType());
verifyEntryEvent(msg, "edMultiMap3", EntryEventType.CLEAR_ALL);
}
}

View File

@@ -23,15 +23,16 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport;
import org.springframework.integration.hazelcast.HazelcastHeaders;
import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser;
import org.springframework.integration.hazelcast.message.EntryEventMessagePayload;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.hazelcast.core.AbstractIMapEvent;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryEventType;
import com.hazelcast.core.ReplicatedMap;
@@ -44,7 +45,8 @@ import com.hazelcast.core.ReplicatedMap;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@DirtiesContext
public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests {
@SuppressWarnings("unchecked")
public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport {
@Autowired
private PollableChannel edReplicatedMapChannel1;
@@ -76,18 +78,23 @@ public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edReplicatedMapChannel1.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.ADDED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edReplicatedMap1",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(1, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edReplicatedMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(1),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
@@ -97,24 +104,33 @@ public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edReplicatedMapChannel2.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.UPDATED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edReplicatedMap2",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(2, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edReplicatedMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(2),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
Assert.assertEquals(1,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getId());
Assert.assertEquals("TestName1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getName());
Assert.assertEquals("TestSurname1",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getSurname());
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).value).getSurname());
}
@Test
@@ -125,46 +141,43 @@ public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests {
Message<?> msg = edReplicatedMapChannel3.receive(2_000);
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof EntryEvent);
Assert.assertEquals(EntryEventType.REMOVED,
((EntryEvent<?, ?>) msg.getPayload()).getEventType());
Assert.assertEquals("edReplicatedMap3",
((EntryEvent<?, ?>) msg.getPayload()).getName());
Assert.assertEquals(2, ((EntryEvent<?, ?>) msg.getPayload()).getKey());
Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload);
Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER));
Assert.assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE));
Assert.assertEquals("edReplicatedMap3", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME));
Assert.assertEquals(Integer.valueOf(2),
((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).key);
Assert.assertEquals(2,
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getId());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getId());
Assert.assertEquals("TestName2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getName());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getName());
Assert.assertEquals("TestSurname2",
((HazelcastIntegrationTestUser) ((EntryEvent<?, ?>) msg.getPayload()).getOldValue()).getSurname());
(((EntryEventMessagePayload<Integer, HazelcastIntegrationTestUser>) msg
.getPayload()).oldValue).getSurname());
}
@Test
public void testEventDrivenForALLEntryEvent() {
edReplicatedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"));
Message<?> msg = edReplicatedMapChannel4.receive(2_000);
verify(msg, "edReplicatedMap4", EntryEventType.ADDED);
verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.ADDED);
edReplicatedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated"));
msg = edReplicatedMapChannel4.receive(2_000);
verify(msg, "edReplicatedMap4", EntryEventType.UPDATED);
verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.UPDATED);
edReplicatedMap4.remove(1);
msg = edReplicatedMapChannel4.receive(2_000);
verify(msg, "edReplicatedMap4", EntryEventType.REMOVED);
verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.REMOVED);
edReplicatedMap4.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"));
msg = edReplicatedMapChannel4.receive(2_000);
verify(msg, "edReplicatedMap4", EntryEventType.ADDED);
verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.ADDED);
}
private void verify(Message<?> msg, String cacheName, EntryEventType type) {
Assert.assertNotNull(msg);
Assert.assertNotNull(msg.getPayload());
Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent);
Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName());
Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType());
}
}