diff --git a/spring-integration-hazelcast/README.md b/spring-integration-hazelcast/README.md index 326bc97..0f443ab 100644 --- a/spring-integration-hazelcast/README.md +++ b/spring-integration-hazelcast/README.md @@ -37,7 +37,7 @@ Basically, Hazelcast Event-Driven Inbound Channel Adapter requires following att 3. Supported cache event types for IList, ISet and IQueue : ADDED, REMOVED. 4. There is no need to cache event type definition for ITopic. -* **cache-listening-policy :** Specifies cache listening policy as SINGLE or ALL. It is optional attribute and its default value is SINGLE. Each Hazelcast inbound channel adapter which listens same cache object with same cache-events attribute, can receive a single event message or all event messages. If this is ALL, all Hazelcast inbound channel adapters which listens same cache object with same cache-events attribute, will receive same event messages. If this is SINGLE, they will receive unique event messages. +* **cache-listening-policy :** Specifies cache listening policy as SINGLE or ALL. It is optional attribute and its default value is SINGLE. Each Hazelcast CQ inbound channel adapter listening same cache object with same cache-events attribute, can receive a single event message or all event messages. If it is ALL, all Hazelcast CQ inbound channel adapters listening same cache object with same cache-events attribute, will receive same event messages. If it is SINGLE, they will receive unique event messages. Sample namespace and schemaLocation definitions are as follows : ``` @@ -74,9 +74,7 @@ Sample definitions are as follows : + cache-events="ADDED, REMOVED, CLEAR_ALL" /> @@ -139,7 +137,7 @@ Sample definitions are as follows : @@ -163,14 +161,18 @@ Hazelcast Continuous Query enables to listen to the modifications performed on s channel="cqMapChannel" cache="cqMap" cache-events="UPDATED, REMOVED" - predicate="name=TestName AND surname=TestSurname" /> + predicate="name=TestName AND surname=TestSurname" + include-value="true" + cache-listening-policy="SINGLE" /> ``` Basically, it requires four attributes as follows : * **channel :** Specifies channel which message is sent. It is mandatory attribute. * **cache :** Specifies distributed Map reference which is listened. It is mandatory attribute. * **cache-events :** Specifies cache events which are listened. It is optional attribute with ADDED default value. Supported values are ADDED, REMOVED, UPDATED, EVICTED, EVICT_ALL and CLEAR_ALL. -* **predicate :** Specifies predicate to listen to the modifications performed on specific map entries. It is mandatory attribute. +* **predicate :** Specifies predicate to listen to the modifications performed on specific map entries. It is mandatory attribute. +* **include-value :** Specifies including of value and oldValue in continuous query result. It is optional attribute with 'true' default value. +* **cache-listening-policy :** Specifies cache listening policy as SINGLE or ALL. It is optional attribute and its default value is SINGLE. Each Hazelcast CQ inbound channel adapter listening same cache object with same cache-events attribute, can receive a single event message or all event messages. If it is ALL, all Hazelcast CQ inbound channel adapters listening same cache object with same cache-events attribute, will receive same event messages. If it is SINGLE, they will receive unique event messages. Sample definition is as follows : ``` @@ -180,7 +182,9 @@ Sample definition is as follows : channel="cqMapChannel" cache="cqMap" cache-events="UPDATED, REMOVED" - predicate="name=TestName AND surname=TestSurname"/> + predicate="name=TestName AND surname=TestSurname" + include-value="true" + cache-listening-policy="SINGLE"/> @@ -205,7 +209,7 @@ Hazelcast allows to run distributed queries on the distributed map. Hazelcast Di cache="dsMap" iteration-type="ENTRY" distributed-sql="active=false OR age >= 25 OR name = 'TestName'"> - + ``` Basically, it requires a poller and four attributes such as @@ -224,7 +228,7 @@ Sample definition is as follows : cache="dsMap" iteration-type="ENTRY" distributed-sql="active=false OR age >= 25 OR name = 'TestName'"> - + diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/CacheEventType.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/CacheEventType.java similarity index 93% rename from spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/CacheEventType.java rename to spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/CacheEventType.java index a160527..976d32e 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/CacheEventType.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/CacheEventType.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.integration.hazelcast.common; +package org.springframework.integration.hazelcast; /** * Enumeration of Cache Event Types diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/CacheListeningPolicyType.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/CacheListeningPolicyType.java similarity index 93% rename from spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/CacheListeningPolicyType.java rename to spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/CacheListeningPolicyType.java index e066c13..22ac779 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/CacheListeningPolicyType.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/CacheListeningPolicyType.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.integration.hazelcast.common; +package org.springframework.integration.hazelcast; /** * Enumeration of Cache Listening Policy Type diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/DistributedSQLIterationType.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/DistributedSQLIterationType.java similarity index 93% rename from spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/DistributedSQLIterationType.java rename to spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/DistributedSQLIterationType.java index 9501eb1..e2b9393 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/DistributedSQLIterationType.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/DistributedSQLIterationType.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.integration.hazelcast.common; +package org.springframework.integration.hazelcast; /** * Enumeration of Distributed SQL Iteration Type diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastHeaders.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastHeaders.java new file mode 100644 index 0000000..255427d --- /dev/null +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastHeaders.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.hazelcast; + +/** + * Hazelcast Message Headers + * + * @author Eren Avsarogullari + * @since 1.0.0 + */ +public abstract class HazelcastHeaders { + + private static final String PREFIX = "hazelcast_"; + + public static final String EVENT_TYPE = PREFIX + "eventType"; + + public static final String MEMBER = PREFIX + "member"; + + public static final String CACHE_NAME = PREFIX + "cacheName"; + + public static final String PUBLISHING_TIME = PREFIX + "publishingTime"; + +} diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/HazelcastIntegrationDefinitionValidator.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java similarity index 98% rename from spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/HazelcastIntegrationDefinitionValidator.java rename to spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java index 00a5144..e9e68fa 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/HazelcastIntegrationDefinitionValidator.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java @@ -14,12 +14,15 @@ * limitations under the License. */ -package org.springframework.integration.hazelcast.common; +package org.springframework.integration.hazelcast; import java.util.Arrays; import java.util.List; import java.util.Set; +import reactor.util.CollectionUtils; +import reactor.util.StringUtils; + import com.hazelcast.core.DistributedObject; import com.hazelcast.core.IList; import com.hazelcast.core.IMap; @@ -29,9 +32,6 @@ import com.hazelcast.core.ITopic; import com.hazelcast.core.MultiMap; import com.hazelcast.core.ReplicatedMap; -import reactor.util.CollectionUtils; -import reactor.util.StringUtils; - /** * Common Validator for Hazelcast Integration. It validates cache types and events. * diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/HazelcastLocalInstanceRegistrar.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastLocalInstanceRegistrar.java similarity index 98% rename from spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/HazelcastLocalInstanceRegistrar.java rename to spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastLocalInstanceRegistrar.java index 5e34272..3bab913 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/HazelcastLocalInstanceRegistrar.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastLocalInstanceRegistrar.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.integration.hazelcast.common; +package org.springframework.integration.hazelcast; import java.net.SocketAddress; import java.util.concurrent.locks.Lock; diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/package-info.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/package-info.java deleted file mode 100644 index ac4fc30..0000000 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/common/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Provides common used types and classes. - */ -package org.springframework.integration.hazelcast.common; diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/HazelcastIntegrationConfigurationInitializer.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/HazelcastIntegrationConfigurationInitializer.java index 6883728..d93ae9a 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/HazelcastIntegrationConfigurationInitializer.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/HazelcastIntegrationConfigurationInitializer.java @@ -21,7 +21,7 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.integration.config.IntegrationConfigurationInitializer; -import org.springframework.integration.hazelcast.common.HazelcastLocalInstanceRegistrar; +import org.springframework.integration.hazelcast.HazelcastLocalInstanceRegistrar; /** * The Hazelcast Integration infrastructure {@code beanFactory} initializer. diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java index 8a7aaf2..80b8e38 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java @@ -19,15 +19,19 @@ package org.springframework.integration.hazelcast.inbound; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; -import java.util.EventObject; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.springframework.integration.endpoint.MessageProducerSupport; -import org.springframework.integration.hazelcast.common.CacheEventType; -import org.springframework.integration.hazelcast.common.CacheListeningPolicyType; -import org.springframework.integration.hazelcast.common.HazelcastIntegrationDefinitionValidator; -import org.springframework.integration.hazelcast.common.HazelcastLocalInstanceRegistrar; +import org.springframework.integration.hazelcast.CacheEventType; +import org.springframework.integration.hazelcast.CacheListeningPolicyType; +import org.springframework.integration.hazelcast.HazelcastHeaders; +import org.springframework.integration.hazelcast.HazelcastIntegrationDefinitionValidator; +import org.springframework.integration.hazelcast.HazelcastLocalInstanceRegistrar; +import org.springframework.integration.hazelcast.message.EntryEventMessagePayload; +import org.springframework.messaging.Message; import org.springframework.util.Assert; import com.hazelcast.core.AbstractIMapEvent; @@ -45,20 +49,21 @@ import reactor.util.StringUtils; * Hazelcast Base Event-Driven Message Producer. * * @author Eren Avsarogullari + * @author Artem Bilan * @since 1.0.0 */ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSupport { protected final DistributedObject distributedObject; - private CacheListeningPolicyType cacheListeningPolicy = CacheListeningPolicyType.SINGLE; + private volatile CacheListeningPolicyType cacheListeningPolicy = CacheListeningPolicyType.SINGLE; - private String hazelcastRegisteredEventListenerId; + private volatile String hazelcastRegisteredEventListenerId; private Set cacheEvents = Collections.singleton(CacheEventType.ADDED.name()); - protected AbstractHazelcastMessageProducer(DistributedObject distributedObject) { - Assert.notNull(distributedObject, "cache must not be null"); + public AbstractHazelcastMessageProducer(DistributedObject distributedObject) { + Assert.notNull(distributedObject, "'distributedObject' must not be null"); this.distributedObject = distributedObject; } @@ -68,8 +73,8 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu public void setCacheEventTypes(String cacheEventTypes) { HazelcastIntegrationDefinitionValidator.validateEnumType(CacheEventType.class, cacheEventTypes); - final Set cacheEvents = StringUtils.commaDelimitedListToSet(cacheEventTypes); - Assert.notEmpty(cacheEvents, "cacheEvents must have elements"); + Set cacheEvents = StringUtils.commaDelimitedListToSet(cacheEventTypes); + Assert.notEmpty(cacheEvents, "'cacheEvents' must have elements"); HazelcastIntegrationDefinitionValidator.validateCacheEventsByDistributedObject( this.distributedObject, cacheEvents); this.cacheEvents = cacheEvents; @@ -80,7 +85,7 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu } public void setCacheListeningPolicy(CacheListeningPolicyType cacheListeningPolicy) { - Assert.notNull(cacheListeningPolicy, "cacheListeningPolicy must not be null"); + Assert.notNull(cacheListeningPolicy, "'cacheListeningPolicy' must not be null"); this.cacheListeningPolicy = cacheListeningPolicy; } @@ -92,28 +97,27 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu this.hazelcastRegisteredEventListenerId = hazelcastRegisteredEventListenerId; } - protected abstract class AbstractHazelcastEventListener { + protected abstract class AbstractHazelcastEventListener { - protected abstract void processEvent(EventObject event); + protected abstract void processEvent(E event); - protected void sendMessage(final EventObject event, final InetSocketAddress socketAddress, - final CacheListeningPolicyType cacheListeningPolicyType) { + protected abstract Message toMessage(E event); + + protected void sendMessage(E event, InetSocketAddress socketAddress, + CacheListeningPolicyType cacheListeningPolicyType) { if (CacheListeningPolicyType.ALL == cacheListeningPolicyType || isEventAcceptable(socketAddress)) { - AbstractHazelcastMessageProducer.this.sendMessage(getMessageBuilderFactory().withPayload(event).build()); + AbstractHazelcastMessageProducer.this.sendMessage(toMessage(event)); } } private boolean isEventAcceptable(final InetSocketAddress socketAddress) { final Set hazelcastInstanceSet = Hazelcast.getAllHazelcastInstances(); final Set localSocketAddressesSet = getLocalSocketAddresses(hazelcastInstanceSet); - if ((!localSocketAddressesSet.isEmpty()) + return (!localSocketAddressesSet.isEmpty()) && (localSocketAddressesSet.contains(socketAddress) || isEventComingFromNonRegisteredHazelcastInstance(hazelcastInstanceSet.iterator().next(), - localSocketAddressesSet, socketAddress))) { - return true; - } + localSocketAddressesSet, socketAddress)); - return false; } private Set getLocalSocketAddresses(final Set hazelcastInstanceSet) { @@ -140,7 +144,7 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu } protected final class HazelcastEntryListener extends - AbstractHazelcastEventListener implements EntryListener { + AbstractHazelcastEventListener implements EntryListener { @Override public void entryAdded(EntryEvent event) { @@ -173,12 +177,9 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu } @Override - protected void processEvent(EventObject event) { - Assert.notNull(event, "event must not be null"); - - if (getCacheEvents().contains(((AbstractIMapEvent) event).getEventType().toString())) { - sendMessage(event, ((AbstractIMapEvent) event).getMember().getSocketAddress(), - getCacheListeningPolicy()); + protected void processEvent(AbstractIMapEvent event) { + if (getCacheEvents().contains(event.getEventType().toString())) { + sendMessage(event, event.getMember().getSocketAddress(), getCacheListeningPolicy()); } if (logger.isDebugEnabled()) { @@ -186,6 +187,29 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu } } + @Override + @SuppressWarnings("unchecked") + protected Message toMessage(AbstractIMapEvent event) { + final Map headers = new HashMap(); + headers.put(HazelcastHeaders.EVENT_TYPE, event.getEventType().name()); + headers.put(HazelcastHeaders.MEMBER, event.getMember().getSocketAddress()); + headers.put(HazelcastHeaders.CACHE_NAME, event.getName()); + + if (event instanceof EntryEvent) { + EntryEvent entryEvent = (EntryEvent) event; + EntryEventMessagePayload messagePayload = new EntryEventMessagePayload<>(entryEvent.getKey(), + entryEvent.getValue(), entryEvent.getOldValue()); + return getMessageBuilderFactory().withPayload(messagePayload).copyHeaders(headers).build(); + } + else if (event instanceof MapEvent) { + return getMessageBuilderFactory() + .withPayload(((MapEvent) event).getNumberOfEntriesAffected()).copyHeaders(headers).build(); + } + else { + throw new IllegalStateException("Invalid event is received. Event : " + event); + } + } + } } diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastContinuousQueryMessageProducer.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastContinuousQueryMessageProducer.java index 73dba69..eb1a5f1 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastContinuousQueryMessageProducer.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastContinuousQueryMessageProducer.java @@ -39,7 +39,7 @@ public class HazelcastContinuousQueryMessageProducer extends AbstractHazelcastMe @SuppressWarnings("rawtypes") public HazelcastContinuousQueryMessageProducer(IMap distributedMap, String predicate) { super(distributedMap); - Assert.hasText(predicate, "predicate must not be null"); + Assert.hasText(predicate, "'predicate' must not be null"); this.predicate = predicate; } diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSQLMessageSource.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSQLMessageSource.java index 75ec4a7..d4a28aa 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSQLMessageSource.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSQLMessageSource.java @@ -20,7 +20,7 @@ import java.util.Collection; import java.util.Collections; import org.springframework.integration.endpoint.AbstractMessageSource; -import org.springframework.integration.hazelcast.common.DistributedSQLIterationType; +import org.springframework.integration.hazelcast.DistributedSQLIterationType; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -39,19 +39,19 @@ public class HazelcastDistributedSQLMessageSource extends AbstractMessageSource private final IMap distributedMap; - private final String distributedSQL; + private final String distributedSql; private DistributedSQLIterationType iterationType = DistributedSQLIterationType.VALUE; - public HazelcastDistributedSQLMessageSource(IMap distributedMap, String distributedSQL) { - Assert.notNull(distributedMap, "cache must not be null"); - Assert.hasText(distributedSQL, "distributed-sql must not be null"); + public HazelcastDistributedSQLMessageSource(IMap distributedMap, String distributedSql) { + Assert.notNull(distributedMap, "'distributedMap' must not be null"); + Assert.hasText(distributedSql, "'distributedSql' must not be empty"); this.distributedMap = distributedMap; - this.distributedSQL = distributedSQL; + this.distributedSql = distributedSql; } public void setIterationType(DistributedSQLIterationType iterationType) { - Assert.notNull(this.iterationType, "iterationType must not be null"); + Assert.notNull(this.iterationType, "'iterationType' must not be null"); this.iterationType = iterationType; } @@ -65,18 +65,18 @@ public class HazelcastDistributedSQLMessageSource extends AbstractMessageSource switch (this.iterationType) { case ENTRY: return getDistributedSQLResultSet(Collections - .unmodifiableCollection(this.distributedMap.entrySet(new SqlPredicate(this.distributedSQL)))); + .unmodifiableCollection(this.distributedMap.entrySet(new SqlPredicate(this.distributedSql)))); case KEY: return getDistributedSQLResultSet(Collections - .unmodifiableCollection(this.distributedMap.keySet(new SqlPredicate(this.distributedSQL)))); + .unmodifiableCollection(this.distributedMap.keySet(new SqlPredicate(this.distributedSql)))); case LOCAL_KEY: return getDistributedSQLResultSet(Collections - .unmodifiableCollection(this.distributedMap.localKeySet(new SqlPredicate(this.distributedSQL)))); + .unmodifiableCollection(this.distributedMap.localKeySet(new SqlPredicate(this.distributedSql)))); default: - return getDistributedSQLResultSet(this.distributedMap.values(new SqlPredicate(this.distributedSQL))); + return getDistributedSQLResultSet(this.distributedMap.values(new SqlPredicate(this.distributedSql))); } } diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastEventDrivenMessageProducer.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastEventDrivenMessageProducer.java index c8470ea..d3f6a13 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastEventDrivenMessageProducer.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastEventDrivenMessageProducer.java @@ -16,9 +16,11 @@ package org.springframework.integration.hazelcast.inbound; -import java.util.EventObject; +import java.util.HashMap; +import java.util.Map; -import org.springframework.integration.hazelcast.common.HazelcastIntegrationDefinitionValidator; +import org.springframework.integration.hazelcast.HazelcastHeaders; +import org.springframework.integration.hazelcast.HazelcastIntegrationDefinitionValidator; import org.springframework.util.Assert; import com.hazelcast.core.DistributedObject; @@ -42,9 +44,9 @@ import com.hazelcast.core.ReplicatedMap; * to listen related cache events and sends events to related channel. * * @author Eren Avsarogullari + * @author Artem Bilan * @since 1.0.0 */ -@SuppressWarnings({ "unchecked", "rawtypes" }) public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessageProducer { public HazelcastEventDrivenMessageProducer(DistributedObject distributedObject) { @@ -58,32 +60,33 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag } @Override + @SuppressWarnings({"rawtypes", "unchecked"}) protected void doStart() { - if(this.distributedObject instanceof IMap) { + if (this.distributedObject instanceof IMap) { setHazelcastRegisteredEventListenerId(((IMap) this.distributedObject) .addEntryListener(new HazelcastEntryListener(), true)); } - else if(this.distributedObject instanceof MultiMap) { + else if (this.distributedObject instanceof MultiMap) { setHazelcastRegisteredEventListenerId(((MultiMap) this.distributedObject) .addEntryListener(new HazelcastEntryListener(), true)); } - else if(this.distributedObject instanceof ReplicatedMap) { + else if (this.distributedObject instanceof ReplicatedMap) { setHazelcastRegisteredEventListenerId(((ReplicatedMap) this.distributedObject) .addEntryListener(new HazelcastEntryListener())); } - else if(this.distributedObject instanceof IList) { + else if (this.distributedObject instanceof IList) { setHazelcastRegisteredEventListenerId(((IList) this.distributedObject) .addItemListener(new HazelcastItemListener(), true)); } - else if(this.distributedObject instanceof ISet) { + else if (this.distributedObject instanceof ISet) { setHazelcastRegisteredEventListenerId(((ISet) this.distributedObject) .addItemListener(new HazelcastItemListener(), true)); } - else if(this.distributedObject instanceof IQueue) { + else if (this.distributedObject instanceof IQueue) { setHazelcastRegisteredEventListenerId(((IQueue) this.distributedObject) .addItemListener(new HazelcastItemListener(), true)); } - else if(this.distributedObject instanceof ITopic) { + else if (this.distributedObject instanceof ITopic) { setHazelcastRegisteredEventListenerId(((ITopic) this.distributedObject) .addMessageListener(new HazelcastMessageListener())); } @@ -91,25 +94,25 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag @Override protected void doStop() { - if(this.distributedObject instanceof IMap) { + if (this.distributedObject instanceof IMap) { ((IMap) this.distributedObject).removeEntryListener(getHazelcastRegisteredEventListenerId()); } - else if(this.distributedObject instanceof MultiMap) { + else if (this.distributedObject instanceof MultiMap) { ((MultiMap) this.distributedObject).removeEntryListener(getHazelcastRegisteredEventListenerId()); } - else if(this.distributedObject instanceof ReplicatedMap) { + else if (this.distributedObject instanceof ReplicatedMap) { ((ReplicatedMap) this.distributedObject).removeEntryListener(getHazelcastRegisteredEventListenerId()); } - else if(this.distributedObject instanceof IList) { + else if (this.distributedObject instanceof IList) { ((IList) this.distributedObject).removeItemListener(getHazelcastRegisteredEventListenerId()); } - else if(this.distributedObject instanceof ISet) { + else if (this.distributedObject instanceof ISet) { ((ISet) this.distributedObject).removeItemListener(getHazelcastRegisteredEventListenerId()); } - else if(this.distributedObject instanceof IQueue) { + else if (this.distributedObject instanceof IQueue) { ((IQueue) this.distributedObject).removeItemListener(getHazelcastRegisteredEventListenerId()); } - else if(this.distributedObject instanceof ITopic) { + else if (this.distributedObject instanceof ITopic) { ((ITopic) this.distributedObject).removeMessageListener(getHazelcastRegisteredEventListenerId()); } } @@ -119,7 +122,8 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag return "hazelcast:inbound-channel-adapter"; } - private class HazelcastItemListener extends AbstractHazelcastEventListener implements ItemListener { + private class HazelcastItemListener extends AbstractHazelcastEventListener> + implements ItemListener { @Override public void itemAdded(ItemEvent item) { @@ -132,21 +136,29 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag } @Override - protected void processEvent(EventObject event) { - Assert.notNull(event, "event must not be null"); - - if (getCacheEvents().contains(((ItemEvent) event).getEventType().toString())) { - sendMessage(event, ((ItemEvent) event).getMember().getSocketAddress(), getCacheListeningPolicy()); + protected void processEvent(ItemEvent event) { + if (getCacheEvents().contains(event.getEventType().toString())) { + sendMessage(event, event.getMember().getSocketAddress(), getCacheListeningPolicy()); } - if (logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.debug("Received ItemEvent : " + event); } } + @Override + protected org.springframework.messaging.Message toMessage(ItemEvent event) { + final Map headers = new HashMap<>(); + headers.put(HazelcastHeaders.EVENT_TYPE, event.getEventType().name()); + headers.put(HazelcastHeaders.MEMBER, event.getMember().getSocketAddress()); + + return getMessageBuilderFactory().withPayload(event.getItem()).copyHeaders(headers).build(); + } + } - private class HazelcastMessageListener extends AbstractHazelcastEventListener implements MessageListener { + private class HazelcastMessageListener extends AbstractHazelcastEventListener> + implements MessageListener { @Override public void onMessage(Message message) { @@ -154,15 +166,26 @@ public class HazelcastEventDrivenMessageProducer extends AbstractHazelcastMessag } @Override - protected void processEvent(EventObject event) { - Assert.notNull(event, "event must not be null"); - sendMessage(event, ((Message) event).getPublishingMember().getSocketAddress(), null); + protected void processEvent(Message event) { + sendMessage(event, event.getPublishingMember().getSocketAddress(), null); - if (logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.debug("Received Message : " + event); } } + @Override + protected org.springframework.messaging.Message toMessage(Message event) { + Assert.notNull(event.getMessageObject(), "message must not be null"); + + final Map headers = new HashMap<>(); + headers.put(HazelcastHeaders.MEMBER, event.getPublishingMember().getSocketAddress()); + headers.put(HazelcastHeaders.CACHE_NAME, event.getSource()); + headers.put(HazelcastHeaders.PUBLISHING_TIME, event.getPublishTime()); + + return getMessageBuilderFactory().withPayload(event.getMessageObject()).copyHeaders(headers).build(); + } + } } diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/listener/HazelcastMembershipListener.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/listener/HazelcastMembershipListener.java index bd1abf3..34c90d6 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/listener/HazelcastMembershipListener.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/listener/HazelcastMembershipListener.java @@ -20,7 +20,7 @@ import java.net.SocketAddress; import java.util.Set; import java.util.concurrent.locks.Lock; -import org.springframework.integration.hazelcast.common.HazelcastLocalInstanceRegistrar; +import org.springframework.integration.hazelcast.HazelcastLocalInstanceRegistrar; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/message/EntryEventMessagePayload.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/message/EntryEventMessagePayload.java new file mode 100644 index 0000000..fa02d38 --- /dev/null +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/message/EntryEventMessagePayload.java @@ -0,0 +1,67 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.hazelcast.message; + +import org.springframework.util.Assert; + +/** + * Hazelcast Message Payload for Entry Events + * + * @author Eren Avsarogullari + * @since 1.0.0 + */ +public class EntryEventMessagePayload { + + public final K key; + + public final V value; + + public final V oldValue; + + public EntryEventMessagePayload(final K key, final V value, final V oldValue) { + Assert.notNull(key, "'key' must not be null"); + this.key = key; + this.value = value; + this.oldValue = oldValue; + } + + @Override + public String toString() { + return "EntryEventMessagePayload [key=" + key + ", value=" + value + ", oldValue=" + oldValue + "]"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + EntryEventMessagePayload that = (EntryEventMessagePayload) o; + + return key.equals(that.key) && !(value != null ? !value.equals(that.value) + : that.value != null) && !(oldValue != null + ? !oldValue.equals(that.oldValue) : that.oldValue != null); + + } + + @Override + public int hashCode() { + int result = key.hashCode(); + result = 31 * result + (value != null ? value.hashCode() : 0); + result = 31 * result + (oldValue != null ? oldValue.hashCode() : 0); + return result; + } + +} diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/message/package-info.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/message/package-info.java new file mode 100644 index 0000000..0a659bf --- /dev/null +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/message/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes supporting Hazelcast message headers and payload. + */ +package org.springframework.integration.hazelcast.message; diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java index 9cc5e2e..f58b709 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java @@ -22,7 +22,7 @@ import java.util.Queue; import java.util.Set; import org.springframework.integration.handler.AbstractMessageHandler; -import org.springframework.integration.hazelcast.common.HazelcastIntegrationDefinitionValidator; +import org.springframework.integration.hazelcast.HazelcastIntegrationDefinitionValidator; import org.springframework.messaging.Message; import org.springframework.util.Assert; @@ -46,7 +46,7 @@ public class HazelcastCacheWritingMessageHandler extends AbstractMessageHandler private final DistributedObject distributedObject; public HazelcastCacheWritingMessageHandler(DistributedObject distributedObject) { - Assert.notNull(distributedObject, "cache must not be null"); + Assert.notNull(distributedObject, "'distributedObject' must not be null"); this.distributedObject = distributedObject; } diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/package-info.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/package-info.java new file mode 100644 index 0000000..cdf25e3 --- /dev/null +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides common used types and classes. + */ +package org.springframework.integration.hazelcast; diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/AbstractHazelcastTestSupport.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/AbstractHazelcastTestSupport.java new file mode 100644 index 0000000..bac6cc7 --- /dev/null +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/AbstractHazelcastTestSupport.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.hazelcast; + +import org.junit.Assert; + +import org.springframework.integration.hazelcast.message.EntryEventMessagePayload; +import org.springframework.messaging.Message; + +import com.hazelcast.core.EntryEventType; + +/** + * Base Class for Hazelcast Test Support + * + * @author Eren Avsarogullari + * @since 1.0.0 + */ +public class AbstractHazelcastTestSupport { + + protected void verifyEntryEvent(Message msg, String cacheName, EntryEventType event) { + Assert.assertNotNull(msg); + Assert.assertNotNull(msg.getPayload()); + if (event == EntryEventType.CLEAR_ALL || event == EntryEventType.EVICT_ALL) { + Assert.assertTrue(msg.getPayload() instanceof Integer); + } + else { + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + } + + Assert.assertEquals(cacheName, msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + Assert.assertEquals(event.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + } + + protected void verifyItemEvent(Message msg, EntryEventType event) { + Assert.assertNotNull(msg); + Assert.assertNotNull(msg.getPayload()); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(event.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString()); + } + +} diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastCQDistributedMapInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastCQDistributedMapInboundChannelAdapterTests.java index 4127db5..cd0928b 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastCQDistributedMapInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastCQDistributedMapInboundChannelAdapterTests.java @@ -16,22 +16,27 @@ package org.springframework.integration.hazelcast.inbound; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import javax.annotation.Resource; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; +import org.springframework.integration.hazelcast.message.EntryEventMessagePayload; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.hazelcast.core.AbstractIMapEvent; -import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryEventType; import com.hazelcast.core.IMap; @@ -44,7 +49,8 @@ import com.hazelcast.core.IMap; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @DirtiesContext -public class HazelcastCQDistributedMapInboundChannelAdapterTests { +@SuppressWarnings("unchecked") +public class HazelcastCQDistributedMapInboundChannelAdapterTests extends AbstractHazelcastTestSupport { @Autowired private PollableChannel cqMapChannel1; @@ -82,20 +88,25 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests { cqDistributedMap1.remove(1); cqDistributedMap1.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); Message msg = cqMapChannel1.receive(2_000); - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.ADDED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("cqDistributedMap1", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(1, ((EntryEvent) msg.getPayload()).getKey()); - Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); - Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); - Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + assertEquals("cqDistributedMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + assertEquals(Integer.valueOf(1), + ((EntryEventMessagePayload) msg + .getPayload()).key); + assertEquals(1, + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); + assertEquals("TestName1", + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); + assertEquals("TestSurname1", + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test @@ -104,43 +115,48 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests { cqDistributedMap2.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); cqDistributedMap2.remove(2); Message msg = cqMapChannel2.receive(2_000); - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.REMOVED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("cqDistributedMap2", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(2, ((EntryEvent) msg.getPayload()).getKey()); - Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getId()); - Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getName()); - Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getSurname()); + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + assertEquals("cqDistributedMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + assertEquals(Integer.valueOf(2), + ((EntryEventMessagePayload) msg + .getPayload()).key); + assertEquals(2, + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getId()); + assertEquals("TestName2", + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getName()); + assertEquals("TestSurname2", + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getSurname()); } @Test public void testContinuousQueryForALLEntryEvent() { cqDistributedMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1")); Message msg = cqMapChannel3.receive(2_000); - verify(msg, "cqDistributedMap3", EntryEventType.ADDED); + verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.ADDED); cqDistributedMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated")); msg = cqMapChannel3.receive(2_000); - verify(msg, "cqDistributedMap3", EntryEventType.UPDATED); + verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.UPDATED); cqDistributedMap3.remove(1); msg = cqMapChannel3.receive(2_000); - verify(msg, "cqDistributedMap3", EntryEventType.REMOVED); + verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.REMOVED); cqDistributedMap3.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); msg = cqMapChannel3.receive(2_000); - verify(msg, "cqDistributedMap3", EntryEventType.ADDED); + verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.ADDED); cqDistributedMap3.clear(); msg = cqMapChannel3.receive(2_000); - verify(msg, "cqDistributedMap3", EntryEventType.CLEAR_ALL); + verifyEntryEvent(msg, "cqDistributedMap3", EntryEventType.CLEAR_ALL); } @Test @@ -148,26 +164,34 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests { cqDistributedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1")); cqDistributedMap4.put(1, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); Message msg = cqMapChannel4.receive(2_000); - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.UPDATED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("cqDistributedMap4", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(1, ((EntryEvent) msg.getPayload()).getKey()); - Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getId()); - Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getName()); - Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getSurname()); - Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); - Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); - Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + assertEquals("cqDistributedMap4", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + assertEquals(Integer.valueOf(1), + ((EntryEventMessagePayload) msg + .getPayload()).key); + assertEquals(1, + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getId()); + assertEquals("TestName1", + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getName()); + assertEquals("TestSurname1", + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getSurname()); + assertEquals(2, + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); + assertEquals("TestName2", + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); + assertEquals("TestSurname2", + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test @@ -175,22 +199,17 @@ public class HazelcastCQDistributedMapInboundChannelAdapterTests { cqDistributedMap5.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1")); cqDistributedMap5.put(1, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); Message msg = cqMapChannel5.receive(2_000); - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.UPDATED, ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("cqDistributedMap5", ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(1, ((EntryEvent) msg.getPayload()).getKey()); - Assert.assertNull(((EntryEvent) msg.getPayload()).getOldValue()); - Assert.assertNull(((EntryEvent) msg.getPayload()).getValue()); - } + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + assertEquals("cqDistributedMap5", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); - private void verify(Message msg, String cacheName, EntryEventType type) { - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent); - Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName()); - Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType()); + assertEquals(Integer.valueOf(1), + ((EntryEventMessagePayload) msg.getPayload()).key); + assertNull(((EntryEventMessagePayload) msg.getPayload()).oldValue); + assertNull(((EntryEventMessagePayload) msg.getPayload()).value); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedListEventDrivenInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedListEventDrivenInboundChannelAdapterTests.java index ab98a00..bb70a48 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedListEventDrivenInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedListEventDrivenInboundChannelAdapterTests.java @@ -16,13 +16,17 @@ package org.springframework.integration.hazelcast.inbound; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import javax.annotation.Resource; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; @@ -32,7 +36,6 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.hazelcast.core.EntryEventType; import com.hazelcast.core.IList; -import com.hazelcast.core.ItemEvent; /** * Hazelcast Distributed List Event Driven Inbound Channel Adapter Test Class @@ -43,7 +46,7 @@ import com.hazelcast.core.ItemEvent; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @DirtiesContext -public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests { +public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport { @Autowired private PollableChannel edListChannel1; @@ -67,17 +70,13 @@ public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests { public void testEventDrivenForOnlyADDEDEntryEvent() { edDistributedList1.add(new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1")); Message msg = edListChannel1.receive(2_000); - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(EntryEventType.ADDED.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); - Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getId()); - Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getName()); - Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getSurname()); + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + assertEquals(EntryEventType.ADDED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString()); + assertEquals(1, ((HazelcastIntegrationTestUser) msg.getPayload()).getId()); + assertEquals("TestName1", ((HazelcastIntegrationTestUser) msg.getPayload()).getName()); + assertEquals("TestSurname1", ((HazelcastIntegrationTestUser) msg.getPayload()).getSurname()); } @Test @@ -86,17 +85,13 @@ public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests { edDistributedList2.add(user); edDistributedList2.remove(user); Message msg = edListChannel2.receive(2_000); - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(EntryEventType.REMOVED.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); - Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getId()); - Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getName()); - Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getSurname()); + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + assertEquals(EntryEventType.REMOVED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString()); + assertEquals(2, ((HazelcastIntegrationTestUser) msg.getPayload()).getId()); + assertEquals("TestName2", ((HazelcastIntegrationTestUser) msg.getPayload()).getName()); + assertEquals("TestSurname2", ((HazelcastIntegrationTestUser) msg.getPayload()).getSurname()); } @Test @@ -104,24 +99,16 @@ public class HazelcastDistributedListEventDrivenInboundChannelAdapterTests { HazelcastIntegrationTestUser user = new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"); edDistributedList3.add(user); Message msg = edListChannel3.receive(2_000); - verify(msg, EntryEventType.ADDED); + verifyItemEvent(msg, EntryEventType.ADDED); edDistributedList3.remove(user); msg = edListChannel3.receive(2_000); - verify(msg, EntryEventType.REMOVED); + verifyItemEvent(msg, EntryEventType.REMOVED); user = new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"); edDistributedList3.add(user); msg = edListChannel3.receive(2_000); - verify(msg, EntryEventType.ADDED); - } - - private void verify(Message msg, EntryEventType type) { - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(type.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); + verifyItemEvent(msg, EntryEventType.ADDED); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedMapEventDrivenInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedMapEventDrivenInboundChannelAdapterTests.java index 21dd5d5..e7077b4 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedMapEventDrivenInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedMapEventDrivenInboundChannelAdapterTests.java @@ -23,15 +23,16 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; +import org.springframework.integration.hazelcast.message.EntryEventMessagePayload; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.hazelcast.core.AbstractIMapEvent; -import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryEventType; import com.hazelcast.core.IMap; @@ -44,7 +45,8 @@ import com.hazelcast.core.IMap; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @DirtiesContext -public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests { +@SuppressWarnings("unchecked") +public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport { @Autowired private PollableChannel edMapChannel1; @@ -76,18 +78,22 @@ public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests { Message msg = edMapChannel1.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.ADDED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edDistributedMap1", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(1, ((EntryEvent) msg.getPayload()).getKey()); + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edDistributedMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(1), + ((EntryEventMessagePayload) msg.getPayload()).key); Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test @@ -97,24 +103,32 @@ public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests { Message msg = edMapChannel2.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.UPDATED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edDistributedMap2", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(2, ((EntryEvent) msg.getPayload()).getKey()); + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edDistributedMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(2), + ((EntryEventMessagePayload) msg + .getPayload()).key); Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getId()); Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getName()); Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getSurname()); Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test @@ -125,49 +139,46 @@ public class HazelcastDistributedMapEventDrivenInboundChannelAdapterTests { Message msg = edMapChannel3.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.REMOVED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edDistributedMap3", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(2, ((EntryEvent) msg.getPayload()).getKey()); + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edDistributedMap3", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(2), + ((EntryEventMessagePayload) msg + .getPayload()).key); Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getId()); Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getName()); Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getSurname()); } @Test public void testEventDrivenForALLEntryEvent() { edDistributedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1")); Message msg = edMapChannel4.receive(2_000); - verify(msg, "edDistributedMap4", EntryEventType.ADDED); + verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.ADDED); edDistributedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated")); msg = edMapChannel4.receive(2_000); - verify(msg, "edDistributedMap4", EntryEventType.UPDATED); + verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.UPDATED); edDistributedMap4.remove(1); msg = edMapChannel4.receive(2_000); - verify(msg, "edDistributedMap4", EntryEventType.REMOVED); + verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.REMOVED); edDistributedMap4.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); msg = edMapChannel4.receive(2_000); - verify(msg, "edDistributedMap4", EntryEventType.ADDED); + verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.ADDED); edDistributedMap4.clear(); msg = edMapChannel4.receive(2_000); - verify(msg, "edDistributedMap4", EntryEventType.CLEAR_ALL); - } - - private void verify(Message msg, String cacheName, EntryEventType type) { - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent); - Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName()); - Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType()); + verifyEntryEvent(msg, "edDistributedMap4", EntryEventType.CLEAR_ALL); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests.java index f883615..351f550 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests.java @@ -23,6 +23,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; @@ -32,7 +34,6 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.hazelcast.core.EntryEventType; import com.hazelcast.core.IQueue; -import com.hazelcast.core.ItemEvent; /** * Hazelcast Distributed Queue Event Driven Inbound Channel Adapter Test @@ -43,7 +44,7 @@ import com.hazelcast.core.ItemEvent; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @DirtiesContext -public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests { +public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport { @Autowired private PollableChannel edQueueChannel1; @@ -69,15 +70,11 @@ public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests { Message msg = edQueueChannel1.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(EntryEventType.ADDED.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); - Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getId()); - Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getName()); - Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getSurname()); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.ADDED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString()); + Assert.assertEquals(1, (((HazelcastIntegrationTestUser) msg.getPayload()).getId())); + Assert.assertEquals("TestName1", (((HazelcastIntegrationTestUser) msg.getPayload()).getName())); + Assert.assertEquals("TestSurname1", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname())); } @Test @@ -88,15 +85,11 @@ public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests { Message msg = edQueueChannel2.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(EntryEventType.REMOVED.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); - Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getId()); - Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getName()); - Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getSurname()); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.REMOVED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString()); + Assert.assertEquals(2, (((HazelcastIntegrationTestUser) msg.getPayload()).getId())); + Assert.assertEquals("TestName2", (((HazelcastIntegrationTestUser) msg.getPayload()).getName())); + Assert.assertEquals("TestSurname2", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname())); } @Test @@ -104,24 +97,16 @@ public class HazelcastDistributedQueueEventDrivenInboundChannelAdapterTests { HazelcastIntegrationTestUser user = new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"); edDistributedQueue3.add(user); Message msg = edQueueChannel3.receive(2_000); - verify(msg, EntryEventType.ADDED); + verifyItemEvent(msg, EntryEventType.ADDED); edDistributedQueue3.remove(user); msg = edQueueChannel3.receive(2_000); - verify(msg, EntryEventType.REMOVED); + verifyItemEvent(msg, EntryEventType.REMOVED); user = new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"); edDistributedQueue3.add(user); msg = edQueueChannel3.receive(2_000); - verify(msg, EntryEventType.ADDED); - } - - private void verify(Message msg, EntryEventType type) { - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(type.toString(), ((ItemEvent) msg.getPayload()) - .getEventType().toString()); + verifyItemEvent(msg, EntryEventType.ADDED); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSetEventDrivenInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSetEventDrivenInboundChannelAdapterTests.java index c29dec9..a0b18c2 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSetEventDrivenInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedSetEventDrivenInboundChannelAdapterTests.java @@ -23,6 +23,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; @@ -32,7 +34,6 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.hazelcast.core.EntryEventType; import com.hazelcast.core.ISet; -import com.hazelcast.core.ItemEvent; /** * Hazelcast Distributed Set Event Driven Inbound Channel Adapter Test @@ -43,7 +44,7 @@ import com.hazelcast.core.ItemEvent; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @DirtiesContext -public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests { +public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport { @Autowired private PollableChannel edSetChannel1; @@ -69,15 +70,11 @@ public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests { Message msg = edSetChannel1.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(EntryEventType.ADDED.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); - Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getId()); - Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getName()); - Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getSurname()); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.ADDED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString()); + Assert.assertEquals(1, (((HazelcastIntegrationTestUser) msg.getPayload()).getId())); + Assert.assertEquals("TestName1", (((HazelcastIntegrationTestUser) msg.getPayload()).getName())); + Assert.assertEquals("TestSurname1", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname())); } @Test @@ -88,15 +85,11 @@ public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests { Message msg = edSetChannel2.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(EntryEventType.REMOVED.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); - Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getId()); - Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getName()); - Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((ItemEvent) msg.getPayload()).getItem()).getSurname()); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.REMOVED.toString(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE).toString()); + Assert.assertEquals(2, (((HazelcastIntegrationTestUser) msg.getPayload()).getId())); + Assert.assertEquals("TestName2", (((HazelcastIntegrationTestUser) msg.getPayload()).getName())); + Assert.assertEquals("TestSurname2", (((HazelcastIntegrationTestUser) msg.getPayload()).getSurname())); } @Test @@ -104,24 +97,16 @@ public class HazelcastDistributedSetEventDrivenInboundChannelAdapterTests { HazelcastIntegrationTestUser user = new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1"); edDistributedSet3.add(user); Message msg = edSetChannel3.receive(2_000); - verify(msg, EntryEventType.ADDED); + verifyItemEvent(msg, EntryEventType.ADDED); edDistributedSet3.remove(user); msg = edSetChannel3.receive(2_000); - verify(msg, EntryEventType.REMOVED); + verifyItemEvent(msg, EntryEventType.REMOVED); user = new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2"); edDistributedSet3.add(user); msg = edSetChannel3.receive(2_000); - verify(msg, EntryEventType.ADDED); - } - - private void verify(Message msg, EntryEventType type) { - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof ItemEvent); - Assert.assertEquals(type.toString(), - ((ItemEvent) msg.getPayload()).getEventType().toString()); + verifyItemEvent(msg, EntryEventType.ADDED); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedTopicEventDrivenInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedTopicEventDrivenInboundChannelAdapterTests.java index fa36474..3d58346 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedTopicEventDrivenInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastDistributedTopicEventDrivenInboundChannelAdapterTests.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; @@ -55,13 +56,12 @@ public class HazelcastDistributedTopicEventDrivenInboundChannelAdapterTests { Message msg = edTopicChannel1.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof com.hazelcast.core.Message); - Assert.assertEquals(1, ((HazelcastIntegrationTestUser) ((com.hazelcast.core.Message) msg.getPayload()) - .getMessageObject()).getId()); - Assert.assertEquals("TestName1", ((HazelcastIntegrationTestUser) ((com.hazelcast.core.Message) msg - .getPayload()).getMessageObject()).getName()); - Assert.assertEquals("TestSurname1", ((HazelcastIntegrationTestUser) ((com.hazelcast.core.Message) msg - .getPayload()).getMessageObject()).getSurname()); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.PUBLISHING_TIME)); + Assert.assertEquals("edDistributedTopic1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + Assert.assertEquals(1, ((HazelcastIntegrationTestUser) msg.getPayload()).getId()); + Assert.assertEquals("TestName1", ((HazelcastIntegrationTestUser) msg.getPayload()).getName()); + Assert.assertEquals("TestSurname1", ((HazelcastIntegrationTestUser) msg.getPayload()).getSurname()); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastMultiMapEventDrivenInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastMultiMapEventDrivenInboundChannelAdapterTests.java index 7fd9b34..cb7caca 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastMultiMapEventDrivenInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastMultiMapEventDrivenInboundChannelAdapterTests.java @@ -23,15 +23,16 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; +import org.springframework.integration.hazelcast.message.EntryEventMessagePayload; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.hazelcast.core.AbstractIMapEvent; -import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryEventType; import com.hazelcast.core.MultiMap; @@ -44,7 +45,8 @@ import com.hazelcast.core.MultiMap; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @DirtiesContext -public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests { +@SuppressWarnings("unchecked") +public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport { @Autowired private PollableChannel edMultiMapChannel1; @@ -70,18 +72,23 @@ public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests { Message msg = edMultiMapChannel1.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.ADDED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edMultiMap1", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(1, ((EntryEvent) msg.getPayload()).getKey()); + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edMultiMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(1), + ((EntryEventMessagePayload) msg + .getPayload()).key); Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test @@ -92,51 +99,48 @@ public class HazelcastMultiMapEventDrivenInboundChannelAdapterTests { Message msg = edMultiMapChannel2.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.REMOVED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edMultiMap2", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(2, ((EntryEvent) msg.getPayload()).getKey()); + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edMultiMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(2), + ((EntryEventMessagePayload) msg + .getPayload()).key); Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test public void testEventDrivenForALLEntryEvent() { edMultiMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1")); Message msg = edMultiMapChannel3.receive(2_000); - verify(msg, "edMultiMap3", EntryEventType.ADDED); + verifyEntryEvent(msg, "edMultiMap3", EntryEventType.ADDED); edMultiMap3.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated")); msg = edMultiMapChannel3.receive(2_000); - verify(msg, "edMultiMap3", EntryEventType.ADDED); + verifyEntryEvent(msg, "edMultiMap3", EntryEventType.ADDED); edMultiMap3.remove(1); msg = edMultiMapChannel3.receive(2_000); - verify(msg, "edMultiMap3", EntryEventType.REMOVED); + verifyEntryEvent(msg, "edMultiMap3", EntryEventType.REMOVED); msg = edMultiMapChannel3.receive(2_000); - verify(msg, "edMultiMap3", EntryEventType.REMOVED); + verifyEntryEvent(msg, "edMultiMap3", EntryEventType.REMOVED); edMultiMap3.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); msg = edMultiMapChannel3.receive(2_000); - verify(msg, "edMultiMap3", EntryEventType.ADDED); + verifyEntryEvent(msg, "edMultiMap3", EntryEventType.ADDED); edMultiMap3.clear(); msg = edMultiMapChannel3.receive(2_000); - verify(msg, "edMultiMap3", EntryEventType.CLEAR_ALL); - } - - private void verify(Message msg, String cacheName, EntryEventType type) { - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent); - Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName()); - Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType()); + verifyEntryEvent(msg, "edMultiMap3", EntryEventType.CLEAR_ALL); } } diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests.java index ef0b0f1..52725dc 100644 --- a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests.java +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests.java @@ -23,15 +23,16 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.integration.hazelcast.AbstractHazelcastTestSupport; +import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.integration.hazelcast.HazelcastIntegrationTestUser; +import org.springframework.integration.hazelcast.message.EntryEventMessagePayload; import org.springframework.messaging.Message; import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import com.hazelcast.core.AbstractIMapEvent; -import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryEventType; import com.hazelcast.core.ReplicatedMap; @@ -44,7 +45,8 @@ import com.hazelcast.core.ReplicatedMap; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @DirtiesContext -public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests { +@SuppressWarnings("unchecked") +public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests extends AbstractHazelcastTestSupport { @Autowired private PollableChannel edReplicatedMapChannel1; @@ -76,18 +78,23 @@ public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests { Message msg = edReplicatedMapChannel1.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.ADDED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edReplicatedMap1", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(1, ((EntryEvent) msg.getPayload()).getKey()); + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.ADDED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edReplicatedMap1", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(1), + ((EntryEventMessagePayload) msg + .getPayload()).key); Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test @@ -97,24 +104,33 @@ public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests { Message msg = edReplicatedMapChannel2.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.UPDATED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edReplicatedMap2", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(2, ((EntryEvent) msg.getPayload()).getKey()); + + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.UPDATED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edReplicatedMap2", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(2), + ((EntryEventMessagePayload) msg + .getPayload()).key); Assert.assertEquals(1, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getId()); Assert.assertEquals("TestName1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getName()); Assert.assertEquals("TestSurname1", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getSurname()); Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getId()); Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getName()); Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).value).getSurname()); } @Test @@ -125,46 +141,43 @@ public class HazelcastReplicatedMapEventDrivenInboundChannelAdapterTests { Message msg = edReplicatedMapChannel3.receive(2_000); Assert.assertNotNull(msg); Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof EntryEvent); - Assert.assertEquals(EntryEventType.REMOVED, - ((EntryEvent) msg.getPayload()).getEventType()); - Assert.assertEquals("edReplicatedMap3", - ((EntryEvent) msg.getPayload()).getName()); - Assert.assertEquals(2, ((EntryEvent) msg.getPayload()).getKey()); + Assert.assertTrue(msg.getPayload() instanceof EntryEventMessagePayload); + Assert.assertNotNull(msg.getHeaders().get(HazelcastHeaders.MEMBER)); + Assert.assertEquals(EntryEventType.REMOVED.name(), msg.getHeaders().get(HazelcastHeaders.EVENT_TYPE)); + Assert.assertEquals("edReplicatedMap3", msg.getHeaders().get(HazelcastHeaders.CACHE_NAME)); + + Assert.assertEquals(Integer.valueOf(2), + ((EntryEventMessagePayload) msg + .getPayload()).key); Assert.assertEquals(2, - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getId()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getId()); Assert.assertEquals("TestName2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getName()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getName()); Assert.assertEquals("TestSurname2", - ((HazelcastIntegrationTestUser) ((EntryEvent) msg.getPayload()).getOldValue()).getSurname()); + (((EntryEventMessagePayload) msg + .getPayload()).oldValue).getSurname()); } @Test public void testEventDrivenForALLEntryEvent() { edReplicatedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurname1")); Message msg = edReplicatedMapChannel4.receive(2_000); - verify(msg, "edReplicatedMap4", EntryEventType.ADDED); + verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.ADDED); edReplicatedMap4.put(1, new HazelcastIntegrationTestUser(1, "TestName1", "TestSurnameUpdated")); msg = edReplicatedMapChannel4.receive(2_000); - verify(msg, "edReplicatedMap4", EntryEventType.UPDATED); + verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.UPDATED); edReplicatedMap4.remove(1); msg = edReplicatedMapChannel4.receive(2_000); - verify(msg, "edReplicatedMap4", EntryEventType.REMOVED); + verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.REMOVED); edReplicatedMap4.put(2, new HazelcastIntegrationTestUser(2, "TestName2", "TestSurname2")); msg = edReplicatedMapChannel4.receive(2_000); - verify(msg, "edReplicatedMap4", EntryEventType.ADDED); + verifyEntryEvent(msg, "edReplicatedMap4", EntryEventType.ADDED); } - private void verify(Message msg, String cacheName, EntryEventType type) { - Assert.assertNotNull(msg); - Assert.assertNotNull(msg.getPayload()); - Assert.assertTrue(msg.getPayload() instanceof AbstractIMapEvent); - Assert.assertEquals(cacheName, ((AbstractIMapEvent) msg.getPayload()).getName()); - Assert.assertEquals(type, ((AbstractIMapEvent) msg.getPayload()).getEventType()); - } - }