Fixed the issue with round robin of stream / integration flows; fixes gh-787

This commit is contained in:
Marcin Grzejszczak
2018-11-08 15:53:19 +01:00
parent 8b869e0c7a
commit e01af34151
12 changed files with 493 additions and 177 deletions

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.contract.stubrunner.messaging.integration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -37,6 +38,9 @@ import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.support.Consumer;
import org.springframework.integration.transformer.MessageTransformingHandler;
import org.springframework.messaging.Message;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
/**
* Spring Integration configuration that iterates over the downloaded Groovy DSLs and
@@ -46,62 +50,76 @@ import org.springframework.messaging.Message;
*/
@Configuration
@ConditionalOnClass(IntegrationFlowBuilder.class)
@ConditionalOnProperty(name="stubrunner.integration.enabled", havingValue="true", matchIfMissing=true)
@ConditionalOnProperty(name = "stubrunner.integration.enabled", havingValue = "true", matchIfMissing = true)
public class StubRunnerIntegrationConfiguration {
@Bean
@ConditionalOnMissingBean(name="stubFlowRegistrar")
@ConditionalOnMissingBean(name = "stubFlowRegistrar")
public FlowRegistrar stubFlowRegistrar(AutowireCapableBeanFactory beanFactory,
BatchStubRunner batchStubRunner) {
Map<StubConfiguration, Collection<Contract>> contracts = batchStubRunner
.getContracts();
for (Entry<StubConfiguration, Collection<Contract>> entry : contracts.entrySet()) {
String name = entry.getKey().getGroupId() + "_"
+ entry.getKey().getArtifactId();
for (Contract dsl : entry.getValue()) {
IntegrationFlowBuilder dummyBuilder = IntegrationFlows.from(DummyMessageHandler.CHANNEL_NAME)
.handle(new DummyMessageHandler(), "handle");
beanFactory.initializeBean(dummyBuilder.get(), DummyMessageHandler.CHANNEL_NAME + ".flow");
for (Entry<StubConfiguration, Collection<Contract>> entry : contracts
.entrySet()) {
StubConfiguration key = entry.getKey();
Collection<Contract> value = entry.getValue();
String name = key.getGroupId() + "_" + key.getArtifactId();
MultiValueMap<String, Contract> map = new LinkedMultiValueMap<>();
for (Contract dsl : value) {
if (dsl == null) {
continue;
}
if (dsl.getInput() != null && dsl.getInput().getMessageFrom() != null
&& dsl.getInput().getMessageFrom().getClientValue() != null) {
final String flowName = name + "_" + dsl.getLabel() + "_" + dsl.hashCode();
IntegrationFlowBuilder builder = IntegrationFlows
.from(dsl.getInput().getMessageFrom().getClientValue())
.filter(new StubRunnerIntegrationMessageSelector(dsl),
new Consumer<FilterEndpointSpec>() {
@Override
public void accept(FilterEndpointSpec e) {
e.id(flowName + ".filter");
}
})
.transform(new StubRunnerIntegrationTransformer(dsl),
new Consumer<GenericEndpointSpec<MessageTransformingHandler>>() {
@Override
public void accept(
GenericEndpointSpec<MessageTransformingHandler> e) {
e.id(flowName + ".transformer");
}
});
if (dsl.getOutputMessage() != null) {
builder = builder.channel(
dsl.getOutputMessage().getSentTo().getClientValue());
}
else {
builder = builder.handle(new DummyMessageHandler(), "handle");
}
beanFactory.initializeBean(builder.get(), flowName);
beanFactory.getBean(flowName + ".filter", Lifecycle.class).start();
beanFactory.getBean(flowName + ".transformer", Lifecycle.class)
.start();
}
&& StringUtils.hasText(
dsl.getInput().getMessageFrom().getClientValue())) {
String from = dsl.getInput().getMessageFrom().getClientValue();
map.add(from, dsl);
}
}
for (Entry<String, List<Contract>> entries : map.entrySet()) {
final String flowName = name + "_" + entries.getKey() + "_"
+ entries.getValue().hashCode();
IntegrationFlowBuilder builder = IntegrationFlows.from(entries.getKey())
.filter(new StubRunnerIntegrationMessageSelector(entries.getValue()),
new Consumer<FilterEndpointSpec>() {
@Override
public void accept(FilterEndpointSpec e) {
e.id(flowName + ".filter");
}
})
.transform(new StubRunnerIntegrationTransformer(entries.getValue()),
new Consumer<GenericEndpointSpec<MessageTransformingHandler>>() {
@Override
public void accept(
GenericEndpointSpec<MessageTransformingHandler> e) {
e.id(flowName + ".transformer");
}
})
.route(new StubRunnerIntegrationRouter(entries.getValue(), beanFactory));
beanFactory.initializeBean(builder.get(), flowName);
beanFactory.getBean(flowName + ".filter", Lifecycle.class).start();
beanFactory.getBean(flowName + ".transformer", Lifecycle.class)
.start();
}
}
return new FlowRegistrar();
}
private static class DummyMessageHandler {
@SuppressWarnings("unused")
static class DummyMessageHandler {
static String CHANNEL_NAME = "stub_runner_dummy_channel";
public void handle(Message<?> message) {
}
}
static class FlowRegistrar {
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2013-2017 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,48 +16,100 @@
package org.springframework.cloud.contract.stubrunner.messaging.integration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.regex.Pattern;
import org.springframework.cloud.contract.spec.Contract;
import org.springframework.cloud.contract.spec.internal.BodyMatcher;
import org.springframework.cloud.contract.spec.internal.BodyMatchers;
import org.springframework.cloud.contract.spec.internal.Header;
import org.springframework.cloud.contract.verifier.util.MapConverter;
import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierObjectMapper;
import org.springframework.cloud.contract.verifier.util.JsonPaths;
import org.springframework.cloud.contract.verifier.util.JsonToJsonPathsConverter;
import org.springframework.cloud.contract.verifier.util.MethodBufferingJsonVerifiable;
import org.springframework.integration.core.MessageSelector;
import org.springframework.messaging.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.toomuchcoding.jsonassert.JsonAssertion;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.contract.spec.Contract;
import org.springframework.cloud.contract.spec.internal.BodyMatcher;
import org.springframework.cloud.contract.spec.internal.BodyMatchers;
import org.springframework.cloud.contract.spec.internal.Header;
import org.springframework.cloud.contract.verifier.messaging.internal.ContractVerifierObjectMapper;
import org.springframework.cloud.contract.verifier.util.JsonPaths;
import org.springframework.cloud.contract.verifier.util.JsonToJsonPathsConverter;
import org.springframework.cloud.contract.verifier.util.MapConverter;
import org.springframework.cloud.contract.verifier.util.MethodBufferingJsonVerifiable;
import org.springframework.integration.core.MessageSelector;
import org.springframework.messaging.Message;
/**
* Passes through a message that matches the one defined in the DSL
*
* @author Marcin Grzejszczak
* @author Tim Ysewyn
*/
class StubRunnerIntegrationMessageSelector implements MessageSelector {
private final Contract groovyDsl;
private static final Map<Message, Contract> CACHE =
Collections.synchronizedMap(new WeakHashMap<Message, Contract>());
private static final Log log = LogFactory.getLog(StubRunnerIntegrationMessageSelector.class);
private final List<Contract> groovyDsls;
private final ContractVerifierObjectMapper objectMapper = new ContractVerifierObjectMapper();
StubRunnerIntegrationMessageSelector(Contract groovyDsl) {
this.groovyDsl = groovyDsl;
this(Collections.singletonList(groovyDsl));
}
StubRunnerIntegrationMessageSelector(List<Contract> groovyDsls) {
this.groovyDsls = groovyDsls;
}
@Override
public boolean accept(Message<?> message) {
if (!headersMatch(message)) {
return false;
return matchingContract(message) != null;
}
Contract matchingContract(Message<?> message) {
if (CACHE.containsKey(message)) {
return CACHE.get(message);
}
Contract contract = getContract(message);
if (contract != null) {
CACHE.put(message, contract);
}
return contract;
}
void updateCache(Message<?> message, Contract contract) {
CACHE.put(message, contract);
}
private Contract getContract(Message<?> message) {
for (Contract groovyDsl : this.groovyDsls) {
Contract contract = matchContract(message, groovyDsl);
if (contract != null) {
return contract;
}
}
return null;
}
private Contract matchContract(Message<?> message, Contract groovyDsl) {
List<String> unmatchedHeaders = headersMatch(message, groovyDsl);
if (!unmatchedHeaders.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Contract [" + groovyDsl
+ "] hasn't matched the following headers " + unmatchedHeaders);
}
return null;
}
Object inputMessage = message.getPayload();
BodyMatchers matchers = this.groovyDsl.getInput().getMatchers();
Object dslBody = MapConverter.getStubSideValues(this.groovyDsl.getInput().getMessageBody());
BodyMatchers matchers = groovyDsl.getInput().getMatchers();
Object dslBody = MapConverter
.getStubSideValues(groovyDsl.getInput().getMessageBody());
Object matchingInputMessage = JsonToJsonPathsConverter
.removeMatchingJsonPaths(dslBody, matchers);
JsonPaths jsonPaths = JsonToJsonPathsConverter
@@ -65,46 +117,78 @@ class StubRunnerIntegrationMessageSelector implements MessageSelector {
matchingInputMessage);
DocumentContext parsedJson;
try {
parsedJson = JsonPath.parse(this.objectMapper.writeValueAsString(inputMessage));
parsedJson = JsonPath
.parse(this.objectMapper.writeValueAsString(inputMessage));
}
catch (JsonProcessingException e) {
throw new IllegalStateException("Cannot serialize to JSON", e);
}
List<String> unmatchedJsonPath = new ArrayList<>();
boolean matches = true;
for (MethodBufferingJsonVerifiable path : jsonPaths) {
matches &= matchesJsonPath(parsedJson, path.jsonPath());
matches &= matchesJsonPath(unmatchedJsonPath, parsedJson, path.jsonPath());
}
if (matchers != null && matchers.hasMatchers()) {
for (BodyMatcher matcher : matchers.jsonPathMatchers()) {
String jsonPath = JsonToJsonPathsConverter.convertJsonPathAndRegexToAJsonPath(matcher, dslBody);
matches &= matchesJsonPath(parsedJson, jsonPath);
String jsonPath = JsonToJsonPathsConverter
.convertJsonPathAndRegexToAJsonPath(matcher, dslBody);
matches &= matchesJsonPath(unmatchedJsonPath, parsedJson, jsonPath);
}
}
return matches;
if (!unmatchedJsonPath.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Contract [" + groovyDsl + "] didn't much the body due to "
+ unmatchedJsonPath);
}
}
if (matches) {
return groovyDsl;
}
return null;
}
private boolean matchesJsonPath(DocumentContext parsedJson, String jsonPath) {
private boolean matchesJsonPath(List<String> unmatchedJsonPath,
DocumentContext parsedJson, String jsonPath) {
try {
JsonAssertion.assertThat(parsedJson)
.matchesJsonPath(jsonPath);
JsonAssertion.assertThat(parsedJson).matchesJsonPath(jsonPath);
return true;
}
catch (Exception e) {
unmatchedJsonPath.add(e.getLocalizedMessage());
return false;
}
}
private boolean headersMatch(Message<?> message) {
private List<String> headersMatch(Message<?> message, Contract groovyDsl) {
List<String> unmatchedHeaders = new ArrayList<>();
Map<String, Object> headers = message.getHeaders();
boolean matches = true;
for (Header it : this.groovyDsl.getInput().getMessageHeaders().getEntries()) {
for (Header it : groovyDsl.getInput().getMessageHeaders().getEntries()) {
String name = it.getName();
Object value = it.getClientValue();
Object valueInHeader = headers.get(name);
matches &= value instanceof Pattern ?
((Pattern) value).matcher(valueInHeader.toString()).matches() :
valueInHeader!=null && valueInHeader.equals(value);
boolean matches;
if (value instanceof Pattern) {
Pattern pattern = (Pattern) value;
matches = pattern.matcher(valueInHeader.toString()).matches();
}
else {
matches = valueInHeader != null
&& valueInHeader.toString().equals(value.toString());
}
if (!matches) {
unmatchedHeaders.add("Header with name [" + name + "] was supposed to "
+ unmatchedText(value) + " but the value is ["
+ (valueInHeader != null ? valueInHeader.toString() : "null")
+ "]");
}
}
return matches;
return unmatchedHeaders;
}
private String unmatchedText(Object expectedValue) {
return expectedValue instanceof Pattern
? "match pattern [" + ((Pattern) expectedValue).pattern() + "]"
: "be equal to [" + expectedValue + "]";
}
}

View File

@@ -0,0 +1,39 @@
package org.springframework.cloud.contract.stubrunner.messaging.integration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.contract.spec.Contract;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
/**
* @author Marcin Grzejszczak
*/
class StubRunnerIntegrationRouter extends AbstractMessageRouter {
private final StubRunnerIntegrationMessageSelector selector;
private final BeanFactory beanFactory;
StubRunnerIntegrationRouter(List<Contract> groovyDsls, BeanFactory beanFactory) {
this.selector = new StubRunnerIntegrationMessageSelector(groovyDsls);
this.beanFactory = beanFactory;
}
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
Contract dsl = this.selector.matchingContract(message);
if (dsl != null && dsl.getOutputMessage() != null
&& dsl.getOutputMessage().getSentTo() != null) {
String channelName = dsl.getOutputMessage().getSentTo().getClientValue();
return Collections
.singleton((MessageChannel) this.beanFactory.getBean(channelName));
}
return Collections
.singleton((MessageChannel)
this.beanFactory.getBean(StubRunnerIntegrationConfiguration.DummyMessageHandler.CHANNEL_NAME));
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.contract.stubrunner.messaging.integration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.springframework.cloud.contract.spec.Contract;
@@ -30,21 +32,38 @@ import org.springframework.messaging.support.MessageBuilder;
*
* @author Marcin Grzejszczak
*/
class StubRunnerIntegrationTransformer implements GenericTransformer<Message<?>, Message<?>> {
class StubRunnerIntegrationTransformer
implements GenericTransformer<Message<?>, Message<?>> {
private final Contract groovyDsl;
private final StubRunnerIntegrationMessageSelector selector;
StubRunnerIntegrationTransformer(Contract groovyDsl) {
this.groovyDsl = groovyDsl;
this(Collections.singletonList(groovyDsl));
}
StubRunnerIntegrationTransformer(List<Contract> groovyDsls) {
this.selector = new StubRunnerIntegrationMessageSelector(groovyDsls);
}
@Override
public Message<?> transform(Message<?> source) {
if (this.groovyDsl.getOutputMessage()==null) {
Contract groovyDsl = matchingContract(source);
if (groovyDsl == null || groovyDsl.getOutputMessage() == null) {
return source;
}
String payload = BodyExtractor.extractStubValueFrom(this.groovyDsl.getOutputMessage().getBody());
Map<String, Object> headers = this.groovyDsl.getOutputMessage().getHeaders().asStubSideMap();
return MessageBuilder.createMessage(payload, new MessageHeaders(headers));
String payload = BodyExtractor
.extractStubValueFrom(groovyDsl.getOutputMessage().getBody());
Map<String, Object> headers = groovyDsl.getOutputMessage().getHeaders()
.asStubSideMap();
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<String> message = MessageBuilder.createMessage(payload,
messageHeaders);
this.selector.updateCache(message, groovyDsl);
return message;
}
Contract matchingContract(Message<?> source) {
return this.selector.matchingContract(source);
}
}

View File

@@ -0,0 +1,40 @@
package org.springframework.cloud.contract.stubrunner.messaging.stream;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.contract.spec.Contract;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
/**
* @author Marcin Grzejszczak
*/
class StubRunnerMessageRouter extends AbstractMessageRouter {
private final StubRunnerStreamMessageSelector selector;
private final BeanFactory beanFactory;
StubRunnerMessageRouter(List<Contract> groovyDsls, BeanFactory beanFactory) {
this.selector = new StubRunnerStreamMessageSelector(groovyDsls);
this.beanFactory = beanFactory;
}
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
Contract dsl = this.selector.matchingContract(message);
if (dsl != null && dsl.getOutputMessage() != null
&& dsl.getOutputMessage().getSentTo() != null) {
String channelName = StubRunnerStreamConfiguration.resolvedDestination(this.beanFactory,
dsl.getOutputMessage().getSentTo().getClientValue());
return Collections
.singleton((MessageChannel) this.beanFactory.getBean(channelName));
}
return Collections
.singleton((MessageChannel)
this.beanFactory.getBean(StubRunnerStreamConfiguration.DummyMessageHandler.CHANNEL_NAME));
}
}

View File

@@ -17,11 +17,14 @@
package org.springframework.cloud.contract.stubrunner.messaging.stream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -46,6 +49,8 @@ import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.support.Consumer;
import org.springframework.integration.transformer.MessageTransformingHandler;
import org.springframework.messaging.Message;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
/**
@@ -55,107 +60,114 @@ import org.springframework.util.StringUtils;
* @author Marcin Grzejszczak
*/
@Configuration
@ConditionalOnClass({IntegrationFlows.class, EnableBinding.class})
@ConditionalOnProperty(name="stubrunner.stream.enabled", havingValue="true", matchIfMissing=true)
@ConditionalOnClass({ IntegrationFlows.class, EnableBinding.class })
@ConditionalOnProperty(name = "stubrunner.stream.enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureBefore(StubRunnerIntegrationConfiguration.class)
public class StubRunnerStreamConfiguration {
private static final Logger log = LoggerFactory
.getLogger(StubRunnerStreamConfiguration.class);
private static final Log log = LogFactory.getLog(StubRunnerStreamConfiguration.class);
@Bean
@ConditionalOnMissingBean(name="stubFlowRegistrar")
@ConditionalOnMissingBean(name = "stubFlowRegistrar")
@ConditionalOnBean(BindingServiceProperties.class)
public FlowRegistrar stubFlowRegistrar(AutowireCapableBeanFactory beanFactory,
BatchStubRunner batchStubRunner) {
Map<StubConfiguration, Collection<Contract>> contracts = batchStubRunner
.getContracts();
IntegrationFlowBuilder dummyBuilder = IntegrationFlows.from(DummyMessageHandler.CHANNEL_NAME)
.handle(new DummyMessageHandler(), "handle");
beanFactory.initializeBean(dummyBuilder.get(), DummyMessageHandler.CHANNEL_NAME + ".flow");
for (Entry<StubConfiguration, Collection<Contract>> entry : contracts
.entrySet()) {
StubConfiguration key = entry.getKey();
Collection<Contract> value = entry.getValue();
String name = key.getGroupId() + "_" + key.getArtifactId();
MultiValueMap<String, Contract> map = new LinkedMultiValueMap<>();
for (Contract dsl : value) {
if (dsl == null) {
continue;
}
if (dsl.getInput() != null
&& dsl.getInput().getMessageFrom() != null
&& StringUtils.hasText(
dsl.getInput().getMessageFrom().getClientValue())) {
final String flowName = name + "_" + dsl.getLabel() + "_"
+ dsl.hashCode();
if (dsl.getInput() != null && dsl.getInput().getMessageFrom() != null
&& StringUtils.hasText(
dsl.getInput().getMessageFrom().getClientValue())) {
String from = resolvedDestination(beanFactory,
dsl.getInput().getMessageFrom().getClientValue());
IntegrationFlowBuilder builder = IntegrationFlows.from(from)
.filter(new StubRunnerStreamMessageSelector(dsl),
new Consumer<FilterEndpointSpec>() {
@Override
public void accept(FilterEndpointSpec e) {
e.id(flowName + ".filter");
}
})
.transform(new StubRunnerStreamTransformer(dsl),
new Consumer<GenericEndpointSpec<MessageTransformingHandler>>() {
@Override
public void accept(
GenericEndpointSpec<MessageTransformingHandler> e) {
e.id(flowName + ".transformer");
}
});
if (dsl.getOutputMessage() != null
&& dsl.getOutputMessage().getSentTo() != null) {
builder = builder.channel(resolvedDestination(beanFactory,
dsl.getOutputMessage().getSentTo().getClientValue()));
}
else {
builder = builder.handle(new DummyMessageHandler(), "handle");
}
beanFactory.initializeBean(builder.get(), flowName);
beanFactory.getBean(flowName + ".filter", Lifecycle.class)
.start();
beanFactory.getBean(flowName + ".transformer", Lifecycle.class)
.start();
} else if (dsl.getOutputMessage() != null
map.add(from, dsl);
}
else if (dsl.getOutputMessage() != null
&& dsl.getOutputMessage().getSentTo() != null
&& StringUtils.hasText(
dsl.getOutputMessage().getSentTo().getClientValue())) {
BinderAwareChannelResolver resolver = beanFactory.getBean(BinderAwareChannelResolver.class);
resolver.resolveDestination(dsl.getOutputMessage().getSentTo().getClientValue());
dsl.getOutputMessage().getSentTo().getClientValue())) {
BinderAwareChannelResolver resolver = beanFactory
.getBean(BinderAwareChannelResolver.class);
resolver.resolveDestination(
dsl.getOutputMessage().getSentTo().getClientValue());
}
}
for (Entry<String, List<Contract>> entries : map.entrySet()) {
final String flowName = name + "_" + entries.getKey() + "_"
+ entries.getValue().hashCode();
IntegrationFlowBuilder builder = IntegrationFlows.from(entries.getKey())
.filter(new StubRunnerStreamMessageSelector(entries.getValue()),
new Consumer<FilterEndpointSpec>() {
@Override
public void accept(FilterEndpointSpec e) {
e.id(flowName + ".filter");
}
})
.transform(new StubRunnerStreamTransformer(entries.getValue()),
new Consumer<GenericEndpointSpec<MessageTransformingHandler>>() {
@Override
public void accept(
GenericEndpointSpec<MessageTransformingHandler> e) {
e.id(flowName + ".transformer");
}
})
.route(new StubRunnerMessageRouter(entries.getValue(), beanFactory));
beanFactory.initializeBean(builder.get(), flowName);
beanFactory.getBean(flowName + ".filter", Lifecycle.class).start();
beanFactory.getBean(flowName + ".transformer", Lifecycle.class)
.start();
}
}
return new FlowRegistrar();
}
private String resolvedDestination(AutowireCapableBeanFactory context,
static String resolvedDestination(BeanFactory context,
String destination) {
Map<String, BindingProperties> bindings = bindingProperties(context);
for (Map.Entry<String, BindingProperties> entry : bindings.entrySet()) {
if (destination.equals(entry.getValue().getDestination())) {
if (log.isDebugEnabled()) {
log.debug("Found a channel named [{}] with destination [{}]",
entry.getKey(), destination);
log.debug("Found a channel named [" + entry.getKey()
+ "] with destination [" + destination + "]");
}
return entry.getKey();
}
}
if (log.isDebugEnabled()) {
log.debug(
"No destination named [{}] was found. Assuming that the destination equals the channel name",
destination);
log.debug("No destination named [" + destination
+ "] was found. Assuming that the destination equals the channel name");
}
return destination;
}
private Map<String, BindingProperties> bindingProperties(AutowireCapableBeanFactory context) {
private static Map<String, BindingProperties> bindingProperties(BeanFactory context) {
return context.getBean(BindingServiceProperties.class).getBindings();
}
private static class DummyMessageHandler {
public void handle(Message<?> message) {}
static class DummyMessageHandler {
static String CHANNEL_NAME = "stub_runner_dummy_channel";
public void handle(Message<?> message) {
}
}
static class FlowRegistrar {
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2013-2017 the original author or authors.
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,12 +17,19 @@
package org.springframework.cloud.contract.stubrunner.messaging.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.regex.Pattern;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.toomuchcoding.jsonassert.JsonAssertion;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.contract.spec.Contract;
import org.springframework.cloud.contract.spec.internal.BodyMatcher;
import org.springframework.cloud.contract.spec.internal.BodyMatchers;
@@ -35,39 +42,75 @@ import org.springframework.cloud.contract.verifier.util.MethodBufferingJsonVerif
import org.springframework.integration.core.MessageSelector;
import org.springframework.messaging.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.toomuchcoding.jsonassert.JsonAssertion;
/**
* Passes through a message that matches the one defined in the DSL
*
* @author Marcin Grzejszczak
* @author Tim Ysewyn
*/
class StubRunnerStreamMessageSelector implements MessageSelector {
private static final Log log = LogFactory.getLog(StubRunnerStreamMessageSelector.class);
private static final Map<Message, Contract> CACHE =
Collections.synchronizedMap(new WeakHashMap<Message, Contract>());
private static final Log log = LogFactory
.getLog(StubRunnerStreamMessageSelector.class);
private final List<Contract> groovyDsls;
private final Contract groovyDsl;
private final ContractVerifierObjectMapper objectMapper = new ContractVerifierObjectMapper();
StubRunnerStreamMessageSelector(Contract groovyDsl) {
this.groovyDsl = groovyDsl;
this(Collections.singletonList(groovyDsl));
}
StubRunnerStreamMessageSelector(List<Contract> groovyDsls) {
this.groovyDsls = groovyDsls;
}
@Override
public boolean accept(Message<?> message) {
List<String> unmatchedHeaders = headersMatch(message);
return matchingContract(message) != null;
}
Contract matchingContract(Message<?> message) {
if (CACHE.containsKey(message)) {
return CACHE.get(message);
}
Contract contract = getContract(message);
if (contract != null) {
CACHE.put(message, contract);
}
return contract;
}
void updateCache(Message<?> message, Contract contract) {
CACHE.put(message, contract);
}
private Contract getContract(Message<?> message) {
for (Contract groovyDsl : this.groovyDsls) {
Contract contract = matchContract(message, groovyDsl);
if (contract != null) {
return contract;
}
}
return null;
}
private Contract matchContract(Message<?> message, Contract groovyDsl) {
List<String> unmatchedHeaders = headersMatch(message, groovyDsl);
if (!unmatchedHeaders.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Contract [" + this.groovyDsl + "] hasn't matched the following headers " + unmatchedHeaders);
log.debug("Contract [" + groovyDsl
+ "] hasn't matched the following headers " + unmatchedHeaders);
}
return false;
return null;
}
Object inputMessage = message.getPayload();
BodyMatchers matchers = this.groovyDsl.getInput().getMatchers();
Object dslBody = MapConverter.getStubSideValues(this.groovyDsl.getInput().getMessageBody());
BodyMatchers matchers = groovyDsl.getInput().getMatchers();
Object dslBody = MapConverter
.getStubSideValues(groovyDsl.getInput().getMessageBody());
Object matchingInputMessage = JsonToJsonPathsConverter
.removeMatchingJsonPaths(dslBody, matchers);
JsonPaths jsonPaths = JsonToJsonPathsConverter
@@ -75,7 +118,8 @@ class StubRunnerStreamMessageSelector implements MessageSelector {
matchingInputMessage);
DocumentContext parsedJson;
try {
parsedJson = JsonPath.parse(this.objectMapper.writeValueAsString(inputMessage));
parsedJson = JsonPath
.parse(this.objectMapper.writeValueAsString(inputMessage));
}
catch (JsonProcessingException e) {
throw new IllegalStateException("Cannot serialize to JSON", e);
@@ -87,32 +131,39 @@ class StubRunnerStreamMessageSelector implements MessageSelector {
}
if (matchers != null && matchers.hasMatchers()) {
for (BodyMatcher matcher : matchers.jsonPathMatchers()) {
String jsonPath = JsonToJsonPathsConverter.convertJsonPathAndRegexToAJsonPath(matcher, dslBody);
String jsonPath = JsonToJsonPathsConverter
.convertJsonPathAndRegexToAJsonPath(matcher, dslBody);
matches &= matchesJsonPath(unmatchedJsonPath, parsedJson, jsonPath);
}
}
if (!unmatchedJsonPath.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Contract [" + this.groovyDsl + "] didn't much the body due to " + unmatchedJsonPath);
log.debug("Contract [" + groovyDsl + "] didn't much the body due to "
+ unmatchedJsonPath);
}
}
return matches;
if (matches) {
return groovyDsl;
}
return null;
}
private boolean matchesJsonPath(List<String> unmatchedJsonPath, DocumentContext parsedJson, String jsonPath) {
private boolean matchesJsonPath(List<String> unmatchedJsonPath,
DocumentContext parsedJson, String jsonPath) {
try {
JsonAssertion.assertThat(parsedJson).matchesJsonPath(jsonPath);
return true;
} catch (Exception e) {
}
catch (Exception e) {
unmatchedJsonPath.add(e.getLocalizedMessage());
return false;
}
}
private List<String> headersMatch(Message<?> message) {
private List<String> headersMatch(Message<?> message, Contract groovyDsl) {
List<String> unmatchedHeaders = new ArrayList<>();
Map<String, Object> headers = message.getHeaders();
for (Header it : this.groovyDsl.getInput().getMessageHeaders().getEntries()) {
for (Header it : groovyDsl.getInput().getMessageHeaders().getEntries()) {
String name = it.getName();
Object value = it.getClientValue();
Object valueInHeader = headers.get(name);
@@ -120,20 +171,24 @@ class StubRunnerStreamMessageSelector implements MessageSelector {
if (value instanceof Pattern) {
Pattern pattern = (Pattern) value;
matches = pattern.matcher(valueInHeader.toString()).matches();
} else {
matches = valueInHeader != null && valueInHeader.toString().equals(value.toString());
}
else {
matches = valueInHeader != null
&& valueInHeader.toString().equals(value.toString());
}
if (!matches) {
unmatchedHeaders.add("Header with name [" + name + "] was supposed to " +
unmatchedText(value) + " but the value is [" + (valueInHeader != null ?
valueInHeader.toString() : "null") + "]");
unmatchedHeaders.add("Header with name [" + name + "] was supposed to "
+ unmatchedText(value) + " but the value is ["
+ (valueInHeader != null ? valueInHeader.toString() : "null")
+ "]");
}
}
return unmatchedHeaders;
}
private String unmatchedText(Object expectedValue) {
return expectedValue instanceof Pattern ? "match pattern [" + ((Pattern) expectedValue).pattern() + "]" :
"be equal to [" + expectedValue + "]";
return expectedValue instanceof Pattern
? "match pattern [" + ((Pattern) expectedValue).pattern() + "]"
: "be equal to [" + expectedValue + "]";
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.contract.stubrunner.messaging.stream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.springframework.cloud.contract.spec.Contract;
@@ -32,19 +34,35 @@ import org.springframework.messaging.support.MessageBuilder;
*/
class StubRunnerStreamTransformer implements GenericTransformer<Message<?>, Message<?>> {
private final Contract groovyDsl;
private final StubRunnerStreamMessageSelector selector;
StubRunnerStreamTransformer(Contract groovyDsl) {
this.groovyDsl = groovyDsl;
this(Collections.singletonList(groovyDsl));
}
StubRunnerStreamTransformer(List<Contract> groovyDsls) {
this.selector = new StubRunnerStreamMessageSelector(groovyDsls);
}
@Override
public Message<?> transform(Message<?> source) {
if (this.groovyDsl.getOutputMessage()==null) {
Contract groovyDsl = matchingContract(source);
if (groovyDsl == null || groovyDsl.getOutputMessage() == null) {
return source;
}
String payload = BodyExtractor.extractStubValueFrom(this.groovyDsl.getOutputMessage().getBody());
Map<String, Object> headers = this.groovyDsl.getOutputMessage().getHeaders().asStubSideMap();
return MessageBuilder.createMessage(payload, new MessageHeaders(headers));
String payload = BodyExtractor
.extractStubValueFrom(groovyDsl.getOutputMessage().getBody());
Map<String, Object> headers = groovyDsl.getOutputMessage().getHeaders()
.asStubSideMap();
MessageHeaders messageHeaders = new MessageHeaders(headers);
Message<String> message = MessageBuilder.createMessage(payload,
messageHeaders);
this.selector.updateCache(message, groovyDsl);
return message;
}
Contract matchingContract(Message<?> source) {
return this.selector.matchingContract(source);
}
}

View File

@@ -50,3 +50,4 @@ public interface MessageVerifier<M> {
*/
M receive(String destination);
}

View File

@@ -41,7 +41,7 @@ import spock.util.concurrent.PollingConditions
/**
* @author Marcin Grzejszczak
*/
//TODO: Speed up this test somehow
//TODO: Speed up this test somehow (move it out of Spring Cloud Contract core to samples)
@ContextConfiguration(classes = Config, loader = SpringBootContextLoader)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = ["stubrunner.cloud.eureka.enabled=true",

View File

@@ -24,7 +24,12 @@ class StubRunnerIntegrationTransformerSpec extends Specification {
def 'should not transform the message if there is no output message'() {
given:
StubRunnerIntegrationTransformer transformer = new StubRunnerIntegrationTransformer(noOutputMessageContract)
StubRunnerIntegrationTransformer transformer = new StubRunnerIntegrationTransformer(noOutputMessageContract) {
@Override
Contract matchingContract(Message<?> source) {
return noOutputMessageContract
}
}
when:
def result = transformer.transform(message)
then:
@@ -55,7 +60,12 @@ class StubRunnerIntegrationTransformerSpec extends Specification {
def 'should convert dsl into message'() {
given:
StubRunnerIntegrationTransformer transformer = new StubRunnerIntegrationTransformer(dsl)
StubRunnerIntegrationTransformer transformer = new StubRunnerIntegrationTransformer(dsl) {
@Override
Contract matchingContract(Message<?> source) {
return dsl
}
}
when:
def result = transformer.transform(message)
then:
@@ -86,7 +96,12 @@ class StubRunnerIntegrationTransformerSpec extends Specification {
def 'should convert dsl into message with regex in GString'() {
given:
StubRunnerIntegrationTransformer transformer = new StubRunnerIntegrationTransformer(dslWithRegexInGString)
StubRunnerIntegrationTransformer transformer = new StubRunnerIntegrationTransformer(dslWithRegexInGString) {
@Override
Contract matchingContract(Message<?> source) {
return dslWithRegexInGString
}
}
when:
def result = transformer.transform(message)
then:

View File

@@ -55,7 +55,12 @@ class StubRunnerStreamTransformerSpec extends Specification {
def 'should convert dsl into message'() {
given:
StubRunnerStreamTransformer streamTransformer = new StubRunnerStreamTransformer(dsl)
StubRunnerStreamTransformer streamTransformer = new StubRunnerStreamTransformer(dsl) {
@Override
Contract matchingContract(Message<?> source) {
return dsl
}
}
when:
def result = streamTransformer.transform(message)
then:
@@ -86,7 +91,12 @@ class StubRunnerStreamTransformerSpec extends Specification {
def 'should convert dsl into message with regex in GString'() {
given:
StubRunnerStreamTransformer streamTransformer = new StubRunnerStreamTransformer(dslWithRegexInGString)
StubRunnerStreamTransformer streamTransformer = new StubRunnerStreamTransformer(dslWithRegexInGString) {
@Override
Contract matchingContract(Message<?> source) {
return dslWithRegexInGString
}
}
when:
def result = streamTransformer.transform(message)
then:
@@ -122,7 +132,12 @@ class StubRunnerStreamTransformerSpec extends Specification {
)
}
}
StubRunnerStreamTransformer streamTransformer = new StubRunnerStreamTransformer(contract)
StubRunnerStreamTransformer streamTransformer = new StubRunnerStreamTransformer(contract) {
@Override
Contract matchingContract(Message<?> source) {
return contract
}
}
when:
def result = streamTransformer.transform(message)
then: