From 3387da107cb4aada1751d4babe84e896ad31990e Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 16 Dec 2022 17:55:49 -0500 Subject: [PATCH] Initial foundation for SCSt binder (#262) * Initial foundation for SCSt binder - Mostly layout * More binder foundations for Pulsar * Polishing * Basic Pulsar topic provisioner * Spring Cloud Stream - 4.0.0 --- settings.gradle | 1 + spring-pulsar-dependencies/build.gradle | 2 + .../build.gradle | 14 +++ .../binder/PulsarMessageChannelBinder.java | 80 +++++++++++++ .../cloud/stream/binder/package-info.java | 9 ++ .../properties/PulsarConsumerProperties.java | 26 +++++ .../properties/PulsarProducerProperties.java | 26 +++++ .../binder/properties/package-info.java | 9 ++ .../provisioning/PulsarTopicProvisioner.java | 107 ++++++++++++++++++ .../binder/provisioning/package-info.java | 9 ++ 10 files changed, 283 insertions(+) create mode 100644 spring-pulsar-spring-cloud-stream-binder/build.gradle create mode 100644 spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java create mode 100644 spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/package-info.java create mode 100644 spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java create mode 100644 spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java create mode 100644 spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/package-info.java create mode 100644 spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/PulsarTopicProvisioner.java create mode 100644 spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/package-info.java diff --git a/settings.gradle b/settings.gradle index d1c1165e..e941a353 100644 --- a/settings.gradle +++ b/settings.gradle @@ -33,3 +33,4 @@ include 'spring-pulsar-sample-apps:sample-app1' include 'spring-pulsar-sample-apps:sample-app2' include 'spring-pulsar-sample-apps:sample-reactive' include 'spring-pulsar-docs' +include 'spring-pulsar-spring-cloud-stream-binder' diff --git a/spring-pulsar-dependencies/build.gradle b/spring-pulsar-dependencies/build.gradle index e600a3b3..0006d6f9 100644 --- a/spring-pulsar-dependencies/build.gradle +++ b/spring-pulsar-dependencies/build.gradle @@ -13,6 +13,7 @@ ext { pulsarVersion = '2.10.2' pulsarClientReactiveVersion = '0.1.0' springBootVersion = '3.0.0' + springCloudStreamVersion = '4.0.0' } dependencies { @@ -24,5 +25,6 @@ dependencies { api "org.apache.pulsar:pulsar-client-all:$pulsarVersion" api "org.apache.pulsar:pulsar-client-reactive-adapter:$pulsarClientReactiveVersion" api "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine:$pulsarClientReactiveVersion" + api "org.springframework.cloud:spring-cloud-stream:$springCloudStreamVersion" } } diff --git a/spring-pulsar-spring-cloud-stream-binder/build.gradle b/spring-pulsar-spring-cloud-stream-binder/build.gradle new file mode 100644 index 00000000..6d70d386 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'org.springframework.pulsar.spring-module' +} + +description = 'Spring Cloud Stream Binder for Apache Pulsar' + +dependencies { + api project (':spring-pulsar') + api('org.springframework.cloud:spring-cloud-stream') { + exclude group: 'javax.activation', module: 'javax.activation-api' + exclude group: 'javax.annotation', module: 'javax.annotation-api' + } + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java new file mode 100644 index 00000000..6ea04254 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java @@ -0,0 +1,80 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder; + +import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; +import org.springframework.cloud.stream.binder.Binder; +import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.integration.core.MessageProducer; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties; +import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarProducerProperties; +import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner; + +/** + * {@link Binder} implementation for Apache Pulsar. + * + * @author Soby Chacko + */ +public class PulsarMessageChannelBinder extends + AbstractMessageChannelBinder, ExtendedProducerProperties, PulsarTopicProvisioner> + implements ExtendedPropertiesBinder { + + public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider) { + super(null, provisioningProvider); + } + + @Override + protected MessageHandler createProducerMessageHandler(ProducerDestination destination, + ExtendedProducerProperties producerProperties, MessageChannel errorChannel) + throws Exception { + return null; + } + + @Override + protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, + ExtendedConsumerProperties properties) throws Exception { + return null; + } + + @Override + public PulsarConsumerProperties getExtendedConsumerProperties(String channelName) { + return null; + } + + @Override + public PulsarProducerProperties getExtendedProducerProperties(String channelName) { + return null; + } + + @Override + public String getDefaultsPrefix() { + return null; + } + + @Override + public Class getExtendedPropertiesEntryClass() { + return null; + } + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/package-info.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/package-info.java new file mode 100644 index 00000000..0b0aa4e1 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/package-info.java @@ -0,0 +1,9 @@ +/** + * Package containing Spring Cloud Stream binder classes for Apache Pulsar. + */ +@NonNullApi +@NonNullFields +package org.springframework.pulsar.spring.cloud.stream.binder; + +import org.springframework.lang.NonNullApi; +import org.springframework.lang.NonNullFields; diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java new file mode 100644 index 00000000..780b6b4f --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarConsumerProperties.java @@ -0,0 +1,26 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder.properties; + +/** + * Pulsar consumer properties used by the binder. + * + * @author Soby Chacko + */ +public class PulsarConsumerProperties { + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java new file mode 100644 index 00000000..46d5b013 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/PulsarProducerProperties.java @@ -0,0 +1,26 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder.properties; + +/** + * Pulsar producer properties used by the binder. + * + * @author Soby Chacko + */ +public class PulsarProducerProperties { + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/package-info.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/package-info.java new file mode 100644 index 00000000..7d964d12 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/properties/package-info.java @@ -0,0 +1,9 @@ +/** + * Package containing Spring Cloud Stream binder properties classes for Apache Pulsar. + */ +@NonNullApi +@NonNullFields +package org.springframework.pulsar.spring.cloud.stream.binder.properties; + +import org.springframework.lang.NonNullApi; +import org.springframework.lang.NonNullFields; diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/PulsarTopicProvisioner.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/PulsarTopicProvisioner.java new file mode 100644 index 00000000..576f2a52 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/PulsarTopicProvisioner.java @@ -0,0 +1,107 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.spring.cloud.stream.binder.provisioning; + +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.cloud.stream.provisioning.ProducerDestination; +import org.springframework.cloud.stream.provisioning.ProvisioningException; +import org.springframework.cloud.stream.provisioning.ProvisioningProvider; +import org.springframework.pulsar.core.PulsarTopic; +import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties; +import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarProducerProperties; + +/** + * Pulsar topic provisioner. + * + * @author Soby Chacko + */ +public class PulsarTopicProvisioner implements + ProvisioningProvider, ExtendedProducerProperties> { + + // TODO: Retrieve partitions through config + @Override + public ProducerDestination provisionProducerDestination(String name, + ExtendedProducerProperties properties) throws ProvisioningException { + PulsarTopic pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(1).build(); + return new PulsarProducerDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions()); + } + + @Override + public ConsumerDestination provisionConsumerDestination(String name, String group, + ExtendedConsumerProperties properties) throws ProvisioningException { + PulsarTopic pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(1).build(); + return new PulsarConsumerDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions()); + } + + private static class PulsarDestination implements ProducerDestination, ConsumerDestination { + + private final String producerDestinationName; + + private final int partitions; + + PulsarDestination(String destinationName, Integer partitions) { + this.producerDestinationName = destinationName; + this.partitions = partitions; + } + + @Override + public String getName() { + return this.producerDestinationName; + } + + @Override + public String getNameForPartition(int partition) { + return this.producerDestinationName; + } + + public int getPartitions() { + return this.partitions; + } + + } + + private static class PulsarProducerDestination extends PulsarDestination { + + PulsarProducerDestination(String destinationName, Integer partitions) { + super(destinationName, partitions); + } + + @Override + public String toString() { + return "PulsarProducerDestination{" + "producerDestinationName='" + getName() + '\'' + ", partitions=" + + getPartitions() + '}'; + } + + } + + private static class PulsarConsumerDestination extends PulsarDestination { + + PulsarConsumerDestination(String destinationName, Integer partitions) { + super(destinationName, partitions); + } + + @Override + public String toString() { + return "PulsarConsumerDestination{" + "producerDestinationName='" + getName() + '\'' + ", partitions=" + + getPartitions() + '}'; + } + + } + +} diff --git a/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/package-info.java b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/package-info.java new file mode 100644 index 00000000..1ecfd7b5 --- /dev/null +++ b/spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/provisioning/package-info.java @@ -0,0 +1,9 @@ +/** + * Package containing Spring Cloud Stream binder provisioning classes for Apache Pulsar. + */ +@NonNullApi +@NonNullFields +package org.springframework.pulsar.spring.cloud.stream.binder.provisioning; + +import org.springframework.lang.NonNullApi; +import org.springframework.lang.NonNullFields;