Splunk: Servers failover

upgrade versions

Conflicts:
	spring-integration-splunk/gradle.properties

upgrade gradle version

revert spring integration upgrade version

Add a feature to be able to declare more than one splunk server.
It's a simple failover mechanism with any pooling.
Before give back the Service instance, the framework just try a getInfo call.
To preserve backward compat this check it's not activated per default but only
if the splunk server instance is marked with testOnBorrow.

Sample:
````xml
  <int-splunk:server id="splunkServer" host="${splunk.server.host}" port="${splunk.server.port}"
                     username="${splunk.server.username}" password="${splunk.server.password}"
                     owner="${splunk.server.owner}" scheme="${splunk.server.scheme}" testOnBorrow="true" />

  <int-splunk:server id="splunkServerBackup" host="localhost" port="9999"
                     username="${splunk.server.username}" password="${splunk.server.password}"
                     owner="${splunk.server.owner}" scheme="${splunk.server.scheme}" testOnBorrow="true" />

  <util:list id="splunkServersList">
    <ref bean="splunkServer" />
    <ref bean="splunkServerBackup" />
  </util:list>

  <bean id="splunkServiceFactory" class="org.springframework.integration.splunk.support.SplunkServiceFactory">
    <constructor-arg ref="splunkServersList"/>
  </bean>

  <int-splunk:inbound-channel-adapter id="splunk-notify-order-status-change-channel"
                                      auto-startup="true"
                                      search=""
                                      splunk-server-ref="splunkServer;splunkServerBackup"
                                      channel="notify-order-status-change-input"
                                      mode="BLOCKING"
                                      init-earliest-time="${splunk.order-status-change.init-earliest-time}"
                                      >
    <int:poller fixed-rate="${splunk.order-status-change.pooling.rate.time}" time-unit="SECONDS"/>
  </int-splunk:inbound-channel-adapter>
````

some changes due to pr comments

Conflicts:
	spring-integration-splunk/gradle.properties

formatting: tabs instead of spaces.....

fix documentation with failover mechanism

formatting

formatting

upgrade versions

some changes due to pr comments

fix javadoc issues

1.1 xsd file

Polishing and upgrading
This commit is contained in:
Olivier Lamy
2014-06-06 13:43:57 +03:00
committed by Artem Bilan
parent 9d591b2bcf
commit a4ca9e6671
23 changed files with 735 additions and 167 deletions

View File

