From 573e1f301f5f3ca3a6cc8a8c89977c2e594692e1 Mon Sep 17 00:00:00 2001 From: erenavsarogullari Date: Mon, 8 Jun 2015 20:37:46 +0100 Subject: [PATCH] INTEXT-167: Add Cluster Monitor Channel Adapter JIRA: https://jira.spring.io/browse/INTEXT-167 Some refactorings are done. Hazelcast Cluster Monitor Unit Test Refactorings are done. Some minor bug fixes are done. Redundant assert is removed. HazelcastInstance lifecycle check is added Migration Event UT Case is updated. Code style polishing and upgrade to the `spring.io.plugin-0.4.0` --- README.md | 4 +- spring-integration-hazelcast/README.md | 40 ++- spring-integration-hazelcast/build.gradle | 12 +- .../hazelcast/ClusterMonitorType.java | 35 +++ ...zelcastIntegrationDefinitionValidator.java | 10 +- ...terMonitorInboundChannelAdapterParser.java | 99 ++++++++ .../HazelcastIntegrationNamespaceHandler.java | 1 + .../AbstractHazelcastMessageProducer.java | 18 +- ...azelcastClusterMonitorMessageProducer.java | 210 ++++++++++++++++ .../HazelcastCacheWritingMessageHandler.java | 1 - .../xml/spring-integration-hazelcast-1.0.xsd | 43 ++++ ...itorInboundChannelAdapterTests-context.xml | 170 +++++++++++++ ...sterMonitorInboundChannelAdapterTests.java | 233 ++++++++++++++++++ 13 files changed, 854 insertions(+), 22 deletions(-) create mode 100644 spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/ClusterMonitorType.java create mode 100644 spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastClusterMonitorInboundChannelAdapterParser.java create mode 100644 spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorMessageProducer.java create mode 100644 spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests-context.xml create mode 100644 spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests.java diff --git a/README.md b/README.md index cc1c907..6aa1a49 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ The Spring Integration Extensions project provides extension modules for [Spring ## Available Modules * [Amazon Web Services (AWS)][] Support +* [Hazelcast][] Support * [Kafka][] Support * [MQ Telemetry Transport (MQTT)][] Support * [Print][] Support @@ -139,6 +140,7 @@ The Spring Integration Extensions Framework is released under version 2.0 of the [MQ Telemetry Transport]: http://mqtt.org/ [Websockets]: http://www.html5rocks.com/en/tutorials/websockets/basics/ [XQuery]: http://en.wikipedia.org/wiki/XQuery -[Splunk]:http://www.splunk.com/ +[Splunk]: http://www.splunk.com/ [Amazon Web Services (AWS)]: http://aws.amazon.com/ [MQ Telemetry Transport (MQTT)]: http://mqtt.org/ +[Hazelcast]: http://hazelcast.org/ diff --git a/spring-integration-hazelcast/README.md b/spring-integration-hazelcast/README.md index 16a34f4..d746979 100644 --- a/spring-integration-hazelcast/README.md +++ b/spring-integration-hazelcast/README.md @@ -28,7 +28,7 @@ Hazelcast Event-Driven Inbound Channel Adapter listens related cache events and ``` Basically, Hazelcast Event-Driven Inbound Channel Adapter requires following attributes : -* **channel :** Specifies channel which message is sent. It is mandatory attribute. +* **channel :** Specifies channel which message is sent. * **cache :** Specifies the distributed Object reference which is listened. It is mandatory attribute. * **cache-events :** Specifies cache events which are listened. It is optional attribute and its default value is ADDED. Its supported values are as follows : @@ -155,7 +155,7 @@ Sample definitions are as follows : ## HAZELCAST CONTINUOUS QUERY INBOUND CHANNEL ADAPTER -Hazelcast Continuous Query enables to listen to the modifications performed on specific map entries. Hazelcast Continuous Query Inbound Channel Adapter is an event-driven inbound channel adapter and listens related distributed map events in the light of defined predicate. Its basic definition is as follows : +Hazelcast Continuous Query enables to listen to the modifications performed on specific map entries. Hazelcast Continuous Query Inbound Channel Adapter is an event-driven channel adapter and listens to related distributed map events in the light of defined predicate. Its basic definition is as follows : ``` +``` +Basically, it requires four attributes as follows : + +* **channel :** Specifies channel which message is sent. +* **hazelcast-instance :** Specifies Hazelcast Instance reference to listen cluster events. It is mandatory attribute. +* **monitor-types :** Specifies monitor types which are listened. It is optional attribute with MEMBERSHIP default value. Supported values are MEMBERSHIP, DISTRIBUTED_OBJECT, MIGRATION, LIFECYCLE, CLIENT. + +Sample definition is as follows : +``` + + + + + + + + + +``` +**Reference :** http://docs.hazelcast.org/docs/latest/manual/html/distributedevents.html + + ## HAZELCAST DISTRIBUTED-SQL INBOUND CHANNEL ADAPTER Hazelcast allows to run distributed queries on the distributed map. Hazelcast Distributed SQL Inbound Channel Adapter is a poller-driven inbound channel adapter. It runs defined distributed-sql and returns results in the light of iteration type. Its basic definition is as follows : diff --git a/spring-integration-hazelcast/build.gradle b/spring-integration-hazelcast/build.gradle index e05c883..e504eb2 100644 --- a/spring-integration-hazelcast/build.gradle +++ b/spring-integration-hazelcast/build.gradle @@ -12,7 +12,7 @@ buildscript { maven { url 'http://repo.spring.io/plugins-release' } } dependencies { - classpath 'org.springframework.build.gradle:spring-io-plugin:0.0.3.RELEASE' + classpath 'io.spring.gradle:spring-io-plugin:0.0.4.RELEASE' } } @@ -26,8 +26,12 @@ repositories { if (project.hasProperty('platformVersion')) { apply plugin: 'spring-io' - dependencies { - springIoVersions "io.spring.platform:platform-bom:${platformVersion}@properties" + dependencyManagement { + springIoTestRuntime { + imports { + mavenBom "io.spring.platform:platform-bom:${platformVersion}" + } + } } } @@ -67,6 +71,8 @@ dependencies { compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" compile "com.hazelcast:hazelcast:$hazelcastVersion" + testCompile "com.hazelcast:hazelcast-client:$hazelcastVersion" + testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion" testRuntime "org.slf4j:slf4j-log4j12:$slf4jVersion" diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/ClusterMonitorType.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/ClusterMonitorType.java new file mode 100644 index 0000000..43090df --- /dev/null +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/ClusterMonitorType.java @@ -0,0 +1,35 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.hazelcast; + +/** + * Enumeration of Hazelcast Cluster Monitor Types + * + * @author Eren Avsarogullari + * @since 1.0.0 + * @see org.springframework.integration.hazelcast.inbound.HazelcastClusterMonitorMessageProducer + * @see com.hazelcast.core.MembershipListener + * @see com.hazelcast.core.DistributedObjectListener + * @see com.hazelcast.core.MigrationListener + * @see com.hazelcast.core.LifecycleListener + * @see com.hazelcast.core.ClientListener + */ +public enum ClusterMonitorType { + + MEMBERSHIP, DISTRIBUTED_OBJECT, MIGRATION, LIFECYCLE, CLIENT; + +} diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java index b797af4..b91bd0c 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/HazelcastIntegrationDefinitionValidator.java @@ -40,11 +40,13 @@ import com.hazelcast.core.ReplicatedMap; */ public class HazelcastIntegrationDefinitionValidator { - public static > void validateEnumType(final Class enumType, final String cacheEventTypes) { - Set eventTypeSet = StringUtils.commaDelimitedListToSet(cacheEventTypes); - for (String eventType : eventTypeSet) { - Enum.valueOf(enumType, eventType); + public static > Set validateEnumType(final Class enumType, final String types) { + Set typeSet = StringUtils.commaDelimitedListToSet(types); + for (String type : typeSet) { + Enum.valueOf(enumType, type); } + + return typeSet; } public static void validateCacheTypeForEventDrivenMessageProducer(final DistributedObject distributedObject) { diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastClusterMonitorInboundChannelAdapterParser.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastClusterMonitorInboundChannelAdapterParser.java new file mode 100644 index 0000000..1f9389e --- /dev/null +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastClusterMonitorInboundChannelAdapterParser.java @@ -0,0 +1,99 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.hazelcast.config.xml; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.BeanDefinitionStoreException; +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; +import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.config.xml.IntegrationNamespaceUtils; +import org.springframework.integration.hazelcast.inbound.HazelcastClusterMonitorMessageProducer; + +import org.springframework.util.StringUtils; + +/** + * Parser for the {@code } component. + * + * @author Eren Avsarogullari + * @since 1.0.0 + */ +public class HazelcastClusterMonitorInboundChannelAdapterParser extends + AbstractSingleBeanDefinitionParser { + + private static final String CHANNEL_ATTRIBUTE = "channel"; + + private static final String HAZELCAST_INSTANCE_ATTRIBUTE = "hazelcast-instance"; + + private static final String MONITOR_TYPES_ATTRIBUTE = "monitor-types"; + + private static final String OUTPUT_CHANNEL = "outputChannel"; + + private static final String MONITOR_EVENT_TYPES = "monitorEventTypes"; + + @Override + protected Class getBeanClass(Element element) { + return HazelcastClusterMonitorMessageProducer.class; + } + + @Override + protected String resolveId(Element element, AbstractBeanDefinition definition, + ParserContext parserContext) throws BeanDefinitionStoreException { + String id = super.resolveId(element, definition, parserContext); + + if (!element.hasAttribute(CHANNEL_ATTRIBUTE)) { + id = id + ".adapter"; + } + + if (!StringUtils.hasText(id)) { + id = BeanDefinitionReaderUtils.generateBeanName(definition, + parserContext.getRegistry()); + } + + return id; + } + + @Override + protected void doParse(Element element, ParserContext parserContext, + BeanDefinitionBuilder builder) { + String channelName = element.getAttribute(CHANNEL_ATTRIBUTE); + if (!StringUtils.hasText(channelName)) { + channelName = IntegrationNamespaceUtils.createDirectChannel(element, + parserContext); + } + + if (!StringUtils.hasText(element.getAttribute(HAZELCAST_INSTANCE_ATTRIBUTE))) { + parserContext.getReaderContext().error( + "'" + HAZELCAST_INSTANCE_ATTRIBUTE + "' attribute is required.", + element); + } + + builder.addPropertyReference(OUTPUT_CHANNEL, channelName); + builder.addConstructorArgReference(element + .getAttribute(HAZELCAST_INSTANCE_ATTRIBUTE)); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, + MONITOR_TYPES_ATTRIBUTE, MONITOR_EVENT_TYPES); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, + IntegrationNamespaceUtils.AUTO_STARTUP); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, + IntegrationNamespaceUtils.PHASE); + } + +} diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastIntegrationNamespaceHandler.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastIntegrationNamespaceHandler.java index 1dfd404..eaea84b 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastIntegrationNamespaceHandler.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/config/xml/HazelcastIntegrationNamespaceHandler.java @@ -32,6 +32,7 @@ public class HazelcastIntegrationNamespaceHandler extends AbstractIntegrationNam registerBeanDefinitionParser("outbound-channel-adapter", new HazelcastOutboundChannelAdapterParser()); registerBeanDefinitionParser("cq-inbound-channel-adapter", new HazelcastContinuousQueryInboundChannelAdapterParser()); registerBeanDefinitionParser("ds-inbound-channel-adapter", new HazelcastDistributedSQLInboundChannelAdapterParser()); + registerBeanDefinitionParser("cm-inbound-channel-adapter", new HazelcastClusterMonitorInboundChannelAdapterParser()); } } diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java index f845989..722348e 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/AbstractHazelcastMessageProducer.java @@ -33,7 +33,6 @@ import org.springframework.integration.hazelcast.HazelcastLocalInstanceRegistrar import org.springframework.integration.hazelcast.message.EntryEventMessagePayload; import org.springframework.messaging.Message; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; import com.hazelcast.core.AbstractIMapEvent; import com.hazelcast.core.DistributedObject; @@ -71,11 +70,11 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu } public void setCacheEventTypes(String cacheEventTypes) { - HazelcastIntegrationDefinitionValidator.validateEnumType(CacheEventType.class, cacheEventTypes); - Set cacheEvents = StringUtils.commaDelimitedListToSet(cacheEventTypes); + Set cacheEvents = + HazelcastIntegrationDefinitionValidator.validateEnumType(CacheEventType.class, cacheEventTypes); Assert.notEmpty(cacheEvents, "'cacheEvents' must have elements"); - HazelcastIntegrationDefinitionValidator.validateCacheEventsByDistributedObject( - this.distributedObject, cacheEvents); + HazelcastIntegrationDefinitionValidator.validateCacheEventsByDistributedObject(this.distributedObject, + cacheEvents); this.cacheEvents = cacheEvents; } @@ -103,7 +102,7 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu protected abstract Message toMessage(E event); protected void sendMessage(E event, InetSocketAddress socketAddress, - CacheListeningPolicyType cacheListeningPolicyType) { + CacheListeningPolicyType cacheListeningPolicyType) { if (CacheListeningPolicyType.ALL == cacheListeningPolicyType || isEventAcceptable(socketAddress)) { AbstractHazelcastMessageProducer.this.sendMessage(toMessage(event)); } @@ -178,12 +177,11 @@ public abstract class AbstractHazelcastMessageProducer extends MessageProducerSu @Override protected void processEvent(AbstractIMapEvent event) { if (getCacheEvents().contains(event.getEventType().toString())) { + if (logger.isDebugEnabled()) { + logger.debug("Received Event : " + event); + } sendMessage(event, event.getMember().getSocketAddress(), getCacheListeningPolicy()); } - - if (logger.isDebugEnabled()) { - logger.debug("Received Event : " + event); - } } @Override diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorMessageProducer.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorMessageProducer.java new file mode 100644 index 0000000..d7dc9ed --- /dev/null +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorMessageProducer.java @@ -0,0 +1,210 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.hazelcast.inbound; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.hazelcast.ClusterMonitorType; +import org.springframework.integration.hazelcast.HazelcastIntegrationDefinitionValidator; +import org.springframework.util.Assert; + +import com.hazelcast.core.Client; +import com.hazelcast.core.ClientListener; +import com.hazelcast.core.DistributedObjectEvent; +import com.hazelcast.core.DistributedObjectListener; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.LifecycleEvent; +import com.hazelcast.core.LifecycleListener; +import com.hazelcast.core.MemberAttributeEvent; +import com.hazelcast.core.MembershipEvent; +import com.hazelcast.core.MembershipListener; +import com.hazelcast.core.MigrationEvent; +import com.hazelcast.core.MigrationListener; + +/** + * Hazelcast Cluster Monitor Event Driven Message Producer is a message producer which + * enables {@link HazelcastClusterMonitorMessageProducer.HazelcastClusterMonitorListener} + * listener in order to listen cluster related events and sends events to related channel. + * + * @author Eren Avsarogullari + * @since 1.0.0 + */ +public class HazelcastClusterMonitorMessageProducer extends MessageProducerSupport { + + private final HazelcastInstance hazelcastInstance; + + private Set monitorTypes = Collections.singleton(ClusterMonitorType.MEMBERSHIP.name()); + + private final Map hazelcastRegisteredListenerIdMap = new ConcurrentHashMap<>(5); + + public HazelcastClusterMonitorMessageProducer(HazelcastInstance hazelcastInstance) { + Assert.notNull(hazelcastInstance, "'hazelcastInstance' must not be null"); + this.hazelcastInstance = hazelcastInstance; + } + + public void setMonitorEventTypes(String monitorEventTypes) { + final Set monitorTypes = + HazelcastIntegrationDefinitionValidator.validateEnumType(ClusterMonitorType.class, monitorEventTypes); + Assert.notEmpty(monitorTypes, "'monitorTypes' must have elements"); + this.monitorTypes = monitorTypes; + } + + @Override + protected void doStart() { + final HazelcastClusterMonitorListener clusterMonitorListener = new HazelcastClusterMonitorListener(); + + if (this.monitorTypes.contains(ClusterMonitorType.MEMBERSHIP.name())) { + final String registrationId = this.hazelcastInstance.getCluster() + .addMembershipListener(clusterMonitorListener); + this.hazelcastRegisteredListenerIdMap.put(ClusterMonitorType.MEMBERSHIP, registrationId); + } + + if (this.monitorTypes.contains(ClusterMonitorType.DISTRIBUTED_OBJECT.name())) { + final String registrationId = this.hazelcastInstance + .addDistributedObjectListener(clusterMonitorListener); + this.hazelcastRegisteredListenerIdMap.put(ClusterMonitorType.DISTRIBUTED_OBJECT, registrationId); + } + + if (this.monitorTypes.contains(ClusterMonitorType.MIGRATION.name())) { + final String registrationId = this.hazelcastInstance.getPartitionService() + .addMigrationListener(clusterMonitorListener); + this.hazelcastRegisteredListenerIdMap.put(ClusterMonitorType.MIGRATION, registrationId); + } + + if (this.monitorTypes.contains(ClusterMonitorType.LIFECYCLE.name())) { + final String registrationId = this.hazelcastInstance.getLifecycleService() + .addLifecycleListener(clusterMonitorListener); + this.hazelcastRegisteredListenerIdMap.put(ClusterMonitorType.LIFECYCLE, registrationId); + } + + if (this.monitorTypes.contains(ClusterMonitorType.CLIENT.name())) { + final String registrationId = this.hazelcastInstance.getClientService() + .addClientListener(clusterMonitorListener); + this.hazelcastRegisteredListenerIdMap.put(ClusterMonitorType.CLIENT, registrationId); + } + } + + @Override + protected void doStop() { + if (this.hazelcastInstance.getLifecycleService().isRunning()) { + String id = this.hazelcastRegisteredListenerIdMap.remove(ClusterMonitorType.MEMBERSHIP); + if (id != null) { + this.hazelcastInstance.getCluster().removeMembershipListener(id); + } + + id = this.hazelcastRegisteredListenerIdMap.remove(ClusterMonitorType.DISTRIBUTED_OBJECT); + if (id != null) { + this.hazelcastInstance.removeDistributedObjectListener(id); + } + + id = this.hazelcastRegisteredListenerIdMap.remove(ClusterMonitorType.MIGRATION); + if (id != null) { + this.hazelcastInstance.getPartitionService().removeMigrationListener(id); + } + + id = this.hazelcastRegisteredListenerIdMap.remove(ClusterMonitorType.LIFECYCLE); + if (id != null) { + this.hazelcastInstance.getLifecycleService().removeLifecycleListener(id); + } + + id = this.hazelcastRegisteredListenerIdMap.remove(ClusterMonitorType.CLIENT); + if (id != null) { + this.hazelcastInstance.getClientService().removeClientListener(id); + } + } + } + + @Override + public String getComponentType() { + return "hazelcast:cm-inbound-channel-adapter"; + } + + private void processEvent(Object event) { + Assert.notNull(event, "'hazelcast event' must not be null"); + if (logger.isDebugEnabled()) { + logger.debug("Received Cluster Monitor Event : " + event); + } + this.sendMessage(getMessageBuilderFactory().withPayload(event).build()); + + } + + private final class HazelcastClusterMonitorListener implements MembershipListener, + DistributedObjectListener, MigrationListener, LifecycleListener, + ClientListener { + + @Override + public void memberAdded(MembershipEvent membershipEvent) { + processEvent(membershipEvent); + } + + @Override + public void memberRemoved(MembershipEvent membershipEvent) { + processEvent(membershipEvent); + } + + @Override + public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) { + processEvent(memberAttributeEvent); + } + + @Override + public void distributedObjectCreated(DistributedObjectEvent event) { + processEvent(event); + } + + @Override + public void distributedObjectDestroyed(DistributedObjectEvent event) { + processEvent(event); + } + + @Override + public void migrationStarted(MigrationEvent migrationEvent) { + processEvent(migrationEvent); + } + + @Override + public void migrationCompleted(MigrationEvent migrationEvent) { + processEvent(migrationEvent); + } + + @Override + public void migrationFailed(MigrationEvent migrationEvent) { + processEvent(migrationEvent); + } + + @Override + public void stateChanged(LifecycleEvent event) { + processEvent(event); + } + + @Override + public void clientConnected(Client client) { + processEvent(client); + } + + @Override + public void clientDisconnected(Client client) { + processEvent(client); + } + + } + +} diff --git a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java index 8ee34f2..1865bef 100644 --- a/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java +++ b/spring-integration-hazelcast/src/main/java/org/springframework/integration/hazelcast/outbound/HazelcastCacheWritingMessageHandler.java @@ -22,7 +22,6 @@ import java.util.Map; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.integration.expression.ExpressionUtils; -import org.springframework.integration.expression.IntegrationEvaluationContextAware; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.hazelcast.HazelcastHeaders; import org.springframework.messaging.Message; diff --git a/spring-integration-hazelcast/src/main/resources/org/springframework/integration/hazelcast/config/xml/spring-integration-hazelcast-1.0.xsd b/spring-integration-hazelcast/src/main/resources/org/springframework/integration/hazelcast/config/xml/spring-integration-hazelcast-1.0.xsd index 95c772a..07c1bc2 100644 --- a/spring-integration-hazelcast/src/main/resources/org/springframework/integration/hazelcast/config/xml/spring-integration-hazelcast-1.0.xsd +++ b/spring-integration-hazelcast/src/main/resources/org/springframework/integration/hazelcast/config/xml/spring-integration-hazelcast-1.0.xsd @@ -257,4 +257,47 @@ + + + + + + Configures Hazelcast Cluster Monitor Inbound Channel Adapter + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests-context.xml b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests-context.xml new file mode 100644 index 0000000..b1a70f6 --- /dev/null +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests-context.xml @@ -0,0 +1,170 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests.java b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests.java new file mode 100644 index 0000000..2a5f3a5 --- /dev/null +++ b/spring-integration-hazelcast/src/test/java/org/springframework/integration/hazelcast/inbound/HazelcastClusterMonitorInboundChannelAdapterTests.java @@ -0,0 +1,233 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.hazelcast.inbound; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import com.hazelcast.client.HazelcastClient; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.config.GroupConfig; +import com.hazelcast.core.Client; +import com.hazelcast.core.ClientType; +import com.hazelcast.core.DistributedObjectEvent; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; +import com.hazelcast.core.LifecycleEvent; +import com.hazelcast.core.Member; +import com.hazelcast.core.LifecycleEvent.LifecycleState; +import com.hazelcast.core.MembershipEvent; +import com.hazelcast.core.MigrationEvent; + +/** + * Hazelcast Cluster Monitor Inbound Channel Adapter Unit Test Class + * + * @author Eren Avsarogullari + * @since 1.0.0 + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration +@DirtiesContext +public class HazelcastClusterMonitorInboundChannelAdapterTests { + + private static final String TEST_GROUP_NAME1 = "Test_Group_Name1"; + + private static final int TIMEOUT = 10_000; + + @Autowired + private PollableChannel cmChannel1; + + @Autowired + private PollableChannel cmChannel2; + + @Autowired + private PollableChannel cmChannel3; + + @Autowired + private PollableChannel cmChannel4; + + @Autowired + private PollableChannel cmChannel5; + + @Autowired + private PollableChannel cmChannel6; + + @Autowired + private HazelcastInstance hazelcastInstance; + + @Autowired + private HazelcastInstance hazelcastInstance2; + + @Autowired + private HazelcastInstance hazelcastInstance3; + + @Test + public void testMembershipEvent() { + testMembershipEvent(hazelcastInstance, cmChannel1, "testKey1", "testValue1"); + } + + @Test + public void testDistributedObjectEvent() { + testDistributedObjectEventByChannelAndHazelcastInstance(cmChannel2, + hazelcastInstance); + } + + @Test + public void testMigrationEvent() { + final IMap distributedMap = hazelcastInstance3 + .getMap("Test_Distributed_Map2"); + distributedMap.put(1, "TestValue1"); + distributedMap.put(2, "TestValue2"); + + hazelcastInstance3.getLifecycleService().terminate(); + final Message msg = cmChannel3.receive(TIMEOUT); + verifyMigrationEvent(msg); + } + + @Test + public void testLifecycleEvent() throws InterruptedException { + hazelcastInstance2.getLifecycleService().terminate(); + + Message msg = cmChannel4.receive(TIMEOUT); + verifyLifecycleEvent(msg, LifecycleState.SHUTTING_DOWN); + + msg = cmChannel4.receive(TIMEOUT); + verifyLifecycleEvent(msg, LifecycleState.SHUTDOWN); + } + + @Test + public void testClientEvent() { + testClientEventByChannelAndGroupName(cmChannel5, TEST_GROUP_NAME1); + } + + @Test + public void testMultipleMonitorTypes() { + testDistributedObjectEventByChannelAndHazelcastInstance(cmChannel6, + hazelcastInstance); + + testMembershipEvent(hazelcastInstance, cmChannel6, "testKey2", "testValue2"); + } + + private void testMembershipEvent( + final HazelcastInstance instance, final PollableChannel channel, + final String key, final String value) { + Member member = instance.getCluster().getMembers().iterator().next(); + member.setStringAttribute(key, value); + + Message msg = channel.receive(TIMEOUT); + verifyMembershipEvent(msg, MembershipEvent.MEMBER_ATTRIBUTE_CHANGED); + } + + private void testClientEventByChannelAndGroupName(final PollableChannel channel, + final String groupName) { + final HazelcastInstance client = getHazelcastClientByGroupName(groupName); + + Message msg = channel.receive(TIMEOUT); + verifyClientEvent(msg); + + client.getLifecycleService().terminate(); + + msg = channel.receive(TIMEOUT); + verifyClientEvent(msg); + } + + private void testDistributedObjectEventByChannelAndHazelcastInstance( + final PollableChannel channel, final HazelcastInstance hazelcastInstance) { + final String distributedObjectName = "Test_Distributed_Map"; + final IMap distributedMap = hazelcastInstance + .getMap(distributedObjectName); + + Message msg = channel.receive(TIMEOUT); + verifyDistributedObjectEvent(msg, DistributedObjectEvent.EventType.CREATED, + distributedObjectName); + + distributedMap.destroy(); + + msg = channel.receive(TIMEOUT); + verifyDistributedObjectEvent(msg, DistributedObjectEvent.EventType.DESTROYED, + distributedObjectName); + } + + private HazelcastInstance getHazelcastClientByGroupName(final String groupName) { + final GroupConfig groupConfig = new GroupConfig(); + groupConfig.setName(groupName); + groupConfig.setPassword("dev-pass"); + final ClientConfig cfg = new ClientConfig(); + cfg.setGroupConfig(groupConfig); + cfg.getNetworkConfig().addAddress("127.0.0.1:5701"); + + return HazelcastClient.newHazelcastClient(cfg); + } + + private void verifyMembershipEvent(final Message msg, final int membershipEvent) { + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof MembershipEvent); + assertEquals(membershipEvent, ((MembershipEvent) msg.getPayload()).getEventType()); + assertNotNull(((MembershipEvent) msg.getPayload()).getMember()); + } + + private void verifyDistributedObjectEvent(final Message msg, + final DistributedObjectEvent.EventType eventType, + final String distributedObjectName) { + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof DistributedObjectEvent); + assertEquals(eventType, ((DistributedObjectEvent) msg.getPayload()).getEventType()); + assertNotNull( + (((DistributedObjectEvent) msg.getPayload()).getDistributedObject()) + .getName(), + distributedObjectName); + } + + private void verifyMigrationEvent(final Message msg) { + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof MigrationEvent); + assertNotNull(((MigrationEvent) msg.getPayload()).getStatus()); + assertNotNull(((MigrationEvent) msg.getPayload()).getNewOwner()); + assertNotNull(((MigrationEvent) msg.getPayload()).getOldOwner()); + } + + private void verifyLifecycleEvent(final Message msg, + final LifecycleState lifecycleState) { + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof LifecycleEvent); + assertEquals(lifecycleState, ((LifecycleEvent) msg.getPayload()).getState()); + } + + private void verifyClientEvent(final Message msg) { + assertNotNull(msg); + assertNotNull(msg.getPayload()); + assertTrue(msg.getPayload() instanceof Client); + assertEquals(ClientType.JAVA, ((Client) msg.getPayload()).getClientType()); + assertNotNull(((Client) msg.getPayload()).getSocketAddress()); + } + +}