From 37204febe200f3bbaf649440a954e28ca4e3e9ca Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Wed, 22 Jul 2015 17:06:22 -0400 Subject: [PATCH] support for binding descriptors per channel name --- README.adoc | 5 +- docs/src/main/asciidoc/intro.adoc | 7 +- roadmap.md | 3 +- .../double/src/main/resources/sink.yml | 3 +- .../double/src/main/resources/source.yml | 3 +- .../sink/src/main/resources/application.yml | 11 ++-- .../source/src/main/resources/application.yml | 18 +++--- .../src/main/resources/application.yml | 5 +- .../stream/adapter/ChannelBindingAdapter.java | 1 + .../stream/adapter/DefaultChannelLocator.java | 52 ++------------- .../stream/aggregate/AggregateBuilder.java | 4 +- .../config/ChannelBindingProperties.java | 55 +++++++++------- .../adapter/DefaultChannelLocatorTests.java | 64 ++++++------------- ...annelBindingAdapterConfigurationTests.java | 46 ++++++------- 14 files changed, 107 insertions(+), 170 deletions(-) diff --git a/README.adoc b/README.adoc index 148895b04..ffc002d5e 100644 --- a/README.adoc +++ b/README.adoc @@ -42,7 +42,8 @@ The `application.yml` has the mapping from channel names to external broker hand spring: cloud: stream: - outputChannelName: ${spring.application.name:ticker} + bindings: + output: ${spring.application.name:ticker} ---- `@EnableModule` is parameterized by an interface (in this case `Source`) which declares input and output channels. `Source`, `Sink` and `Processor` are provided off the shelf, but you can define others. Here's the definition of `Source` @@ -80,7 +81,7 @@ NOTE: In this case there is only one `Source` in the application context so ther == Multiple Input or Output Channels -A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `input.*` or `output.*` and the names are converted to external channel names on the broker. The external channel names are the `spring.cloud.stream.[input|output]ChannelName` plus the `MessageChannel` bean name, period separated. In addition, the bean name can be `input.[queue|topic|tap]:*` or `output.[queue|topic]:*` (i.e. with a channel type as a colon-separated prefix), and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "output.topic:foo" in a module with `outputChannelName=bar`, and the result is 2 external channels called "bar" and "topic:foo.bar". +A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `@Input` or `@Output` and the names are converted to external channel names on the broker. The external channel names can be specified as properties that consist of the channel names prefixed with `spring.cloud.stream.bindings` (e.g. `spring.cloud.stream.bindings.input` or `spring.cloud.stream.bindings.output`). External channel names can have a channel type as a colon-separated prefix, and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "foo" in a module with `spring.cloud.stream.bindings.output=bar` and `spring.cloud.stream.bindings.foo=topic:foo`, and the result is 2 external channels called "bar" and "topic:foo". == Samples diff --git a/docs/src/main/asciidoc/intro.adoc b/docs/src/main/asciidoc/intro.adoc index 09c48e01f..cd1f07221 100644 --- a/docs/src/main/asciidoc/intro.adoc +++ b/docs/src/main/asciidoc/intro.adoc @@ -38,7 +38,8 @@ The `application.yml` has the mapping from channel names to external broker hand spring: cloud: stream: - outputChannelName: ${spring.application.name:ticker} + bindings: + output: ${spring.application.name:ticker} ---- `@EnableModule` is parameterized by an interface (in this case `Source`) which declares input and output channels. `Source`, `Sink` and `Processor` are provided off the shelf, but you can define others. Here's the definition of `Source` @@ -76,7 +77,7 @@ NOTE: In this case there is only one `Source` in the application context so ther == Multiple Input or Output Channels -A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `input.*` or `output.*` and the names are converted to external channel names on the broker. The external channel names are the `spring.cloud.stream.[input|output]ChannelName` plus the `MessageChannel` bean name, period separated. In addition, the bean name can be `input.[queue|topic|tap]:*` or `output.[queue|topic]:*` (i.e. with a channel type as a colon-separated prefix), and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "output.topic:foo" in a module with `outputChannelName=bar`, and the result is 2 external channels called "bar" and "topic:foo.bar". +A module can have multiple input or output channels all defined either as `@Input` and `@Output` methods in an interface (preferrable) or as bean definitions. Instead of just one channel named "input" or "output" you can add multiple `MessageChannel` methods annotated `@Input` or `@Output` and the names are converted to external channel names on the broker. The external channel names can be specified as properties that consist of the channel names prefixed with `spring.cloud.stream.bindings` (e.g. `spring.cloud.stream.bindings.input` or `spring.cloud.stream.bindings.output`). External channel names can have a channel type as a colon-separated prefix, and the semantics of the external bus channel changes accordingly (a tap is like a topic). For example, you can have two `MessageChannels` called "output" and "foo" in a module with `spring.cloud.stream.bindings.output=bar` and `spring.cloud.stream.bindings.foo=topic:foo`, and the result is 2 external channels called "bar" and "topic:foo". == Samples @@ -117,7 +118,7 @@ For an XD module the channel names are `.` and a source (output on == Taps -All output channels can be also tapped so you can also attach a module to a pub-sub endpoint and listen to the tap if you know the module metadata. To tap an existing vanilla module you need to know its `outputChannelName` and the tap name is then `tap:`, so you can listen to it on an input channel named `input.topic.tap:`. The tap is only active if you explicitly ask for it: you can do that by POSTing to the HTTP endpoint `/taps/` (where the channel name can be the internal or external name, e.g. "output" or the external name mapped to the output channel). +All output channels can be also tapped so you can also attach a module to a pub-sub endpoint and listen to the tap if you know the module metadata. To tap an existing vanilla module you need to know its `outputChannelName` and the tap name is then `tap:`, so you can listen to it on an input channel named `topic.tap:`. The tap is only active if you explicitly ask for it: you can do that by POSTing to the HTTP endpoint `/taps/` (where the channel name can be the internal or external name, e.g. "output" or the external name mapped to the output channel). To tap an existing output channel in an XD module you just need to know its group, name and index, e.g. diff --git a/roadmap.md b/roadmap.md index c00e290af..0f250494f 100644 --- a/roadmap.md +++ b/roadmap.md @@ -49,7 +49,8 @@ The `application.yml` has the external channel names, e.g. spring: cloud: stream: - outputChannelName: ${spring.application.name:ticker} + bindings: + output: ${spring.application.name:ticker} ``` ## Richer Input and Output diff --git a/spring-cloud-stream-samples/double/src/main/resources/sink.yml b/spring-cloud-stream-samples/double/src/main/resources/sink.yml index 4bd18d4c0..a0f178ec6 100644 --- a/spring-cloud-stream-samples/double/src/main/resources/sink.yml +++ b/spring-cloud-stream-samples/double/src/main/resources/sink.yml @@ -1,5 +1,6 @@ spring: cloud: stream: - inputChannelName: testtock + bindings: + input: testtock \ No newline at end of file diff --git a/spring-cloud-stream-samples/double/src/main/resources/source.yml b/spring-cloud-stream-samples/double/src/main/resources/source.yml index fc77f957d..ac31a8981 100644 --- a/spring-cloud-stream-samples/double/src/main/resources/source.yml +++ b/spring-cloud-stream-samples/double/src/main/resources/source.yml @@ -2,5 +2,6 @@ fixedDelay: 5000 spring: cloud: stream: - outputChannelName: testtock + bindings: + output: testtock \ No newline at end of file diff --git a/spring-cloud-stream-samples/sink/src/main/resources/application.yml b/spring-cloud-stream-samples/sink/src/main/resources/application.yml index 8ef7e4dc3..d469a0f5b 100644 --- a/spring-cloud-stream-samples/sink/src/main/resources/application.yml +++ b/spring-cloud-stream-samples/sink/src/main/resources/application.yml @@ -3,9 +3,10 @@ server: spring: cloud: stream: - inputChannelName: testtock - # uncomment below to have this module consume from a specific partition - # in the range of 0 to N-1, where N is the upstream module's partitionCount - #consumerProperties: - # partitionIndex: 1 + bindings: + input: testtock + # uncomment below to have this module consume from a specific partition + # in the range of 0 to N-1, where N is the upstream module's partitionCount + #consumerProperties: + # partitionIndex: 1 \ No newline at end of file diff --git a/spring-cloud-stream-samples/source/src/main/resources/application.yml b/spring-cloud-stream-samples/source/src/main/resources/application.yml index 3f05363f3..04d5e2be3 100644 --- a/spring-cloud-stream-samples/source/src/main/resources/application.yml +++ b/spring-cloud-stream-samples/source/src/main/resources/application.yml @@ -4,17 +4,19 @@ fixedDelay: 5000 spring: cloud: stream: - outputChannelName: testtock - # uncomment below to use the last digit of the seconds as a partition key - # hashcode(key) % N is then applied with N being the partitionCount value - # thus, even seconds should go to the 0 queue, odd seconds to the 1 queue - #producerProperties: - # partitionKeyExpression: payload.charAt(payload.length()-1) - # partitionCount: 2 + bindings: + output: testtock + # uncomment below to use the last digit of the seconds as a partition key + # hashcode(key) % N is then applied with N being the partitionCount value + # thus, even seconds should go to the 0 queue, odd seconds to the 1 queue + #producerProperties: + # partitionKeyExpression: payload.charAt(payload.length()-1) + # partitionCount: 2 --- spring: profiles: extended cloud: stream: - outputChannelName: xformed + bindings: + output: xformed diff --git a/spring-cloud-stream-samples/transform/src/main/resources/application.yml b/spring-cloud-stream-samples/transform/src/main/resources/application.yml index bc0a8fabc..f62709eea 100644 --- a/spring-cloud-stream-samples/transform/src/main/resources/application.yml +++ b/spring-cloud-stream-samples/transform/src/main/resources/application.yml @@ -3,6 +3,7 @@ server: spring: cloud: stream: - outputChannelName: xformed - inputChannelName: testtock + bindings: + output: xformed + input: testtock \ No newline at end of file diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/ChannelBindingAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/ChannelBindingAdapter.java index e2eef8a07..72dc8abd5 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/ChannelBindingAdapter.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/ChannelBindingAdapter.java @@ -69,6 +69,7 @@ public class ChannelBindingAdapter implements Lifecycle, ApplicationContextAware private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); private Collection outputChannels = Collections.emptySet(); + private Collection inputChannels = Collections.emptySet(); private boolean running = false; diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/DefaultChannelLocator.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/DefaultChannelLocator.java index 41f72e00b..6df6f2a29 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/DefaultChannelLocator.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/adapter/DefaultChannelLocator.java @@ -32,60 +32,16 @@ public class DefaultChannelLocator implements ChannelLocator { @Override public String locate(String name) { - String channelName = extractChannelName("input", name, this.module.getInputChannelName()); - if (channelName!=null) { - return channelName; - } - channelName = extractChannelName("output", name, this.module.getOutputChannelName()); - if (channelName!=null) { - return channelName; - } - return null; + return module.getBindingPath(name); } @Override public String tap(String name) { - return !isDefaultOuputChannel(name) ? this.module.getTapChannelName(getPlainChannelName(name)) - : this.module.getTapChannelName(); + return this.module.getTapChannelName(getPlainBinding(name)); } - private boolean isDefaultOuputChannel(String channelName) { - if (channelName.contains(":")) { - String[] tokens = channelName.split(":", 2); - channelName = tokens[1]; - } - return channelName.equals(this.module.getOutputChannelName()); - } - - private String extractChannelName(String start, String name, String externalChannelName) { - if (name.equals(start)) { - return externalChannelName; - } - else if (name.startsWith(start + ".") || name.startsWith(start + "_")) { - String prefix = ""; - String channelName = name.substring(start.length() + 1); - if (channelName.contains(":")) { - String[] tokens = channelName.split(":", 2); - String type = tokens[0]; - if ("queue".equals(type)) { - // omit the type for a queue - if (StringUtils.hasText(tokens[1])) { - prefix = tokens[1] + "."; - } - } - else { - prefix = channelName + (channelName.endsWith(":") ? "" : "."); - } - } - else { - prefix = channelName + "."; - } - return prefix + getPlainChannelName(externalChannelName); - } - return null; - } - - private String getPlainChannelName(String name) { + private String getPlainBinding(String name) { + // remove prefixes such as 'tap:','topic:','queue:' if (name.contains(":")) { name = name.substring(name.indexOf(":") + 1); } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/aggregate/AggregateBuilder.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/aggregate/AggregateBuilder.java index 3ff2ee675..663374714 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/aggregate/AggregateBuilder.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/aggregate/AggregateBuilder.java @@ -230,10 +230,10 @@ public class AggregateBuilder implements ApplicationContextAware { args.add("--spring.config.name=" + this.configName); } if (this.input != null) { - args.add("--spring.cloud.stream.inputChannelName=" + this.input); + args.add("--spring.cloud.stream.bindings.input=" + this.input); } if (this.output != null) { - args.add("--spring.cloud.stream.outputChannelName=" + this.output); + args.add("--spring.cloud.stream.bindings.output=" + this.output); } this.builder.run(args.toArray(new String[0])); } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/ChannelBindingProperties.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/ChannelBindingProperties.java index e3e54aea4..cee840b8c 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/ChannelBindingProperties.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/ChannelBindingProperties.java @@ -16,6 +16,8 @@ package org.springframework.cloud.stream.config; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -31,11 +33,7 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include; @JsonInclude(Include.NON_DEFAULT) public class ChannelBindingProperties { - public static final String DEFAULT_CHANNEL_NAME = "group.0"; - - private String outputChannelName = DEFAULT_CHANNEL_NAME; - - private String inputChannelName = DEFAULT_CHANNEL_NAME; + public static final String PATH = "path"; private Properties consumerProperties = new Properties(); @@ -43,21 +41,7 @@ public class ChannelBindingProperties { private boolean autoStartup = true; - public String getOutputChannelName() { - return this.outputChannelName; - } - - public String getInputChannelName() { - return this.inputChannelName; - } - - public void setOutputChannelName(String outputChannelName) { - this.outputChannelName = outputChannelName; - } - - public void setInputChannelName(String inputChannelName) { - this.inputChannelName = inputChannelName; - } + private Map bindings = new HashMap<>(); public Properties getConsumerProperties() { return this.consumerProperties; @@ -83,12 +67,35 @@ public class ChannelBindingProperties { this.autoStartup = autoStartup; } - public String getTapChannelName() { - return getTapChannelName(getOutputChannelName()); + public Map getBindings() { + return bindings; } - public String getTapChannelName(String prefix) { - return "tap:" + prefix; + public void setBindings(Map bindings) { + this.bindings = bindings; + } + + public String getBindingPath(String channelName) { + Object binding = bindings.get(channelName); + // we may shortcut directly to the path + if (binding != null) { + if (binding instanceof String) { + return (String) binding; + } + else if (binding instanceof Map) { + Map bindingProperties = (Map) binding; + Object bindingPath = bindingProperties.get(PATH); + if (bindingPath != null) { + return bindingPath.toString(); + } + } + } + // the default path of the binding is the channel name itself + return channelName; + } + + public String getTapChannelName(String channelName) { + return "tap:" + getBindingPath(channelName); } } diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/adapter/DefaultChannelLocatorTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/adapter/DefaultChannelLocatorTests.java index c64e9992c..212b0a95b 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/adapter/DefaultChannelLocatorTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/adapter/DefaultChannelLocatorTests.java @@ -17,6 +17,8 @@ package org.springframework.cloud.stream.adapter; import static org.junit.Assert.assertEquals; +import java.util.Collections; + import org.junit.Test; import org.springframework.cloud.stream.adapter.DefaultChannelLocator; import org.springframework.cloud.stream.config.ChannelBindingProperties; @@ -33,56 +35,26 @@ public class DefaultChannelLocatorTests { @Test public void oneOutput() throws Exception { - assertEquals("group.0", this.locator.locate("output")); + assertEquals("output", this.locator.locate("output")); } + @Test + public void oneOutputWithShortcutPath() throws Exception { + module.getBindings().put("outputWithShortcutPath","group.0.shortcut"); + assertEquals("group.0.shortcut", this.locator.locate("outputWithShortcutPath")); + } + + @Test + public void oneOutputWithFullPath() throws Exception { + module.getBindings().put("outputWithFullPath", Collections.singletonMap(ChannelBindingProperties.PATH,"group.0.full")); + assertEquals("group.0.full", this.locator.locate("outputWithFullPath")); + } + + @Test public void oneOutputTopic() throws Exception { - assertEquals("topic:group.0", this.locator.locate("output.topic:")); - } - - @Test - public void outputWithNamedTopic() throws Exception { - assertEquals("topic:foo.group.0", this.locator.locate("output.topic:foo")); - } - - @Test - public void outputWithNamedQueue() throws Exception { - assertEquals("foo.group.0", this.locator.locate("output.queue:foo")); - } - - @Test - public void overrideNaturalOutputChannelName() throws Exception { - this.module.setOutputChannelName("bar"); - assertEquals("foo.bar", this.locator.locate("output.queue:foo")); - } - - @Test - public void noQueueQualifier() throws Exception { - assertEquals("foo.group.0", this.locator.locate("output.foo")); - } - - @Test - public void underscoreSeparatorForChannelName() throws Exception { - assertEquals("foo.group.0", this.locator.locate("output_foo")); - } - - @Test - public void overrideNaturalOutputChannelNamedQueue() throws Exception { - this.module.setOutputChannelName("queue:bar"); - assertEquals("foo.bar", this.locator.locate("output.foo")); - } - - @Test - public void overrideNaturalOutputChannelNamedQueueWithTopic() throws Exception { - this.module.setOutputChannelName("queue:bar"); - assertEquals("topic:foo.bar", this.locator.locate("output.topic:foo")); - } - - @Test - public void overrideNaturalOutputChannelNamedTopic() throws Exception { - this.module.setOutputChannelName("topic:bar"); - assertEquals("foo.bar", this.locator.locate("output.queue:foo")); + module.getBindings().put("outputWithTopic", Collections.singletonMap(ChannelBindingProperties.PATH,"topic:group.0")); + assertEquals("topic:group.0", this.locator.locate("outputWithTopic")); } } diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/config/ChannelBindingAdapterConfigurationTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/config/ChannelBindingAdapterConfigurationTests.java index c9b468032..76a085f35 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/config/ChannelBindingAdapterConfigurationTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/config/ChannelBindingAdapterConfigurationTests.java @@ -72,8 +72,8 @@ public class ChannelBindingAdapterConfigurationTests { refresh(); Collection channels = this.adapter.getChannelsMetadata().getOutputChannels(); assertEquals(1, channels.size()); - assertEquals("group.0", channels.iterator().next().getRemoteName()); - assertEquals("tap:group.0", channels.iterator().next().getTapChannelName()); + assertEquals("output", channels.iterator().next().getRemoteName()); + assertEquals("tap:output", channels.iterator().next().getTapChannelName()); } private void refresh() { @@ -86,28 +86,20 @@ public class ChannelBindingAdapterConfigurationTests { } @Test - public void oneOutputTopic() throws Exception { - MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.topic:", context); - refresh(); - Collection channels = this.adapter.getChannelsMetadata().getOutputChannels(); - assertEquals(1, channels.size()); - assertEquals("topic:group.0", channels.iterator().next().getRemoteName()); - assertEquals("tap:group.0", channels.iterator().next().getTapChannelName()); - } - - @Test - public void twoOutputsWithQueue() throws Exception { + public void twoOutputs() throws Exception { MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output", context); - MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.queue:foo", context); + MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("foo", context); + module.getBindings().put("output", "group.0"); + module.getBindings().put("foo", "topic:group.0"); refresh(); Collection channels = this.adapter.getChannelsMetadata().getOutputChannels(); - List names = getChannelNames(channels); + List remoteBindingNames = getRemoteBindingNames(channels); assertEquals(2, channels.size()); - assertTrue(names.contains("group.0")); - assertTrue(names.contains("foo.group.0")); + assertTrue(remoteBindingNames.contains("group.0")); + assertTrue(remoteBindingNames.contains("topic:group.0")); } - private List getChannelNames(Collection channels) { + private List getRemoteBindingNames(Collection channels) { List list = new ArrayList(); for (ChannelBinding binding : channels) { list.add(binding.getRemoteName()); @@ -117,25 +109,25 @@ public class ChannelBindingAdapterConfigurationTests { @Test public void overrideNaturalOutputChannelName() throws Exception { - this.module.setOutputChannelName("bar"); - MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.queue:foo", context); + MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output", context); + module.getBindings().put("output", "group.0"); refresh(); Collection channels = this.adapter.getChannelsMetadata().getOutputChannels(); assertEquals(1, channels.size()); - assertEquals("foo.bar", channels.iterator().next().getRemoteName()); - // TODO: fix this. What should it be? - assertEquals("tap:foo.bar", channels.iterator().next().getTapChannelName()); + assertEquals("group.0", channels.iterator().next().getRemoteName()); + assertEquals("tap:group.0", channels.iterator().next().getTapChannelName()); } @Test public void overrideNaturalOutputChannelNamedQueueWithTopic() throws Exception { - this.module.setOutputChannelName("queue:bar"); - MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output.topic:foo", context); + MessageChannelBeanDefinitionRegistryUtils.registerOutputChannelBeanDefinition("output", context); + module.getBindings().put("output", "topic:group.0"); refresh(); Collection channels = this.adapter.getChannelsMetadata().getOutputChannels(); assertEquals(1, channels.size()); - assertEquals("topic:foo.bar", channels.iterator().next().getRemoteName()); - assertEquals("tap:foo.bar", channels.iterator().next().getTapChannelName()); + assertEquals("topic:group.0", channels.iterator().next().getRemoteName()); + // is this correct? + assertEquals("tap:group.0", channels.iterator().next().getTapChannelName()); } @Configuration