Spring Cloud Stream Pulsar Binder (#276)

* Spring Cloud Stream Pulsar Binder

Basic consumer and producer bindings.
Sample app (end to end)

* PR review
This commit is contained in:
Soby Chacko
2023-01-19 13:07:07 -05:00
committed by GitHub
parent a32050c30c
commit 1969c8a488
14 changed files with 420 additions and 9 deletions

View File

@@ -34,5 +34,6 @@ include 'spring-pulsar-sample-apps:sample-app2'
include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-app'
include 'spring-pulsar-sample-apps:sample-pulsar-functions:sample-signup-function'
include 'spring-pulsar-sample-apps:sample-reactive'
include 'spring-pulsar-sample-apps:sample-pulsar-binder'
include 'spring-pulsar-docs'
include 'spring-pulsar-spring-cloud-stream-binder'

View File

@@ -0,0 +1,27 @@
plugins {
id 'org.springframework.pulsar.spring-module'
id 'org.springframework.boot' version '3.0.0'
}
description = 'Spring Cloud Stream Binder for Pulsar Sample Application'
dependencies {
implementation project(':spring-pulsar-spring-cloud-stream-binder')
implementation project(':spring-pulsar-spring-boot-starter')
}
bootRun {
jvmArgs = [
"--add-opens", "java.base/java.lang=ALL-UNNAMED",
"--add-opens", "java.base/java.util=ALL-UNNAMED",
"--add-opens", "java.base/sun.net=ALL-UNNAMED"
]
}
project.afterEvaluate {
project.tasks.publishArtifacts.enabled(false)
project.tasks.artifactoryPublish.enabled(false)
project.tasks.publishToOssrh.enabled(false)
project.tasks.publishMavenJavaPublicationToOssrhRepository.enabled(false)
project.tasks.publishAllPublicationsToOssrhRepository.enabled(false)
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.sample.binder;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class SpringPulsarBinderSampleApp {
private final Logger logger = LoggerFactory.getLogger(SpringPulsarBinderSampleApp.class);
public static void main(String[] args) {
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
}
@Bean
public Supplier<String> timeSupplier() {
return () -> String.valueOf(System.currentTimeMillis());
}
@Bean
public Consumer<String> timeLogger() {
return s -> this.logger.info("Hello binder: " + s);
}
}

View File

@@ -0,0 +1,9 @@
/**
* Package containing sample app for the framework.
*/
@NonNullApi
@NonNullFields
package org.springframework.pulsar.sample.binder;
import org.springframework.lang.NonNullApi;
import org.springframework.lang.NonNullFields;

View File

@@ -0,0 +1,16 @@
spring:
cloud:
function:
definition: timeSupplier;timeLogger
stream:
bindings:
timeLogger-in-0:
destination: timeSupplier-out-0
consumer:
use-native-decoding: true
pulsar:
bindings:
timeLogger-in-0:
consumer:
subscription-name: my-scst-sub1
schema-type: STRING

View File

@@ -5,7 +5,7 @@ plugins {
description = 'Spring Cloud Stream Binder for Apache Pulsar'
dependencies {
api project (':spring-pulsar')
implementation project(':spring-pulsar-spring-boot-starter')
api('org.springframework.cloud:spring-cloud-stream') {
exclude group: 'javax.activation', module: 'javax.activation-api'
exclude group: 'javax.annotation', module: 'javax.annotation-api'

View File

@@ -16,6 +16,10 @@
package org.springframework.pulsar.spring.cloud.stream.binder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@@ -25,9 +29,19 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.listener.PulsarRecordMessageListener;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarExtendedBindingProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarProducerProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner;
@@ -40,31 +54,76 @@ public class PulsarMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<PulsarConsumerProperties>, ExtendedProducerProperties<PulsarProducerProperties>, PulsarTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, PulsarConsumerProperties, PulsarProducerProperties> {
public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider) {
private final PulsarTemplate<Object> pulsarTemplate;
private final PulsarConsumerFactory<?> pulsarConsumerFactory;
private PulsarExtendedBindingProperties extendedBindingProperties = new PulsarExtendedBindingProperties();
public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider,
PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<?> pulsarConsumerFactory) {
super(null, provisioningProvider);
this.pulsarTemplate = pulsarTemplate;
this.pulsarConsumerFactory = pulsarConsumerFactory;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<PulsarProducerProperties> producerProperties, MessageChannel errorChannel)
throws Exception {
return null;
ExtendedProducerProperties<PulsarProducerProperties> producerProperties, MessageChannel errorChannel) {
return message -> {
try {
PulsarMessageChannelBinder.this.pulsarTemplate.sendAsync(destination.getName(), message.getPayload());
}
catch (PulsarClientException e) {
// deal later
}
};
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<PulsarConsumerProperties> properties) throws Exception {
return null;
ExtendedConsumerProperties<PulsarConsumerProperties> properties) {
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setTopics(new String[] { destination.getName() });
PulsarMessageDrivenChannelAdapter pulsarMessageDrivenChannelAdapter = new PulsarMessageDrivenChannelAdapter();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
Message<Object> message = MessageBuilder.withPayload(msg.getValue()).build();
pulsarMessageDrivenChannelAdapter.send(message);
});
SchemaType schemaType = properties.getExtension().getSchemaType();
if (schemaType != null) {
pulsarContainerProperties.setSchema(toSchema(schemaType));
}
else {
pulsarContainerProperties.setSchema(Schema.BYTES);
}
pulsarContainerProperties.setSubscriptionName(properties.getExtension().getSubscriptionName());
DefaultPulsarMessageListenerContainer<?> container = new DefaultPulsarMessageListenerContainer<>(
this.pulsarConsumerFactory, pulsarContainerProperties);
pulsarMessageDrivenChannelAdapter.setMessageListenerContainer(container);
return pulsarMessageDrivenChannelAdapter;
}
// This is just a place-holder impl
public Schema<?> toSchema(SchemaType schemaType) {
return switch (schemaType) {
case STRING -> Schema.STRING;
case INT32 -> Schema.INT32;
default -> Schema.BYTES;
};
}
@Override
public PulsarConsumerProperties getExtendedConsumerProperties(String channelName) {
return null;
return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public PulsarProducerProperties getExtendedProducerProperties(String channelName) {
return null;
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
@Override
@@ -77,4 +136,36 @@ public class PulsarMessageChannelBinder extends
return null;
}
public PulsarExtendedBindingProperties getExtendedBindingProperties() {
return this.extendedBindingProperties;
}
public void setExtendedBindingProperties(PulsarExtendedBindingProperties extendedBindingProperties) {
this.extendedBindingProperties = extendedBindingProperties;
}
static class PulsarMessageDrivenChannelAdapter extends MessageProducerSupport {
AbstractPulsarMessageListenerContainer<?> messageListenerContainer;
public void send(Message<?> message) {
sendMessage(message);
}
@Override
protected void doStart() {
this.messageListenerContainer.start();
}
@Override
protected void doStop() {
this.messageListenerContainer.stop();
}
public void setMessageListenerContainer(AbstractPulsarMessageListenerContainer<?> messageListenerContainer) {
this.messageListenerContainer = messageListenerContainer;
}
}
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.pulsar.autoconfigure.PulsarProperties;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.spring.cloud.stream.binder.PulsarMessageChannelBinder;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarExtendedBindingProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner;
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(Binder.class)
@EnableConfigurationProperties({ PulsarProperties.class, PulsarExtendedBindingProperties.class })
public class PulsarBinderConfiguration {
@Bean
public PulsarTopicProvisioner pulsarTopicProvisioner() {
return new PulsarTopicProvisioner();
}
@Bean
public PulsarMessageChannelBinder pulsarMessageChannelBinder(PulsarTopicProvisioner pulsarTopicProvisioner,
PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<byte[]> pulsarConsumerFactory,
PulsarExtendedBindingProperties pulsarExtendedBindingProperties) {
PulsarMessageChannelBinder pulsarMessageChannelBinder = new PulsarMessageChannelBinder(pulsarTopicProvisioner,
pulsarTemplate, pulsarConsumerFactory);
pulsarMessageChannelBinder.setExtendedBindingProperties(pulsarExtendedBindingProperties);
return pulsarMessageChannelBinder;
}
}

View File

@@ -0,0 +1,24 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.cloud.stream.pulsar.binder")
public class PulsarBinderConfigurationProperties {
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
public class PulsarBindingProperties implements BinderSpecificPropertiesProvider {
private PulsarConsumerProperties consumer = new PulsarConsumerProperties();
private PulsarProducerProperties producer = new PulsarProducerProperties();
public PulsarConsumerProperties getConsumer() {
return this.consumer;
}
public void setConsumer(PulsarConsumerProperties consumer) {
this.consumer = consumer;
}
public PulsarProducerProperties getProducer() {
return this.producer;
}
public void setProducer(PulsarProducerProperties producer) {
this.producer = producer;
}
}

View File

@@ -16,6 +16,12 @@
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
/**
* Pulsar consumer properties used by the binder.
*
@@ -23,4 +29,39 @@ package org.springframework.pulsar.spring.cloud.stream.binder.properties;
*/
public class PulsarConsumerProperties {
@NonNull
private String subscriptionName = "DEFAULT-SUBSCRIPTION-TODO-CHANGE-THIS";
@Nullable
private SchemaType schemaType;
@Nullable
private SubscriptionType subscriptionType;
public String getSubscriptionName() {
return this.subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
@Nullable
public SchemaType getSchemaType() {
return this.schemaType;
}
public void setSchemaType(SchemaType schemaType) {
this.schemaType = schemaType;
}
@Nullable
public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
}
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@ConfigurationProperties("spring.cloud.stream.pulsar")
public class PulsarExtendedBindingProperties extends
AbstractExtendedBindingProperties<PulsarConsumerProperties, PulsarProducerProperties, PulsarBindingProperties> {
private static final String DEFAULTS_PREFIX = "spring.cloud.stream.pulsar.default";
@Override
public String getDefaultsPrefix() {
return DEFAULTS_PREFIX;
}
@Override
public Map<String, PulsarBindingProperties> getBindings() {
return this.doGetBindings();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return PulsarBindingProperties.class;
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.apache.pulsar.common.schema.SchemaType;
/**
* Pulsar producer properties used by the binder.
*
@@ -23,4 +25,14 @@ package org.springframework.pulsar.spring.cloud.stream.binder.properties;
*/
public class PulsarProducerProperties {
private SchemaType schemaType;
public SchemaType getSchemaType() {
return this.schemaType;
}
public void setSchemaType(SchemaType schemaType) {
this.schemaType = schemaType;
}
}

View File

@@ -0,0 +1,2 @@
pulsar:\
org.springframework.pulsar.spring.cloud.stream.binder.config.PulsarBinderConfiguration