support for binding descriptors per channel name

This commit is contained in:
Marius Bogoevici
2015-07-22 17:06:22 -04:00
committed by Mark Fisher
parent 70e1ff006c
commit 37204febe2
14 changed files with 107 additions and 170 deletions

View File

@@ -42,7 +42,8 @@ The `application.yml` has the mapping from channel names to external broker hand
spring:
cloud:
stream:
outputChannelName: ${spring.application.name:ticker}
bindings:
output: ${spring.application.name:ticker}
----
`@EnableModule` is parameterized by an interface (in this case `Source`) which declares input and output channels. `Source`, `Sink` and `Processor` are provided off the shelf, but you can define others. Here's the definition of `Source`
@@ -80,7 +81,7 @@ NOTE: In this case there is only one `Source` in the application context so ther
== Multiple Input or Output Channels
A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `input.*` or `output.*` and the names are converted to external channel names on the broker. The external channel names are the `spring.cloud.stream.[input|output]ChannelName` plus the `MessageChannel` bean name, period separated. In addition, the bean name can be `input.[queue|topic|tap]:*` or `output.[queue|topic]:*` (i.e. with a channel type as a colon-separated prefix), and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "output.topic:foo" in a module with `outputChannelName=bar`, and the result is 2 external channels called "bar" and "topic:foo.bar".
A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `@Input` or `@Output` and the names are converted to external channel names on the broker. The external channel names can be specified as properties that consist of the channel names prefixed with `spring.cloud.stream.bindings` (e.g. `spring.cloud.stream.bindings.input` or `spring.cloud.stream.bindings.output`). External channel names can have a channel type as a colon-separated prefix, and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "foo" in a module with `spring.cloud.stream.bindings.output=bar` and `spring.cloud.stream.bindings.foo=topic:foo`, and the result is 2 external channels called "bar" and "topic:foo".
== Samples

View File

