diff --git a/settings.gradle b/settings.gradle index 81acd562..891b638d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -34,5 +34,6 @@ include 'spring-pulsar-sample-apps:sample-app2' include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-app' include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-function' include 'spring-pulsar-sample-apps:sample-reactive' +include 'spring-pulsar-sample-apps:sample-pulsar-binder' include 'spring-pulsar-docs' include 'spring-pulsar-spring-cloud-stream-binder' diff --git a/spring-pulsar-sample-apps/sample-pulsar-binder/build.gradle b/spring-pulsar-sample-apps/sample-pulsar-binder/build.gradle new file mode 100644 index 00000000..e720067f --- /dev/null +++ b/spring-pulsar-sample-apps/sample-pulsar-binder/build.gradle @@ -0,0 +1,27 @@ +plugins { + id 'org.springframework.pulsar.spring-module' + id 'org.springframework.boot' version '3.0.0' +} + +description = 'Spring Cloud Stream Binder for Pulsar Sample Application' + +dependencies { + implementation project(':spring-pulsar-spring-cloud-stream-binder') + implementation project(':spring-pulsar-spring-boot-starter') +} + +bootRun { + jvmArgs = [ + "--add-opens", "java.base/java.lang=ALL-UNNAMED", + "--add-opens", "java.base/java.util=ALL-UNNAMED", + "--add-opens", "java.base/sun.net=ALL-UNNAMED" + ] +} + +project.afterEvaluate { + project.tasks.publishArtifacts.enabled(false) + project.tasks.artifactoryPublish.enabled(false) + project.tasks.publishToOssrh.enabled(false) + project.tasks.publishMavenJavaPublicationToOssrhRepository.enabled(false) + project.tasks.publishAllPublicationsToOssrhRepository.enabled(false) +} diff --git a/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/SpringPulsarBinderSampleApp.java b/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/SpringPulsarBinderSampleApp.java new file mode 100644 index 00000000..06e78195 --- /dev/null +++ b/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/SpringPulsarBinderSampleApp.java @@ -0,0 +1,48 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.sample.binder; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +@SpringBootApplication +public class SpringPulsarBinderSampleApp { + + private final Logger logger = LoggerFactory.getLogger(SpringPulsarBinderSampleApp.class); + + public static void main(String[] args) { + SpringApplication.run(SpringPulsarBinderSampleApp.class, args); + } + + @Bean + public Supplier timeSupplier() { + return () -> String.valueOf(System.currentTimeMillis()); + } + + @Bean + public Consumer timeLogger() { + return s -> this.logger.info("Hello binder: " + s); + } + +} diff --git a/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/package-info.java b/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/package-info.java new file mode 100644 index 00000000..e2d5e948 --- /dev/null +++ b/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/package-info.java @@ -0,0 +1,9 @@ +/** + * Package containing sample app for the framework. + */ +@NonNullApi +@NonNullFields +package org.springframework.pulsar.sample.binder; + +import org.springframework.lang.NonNullApi; +import org.springframework.lang.NonNullFields; diff --git a/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/resources/application.yml b/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/resources/application.yml new file mode 100644 index 00000000..65b1e4ba --- /dev/null +++ b/spring-pulsar-sample-apps/sample-pulsar-binder/src/main/resources/application.yml @@ -0,0 +1,16 @@ +spring: + cloud: + function: + definition: timeSupplier;timeLogger + stream: + bindings: + timeLogger-in-0: + destination: timeSupplier-out-0 + consumer: + use-native-decoding: true + pulsar: + bindings: + timeLogger-in-0: + consumer: + subscription-name: my-scst-sub1 + schema-type: STRING diff --git a/spring-pulsar-spring-cloud-stream-binder/build.gradle b/spring-pulsar-spring-cloud-stream-binder/build.gradle index 6d70d386..c93e5706 100644 --- a/spring-pulsar-spring-cloud-stream-binder/build.gradle +++ b/spring-pulsar-spring-cloud-stream-binder/build.gradle @@ -5,7 +5,7 @@ plugins { description = 'Spring Cloud Stream Binder for Apache Pulsar' dependencies { - api project (':spring-pulsar') + implementation project(':spring-pulsar-spring-boot-starter') api('org.springframework.cloud:spring-cloud-stream') { exclude group: 'javax.activation', module: 'javax.activation-api' exclude group: 'javax.annotation', module: 'javax.annotation-api' diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java index 6ea04254..e88ed2f3 100644 --- a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java @@ -16,6 +16,10 @@ package org.springframework.pulsar.spring.cloud.stream.binder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaType; + import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; @@ -25,9 +29,19 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.core.MessageProducer; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer; +import org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer; +import org.springframework.pulsar.listener.PulsarContainerProperties; +import org.springframework.pulsar.listener.PulsarRecordMessageListener; import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties; +import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarExtendedBindingProperties; import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarProducerProperties; import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner; @@ -40,31 +54,76 @@ public class PulsarMessageChannelBinder extends AbstractMessageChannelBinder, ExtendedProducerProperties, PulsarTopicProvisioner> implements ExtendedPropertiesBinder { - public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider) { + private final PulsarTemplate pulsarTemplate; + + private final PulsarConsumerFactory pulsarConsumerFactory; + + private PulsarExtendedBindingProperties extendedBindingProperties = new PulsarExtendedBindingProperties(); + + public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider, + PulsarTemplate pulsarTemplate, PulsarConsumerFactory pulsarConsumerFactory) { super(null, provisioningProvider); + this.pulsarTemplate = pulsarTemplate; + this.pulsarConsumerFactory = pulsarConsumerFactory; } @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, - ExtendedProducerProperties producerProperties, MessageChannel errorChannel) - throws Exception { - return null; + ExtendedProducerProperties producerProperties, MessageChannel errorChannel) { + + return message -> { + try { + PulsarMessageChannelBinder.this.pulsarTemplate.sendAsync(destination.getName(), message.getPayload()); + } + catch (PulsarClientException e) { + // deal later + } + }; + } @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, - ExtendedConsumerProperties properties) throws Exception { - return null; + ExtendedConsumerProperties properties) { + + PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties(); + pulsarContainerProperties.setTopics(new String[] { destination.getName() }); + PulsarMessageDrivenChannelAdapter pulsarMessageDrivenChannelAdapter = new PulsarMessageDrivenChannelAdapter(); + pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener) (consumer, msg) -> { + Message message = MessageBuilder.withPayload(msg.getValue()).build(); + pulsarMessageDrivenChannelAdapter.send(message); + }); + SchemaType schemaType = properties.getExtension().getSchemaType(); + if (schemaType != null) { + pulsarContainerProperties.setSchema(toSchema(schemaType)); + } + else { + pulsarContainerProperties.setSchema(Schema.BYTES); + } + pulsarContainerProperties.setSubscriptionName(properties.getExtension().getSubscriptionName()); + DefaultPulsarMessageListenerContainer container = new DefaultPulsarMessageListenerContainer<>( + this.pulsarConsumerFactory, pulsarContainerProperties); + pulsarMessageDrivenChannelAdapter.setMessageListenerContainer(container); + return pulsarMessageDrivenChannelAdapter; + } + + // This is just a place-holder impl + public Schema toSchema(SchemaType schemaType) { + return switch (schemaType) { + case STRING -> Schema.STRING; + case INT32 -> Schema.INT32; + default -> Schema.BYTES; + }; } @Override public PulsarConsumerProperties getExtendedConsumerProperties(String channelName) { - return null; + return this.extendedBindingProperties.getExtendedConsumerProperties(channelName); } @Override public PulsarProducerProperties getExtendedProducerProperties(String channelName) { - return null; + return this.extendedBindingProperties.getExtendedProducerProperties(channelName); } @Override @@ -77,4 +136,36 @@ public class PulsarMessageChannelBinder extends return null; } + public PulsarExtendedBindingProperties getExtendedBindingProperties() { + return this.extendedBindingProperties; + } + + public void setExtendedBindingProperties(PulsarExtendedBindingProperties extendedBindingProperties) { + this.extendedBindingProperties = extendedBindingProperties; + } + + static class PulsarMessageDrivenChannelAdapter extends MessageProducerSupport { + + AbstractPulsarMessageListenerContainer messageListenerContainer; + + public void send(Message message) { + sendMessage(message); + } + + @Override + protected void doStart() { + this.messageListenerContainer.start(); + } + + @Override + protected void doStop() { + this.messageListenerContainer.stop(); + } + + public void setMessageListenerContainer(AbstractPulsarMessageListenerContainer messageListenerContainer) { + this.messageListenerContainer = messageListenerContainer; + } + + } + } diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/config/PulsarBinderConfiguration.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/config/PulsarBinderConfiguration.java new file mode 100644 index 00000000..ce8c6770 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/config/PulsarBinderConfiguration.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.stream.binder.Binder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.pulsar.autoconfigure.PulsarProperties; +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.pulsar.spring.cloud.stream.binder.PulsarMessageChannelBinder; +import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarExtendedBindingProperties; +import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner; + +@Configuration(proxyBeanMethods = false) +@ConditionalOnMissingBean(Binder.class) +@EnableConfigurationProperties({ PulsarProperties.class, PulsarExtendedBindingProperties.class }) +public class PulsarBinderConfiguration { + + @Bean + public PulsarTopicProvisioner pulsarTopicProvisioner() { + return new PulsarTopicProvisioner(); + } + + @Bean + public PulsarMessageChannelBinder pulsarMessageChannelBinder(PulsarTopicProvisioner pulsarTopicProvisioner, + PulsarTemplate pulsarTemplate, PulsarConsumerFactory pulsarConsumerFactory, + PulsarExtendedBindingProperties pulsarExtendedBindingProperties) { + PulsarMessageChannelBinder pulsarMessageChannelBinder = new PulsarMessageChannelBinder(pulsarTopicProvisioner, + pulsarTemplate, pulsarConsumerFactory); + pulsarMessageChannelBinder.setExtendedBindingProperties(pulsarExtendedBindingProperties); + return pulsarMessageChannelBinder; + } + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarBinderConfigurationProperties.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarBinderConfigurationProperties.java new file mode 100644 index 00000000..2edec9c7 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarBinderConfigurationProperties.java @@ -0,0 +1,24 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder.properties; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "spring.cloud.stream.pulsar.binder") +public class PulsarBinderConfigurationProperties { + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarBindingProperties.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarBindingProperties.java new file mode 100644 index 00000000..363ee6de --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarBindingProperties.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder.properties; + +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; + +public class PulsarBindingProperties implements BinderSpecificPropertiesProvider { + + private PulsarConsumerProperties consumer = new PulsarConsumerProperties(); + + private PulsarProducerProperties producer = new PulsarProducerProperties(); + + public PulsarConsumerProperties getConsumer() { + return this.consumer; + } + + public void setConsumer(PulsarConsumerProperties consumer) { + this.consumer = consumer; + } + + public PulsarProducerProperties getProducer() { + return this.producer; + } + + public void setProducer(PulsarProducerProperties producer) { + this.producer = producer; + } + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java index 780b6b4f..429121ce 100644 --- a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java @@ -16,6 +16,12 @@ package org.springframework.pulsar.spring.cloud.stream.binder.properties; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.schema.SchemaType; + +import org.springframework.lang.NonNull; +import org.springframework.lang.Nullable; + /** * Pulsar consumer properties used by the binder. * @@ -23,4 +29,39 @@ package org.springframework.pulsar.spring.cloud.stream.binder.properties; */ public class PulsarConsumerProperties { + @NonNull + private String subscriptionName = "DEFAULT-SUBSCRIPTION-TODO-CHANGE-THIS"; + + @Nullable + private SchemaType schemaType; + + @Nullable + private SubscriptionType subscriptionType; + + public String getSubscriptionName() { + return this.subscriptionName; + } + + public void setSubscriptionName(String subscriptionName) { + this.subscriptionName = subscriptionName; + } + + @Nullable + public SchemaType getSchemaType() { + return this.schemaType; + } + + public void setSchemaType(SchemaType schemaType) { + this.schemaType = schemaType; + } + + @Nullable + public SubscriptionType getSubscriptionType() { + return this.subscriptionType; + } + + public void setSubscriptionType(SubscriptionType subscriptionType) { + this.subscriptionType = subscriptionType; + } + } diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarExtendedBindingProperties.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarExtendedBindingProperties.java new file mode 100644 index 00000000..1762bb27 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarExtendedBindingProperties.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder.properties; + +import java.util.Map; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties; +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; + +@ConfigurationProperties("spring.cloud.stream.pulsar") +public class PulsarExtendedBindingProperties extends + AbstractExtendedBindingProperties { + + private static final String DEFAULTS_PREFIX = "spring.cloud.stream.pulsar.default"; + + @Override + public String getDefaultsPrefix() { + return DEFAULTS_PREFIX; + } + + @Override + public Map getBindings() { + return this.doGetBindings(); + } + + @Override + public Class getExtendedPropertiesEntryClass() { + return PulsarBindingProperties.class; + } + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java index 46d5b013..2edfb041 100644 --- a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java @@ -16,6 +16,8 @@ package org.springframework.pulsar.spring.cloud.stream.binder.properties; +import org.apache.pulsar.common.schema.SchemaType; + /** * Pulsar producer properties used by the binder. * @@ -23,4 +25,14 @@ package org.springframework.pulsar.spring.cloud.stream.binder.properties; */ public class PulsarProducerProperties { + private SchemaType schemaType; + + public SchemaType getSchemaType() { + return this.schemaType; + } + + public void setSchemaType(SchemaType schemaType) { + this.schemaType = schemaType; + } + } diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders b/spring-pulsar-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders new file mode 100644 index 00000000..27b3b29b --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/resources/META-INF/spring.binders @@ -0,0 +1,2 @@ +pulsar:\ +org.springframework.pulsar.spring.cloud.stream.binder.config.PulsarBinderConfiguration