@@ -125,6 +125,25 @@ The outbound channel adapter requires a child *-writer element which defines rel
<int-splunk:server id="splunkServer" username="admin" password="password" timeout="5000" host="somehost.someplace.com" port="9000" />
```
Alternatively, you can configure a Splunk Server failover mechanism
```xml
<int-splunk:server id="splunkServer" username="admin" password="password" timeout="5000"
host="somehost.someplace.com" port="9000" />
<int-splunk:server id="splunkServerBackup" username="admin" password="password" timeout="5000"
host="somehost.someotherplace.com" port="9000" />
<util:list id="splunkServersList">
<ref bean="splunkServer" />
<ref bean="splunkServerBackup" />
</util:list>
<bean id="splunkServiceFactory" class="org.springframework.integration.splunk.support.SplunkServiceFactory">
<constructor-arg ref="splunkServersList"/>
</bean>
```
Additional server properties include (see [splunk](http://docs.splunk.com/Documentation/Splunk/latest) documentation for details):
* app

View File

@@ -2,10 +2,10 @@ description = 'Spring Integration Splunk Adapter'
buildscript {
repositories {
maven { url 'http://repo.springsource.org/plugins-snapshot' }
maven { url 'http://repo.spring.io/plugins-release' }
}
dependencies {
classpath 'org.springframework.build.gradle:docbook-reference-plugin:0.1.5'
classpath 'org.springframework.build.gradle:docbook-reference-plugin:0.2.8'
}
}
@@ -17,21 +17,20 @@ apply plugin: 'idea'
group = 'org.springframework.integration'
repositories {
maven { url 'http://repo.springsource.org/libs-milestone' }
maven { url 'http://repo.springsource.org/plugins-release' } // for bundlor
maven { url 'http://repo.spring.io/libs-milestone' }
}
sourceCompatibility=1.6
targetCompatibility=1.6
ext {
linkHomepage = 'https://github.com/SpringSource/spring-integration-extensions'
linkCi = 'https://build.springsource.org/browse/INTEXT'
linkIssue = 'https://jira.springsource.org/browse/INTEXT'
linkScmUrl = 'https://github.com/SpringSource/spring-integration-extensions'
linkScmConnection = 'https://github.com/SpringSource/spring-integration-extensions.git'
linkScmDevConnection = 'git@github.com:SpringSource/spring-integration-extensions.git'
linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions'
linkCi = 'https://build.spring.io/browse/INTEXT'
linkIssue = 'https://jira.spring.io/browse/INTEXT'
linkScmUrl = 'https://github.com/spring-projects/spring-integration-extensions'
linkScmConnection = 'https://github.com/spring-projects/spring-integration-extensions.git'
linkScmDevConnection = 'git@github.com:spring-projects/spring-integration-extensions.git'
shortName = 'splunk'
}
@@ -42,17 +41,13 @@ configurations {
}
dependencies {
compile("com.splunk:splunk:$splunkVersion")
compile "org.springframework:spring-beans:$springVersion"
compile "org.springframework:spring-context:$springVersion"
compile "org.springframework:spring-expression:$springVersion"
compile "com.splunk:splunk:$splunkVersion"
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
compile "joda-time:joda-time:$jodaTimeVersion"
compile "commons-pool:commons-pool:$commonsPoolVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.springframework:spring-test:$springVersion"
testCompile "cglib:cglib-nodep:$cglibVersion"
testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion"
testCompile "junit:junit-dep:$junitVersion"
testCompile "log4j:log4j:$log4jVersion"
testCompile "org.springframework.integration:spring-integration-stream:$springIntegrationVersion"
@@ -77,7 +72,7 @@ sourceSets {
// enable all compiler warnings; individual projects may customize further
ext.xLintArg = '-Xlint:all'
ext.xLintArg = '-Xlint:all,-options'
[compileJava, compileTestJava]*.options*.compilerArgs = [xLintArg]
test {
@@ -255,7 +250,8 @@ task dist(dependsOn: assemble) {
task wrapper(type: Wrapper) {
description = 'Generates gradlew[.bat] scripts'
gradleVersion = '1.6'
gradleVersion = '1.12'
distributionUrl = "http://services.gradle.org/distributions/gradle-${gradleVersion}-all.zip"
}
defaultTasks 'build'

View File

@@ -1,10 +1,8 @@
junitVersion=4.8.2
jodaTimeVersion=2.1
springVersion=4.0.0.RELEASE
splunkVersion=1.0.0
log4jVersion=1.2.12
version=1.1.0.BUILD-SNAPSHOT
springIntegrationVersion=4.0.0.M2
cglibVersion=2.2
junitVersion=4.11
jodaTimeVersion=2.3
splunkVersion=1.0.0
log4jVersion=1.2.17
springIntegrationVersion=4.0.2.RELEASE
commonsPoolVersion=1.6
mockitoVersion=1.9.0
mockitoVersion=1.9.5

Binary file not shown.

View File

@@ -1,6 +1,6 @@
#Wed May 15 23:11:13 EDT 2013
#Fri Jun 06 13:11:01 EEST 2014
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=http\://services.gradle.org/distributions/gradle-1.6-bin.zip
distributionUrl=http\://services.gradle.org/distributions/gradle-1.12-all.zip

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,12 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.splunk.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.BeanMetadataElement;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.AbstractPollingInboundChannelAdapterParser;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
@@ -26,12 +31,12 @@ import org.springframework.integration.splunk.inbound.SplunkPollingChannelAdapte
import org.springframework.integration.splunk.support.SplunkDataReader;
import org.springframework.integration.splunk.support.SplunkServiceFactory;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;
/**
* The Splunk Inbound Channel adapter parser
*
* @author Jarred Li
* @author Olivier Lamy
* @since 1.0
*
*/
@@ -40,17 +45,14 @@ public class SplunkInboundChannelAdapterParser extends AbstractPollingInboundCha
protected BeanMetadataElement parseSource(Element element, ParserContext parserContext) {
BeanDefinitionBuilder splunkPollingChannelAdapterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkPollingChannelAdapter.class);
BeanDefinitionBuilder splunkPollingChannelAdapterBuilder =
BeanDefinitionBuilder.genericBeanDefinition(SplunkPollingChannelAdapter.class);
BeanDefinitionBuilder splunkExecutorBuilder = SplunkParserUtils.getSplunkExecutorBuilder(element, parserContext);
BeanDefinitionBuilder splunkDataReaderBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkDataReader.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(splunkDataReaderBuilder, element, "mode");
String count = element.getAttribute("count");
if (StringUtils.hasText(count)) {
splunkDataReaderBuilder.addPropertyValue("count", count);
}
IntegrationNamespaceUtils.setValueIfAttributeDefined(splunkDataReaderBuilder, element, "count");
IntegrationNamespaceUtils.setValueIfAttributeDefined(splunkDataReaderBuilder, element, "field-list");
IntegrationNamespaceUtils.setValueIfAttributeDefined(splunkDataReaderBuilder, element, "search");
IntegrationNamespaceUtils.setValueIfAttributeDefined(splunkDataReaderBuilder, element, "saved-search");
@@ -60,14 +62,20 @@ public class SplunkInboundChannelAdapterParser extends AbstractPollingInboundCha
IntegrationNamespaceUtils.setValueIfAttributeDefined(splunkDataReaderBuilder, element, "earliest-time");
IntegrationNamespaceUtils.setValueIfAttributeDefined(splunkDataReaderBuilder, element, "latest-time");
// initialize splunk servers references
BeanDefinitionBuilder serviceFactoryBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkServiceFactory.class);
String splunkServerBeanName = element.getAttribute("splunk-server-ref");
if (StringUtils.hasText(splunkServerBeanName)) {
serviceFactoryBuilder.addConstructorArgReference(splunkServerBeanName);
}
String splunkServerBeanNames = element.getAttribute("splunk-server-ref");
if (StringUtils.hasText(splunkServerBeanNames)) {
splunkDataReaderBuilder.addConstructorArgValue(serviceFactoryBuilder.getBeanDefinition());
ManagedList<RuntimeBeanReference> splunkServersList = new ManagedList<RuntimeBeanReference>();
for (String splunkServerBeanName : StringUtils.commaDelimitedListToStringArray(splunkServerBeanNames)) {
splunkServersList.add(new RuntimeBeanReference(splunkServerBeanName));
}
serviceFactoryBuilder.addConstructorArgValue(splunkServersList);
splunkDataReaderBuilder.addConstructorArgValue(serviceFactoryBuilder.getBeanDefinition());
}
String channelAdapterId = this.resolveId(element, splunkPollingChannelAdapterBuilder.getRawBeanDefinition(),
parserContext);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,12 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.splunk.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.AbstractOutboundChannelAdapterParser;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
@@ -30,13 +35,13 @@ import org.springframework.integration.splunk.support.SplunkSubmitWriter;
import org.springframework.integration.splunk.support.SplunkTcpWriter;
import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;
import org.w3c.dom.Element;
/**
* The parser for the Splunk Outbound Channel Adapter.
*
* @author Jarred Li
* @author David Turanski
* @author Olivier Lamy
* @since 1.0
*
*/
@@ -55,7 +60,8 @@ public class SplunkOutboundChannelAdapterParser extends AbstractOutboundChannelA
@Override
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
BeanDefinitionBuilder splunkOutboundChannelAdapterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkOutboundChannelAdapter.class);
BeanDefinitionBuilder splunkOutboundChannelAdapterBuilder =
BeanDefinitionBuilder.genericBeanDefinition(SplunkOutboundChannelAdapter.class);
BeanDefinitionBuilder splunkExecutorBuilder = SplunkParserUtils.getSplunkExecutorBuilder(element, parserContext);
BeanDefinitionBuilder argsBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkArgsFactoryBean.class);
@@ -64,16 +70,26 @@ public class SplunkOutboundChannelAdapterParser extends AbstractOutboundChannelA
IntegrationNamespaceUtils.setValueIfAttributeDefined(argsBuilder, element, "host");
IntegrationNamespaceUtils.setValueIfAttributeDefined(argsBuilder, element, "host-regex");
BeanDefinitionBuilder dataWriterBuilder = parseDataWriter(element);
// initialize splunk servers references
BeanDefinitionBuilder serviceFactoryBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkServiceFactory.class);
String splunkServerBeanName = element.getAttribute("splunk-server-ref");
if (StringUtils.hasText(splunkServerBeanName)) {
serviceFactoryBuilder.addConstructorArgReference(splunkServerBeanName);
String splunkServerBeanNames = element.getAttribute("splunk-server-ref");
if (StringUtils.hasText(splunkServerBeanNames)) {
ManagedList<RuntimeBeanReference> splunkServersList = new ManagedList<RuntimeBeanReference>();
for (String splunkServerBeanName : StringUtils.delimitedListToStringArray(splunkServerBeanNames, ";")) {
splunkServersList.add(new RuntimeBeanReference(splunkServerBeanName));
}
serviceFactoryBuilder.addConstructorArgValue(splunkServersList);
}
BeanDefinitionBuilder dataWriterBuilder = parseDataWriter(element, parserContext);
dataWriterBuilder.addConstructorArgValue(serviceFactoryBuilder.getBeanDefinition());
dataWriterBuilder.addConstructorArgValue(argsBuilder.getBeanDefinition());
String channelAdapterId = this.resolveId(element, splunkOutboundChannelAdapterBuilder.getRawBeanDefinition(),
@@ -96,30 +112,30 @@ public class SplunkOutboundChannelAdapterParser extends AbstractOutboundChannelA
}
private BeanDefinitionBuilder parseDataWriter(Element element, ParserContext parserContext) {
private BeanDefinitionBuilder parseDataWriter(Element element) {
BeanDefinitionBuilder dataWriterBuilder = null;
Element dataWriter = null;
if (DomUtils.getChildElementByTagName(element, "index-writer") != null) {
dataWriter = DomUtils.getChildElementByTagName(element, "index-writer");
dataWriterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkIndexWriter.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, dataWriter, "index");
if (DomUtils.getChildElementByTagName(element, "index-writer") != null) {
Element dataWriter = DomUtils.getChildElementByTagName(element, "index-writer");
dataWriterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkIndexWriter.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, dataWriter, "index");
}
if (DomUtils.getChildElementByTagName(element, "submit-writer") != null) {
dataWriter = DomUtils.getChildElementByTagName(element, "submit-writer");
dataWriterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkSubmitWriter.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, dataWriter, "index");
}
if (DomUtils.getChildElementByTagName(element, "tcp-writer") != null) {
dataWriter = DomUtils.getChildElementByTagName(element, "tcp-writer");
dataWriterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkTcpWriter.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, dataWriter, "port");
}
if (DomUtils.getChildElementByTagName(element, "submit-writer") != null) {
Element dataWriter = DomUtils.getChildElementByTagName(element, "submit-writer");
dataWriterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkSubmitWriter.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, dataWriter, "index");
}
if (DomUtils.getChildElementByTagName(element, "tcp-writer") != null) {
Element dataWriter = DomUtils.getChildElementByTagName(element, "tcp-writer");
dataWriterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SplunkTcpWriter.class);
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, dataWriter, "port");
}
}
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, element, "auto-startup");
IntegrationNamespaceUtils.setValueIfAttributeDefined(dataWriterBuilder, element,
IntegrationNamespaceUtils.AUTO_STARTUP);
return dataWriterBuilder;
return dataWriterBuilder;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2011-2012 the original author or authors.
* Copyright 2011-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,27 +13,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.splunk.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.splunk.support.SplunkServer;
import org.w3c.dom.Element;
/**
* Splunk server element parser.
*
* The XML element is like this:
* <pre>
* <pre class="code">
* {@code
* <splunk:server id="splunkServer" host="host" port="8089" username="admin" password="password"
* scheme="https" owner="admin" app="search"/>
* }
* </pre>
*
* @author Jarred Li
* @author Olivier Lamy
* @since 1.0
*
*/
@@ -57,8 +61,8 @@ public class SplunkServerParser extends AbstractSimpleBeanDefinitionParser {
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "username");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "password");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "timeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "checkServiceOnBorrow");
}
}

View File

@@ -38,7 +38,7 @@ public class SplunkIndexWriter extends AbstractSplunkDataWriter {
private String indexName;
/**
*
* @param connectionFactory
* @param serviceFactory
* @param args
*/
public SplunkIndexWriter(ServiceFactory serviceFactory, Args args) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2011-2012 the original author or authors.
* Copyright 2011-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.splunk.support;
import com.splunk.Service;
@@ -21,20 +22,33 @@ import com.splunk.Service;
* Splunk server entity
*
* @author Jarred Li
* @author Olivier Lamy
* @since 1.0
*
*/
public class SplunkServer {
private String host = Service.DEFAULT_HOST;
private int port = Service.DEFAULT_PORT;
private String scheme = Service.DEFAULT_SCHEME;
private String app;
private String owner;
private String username;
private String password;
private int timeout;
/**
* if <code>true</code> the framework will test the connectivity before give back the connection.
*/
private boolean checkServiceOnBorrow = false;
/**
* @return the host
*/
@@ -64,15 +78,15 @@ public class SplunkServer {
}
/**
*
* @return
*
* @return the used scheme
*/
public String getScheme() {
return scheme;
}
/**
*
*
* @param scheme
*/
public void setScheme(String scheme) {
@@ -80,15 +94,15 @@ public class SplunkServer {
}
/**
*
* @return
*
* @return the application
*/
public String getApp() {
return app;
}
/**
*
*
* @param app
*/
public void setApp(String app) {
@@ -96,15 +110,15 @@ public class SplunkServer {
}
/**
*
* @return
*
* @return the owner
*/
public String getOwner() {
return owner;
}
/**
*
*
* @param owner
*/
public void setOwner(String owner) {
@@ -145,7 +159,7 @@ public class SplunkServer {
public int getTimeout() {
return timeout;
}
/**
* set the timeout in ms.
* @param timeout
@@ -153,4 +167,52 @@ public class SplunkServer {
public void setTimeout(int timeout) {
this.timeout = timeout;
}
/**
* @return {@code true/false} if there is need to check the underlying Splunk Service
* @since 1.1
*/
public boolean isCheckServiceOnBorrow() {
return checkServiceOnBorrow;
}
/**
* @param checkServiceOnBorrow the {@code checkServiceOnBorrow} flag
* @since 1.1
*/
public void setCheckServiceOnBorrow(boolean checkServiceOnBorrow) {
this.checkServiceOnBorrow = checkServiceOnBorrow;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SplunkServer that = SplunkServer.class.cast(o);
return port == that.port && host.equals(that.host);
}
@Override
public int hashCode() {
int result = host.hashCode();
result = 31 * result + port;
return result;
}
@Override
public String toString() {
return "SplunkServer{" +
"host='" + host + '\'' +
", port=" + port +
", scheme='" + scheme + '\'' +
", app='" + app + '\'' +
'}';
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -10,39 +10,109 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.integration.splunk.support;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.splunk.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.integration.splunk.core.ServiceFactory;
import com.splunk.Service;
import org.springframework.util.Assert;
/**
* A {@link FactoryBean} for creating a {@link Service}
*
* @author David Turanski
* @author Olivier Lamy
*/
public class SplunkServiceFactory implements ServiceFactory {
private final SplunkServer splunkServer;
private Service service;
private static final Log LOGGER = LogFactory.getLog(SplunkServiceFactory.class);
private final List<SplunkServer> splunkServers;
private final Map<SplunkServer, Service> servicePerServer = new ConcurrentHashMap<SplunkServer, Service>();
public SplunkServiceFactory(SplunkServer splunkServer) {
this.splunkServer = splunkServer;
Assert.notNull(splunkServer);
this.splunkServers = Arrays.asList(splunkServer);
}
/**
* @param splunkServers the {@code List<SplunkServer>} to build this {@code SplunkServiceFactory}
* @since 1.1
*/
public SplunkServiceFactory(List<SplunkServer> splunkServers) {
Assert.notEmpty(splunkServers);
this.splunkServers = new ArrayList<SplunkServer>(splunkServers);
}
@Override
public synchronized Service getService() {
if (service != null) {
return service;
return getServiceInternal();
}
private Service getServiceInternal() {
for (SplunkServer splunkServer : splunkServers) {
Service service = servicePerServer.get(splunkServer);
// service already exist and no test on borrow it so simply use it
if (service != null) {
if (!splunkServer.isCheckServiceOnBorrow() || pingService(service)) {
return service;
}
else {
// fail so try next server
continue;
}
}
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<Service> callable = buildServiceCallable(splunkServer);
Future<Service> future = executor.submit(callable);
try {
if (splunkServer.getTimeout() > 0) {
service = future.get(splunkServer.getTimeout(), TimeUnit.MILLISECONDS);
}
else {
service = future.get();
}
servicePerServer.put(splunkServer, service);
return service;
}
catch (Exception e) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(String.format("could not connect to Splunk Server @ %s:%d - %s, try next one",
splunkServer.getHost(), splunkServer.getPort(), e.getMessage()));
}
}
}
String message = String.format("could not connect to any of Splunk Servers %s", this.splunkServers);
LOGGER.error(message);
throw new RuntimeException(message);
}
private Callable<Service> buildServiceCallable(SplunkServer splunkServer) {
final Map<String, Object> args = new HashMap<String, Object>();
if (splunkServer.getHost() != null) {
args.put("host", splunkServer.getHost());
@@ -63,24 +133,22 @@ public class SplunkServiceFactory implements ServiceFactory {
args.put("username", splunkServer.getUsername());
args.put("password", splunkServer.getPassword());
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Service> future = executor.submit(new Callable<Service>(){
public Service call() throws Exception {
return new Callable<Service>() {
public Service call()
throws Exception {
return Service.connect(args);
}
});
try {
if (splunkServer.getTimeout() > 0) {
service = future.get(splunkServer.getTimeout(),TimeUnit.MILLISECONDS);
} else {
service = future.get();
}
} catch (Exception e) {
throw new RuntimeException(String.format("could not connect to Splunk Server @ %s:%d - %s",
splunkServer.getHost(),splunkServer.getPort(),e.getMessage()));
}
return service;
};
}
private boolean pingService(Service service) {
try {
service.getInfo();
return true;
}
catch (Exception e) {
return false;
}
}
}

View File

@@ -33,7 +33,8 @@ public class SplunkSubmitWriter extends AbstractSplunkDataWriter {
private String index;
/**
* @param connectionFactory
* @param serviceFactory
* @param args
*/
public SplunkSubmitWriter(ServiceFactory serviceFactory, Args args) {
super(serviceFactory, args);

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2002-2013 the original author or authors.
*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
@@ -15,43 +15,37 @@ package org.springframework.integration.splunk.support;
import java.io.IOException;
import java.net.Socket;
import org.springframework.integration.splunk.core.ServiceFactory;
import org.springframework.util.Assert;
import com.splunk.Args;
import com.splunk.Input;
import com.splunk.Service;
import org.springframework.integration.splunk.core.ServiceFactory;
import org.springframework.util.Assert;
/**
*
* A {@link SplunkStreamWriter} that creates a socket on a given port
*
* A {@code org.springframework.integration.splunk.core.DataWriter}
* that creates a socket on a given port
*
* @author David Turanski
*
*/
public class SplunkTcpWriter extends AbstractSplunkDataWriter {
private int port;
/**
* @param connectionFactory
* @param args
*/
public SplunkTcpWriter(ServiceFactory serviceFactory, Args args) {
super(serviceFactory, args);
}
/* (non-Javadoc)
* @see org.springframework.integration.splunk.support.SplunkDataWriter#createSocket(com.splunk.Service)
*/
@Override
protected Socket createSocket(Service service) throws IOException {
Input input = service.getInputs().get(String.valueOf(port));
Assert.notNull(input, "no input defined for port " + port);
Assert.isTrue(!input.isDisabled(),String.format("input on port %d is disabled",port));
Socket socket = service.open(port);
return socket;
return service.open(port);
}
/**
* @param port the port to set
*/
@@ -59,5 +53,5 @@ public class SplunkTcpWriter extends AbstractSplunkDataWriter {
this.port = port;
}
}

View File

@@ -0,0 +1,404 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema xmlns="http://www.springframework.org/schema/integration/splunk"
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:tool="http://www.springframework.org/schema/tool"
xmlns:integration="http://www.springframework.org/schema/integration"
targetNamespace="http://www.springframework.org/schema/integration/splunk"
elementFormDefault="qualified" attributeFormDefault="unqualified">
<xsd:import namespace="http://www.springframework.org/schema/beans" />
<xsd:import namespace="http://www.springframework.org/schema/tool" />
<xsd:import namespace="http://www.springframework.org/schema/integration"
schemaLocation="http://www.springframework.org/schema/integration/spring-integration.xsd" />
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines the configuration elements for the Spring Integration
Splunk Adapter.
]]></xsd:documentation>
</xsd:annotation>
<xsd:element name="server">
<xsd:annotation>
<xsd:documentation><![CDATA[
Defines a Splunk server information.
]]></xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:attribute name="host" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the Splunk server name or IP address
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.String" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="port" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the Splunk server port
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.Integer" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scheme" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the Splunk server scheme
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.String" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="app" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the Splunk server application name
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.String" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="owner" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the Splunk server owner name
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.String" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="username" use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the userName to login Splunk server
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.String" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="password" use="required">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the password to login Splunk server
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.String" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="scope" type="xsd:string" use="optional" />
<xsd:attribute name="id" type="xsd:string" use="required" />
<xsd:attribute name="timeout" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Indicates the connection timeout in ms.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.Integer" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="checkServiceOnBorrow" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
To test or not the connection when reusing it.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="direct">
<tool:expected-type type="java.lang.Boolean" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:element name="inbound-channel-adapter">
<xsd:annotation>
<xsd:documentation>
The definition for the Spring Integration Splunk
Inbound Channel Adapter.
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="integration:poller" minOccurs="0"
maxOccurs="1" />
</xsd:sequence>
<xsd:attributeGroup ref="coreSplunkComponentAttributes" />
<xsd:attribute name="channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.core.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="send-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Allows you to specify how long this inbound-channel-adapter
will wait for the message (containing the retrieved entities)
to be sent successfully to the message channel, before throwing
an exception.
Keep in mind that when sending to a DirectChannel, the
invocation will occur in the sender's thread so the failing
of the send operation may be caused by other components
further downstream. By default the Inbound Channel Adapter
will wait indefinitely. The value is specified in milliseconds.
]]>
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="mode" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Search mode: normal, blocking, realtime, export, saved
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="count" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
The maximum number of event record to be return
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="field-list" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
A comma-separated list of the fields to return
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="search" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Search String following Splunk syntax.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="earliest-time" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Time modifier for the start of the time window.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="latest-time" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Time modifier for the end of the time window.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="init-earliest-time" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Time modifier for the start of the time window for the first search.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="saved-search" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Saved search.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="owner" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Owner of the saved search.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="app" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
App of the saved search.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="indexWriterType">
<xsd:attribute name="index" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Index to write to.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="tcpWriterType">
<xsd:attribute name="port" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
The port corresponding to a tcp Input
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:element name="outbound-channel-adapter">
<xsd:annotation>
<xsd:documentation>
Defines an outbound Channel Adapter.
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element ref="integration:poller" minOccurs="0"
maxOccurs="1" />
<xsd:choice>
<xsd:element name="index-writer" type="indexWriterType">
<xsd:annotation>
<xsd:documentation>
Defines a Data Writer for streaming data to an index, or the default index if not specified.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="tcp-writer" type="tcpWriterType">
<xsd:annotation>
<xsd:documentation>
Defines a Data Writer for streaming data to a tcp input port.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="submit-writer" type="indexWriterType">
<xsd:annotation>
<xsd:documentation>
Defines a Data Writer to submit data, using the REST interface, to an index, or the default index if not specified.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:choice>
</xsd:sequence>
<xsd:attributeGroup ref="coreSplunkComponentAttributes" />
<xsd:attribute name="channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<xsd:documentation>
Channel from which messages will be output.
When a message is sent to this channel it will
cause the query
to
be executed.
</xsd:documentation>
<tool:annotation kind="ref">
<tool:expected-type
type="org.springframework.integration.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="order">
<xsd:annotation>
<xsd:documentation>
Specifies the order for invocation when this
endpoint is connected as a
subscriber to a SubscribableChannel.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="source" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Splunk event source
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="source-type" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Splunk event source type
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="host" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Host where the event occurred
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="host-regex" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Host regex can be provided so Splunk can dynamically extract the host value from the log event
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
<xsd:attributeGroup name="coreSplunkComponentAttributes">
<xsd:attribute name="id" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
Identifies the underlying Spring bean definition,
which is an
instance of either 'EventDrivenConsumer' or
'PollingConsumer',
depending on whether the component's input
channel is a
'SubscribableChannel' or 'PollableChannel'.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="auto-startup" default="true" use="optional">
<xsd:annotation>
<xsd:documentation>
Flag to indicate that the component should start
automatically
on startup (default true).
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string" />
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="splunk-server-ref" use="required"
type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Splunk Server Bean Name
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:attributeGroup>
</xsd:schema>

View File

@@ -15,10 +15,10 @@
*/
package org.springframework.integration.splunk.config.xml;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;

View File

@@ -15,11 +15,12 @@
*/
package org.springframework.integration.splunk.config.xml;
import static org.junit.Assert.assertTrue;
import junit.framework.Assert;
import static org.junit.Assert.*;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.splunk.support.AbstractSplunkDataWriter;

View File

@@ -15,13 +15,12 @@
*/
package org.springframework.integration.splunk.config.xml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import junit.framework.Assert;
import static org.junit.Assert.*;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.splunk.support.AbstractSplunkDataWriter;
@@ -52,11 +51,11 @@ public class SplunkOutboundChannelAdapterParserTests {
AbstractSplunkDataWriter writer = appContext.getBean("splunkOutboundChannelAdapter.splunkExecutor.writer",
AbstractSplunkDataWriter.class);
assertNotNull(writer);
assertTrue(writer instanceof SplunkSubmitWriter);
assertEquals(false,writer.isAutoStartup());
assertEquals(false,writer.isRunning());
String sourceType = "spring-integration";
assertEquals(sourceType, writer.getArgs().get("sourcetype"));

View File

@@ -15,10 +15,10 @@
*/
package org.springframework.integration.splunk.config.xml;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.splunk.support.SplunkServer;

View File

@@ -11,6 +11,7 @@
* specific language governing permissions and limitations under the License.
*/
package org.springframework.integration.splunk.event;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
@@ -64,4 +65,4 @@ public class SplunkEventTests {
assertEquals(eventData.get(key),event2Data.get(key));
}
}
}
}

View File

@@ -15,16 +15,15 @@
*/
package org.springframework.integration.splunk.inbound;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.splunk.event.SplunkEvent;
import org.springframework.integration.splunk.support.SplunkExecutor;

View File

@@ -15,12 +15,12 @@
*/
package org.springframework.integration.splunk.outbound;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import junit.framework.Assert;
import static org.mockito.Mockito.*;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.splunk.support.SplunkExecutor;
import org.springframework.messaging.Message;

View File

@@ -15,24 +15,24 @@
*/
package org.springframework.integration.splunk.support;
import static org.mockito.Mockito.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.*;
import org.springframework.core.io.ClassPathResource;
import org.springframework.integration.splunk.core.ServiceFactory;
import org.springframework.integration.splunk.event.SplunkEvent;
import com.splunk.Job;
import com.splunk.JobCollection;
import com.splunk.Service;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.integration.splunk.core.ServiceFactory;
import org.springframework.integration.splunk.event.SplunkEvent;
/**
* @author Jarred Li

View File

@@ -15,17 +15,15 @@
*/
package org.springframework.integration.splunk.support;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.splunk.core.DataReader;
import org.springframework.integration.splunk.core.DataWriter;
import org.springframework.integration.splunk.event.SplunkEvent;