Initial foundation for SCSt binder (#262)

* Initial foundation for SCSt binder

- Mostly layout

* More binder foundations for Pulsar

* Polishing

* Basic Pulsar topic provisioner

* Spring Cloud Stream - 4.0.0
This commit is contained in:
Soby Chacko
2022-12-16 17:55:49 -05:00
committed by GitHub
parent aa52a60258
commit 3387da107c
10 changed files with 283 additions and 0 deletions

View File

@@ -33,3 +33,4 @@ include 'spring-pulsar-sample-apps:sample-app1'
include 'spring-pulsar-sample-apps:sample-app2'
include 'spring-pulsar-sample-apps:sample-reactive'
include 'spring-pulsar-docs'
include 'spring-pulsar-spring-cloud-stream-binder'

View File

@@ -13,6 +13,7 @@ ext {
pulsarVersion = '2.10.2'
pulsarClientReactiveVersion = '0.1.0'
springBootVersion = '3.0.0'
springCloudStreamVersion = '4.0.0'
}
dependencies {
@@ -24,5 +25,6 @@ dependencies {
api "org.apache.pulsar:pulsar-client-all:$pulsarVersion"
api "org.apache.pulsar:pulsar-client-reactive-adapter:$pulsarClientReactiveVersion"
api "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine:$pulsarClientReactiveVersion"
api "org.springframework.cloud:spring-cloud-stream:$springCloudStreamVersion"
}
}

View File

@@ -0,0 +1,14 @@
plugins {
id 'org.springframework.pulsar.spring-module'
}
description = 'Spring Cloud Stream Binder for Apache Pulsar'
dependencies {
api project (':spring-pulsar')
api('org.springframework.cloud:spring-cloud-stream') {
exclude group: 'javax.activation', module: 'javax.activation-api'
exclude group: 'javax.annotation', module: 'javax.annotation-api'
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarProducerProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner;
/**
* {@link Binder} implementation for Apache Pulsar.
*
* @author Soby Chacko
*/
public class PulsarMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<PulsarConsumerProperties>, ExtendedProducerProperties<PulsarProducerProperties>, PulsarTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, PulsarConsumerProperties, PulsarProducerProperties> {
public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider) {
super(null, provisioningProvider);
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<PulsarProducerProperties> producerProperties, MessageChannel errorChannel)
throws Exception {
return null;
}
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<PulsarConsumerProperties> properties) throws Exception {
return null;
}
@Override
public PulsarConsumerProperties getExtendedConsumerProperties(String channelName) {
return null;
}
@Override
public PulsarProducerProperties getExtendedProducerProperties(String channelName) {
return null;
}
@Override
public String getDefaultsPrefix() {
return null;
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return null;
}
}

View File

@@ -0,0 +1,9 @@
/**
* Package containing Spring Cloud Stream binder classes for Apache Pulsar.
*/
@NonNullApi
@NonNullFields
package org.springframework.pulsar.spring.cloud.stream.binder;
import org.springframework.lang.NonNullApi;
import org.springframework.lang.NonNullFields;

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
/**
* Pulsar consumer properties used by the binder.
*
* @author Soby Chacko
*/
public class PulsarConsumerProperties {
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
/**
* Pulsar producer properties used by the binder.
*
* @author Soby Chacko
*/
public class PulsarProducerProperties {
}

View File

@@ -0,0 +1,9 @@
/**
* Package containing Spring Cloud Stream binder properties classes for Apache Pulsar.
*/
@NonNullApi
@NonNullFields
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.springframework.lang.NonNullApi;
import org.springframework.lang.NonNullFields;

View File

@@ -0,0 +1,107 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder.provisioning;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.pulsar.core.PulsarTopic;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarProducerProperties;
/**
* Pulsar topic provisioner.
*
* @author Soby Chacko
*/
public class PulsarTopicProvisioner implements
ProvisioningProvider<ExtendedConsumerProperties<PulsarConsumerProperties>, ExtendedProducerProperties<PulsarProducerProperties>> {
// TODO: Retrieve partitions through config
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<PulsarProducerProperties> properties) throws ProvisioningException {
PulsarTopic pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(1).build();
return new PulsarProducerDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions());
}
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<PulsarConsumerProperties> properties) throws ProvisioningException {
PulsarTopic pulsarTopic = PulsarTopic.builder(name).numberOfPartitions(1).build();
return new PulsarConsumerDestination(pulsarTopic.topicName(), pulsarTopic.numberOfPartitions());
}
private static class PulsarDestination implements ProducerDestination, ConsumerDestination {
private final String producerDestinationName;
private final int partitions;
PulsarDestination(String destinationName, Integer partitions) {
this.producerDestinationName = destinationName;
this.partitions = partitions;
}
@Override
public String getName() {
return this.producerDestinationName;
}
@Override
public String getNameForPartition(int partition) {
return this.producerDestinationName;
}
public int getPartitions() {
return this.partitions;
}
}
private static class PulsarProducerDestination extends PulsarDestination {
PulsarProducerDestination(String destinationName, Integer partitions) {
super(destinationName, partitions);
}
@Override
public String toString() {
return "PulsarProducerDestination{" + "producerDestinationName='" + getName() + '\'' + ", partitions="
+ getPartitions() + '}';
}
}
private static class PulsarConsumerDestination extends PulsarDestination {
PulsarConsumerDestination(String destinationName, Integer partitions) {
super(destinationName, partitions);
}
@Override
public String toString() {
return "PulsarConsumerDestination{" + "producerDestinationName='" + getName() + '\'' + ", partitions="
+ getPartitions() + '}';
}
}
}

View File

@@ -0,0 +1,9 @@
/**
* Package containing Spring Cloud Stream binder provisioning classes for Apache Pulsar.
*/
@NonNullApi
@NonNullFields
package org.springframework.pulsar.spring.cloud.stream.binder.provisioning;
import org.springframework.lang.NonNullApi;
import org.springframework.lang.NonNullFields;