@@ -38,7 +38,8 @@ The `application.yml` has the mapping from channel names to external broker hand
spring:
cloud:
stream:
outputChannelName: ${spring.application.name:ticker}
bindings:
output: ${spring.application.name:ticker}
----
`@EnableModule` is parameterized by an interface (in this case `Source`) which declares input and output channels. `Source`, `Sink` and `Processor` are provided off the shelf, but you can define others. Here's the definition of `Source`
@@ -76,7 +77,7 @@ NOTE: In this case there is only one `Source` in the application context so ther
== Multiple Input or Output Channels
A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `input.*` or `output.*` and the names are converted to external channel names on the broker. The external channel names are the `spring.cloud.stream.[input|output]ChannelName` plus the `MessageChannel` bean name, period separated. In addition, the bean name can be `input.[queue|topic|tap]:*` or `output.[queue|topic]:*` (i.e. with a channel type as a colon-separated prefix), and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "output.topic:foo" in a module with `outputChannelName=bar`, and the result is 2 external channels called "bar" and "topic:foo.bar".
A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `@Input` or `@Output` and the names are converted to external channel names on the broker. The external channel names can be specified as properties that consist of the channel names prefixed with `spring.cloud.stream.bindings` (e.g. `spring.cloud.stream.bindings.input` or `spring.cloud.stream.bindings.output`). External channel names can have a channel type as a colon-separated prefix, and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "foo" in a module with `spring.cloud.stream.bindings.output=bar` and `spring.cloud.stream.bindings.foo=topic:foo`, and the result is 2 external channels called "bar" and "topic:foo".
== Samples
@@ -117,7 +118,7 @@ For an XD module the channel names are `<group>.<index>` and a source (output on
== Taps
All output channels can be also tapped so you can also attach a module to a pub-sub endpoint and listen to the tap if you know the module metadata. To tap an existing vanilla module you need to know its `outputChannelName` and the tap name is then `tap:<outputChannelName>`, so you can listen to it on an input channel named `input.topic.tap:<outputChannelName>`. The tap is only active if you explicitly ask for it: you can do that by POSTing to the HTTP endpoint `/taps/<channelName>` (where the channel name can be the internal or external name, e.g. "output" or the external name mapped to the output channel).
All output channels can be also tapped so you can also attach a module to a pub-sub endpoint and listen to the tap if you know the module metadata. To tap an existing vanilla module you need to know its `outputChannelName` and the tap name is then `tap:<outputChannelName>`, so you can listen to it on an input channel named `topic.tap:<outputChannelName>`. The tap is only active if you explicitly ask for it: you can do that by POSTing to the HTTP endpoint `/taps/<channelName>` (where the channel name can be the internal or external name, e.g. "output" or the external name mapped to the output channel).
To tap an existing output channel in an XD module you just need to know its group, name and index, e.g.

View File

@@ -49,7 +49,8 @@ The `application.yml` has the external channel names, e.g.
spring:
cloud:
stream:
outputChannelName: ${spring.application.name:ticker}
bindings:
output: ${spring.application.name:ticker}
```
## Richer Input and Output

View File

@@ -1,5 +1,6 @@
spring:
cloud:
stream:
inputChannelName: testtock
bindings:
input: testtock

View File

@@ -2,5 +2,6 @@ fixedDelay: 5000
spring:
cloud:
stream:
outputChannelName: testtock
bindings:
output: testtock

View File

@@ -3,9 +3,10 @@ server:
spring:
cloud:
stream:
inputChannelName: testtock
# uncomment below to have this module consume from a specific partition
# in the range of 0 to N-1, where N is the upstream module's partitionCount
#consumerProperties:
# partitionIndex: 1
bindings:
input: testtock
# uncomment below to have this module consume from a specific partition
# in the range of 0 to N-1, where N is the upstream module's partitionCount
#consumerProperties:
# partitionIndex: 1

View File

@@ -4,17 +4,19 @@ fixedDelay: 5000
spring:
cloud:
stream:
outputChannelName: testtock
# uncomment below to use the last digit of the seconds as a partition key
# hashcode(key) % N is then applied with N being the partitionCount value
# thus, even seconds should go to the 0 queue, odd seconds to the 1 queue
#producerProperties:
# partitionKeyExpression: payload.charAt(payload.length()-1)
# partitionCount: 2
bindings:
output: testtock
# uncomment below to use the last digit of the seconds as a partition key
# hashcode(key) % N is then applied with N being the partitionCount value
# thus, even seconds should go to the 0 queue, odd seconds to the 1 queue
#producerProperties:
# partitionKeyExpression: payload.charAt(payload.length()-1)
# partitionCount: 2
---
spring:
profiles: extended
cloud:
stream:
outputChannelName: xformed
bindings:
output: xformed

View File

@@ -3,6 +3,7 @@ server:
spring:
cloud:
stream:
outputChannelName: xformed
inputChannelName: testtock
bindings:
output: xformed
input: testtock

View File

@@ -69,6 +69,7 @@ public class ChannelBindingAdapter implements Lifecycle, ApplicationContextAware
private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
private Collection<OutputChannelBinding> outputChannels = Collections.emptySet();
private Collection<InputChannelBinding> inputChannels = Collections.emptySet();
private boolean running = false;

View File

@@ -32,60 +32,16 @@ public class DefaultChannelLocator implements ChannelLocator {
@Override
public String locate(String name) {
String channelName = extractChannelName("input", name, this.module.getInputChannelName());
if (channelName!=null) {
return channelName;
}
channelName = extractChannelName("output", name, this.module.getOutputChannelName());
if (channelName!=null) {
return channelName;
}
return null;
return module.getBindingPath(name);
}
@Override
public String tap(String name) {
return !isDefaultOuputChannel(name) ? this.module.getTapChannelName(getPlainChannelName(name))
: this.module.getTapChannelName();
return this.module.getTapChannelName(getPlainBinding(name));
}
private boolean isDefaultOuputChannel(String channelName) {
if (channelName.contains(":")) {
String[] tokens = channelName.split(":", 2);
channelName = tokens[1];
}
return channelName.equals(this.module.getOutputChannelName());
}
private String extractChannelName(String start, String name, String externalChannelName) {
if (name.equals(start)) {
return externalChannelName;
}
else if (name.startsWith(start + ".") || name.startsWith(start + "_")) {
String prefix = "";
String channelName = name.substring(start.length() + 1);
if (channelName.contains(":")) {
String[] tokens = channelName.split(":", 2);
String type = tokens[0];
if ("queue".equals(type)) {
// omit the type for a queue
if (StringUtils.hasText(tokens[1])) {
prefix = tokens[1] + ".";
}
}
else {
prefix = channelName + (channelName.endsWith(":") ? "" : ".");
}
}
else {
prefix = channelName + ".";
}
return prefix + getPlainChannelName(externalChannelName);
}
return null;
}
private String getPlainChannelName(String name) {
private String getPlainBinding(String name) {
// remove prefixes such as 'tap:','topic:','queue:'
if (name.contains(":")) {
name = name.substring(name.indexOf(":") + 1);
}

View File

@@ -230,10 +230,10 @@ public class AggregateBuilder implements ApplicationContextAware {
args.add("--spring.config.name=" + this.configName);
}
if (this.input != null) {
args.add("--spring.cloud.stream.inputChannelName=" + this.input);
args.add("--spring.cloud.stream.bindings.input=" + this.input);
}
if (this.output != null) {
args.add("--spring.cloud.stream.outputChannelName=" + this.output);
args.add("--spring.cloud.stream.bindings.output=" + this.output);
}
this.builder.run(args.toArray(new String[0]));
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.config;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -31,11 +33,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include;
@JsonInclude(Include.NON_DEFAULT)
public class ChannelBindingProperties {
public static final String DEFAULT_CHANNEL_NAME = "group.0";
private String outputChannelName = DEFAULT_CHANNEL_NAME;
private String inputChannelName = DEFAULT_CHANNEL_NAME;
public static final String PATH = "path";
private Properties consumerProperties = new Properties();
@@ -43,21 +41,7 @@ public class ChannelBindingProperties {
private boolean autoStartup = true;
public String getOutputChannelName() {
return this.outputChannelName;
}
public String getInputChannelName() {
return this.inputChannelName;
}
public void setOutputChannelName(String outputChannelName) {
this.outputChannelName = outputChannelName;
}
public void setInputChannelName(String inputChannelName) {
this.inputChannelName = inputChannelName;
}
private Map<String,Object> bindings = new HashMap<>();
public Properties getConsumerProperties() {
return this.consumerProperties;
@@ -83,12 +67,35 @@ public class ChannelBindingProperties {
this.autoStartup = autoStartup;
}
public String getTapChannelName() {
return getTapChannelName(getOutputChannelName());
public Map<String, Object> getBindings() {
return bindings;
}
public String getTapChannelName(String prefix) {
return "tap:" + prefix;
public void setBindings(Map<String, Object> bindings) {
this.bindings = bindings;
}
public String getBindingPath(String channelName) {
Object binding = bindings.get(channelName);
// we may shortcut directly to the path
if (binding != null) {
if (binding instanceof String) {
return (String) binding;
}
else if (binding instanceof Map) {
Map<?, ?> bindingProperties = (Map<?, ?>) binding;
Object bindingPath = bindingProperties.get(PATH);
if (bindingPath != null) {
return bindingPath.toString();
}
}
}
// the default path of the binding is the channel name itself
return channelName;
}
public String getTapChannelName(String channelName) {
return "tap:" + getBindingPath(channelName);
}
}

View File

@@ -17,6 +17,8 @@ package org.springframework.cloud.stream.adapter;
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import org.junit.Test;
import org.springframework.cloud.stream.adapter.DefaultChannelLocator;
import org.springframework.cloud.stream.config.ChannelBindingProperties;
@@ -33,56 +35,26 @@ public class DefaultChannelLocatorTests {
@Test
public void oneOutput() throws Exception {
assertEquals("group.0", this.locator.locate("output"));
assertEquals("output", this.locator.locate("output"));
}
@Test
public void oneOutputWithShortcutPath() throws Exception {
module.getBindings().put("outputWithShortcutPath","group.0.shortcut");
assertEquals("group.0.shortcut", this.locator.locate("outputWithShortcutPath"));
}
@Test
public void oneOutputWithFullPath() throws Exception {
module.getBindings().put("outputWithFullPath", Collections.singletonMap(ChannelBindingProperties.PATH,"group.0.full"));
assertEquals("group.0.full", this.locator.locate("outputWithFullPath"));
}
@Test
public void oneOutputTopic() throws Exception {
assertEquals("topic:group.0", this.locator.locate("output.topic:"));
}
@Test
public void outputWithNamedTopic() throws Exception {
assertEquals("topic:foo.group.0", this.locator.locate("output.topic:foo"));
}
@Test
public void outputWithNamedQueue() throws Exception {
assertEquals("foo.group.0", this.locator.locate("output.queue:foo"));
}
@Test
public void overrideNaturalOutputChannelName() throws Exception {
this.module.setOutputChannelName("bar");
assertEquals("foo.bar", this.locator.locate("output.queue:foo"));
}
@Test
public void noQueueQualifier() throws Exception {
assertEquals("foo.group.0", this.locator.locate("output.foo"));
}
@Test
public void underscoreSeparatorForChannelName() throws Exception {
assertEquals("foo.group.0", this.locator.locate("output_foo"));
}
@Test
public void overrideNaturalOutputChannelNamedQueue() throws Exception {
this.module.setOutputChannelName("queue:bar");
assertEquals("foo.bar", this.locator.locate("output.foo"));
}
@Test
public void overrideNaturalOutputChannelNamedQueueWithTopic() throws Exception {
this.module.setOutputChannelName("queue:bar");
assertEquals("topic:foo.bar", this.locator.locate("output.topic:foo"));
}
@Test
public void overrideNaturalOutputChannelNamedTopic() throws Exception {
this.module.setOutputChannelName("topic:bar");
assertEquals("foo.bar", this.locator.locate("output.queue:foo"));
module.getBindings().put("outputWithTopic", Collections.singletonMap(ChannelBindingProperties.PATH,"topic:group.0"));
assertEquals("topic:group.0", this.locator.locate("outputWithTopic"));
}
}

View File

@@ -72,8 +72,8 @@ public class ChannelBindingAdapterConfigurationTests {
refresh();
Collection<OutputChannelBinding> channels = this.adapter.getChannelsMetadata().getOutputChannels();
assertEquals(1, channels.size());
assertEquals("group.0", channels.iterator().next().getRemoteName());
assertEquals("tap:group.0", channels.iterator().next().getTapChannelName());
assertEquals("output", channels.iterator().next().getRemoteName());
assertEquals("tap:output", channels.iterator().next().getTapChannelName());
}
private void refresh() {
@@ -86,28 +86,20 @@ public class ChannelBindingAdapterConfigurationTests {
}
@Test
public void oneOutputTopic() throws Exception {
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.topic:", context);
refresh();
Collection<OutputChannelBinding> channels = this.adapter.getChannelsMetadata().getOutputChannels();
assertEquals(1, channels.size());
assertEquals("topic:group.0", channels.iterator().next().getRemoteName());
assertEquals("tap:group.0", channels.iterator().next().getTapChannelName());
}
@Test
public void twoOutputsWithQueue() throws Exception {
public void twoOutputs() throws Exception {
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output", context);
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.queue:foo", context);
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("foo", context);
module.getBindings().put("output", "group.0");
module.getBindings().put("foo", "topic:group.0");
refresh();
Collection<OutputChannelBinding> channels = this.adapter.getChannelsMetadata().getOutputChannels();
List<String> names = getChannelNames(channels);
List<String> remoteBindingNames = getRemoteBindingNames(channels);
assertEquals(2, channels.size());
assertTrue(names.contains("group.0"));
assertTrue(names.contains("foo.group.0"));
assertTrue(remoteBindingNames.contains("group.0"));
assertTrue(remoteBindingNames.contains("topic:group.0"));
}
private List<String> getChannelNames(Collection<? extends ChannelBinding> channels) {
private List<String> getRemoteBindingNames(Collection<? extends ChannelBinding> channels) {
List<String> list = new ArrayList<String>();
for (ChannelBinding binding : channels) {
list.add(binding.getRemoteName());
@@ -117,25 +109,25 @@ public class ChannelBindingAdapterConfigurationTests {
@Test
public void overrideNaturalOutputChannelName() throws Exception {
this.module.setOutputChannelName("bar");
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.queue:foo", context);
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output", context);
module.getBindings().put("output", "group.0");
refresh();
Collection<OutputChannelBinding> channels = this.adapter.getChannelsMetadata().getOutputChannels();
assertEquals(1, channels.size());
assertEquals("foo.bar", channels.iterator().next().getRemoteName());
// TODO: fix this. What should it be?
assertEquals("tap:foo.bar", channels.iterator().next().getTapChannelName());
assertEquals("group.0", channels.iterator().next().getRemoteName());
assertEquals("tap:group.0", channels.iterator().next().getTapChannelName());
}
@Test
public void overrideNaturalOutputChannelNamedQueueWithTopic() throws Exception {
this.module.setOutputChannelName("queue:bar");
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.topic:foo", context);
MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output", context);
module.getBindings().put("output", "topic:group.0");
refresh();
Collection<OutputChannelBinding> channels = this.adapter.getChannelsMetadata().getOutputChannels();
assertEquals(1, channels.size());
assertEquals("topic:foo.bar", channels.iterator().next().getRemoteName());
assertEquals("tap:foo.bar", channels.iterator().next().getTapChannelName());
assertEquals("topic:group.0", channels.iterator().next().getRemoteName());
// is this correct?
assertEquals("tap:group.0", channels.iterator().next().getTapChannelName());
}
@Configuration