Add consumer/producer config props to binder and binding level (#338)

* Move loadConf into ProducerBuilderConfigurationUtil

* Add full consumer/producer props to binder/binding props

* Re-use the consumer and producer props from PulsarProperties
* Add unit and integration test for newly added props
This commit is contained in:
Chris Bono
2023-02-13 10:06:21 -06:00
committed by GitHub
parent 48ac3836bc
commit 284b4b0c1e
20 changed files with 1633 additions and 1029 deletions

View File

@@ -0,0 +1,533 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.autoconfigure;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.autoconfigure.PulsarProperties.Properties;
/**
* Configuration properties used to specify Pulsar consumers.
*
* @author Chris Bono
*/
public class ConsumerConfigProperties {
/**
* Comma-separated list of topics the consumer subscribes to.
*/
private Set<String> topics;
/**
* Pattern for topics the consumer subscribes to.
*/
private Pattern topicsPattern;
/**
* Subscription name for the consumer.
*/
private String subscriptionName;
/**
* Subscription type to be used when subscribing to a topic.
*/
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
/**
* Map of properties to add to the subscription.
*/
private Map<String, String> subscriptionProperties = new HashMap<>();
/**
* Subscription mode to be used when subscribing to the topic.
*/
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
/**
* Number of messages that can be accumulated before the consumer calls "receive".
*/
private Integer receiverQueueSize = 1000;
/**
* Time to group acknowledgements before sending them to the broker.
*/
private Duration acknowledgementsGroupTime = Duration.ofMillis(100);
/**
* Delay before re-delivering messages that have failed to be processed.
*/
private Duration negativeAckRedeliveryDelay = Duration.ofMinutes(1);
/**
* Maximum number of messages that a consumer can be pushed at once from a broker
* across all partitions.
*/
private Integer maxTotalReceiverQueueSizeAcrossPartitions = 50000;
/**
* Consumer name to identify a particular consumer from the topic stats.
*/
private String consumerName;
/**
* Timeout for unacked messages to be redelivered.
*/
private Duration ackTimeout = Duration.ZERO;
/**
* Precision for the ack timeout messages tracker.
*/
private Duration tickDuration = Duration.ofSeconds(1);
/**
* Priority level for shared subscription consumers.
*/
private Integer priorityLevel = 0;
/**
* Action the consumer will take in case of decryption failure.
*/
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
/**
* Map of properties to add to the consumer.
*/
private SortedMap<String, String> properties = new TreeMap<>();
/**
* Whether to read messages from the compacted topic rather than the full message
* backlog.
*/
private Boolean readCompacted = false;
/**
* Position where to initialize a newly created subscription.
*/
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
/**
* Auto-discovery period for topics when topic pattern is used in minutes.
*/
private Integer patternAutoDiscoveryPeriod = 1;
/**
* Determines which topics the consumer should be subscribed to when using pattern
* subscriptions.
*/
private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;
/**
* Dead letter policy to use.
*/
@Nullable
@NestedConfigurationProperty
private DeadLetterPolicy deadLetterPolicy;
/**
* Whether to auto retry messages.
*/
private Boolean retryEnable = false;
/**
* Whether the consumer auto-subscribes for partition increase. This is only for
* partitioned consumers.
*/
private Boolean autoUpdatePartitions = true;
/**
* Interval of partitions discovery updates.
*/
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);
/**
* Whether to replicate subscription state.
*/
private Boolean replicateSubscriptionState = false;
/**
* Whether to include the given position of any reset operation like
* {@link org.apache.pulsar.client.api.Consumer#seek(long) or
* {@link ConsumerConfigProperties#seek(MessageId)}}.
*/
private Boolean resetIncludeHead = false;
/**
* Whether the batch index acknowledgment is enabled.
*/
private Boolean batchIndexAckEnabled = false;
/**
* Whether an acknowledgement receipt is enabled.
*/
private Boolean ackReceiptEnabled = false;
/**
* Whether pooling of messages and the underlying data buffers is enabled.
*/
private Boolean poolMessages = false;
/**
* Whether to start the consumer in a paused state.
*/
private Boolean startPaused = false;
/**
* Whether to automatically drop outstanding un-acked messages if the queue is full.
*/
private Boolean autoAckOldestChunkedMessageOnQueueFull = true;
/**
* Maximum number of chunked messages to be kept in memory.
*/
private Integer maxPendingChunkedMessage = 10;
/**
* Time to expire incomplete chunks if the consumer won't be able to receive all
* chunks before.
*/
private Duration expireTimeOfIncompleteChunkedMessage = Duration.ofMinutes(1);
public Set<String> getTopics() {
return this.topics;
}
public void setTopics(Set<String> topics) {
this.topics = topics;
}
public Pattern getTopicsPattern() {
return this.topicsPattern;
}
public void setTopicsPattern(Pattern topicsPattern) {
this.topicsPattern = topicsPattern;
}
public String getSubscriptionName() {
return this.subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public Map<String, String> getSubscriptionProperties() {
return this.subscriptionProperties;
}
public void setSubscriptionProperties(Map<String, String> subscriptionProperties) {
this.subscriptionProperties = subscriptionProperties;
}
public SubscriptionMode getSubscriptionMode() {
return this.subscriptionMode;
}
public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
}
public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
}
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}
public Integer getReceiverQueueSize() {
return this.receiverQueueSize;
}
public void setReceiverQueueSize(Integer receiverQueueSize) {
this.receiverQueueSize = receiverQueueSize;
}
public Duration getAcknowledgementsGroupTime() {
return this.acknowledgementsGroupTime;
}
public void setAcknowledgementsGroupTime(Duration acknowledgementsGroupTime) {
this.acknowledgementsGroupTime = acknowledgementsGroupTime;
}
public Duration getNegativeAckRedeliveryDelay() {
return this.negativeAckRedeliveryDelay;
}
public void setNegativeAckRedeliveryDelay(Duration negativeAckRedeliveryDelay) {
this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
}
public Integer getMaxTotalReceiverQueueSizeAcrossPartitions() {
return this.maxTotalReceiverQueueSizeAcrossPartitions;
}
public void setMaxTotalReceiverQueueSizeAcrossPartitions(Integer maxTotalReceiverQueueSizeAcrossPartitions) {
this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
}
public String getConsumerName() {
return this.consumerName;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public Duration getAckTimeout() {
return this.ackTimeout;
}
public void setAckTimeout(Duration ackTimeout) {
this.ackTimeout = ackTimeout;
}
public Duration getTickDuration() {
return this.tickDuration;
}
public void setTickDuration(Duration tickDuration) {
this.tickDuration = tickDuration;
}
public Integer getPriorityLevel() {
return this.priorityLevel;
}
public void setPriorityLevel(Integer priorityLevel) {
this.priorityLevel = priorityLevel;
}
public ConsumerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
}
public void setCryptoFailureAction(ConsumerCryptoFailureAction cryptoFailureAction) {
this.cryptoFailureAction = cryptoFailureAction;
}
public SortedMap<String, String> getProperties() {
return this.properties;
}
public void setProperties(SortedMap<String, String> properties) {
this.properties = properties;
}
public Boolean getReadCompacted() {
return this.readCompacted;
}
public void setReadCompacted(Boolean readCompacted) {
this.readCompacted = readCompacted;
}
public SubscriptionInitialPosition getSubscriptionInitialPosition() {
return this.subscriptionInitialPosition;
}
public void setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
this.subscriptionInitialPosition = subscriptionInitialPosition;
}
public Integer getPatternAutoDiscoveryPeriod() {
return this.patternAutoDiscoveryPeriod;
}
public void setPatternAutoDiscoveryPeriod(Integer patternAutoDiscoveryPeriod) {
this.patternAutoDiscoveryPeriod = patternAutoDiscoveryPeriod;
}
public RegexSubscriptionMode getRegexSubscriptionMode() {
return this.regexSubscriptionMode;
}
public void setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode) {
this.regexSubscriptionMode = regexSubscriptionMode;
}
@Nullable
public DeadLetterPolicy getDeadLetterPolicy() {
return this.deadLetterPolicy;
}
public void setDeadLetterPolicy(@Nullable DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}
public Boolean getRetryEnable() {
return this.retryEnable;
}
public void setRetryEnable(Boolean retryEnable) {
this.retryEnable = retryEnable;
}
public Boolean getAutoUpdatePartitions() {
return this.autoUpdatePartitions;
}
public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}
public Duration getAutoUpdatePartitionsInterval() {
return this.autoUpdatePartitionsInterval;
}
public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
}
public Boolean getReplicateSubscriptionState() {
return this.replicateSubscriptionState;
}
public void setReplicateSubscriptionState(Boolean replicateSubscriptionState) {
this.replicateSubscriptionState = replicateSubscriptionState;
}
public Boolean getResetIncludeHead() {
return this.resetIncludeHead;
}
public void setResetIncludeHead(Boolean resetIncludeHead) {
this.resetIncludeHead = resetIncludeHead;
}
public Boolean getBatchIndexAckEnabled() {
return this.batchIndexAckEnabled;
}
public void setBatchIndexAckEnabled(Boolean batchIndexAckEnabled) {
this.batchIndexAckEnabled = batchIndexAckEnabled;
}
public Boolean getAckReceiptEnabled() {
return this.ackReceiptEnabled;
}
public void setAckReceiptEnabled(Boolean ackReceiptEnabled) {
this.ackReceiptEnabled = ackReceiptEnabled;
}
public Boolean getPoolMessages() {
return this.poolMessages;
}
public void setPoolMessages(Boolean poolMessages) {
this.poolMessages = poolMessages;
}
public Boolean getStartPaused() {
return this.startPaused;
}
public void setStartPaused(Boolean startPaused) {
this.startPaused = startPaused;
}
public Boolean getAutoAckOldestChunkedMessageOnQueueFull() {
return this.autoAckOldestChunkedMessageOnQueueFull;
}
public void setAutoAckOldestChunkedMessageOnQueueFull(Boolean autoAckOldestChunkedMessageOnQueueFull) {
this.autoAckOldestChunkedMessageOnQueueFull = autoAckOldestChunkedMessageOnQueueFull;
}
public Integer getMaxPendingChunkedMessage() {
return this.maxPendingChunkedMessage;
}
public void setMaxPendingChunkedMessage(Integer maxPendingChunkedMessage) {
this.maxPendingChunkedMessage = maxPendingChunkedMessage;
}
public Duration getExpireTimeOfIncompleteChunkedMessage() {
return this.expireTimeOfIncompleteChunkedMessage;
}
public void setExpireTimeOfIncompleteChunkedMessage(Duration expireTimeOfIncompleteChunkedMessage) {
this.expireTimeOfIncompleteChunkedMessage = expireTimeOfIncompleteChunkedMessage;
}
public Map<String, Object> buildProperties() {
PulsarProperties.Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getTopics).to(properties.in("topicNames"));
map.from(this::getTopicsPattern).to(properties.in("topicsPattern"));
map.from(this::getSubscriptionName).to(properties.in("subscriptionName"));
map.from(this::getSubscriptionType).to(properties.in("subscriptionType"));
map.from(this::getSubscriptionProperties).to(properties.in("subscriptionProperties"));
map.from(this::getSubscriptionMode).to(properties.in("subscriptionMode"));
map.from(this::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
map.from(this::getAcknowledgementsGroupTime).as(it -> it.toNanos() / 1000)
.to(properties.in("acknowledgementsGroupTimeMicros"));
map.from(this::getNegativeAckRedeliveryDelay).as(it -> it.toNanos() / 1000)
.to(properties.in("negativeAckRedeliveryDelayMicros"));
map.from(this::getMaxTotalReceiverQueueSizeAcrossPartitions)
.to(properties.in("maxTotalReceiverQueueSizeAcrossPartitions"));
map.from(this::getConsumerName).to(properties.in("consumerName"));
map.from(this::getAckTimeout).as(Duration::toMillis).to(properties.in("ackTimeoutMillis"));
map.from(this::getTickDuration).as(Duration::toMillis).to(properties.in("tickDurationMillis"));
map.from(this::getPriorityLevel).to(properties.in("priorityLevel"));
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(this::getProperties).to(properties.in("properties"));
map.from(this::getReadCompacted).to(properties.in("readCompacted"));
map.from(this::getSubscriptionInitialPosition).to(properties.in("subscriptionInitialPosition"));
map.from(this::getPatternAutoDiscoveryPeriod).to(properties.in("patternAutoDiscoveryPeriod"));
map.from(this::getRegexSubscriptionMode).to(properties.in("regexSubscriptionMode"));
map.from(this::getDeadLetterPolicy).to(properties.in("deadLetterPolicy"));
map.from(this::getRetryEnable).to(properties.in("retryEnable"));
map.from(this::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(this::getReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
map.from(this::getResetIncludeHead).to(properties.in("resetIncludeHead"));
map.from(this::getBatchIndexAckEnabled).to(properties.in("batchIndexAckEnabled"));
map.from(this::getAckReceiptEnabled).to(properties.in("ackReceiptEnabled"));
map.from(this::getPoolMessages).to(properties.in("poolMessages"));
map.from(this::getStartPaused).to(properties.in("startPaused"));
map.from(this::getAutoAckOldestChunkedMessageOnQueueFull)
.to(properties.in("autoAckOldestChunkedMessageOnQueueFull"));
map.from(this::getMaxPendingChunkedMessage).to(properties.in("maxPendingChunkedMessage"));
map.from(this::getExpireTimeOfIncompleteChunkedMessage).as(Duration::toMillis)
.to(properties.in("expireTimeOfIncompleteChunkedMessageMillis"));
return properties;
}
}

View File

@@ -0,0 +1,404 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.autoconfigure;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.autoconfigure.PulsarProperties.Cache;
import org.springframework.pulsar.autoconfigure.PulsarProperties.Properties;
import org.springframework.util.unit.DataSize;
/**
* Configuration properties used to specify Pulsar producers.
*
* @author Chris Bono
*/
public class ProducerConfigProperties {
/**
* Topic the producer will publish to.
*/
private String topicName;
/**
* Name for the producer. If not assigned, a unique name is generated.
*/
private String producerName;
/**
* Time before a message has to be acknowledged by the broker.
*/
private Duration sendTimeout = Duration.ofSeconds(30);
/**
* Whether the "send" and "sendAsync" methods should block if the outgoing message
* queue is full.
*/
private Boolean blockIfQueueFull = false;
/**
* Maximum number of pending messages for the producer.
*/
private Integer maxPendingMessages = 1000;
/**
* Maximum number of pending messages across all the partitions.
*/
private Integer maxPendingMessagesAcrossPartitions = 50000;
/**
* Message routing mode for a partitioned producer.
*/
private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
/**
* Message hashing scheme to choose the partition to which the message is published.
*/
private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
/**
* Action the producer will take in case of encryption failure.
*/
private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
/**
* Time period within which the messages sent will be batched.
*/
private Duration batchingMaxPublishDelay = Duration.ofMillis(1);
/**
* Partition switch frequency while batching of messages is enabled and using
* round-robin routing mode for non-keyed message.
*/
private Integer batchingPartitionSwitchFrequencyByPublishDelay = 10;
/**
* Maximum number of messages to be batched.
*/
private Integer batchingMaxMessages = 1000;
/**
* Maximum number of bytes permitted in a batch.
*/
private DataSize batchingMaxBytes = DataSize.ofKilobytes(128);
/**
* Whether to automatically batch messages.
*/
private Boolean batchingEnabled = true;
/**
* Whether to split large-size messages into multiple chunks.
*/
private Boolean chunkingEnabled = false;
/**
* Names of the public encryption keys to use when encrypting data.
*/
private Set<String> encryptionKeys = new HashSet<>();
/**
* Message compression type.
*/
private CompressionType compressionType;
/**
* Baseline for the sequence ids for messages published by the producer.
*/
@Nullable
private Long initialSequenceId;
/**
* Whether partitioned producer automatically discover new partitions at runtime.
*/
private Boolean autoUpdatePartitions = true;
/**
* Interval of partitions discovery updates.
*/
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);
/**
* Whether the multiple schema mode is enabled.
*/
private Boolean multiSchema = true;
/**
* Type of access to the topic the producer requires.
*/
private ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
/**
* Whether producers in Shared mode register and connect immediately to the owner
* broker of each partition or start lazily on demand.
*/
private Boolean lazyStartPartitionedProducers = false;
/**
* Map of properties to add to the producer.
*/
private Map<String, String> properties = new HashMap<>();
private final Cache cache = new Cache();
public String getTopicName() {
return this.topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getProducerName() {
return this.producerName;
}
public void setProducerName(String producerName) {
this.producerName = producerName;
}
public Duration getSendTimeout() {
return this.sendTimeout;
}
public void setSendTimeout(Duration sendTimeout) {
this.sendTimeout = sendTimeout;
}
public Boolean getBlockIfQueueFull() {
return this.blockIfQueueFull;
}
public void setBlockIfQueueFull(Boolean blockIfQueueFull) {
this.blockIfQueueFull = blockIfQueueFull;
}
public Integer getMaxPendingMessages() {
return this.maxPendingMessages;
}
public void setMaxPendingMessages(Integer maxPendingMessages) {
this.maxPendingMessages = maxPendingMessages;
}
public Integer getMaxPendingMessagesAcrossPartitions() {
return this.maxPendingMessagesAcrossPartitions;
}
public void setMaxPendingMessagesAcrossPartitions(Integer maxPendingMessagesAcrossPartitions) {
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
}
public MessageRoutingMode getMessageRoutingMode() {
return this.messageRoutingMode;
}
public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
this.messageRoutingMode = messageRoutingMode;
}
public HashingScheme getHashingScheme() {
return this.hashingScheme;
}
public void setHashingScheme(HashingScheme hashingScheme) {
this.hashingScheme = hashingScheme;
}
public ProducerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
}
public void setCryptoFailureAction(ProducerCryptoFailureAction cryptoFailureAction) {
this.cryptoFailureAction = cryptoFailureAction;
}
public Duration getBatchingMaxPublishDelay() {
return this.batchingMaxPublishDelay;
}
public void setBatchingMaxPublishDelay(Duration batchingMaxPublishDelay) {
this.batchingMaxPublishDelay = batchingMaxPublishDelay;
}
public Integer getBatchingPartitionSwitchFrequencyByPublishDelay() {
return this.batchingPartitionSwitchFrequencyByPublishDelay;
}
public void setBatchingPartitionSwitchFrequencyByPublishDelay(
Integer batchingPartitionSwitchFrequencyByPublishDelay) {
this.batchingPartitionSwitchFrequencyByPublishDelay = batchingPartitionSwitchFrequencyByPublishDelay;
}
public Integer getBatchingMaxMessages() {
return this.batchingMaxMessages;
}
public void setBatchingMaxMessages(Integer batchingMaxMessages) {
this.batchingMaxMessages = batchingMaxMessages;
}
public DataSize getBatchingMaxBytes() {
return this.batchingMaxBytes;
}
public void setBatchingMaxBytes(DataSize batchingMaxBytes) {
this.batchingMaxBytes = batchingMaxBytes;
}
public Boolean getBatchingEnabled() {
return this.batchingEnabled;
}
public void setBatchingEnabled(Boolean batchingEnabled) {
this.batchingEnabled = batchingEnabled;
}
public Boolean getChunkingEnabled() {
return this.chunkingEnabled;
}
public void setChunkingEnabled(Boolean chunkingEnabled) {
this.chunkingEnabled = chunkingEnabled;
}
public Set<String> getEncryptionKeys() {
return this.encryptionKeys;
}
public void setEncryptionKeys(Set<String> encryptionKeys) {
this.encryptionKeys = encryptionKeys;
}
public CompressionType getCompressionType() {
return this.compressionType;
}
public void setCompressionType(CompressionType compressionType) {
this.compressionType = compressionType;
}
@Nullable
public Long getInitialSequenceId() {
return this.initialSequenceId;
}
public void setInitialSequenceId(@Nullable Long initialSequenceId) {
this.initialSequenceId = initialSequenceId;
}
public Boolean getAutoUpdatePartitions() {
return this.autoUpdatePartitions;
}
public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}
public Duration getAutoUpdatePartitionsInterval() {
return this.autoUpdatePartitionsInterval;
}
public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
}
public Boolean getMultiSchema() {
return this.multiSchema;
}
public void setMultiSchema(Boolean multiSchema) {
this.multiSchema = multiSchema;
}
public ProducerAccessMode getProducerAccessMode() {
return this.producerAccessMode;
}
public void setProducerAccessMode(ProducerAccessMode producerAccessMode) {
this.producerAccessMode = producerAccessMode;
}
public Boolean getLazyStartPartitionedProducers() {
return this.lazyStartPartitionedProducers;
}
public void setLazyStartPartitionedProducers(Boolean lazyStartPartitionedProducers) {
this.lazyStartPartitionedProducers = lazyStartPartitionedProducers;
}
public Map<String, String> getProperties() {
return this.properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
public Cache getCache() {
return this.cache;
}
public Map<String, Object> buildProperties() {
PulsarProperties.Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getTopicName).to(properties.in("topicName"));
map.from(this::getProducerName).to(properties.in("producerName"));
map.from(this::getSendTimeout).asInt(Duration::toMillis).to(properties.in("sendTimeoutMs"));
map.from(this::getBlockIfQueueFull).to(properties.in("blockIfQueueFull"));
map.from(this::getMaxPendingMessages).to(properties.in("maxPendingMessages"));
map.from(this::getMaxPendingMessagesAcrossPartitions).to(properties.in("maxPendingMessagesAcrossPartitions"));
map.from(this::getMessageRoutingMode).to(properties.in("messageRoutingMode"));
map.from(this::getHashingScheme).to(properties.in("hashingScheme"));
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(this::getBatchingMaxPublishDelay).as(it -> it.toNanos() / 1000)
.to(properties.in("batchingMaxPublishDelayMicros"));
map.from(this::getBatchingPartitionSwitchFrequencyByPublishDelay)
.to(properties.in("batchingPartitionSwitchFrequencyByPublishDelay"));
map.from(this::getBatchingMaxMessages).to(properties.in("batchingMaxMessages"));
map.from(this::getBatchingMaxBytes).asInt(DataSize::toBytes).to(properties.in("batchingMaxBytes"));
map.from(this::getBatchingEnabled).to(properties.in("batchingEnabled"));
map.from(this::getChunkingEnabled).to(properties.in("chunkingEnabled"));
map.from(this::getEncryptionKeys).to(properties.in("encryptionKeys"));
map.from(this::getCompressionType).to(properties.in("compressionType"));
map.from(this::getInitialSequenceId).to(properties.in("initialSequenceId"));
map.from(this::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(this::getMultiSchema).to(properties.in("multiSchema"));
map.from(this::getProducerAccessMode).to(properties.in("accessMode"));
map.from(this::getLazyStartPartitionedProducers).to(properties.in("lazyStartPartitionedProducers"));
map.from(this::getProperties).to(properties.in("properties"));
return properties;
}
}

View File

@@ -19,33 +19,16 @@ package org.springframework.pulsar.autoconfigure;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@@ -65,7 +48,8 @@ import org.springframework.util.unit.DataSize;
@ConfigurationProperties(prefix = "spring.pulsar")
public class PulsarProperties {
private final Consumer consumer = new Consumer();
@NestedConfigurationProperty
private final ConsumerConfigProperties consumer = new ConsumerConfigProperties();
private final Client client = new Client();
@@ -73,7 +57,8 @@ public class PulsarProperties {
private final Listener listener = new Listener();
private final Producer producer = new Producer();
@NestedConfigurationProperty
private final ProducerConfigProperties producer = new ProducerConfigProperties();
private final Template template = new Template();
@@ -83,7 +68,7 @@ public class PulsarProperties {
private final Defaults defaults = new Defaults();
public Consumer getConsumer() {
public ConsumerConfigProperties getConsumer() {
return this.consumer;
}
@@ -99,7 +84,7 @@ public class PulsarProperties {
return this.function;
}
public Producer getProducer() {
public ProducerConfigProperties getProducer() {
return this.producer;
}
@@ -139,863 +124,6 @@ public class PulsarProperties {
return new HashMap<>(this.reader.buildProperties());
}
public static class Consumer {
/**
* Comma-separated list of topics the consumer subscribes to.
*/
private Set<String> topics;
/**
* Pattern for topics the consumer subscribes to.
*/
private Pattern topicsPattern;
/**
* Subscription name for the consumer.
*/
private String subscriptionName;
/**
* Subscription type to be used when subscribing to a topic.
*/
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
/**
* Map of properties to add to the subscription.
*/
private Map<String, String> subscriptionProperties = new HashMap<>();
/**
* Subscription mode to be used when subscribing to the topic.
*/
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
/**
* Number of messages that can be accumulated before the consumer calls "receive".
*/
private Integer receiverQueueSize = 1000;
/**
* Time to group acknowledgements before sending them to the broker.
*/
private Duration acknowledgementsGroupTime = Duration.ofMillis(100);
/**
* Delay before re-delivering messages that have failed to be processed.
*/
private Duration negativeAckRedeliveryDelay = Duration.ofMinutes(1);
/**
* Maximum number of messages that a consumer can be pushed at once from a broker
* across all partitions.
*/
private Integer maxTotalReceiverQueueSizeAcrossPartitions = 50000;
/**
* Consumer name to identify a particular consumer from the topic stats.
*/
private String consumerName;
/**
* Timeout for unacked messages to be redelivered.
*/
private Duration ackTimeout = Duration.ZERO;
/**
* Precision for the ack timeout messages tracker.
*/
private Duration tickDuration = Duration.ofSeconds(1);
/**
* Priority level for shared subscription consumers.
*/
private Integer priorityLevel = 0;
/**
* Action the consumer will take in case of decryption failure.
*/
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
/**
* Map of properties to add to the consumer.
*/
private SortedMap<String, String> properties = new TreeMap<>();
/**
* Whether to read messages from the compacted topic rather than the full message
* backlog.
*/
private Boolean readCompacted = false;
/**
* Position where to initialize a newly created subscription.
*/
private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
/**
* Auto-discovery period for topics when topic pattern is used in minutes.
*/
private Integer patternAutoDiscoveryPeriod = 1;
/**
* Determines which topics the consumer should be subscribed to when using pattern
* subscriptions.
*/
private RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;
/**
* Dead letter policy to use.
*/
@Nullable
@NestedConfigurationProperty
private DeadLetterPolicy deadLetterPolicy;
/**
* Whether to auto retry messages.
*/
private Boolean retryEnable = false;
/**
* Whether the consumer auto-subscribes for partition increase. This is only for
* partitioned consumers.
*/
private Boolean autoUpdatePartitions = true;
/**
* Interval of partitions discovery updates.
*/
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);
/**
* Whether to replicate subscription state.
*/
private Boolean replicateSubscriptionState = false;
/**
* Whether to include the given position of any reset operation like
* {@link org.apache.pulsar.client.api.Consumer#seek(long) or
* {@link Consumer#seek(MessageId)}}.
*/
private Boolean resetIncludeHead = false;
/**
* Whether the batch index acknowledgment is enabled.
*/
private Boolean batchIndexAckEnabled = false;
/**
* Whether an acknowledgement receipt is enabled.
*/
private Boolean ackReceiptEnabled = false;
/**
* Whether pooling of messages and the underlying data buffers is enabled.
*/
private Boolean poolMessages = false;
/**
* Whether to start the consumer in a paused state.
*/
private Boolean startPaused = false;
/**
* Whether to automatically drop outstanding un-acked messages if the queue is
* full.
*/
private Boolean autoAckOldestChunkedMessageOnQueueFull = true;
/**
* Maximum number of chunked messages to be kept in memory.
*/
private Integer maxPendingChunkedMessage = 10;
/**
* Time to expire incomplete chunks if the consumer won't be able to receive all
* chunks before.
*/
private Duration expireTimeOfIncompleteChunkedMessage = Duration.ofMinutes(1);
public Set<String> getTopics() {
return this.topics;
}
public void setTopics(Set<String> topics) {
this.topics = topics;
}
public Pattern getTopicsPattern() {
return this.topicsPattern;
}
public void setTopicsPattern(Pattern topicsPattern) {
this.topicsPattern = topicsPattern;
}
public String getSubscriptionName() {
return this.subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
public Map<String, String> getSubscriptionProperties() {
return this.subscriptionProperties;
}
public void setSubscriptionProperties(Map<String, String> subscriptionProperties) {
this.subscriptionProperties = subscriptionProperties;
}
public SubscriptionMode getSubscriptionMode() {
return this.subscriptionMode;
}
public void setSubscriptionMode(SubscriptionMode subscriptionMode) {
this.subscriptionMode = subscriptionMode;
}
public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
}
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}
public Integer getReceiverQueueSize() {
return this.receiverQueueSize;
}
public void setReceiverQueueSize(Integer receiverQueueSize) {
this.receiverQueueSize = receiverQueueSize;
}
public Duration getAcknowledgementsGroupTime() {
return this.acknowledgementsGroupTime;
}
public void setAcknowledgementsGroupTime(Duration acknowledgementsGroupTime) {
this.acknowledgementsGroupTime = acknowledgementsGroupTime;
}
public Duration getNegativeAckRedeliveryDelay() {
return this.negativeAckRedeliveryDelay;
}
public void setNegativeAckRedeliveryDelay(Duration negativeAckRedeliveryDelay) {
this.negativeAckRedeliveryDelay = negativeAckRedeliveryDelay;
}
public Integer getMaxTotalReceiverQueueSizeAcrossPartitions() {
return this.maxTotalReceiverQueueSizeAcrossPartitions;
}
public void setMaxTotalReceiverQueueSizeAcrossPartitions(Integer maxTotalReceiverQueueSizeAcrossPartitions) {
this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
}
public String getConsumerName() {
return this.consumerName;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public Duration getAckTimeout() {
return this.ackTimeout;
}
public void setAckTimeout(Duration ackTimeout) {
this.ackTimeout = ackTimeout;
}
public Duration getTickDuration() {
return this.tickDuration;
}
public void setTickDuration(Duration tickDuration) {
this.tickDuration = tickDuration;
}
public Integer getPriorityLevel() {
return this.priorityLevel;
}
public void setPriorityLevel(Integer priorityLevel) {
this.priorityLevel = priorityLevel;
}
public ConsumerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
}
public void setCryptoFailureAction(ConsumerCryptoFailureAction cryptoFailureAction) {
this.cryptoFailureAction = cryptoFailureAction;
}
public SortedMap<String, String> getProperties() {
return this.properties;
}
public void setProperties(SortedMap<String, String> properties) {
this.properties = properties;
}
public Boolean getReadCompacted() {
return this.readCompacted;
}
public void setReadCompacted(Boolean readCompacted) {
this.readCompacted = readCompacted;
}
public SubscriptionInitialPosition getSubscriptionInitialPosition() {
return this.subscriptionInitialPosition;
}
public void setSubscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition) {
this.subscriptionInitialPosition = subscriptionInitialPosition;
}
public Integer getPatternAutoDiscoveryPeriod() {
return this.patternAutoDiscoveryPeriod;
}
public void setPatternAutoDiscoveryPeriod(Integer patternAutoDiscoveryPeriod) {
this.patternAutoDiscoveryPeriod = patternAutoDiscoveryPeriod;
}
public RegexSubscriptionMode getRegexSubscriptionMode() {
return this.regexSubscriptionMode;
}
public void setRegexSubscriptionMode(RegexSubscriptionMode regexSubscriptionMode) {
this.regexSubscriptionMode = regexSubscriptionMode;
}
@Nullable
public DeadLetterPolicy getDeadLetterPolicy() {
return this.deadLetterPolicy;
}
public void setDeadLetterPolicy(@Nullable DeadLetterPolicy deadLetterPolicy) {
this.deadLetterPolicy = deadLetterPolicy;
}
public Boolean getRetryEnable() {
return this.retryEnable;
}
public void setRetryEnable(Boolean retryEnable) {
this.retryEnable = retryEnable;
}
public Boolean getAutoUpdatePartitions() {
return this.autoUpdatePartitions;
}
public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}
public Duration getAutoUpdatePartitionsInterval() {
return this.autoUpdatePartitionsInterval;
}
public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
}
public Boolean getReplicateSubscriptionState() {
return this.replicateSubscriptionState;
}
public void setReplicateSubscriptionState(Boolean replicateSubscriptionState) {
this.replicateSubscriptionState = replicateSubscriptionState;
}
public Boolean getResetIncludeHead() {
return this.resetIncludeHead;
}
public void setResetIncludeHead(Boolean resetIncludeHead) {
this.resetIncludeHead = resetIncludeHead;
}
public Boolean getBatchIndexAckEnabled() {
return this.batchIndexAckEnabled;
}
public void setBatchIndexAckEnabled(Boolean batchIndexAckEnabled) {
this.batchIndexAckEnabled = batchIndexAckEnabled;
}
public Boolean getAckReceiptEnabled() {
return this.ackReceiptEnabled;
}
public void setAckReceiptEnabled(Boolean ackReceiptEnabled) {
this.ackReceiptEnabled = ackReceiptEnabled;
}
public Boolean getPoolMessages() {
return this.poolMessages;
}
public void setPoolMessages(Boolean poolMessages) {
this.poolMessages = poolMessages;
}
public Boolean getStartPaused() {
return this.startPaused;
}
public void setStartPaused(Boolean startPaused) {
this.startPaused = startPaused;
}
public Boolean getAutoAckOldestChunkedMessageOnQueueFull() {
return this.autoAckOldestChunkedMessageOnQueueFull;
}
public void setAutoAckOldestChunkedMessageOnQueueFull(Boolean autoAckOldestChunkedMessageOnQueueFull) {
this.autoAckOldestChunkedMessageOnQueueFull = autoAckOldestChunkedMessageOnQueueFull;
}
public Integer getMaxPendingChunkedMessage() {
return this.maxPendingChunkedMessage;
}
public void setMaxPendingChunkedMessage(Integer maxPendingChunkedMessage) {
this.maxPendingChunkedMessage = maxPendingChunkedMessage;
}
public Duration getExpireTimeOfIncompleteChunkedMessage() {
return this.expireTimeOfIncompleteChunkedMessage;
}
public void setExpireTimeOfIncompleteChunkedMessage(Duration expireTimeOfIncompleteChunkedMessage) {
this.expireTimeOfIncompleteChunkedMessage = expireTimeOfIncompleteChunkedMessage;
}
public Map<String, Object> buildProperties() {
PulsarProperties.Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getTopics).to(properties.in("topicNames"));
map.from(this::getTopicsPattern).to(properties.in("topicsPattern"));
map.from(this::getSubscriptionName).to(properties.in("subscriptionName"));
map.from(this::getSubscriptionType).to(properties.in("subscriptionType"));
map.from(this::getSubscriptionProperties).to(properties.in("subscriptionProperties"));
map.from(this::getSubscriptionMode).to(properties.in("subscriptionMode"));
map.from(this::getReceiverQueueSize).to(properties.in("receiverQueueSize"));
map.from(this::getAcknowledgementsGroupTime).as(it -> it.toNanos() / 1000)
.to(properties.in("acknowledgementsGroupTimeMicros"));
map.from(this::getNegativeAckRedeliveryDelay).as(it -> it.toNanos() / 1000)
.to(properties.in("negativeAckRedeliveryDelayMicros"));
map.from(this::getMaxTotalReceiverQueueSizeAcrossPartitions)
.to(properties.in("maxTotalReceiverQueueSizeAcrossPartitions"));
map.from(this::getConsumerName).to(properties.in("consumerName"));
map.from(this::getAckTimeout).as(Duration::toMillis).to(properties.in("ackTimeoutMillis"));
map.from(this::getTickDuration).as(Duration::toMillis).to(properties.in("tickDurationMillis"));
map.from(this::getPriorityLevel).to(properties.in("priorityLevel"));
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(this::getProperties).to(properties.in("properties"));
map.from(this::getReadCompacted).to(properties.in("readCompacted"));
map.from(this::getSubscriptionInitialPosition).to(properties.in("subscriptionInitialPosition"));
map.from(this::getPatternAutoDiscoveryPeriod).to(properties.in("patternAutoDiscoveryPeriod"));
map.from(this::getRegexSubscriptionMode).to(properties.in("regexSubscriptionMode"));
map.from(this::getDeadLetterPolicy).to(properties.in("deadLetterPolicy"));
map.from(this::getRetryEnable).to(properties.in("retryEnable"));
map.from(this::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(this::getReplicateSubscriptionState).to(properties.in("replicateSubscriptionState"));
map.from(this::getResetIncludeHead).to(properties.in("resetIncludeHead"));
map.from(this::getBatchIndexAckEnabled).to(properties.in("batchIndexAckEnabled"));
map.from(this::getAckReceiptEnabled).to(properties.in("ackReceiptEnabled"));
map.from(this::getPoolMessages).to(properties.in("poolMessages"));
map.from(this::getStartPaused).to(properties.in("startPaused"));
map.from(this::getAutoAckOldestChunkedMessageOnQueueFull)
.to(properties.in("autoAckOldestChunkedMessageOnQueueFull"));
map.from(this::getMaxPendingChunkedMessage).to(properties.in("maxPendingChunkedMessage"));
map.from(this::getExpireTimeOfIncompleteChunkedMessage).as(Duration::toMillis)
.to(properties.in("expireTimeOfIncompleteChunkedMessageMillis"));
return properties;
}
}
public static class Producer {
/**
* Topic the producer will publish to.
*/
private String topicName;
/**
* Name for the producer. If not assigned, a unique name is generated.
*/
private String producerName;
/**
* Time before a message has to be acknowledged by the broker.
*/
private Duration sendTimeout = Duration.ofSeconds(30);
/**
* Whether the "send" and "sendAsync" methods should block if the outgoing message
* queue is full.
*/
private Boolean blockIfQueueFull = false;
/**
* Maximum number of pending messages for the producer.
*/
private Integer maxPendingMessages = 1000;
/**
* Maximum number of pending messages across all the partitions.
*/
private Integer maxPendingMessagesAcrossPartitions = 50000;
/**
* Message routing mode for a partitioned producer.
*/
private MessageRoutingMode messageRoutingMode = MessageRoutingMode.RoundRobinPartition;
/**
* Message hashing scheme to choose the partition to which the message is
* published.
*/
private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
/**
* Action the producer will take in case of encryption failure.
*/
private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
/**
* Time period within which the messages sent will be batched.
*/
private Duration batchingMaxPublishDelay = Duration.ofMillis(1);
/**
* Partition switch frequency while batching of messages is enabled and using
* round-robin routing mode for non-keyed message.
*/
private Integer batchingPartitionSwitchFrequencyByPublishDelay = 10;
/**
* Maximum number of messages to be batched.
*/
private Integer batchingMaxMessages = 1000;
/**
* Maximum number of bytes permitted in a batch.
*/
private DataSize batchingMaxBytes = DataSize.ofKilobytes(128);
/**
* Whether to automatically batch messages.
*/
private Boolean batchingEnabled = true;
/**
* Whether to split large-size messages into multiple chunks.
*/
private Boolean chunkingEnabled = false;
/**
* Names of the public encryption keys to use when encrypting data.
*/
private Set<String> encryptionKeys = new HashSet<>();
/**
* Message compression type.
*/
private CompressionType compressionType;
/**
* Baseline for the sequence ids for messages published by the producer.
*/
@Nullable
private Long initialSequenceId;
/**
* Whether partitioned producer automatically discover new partitions at runtime.
*/
private Boolean autoUpdatePartitions = true;
/**
* Interval of partitions discovery updates.
*/
private Duration autoUpdatePartitionsInterval = Duration.ofMinutes(1);
/**
* Whether the multiple schema mode is enabled.
*/
private Boolean multiSchema = true;
/**
* Type of access to the topic the producer requires.
*/
private ProducerAccessMode producerAccessMode = ProducerAccessMode.Shared;
/**
* Whether producers in Shared mode register and connect immediately to the owner
* broker of each partition or start lazily on demand.
*/
private Boolean lazyStartPartitionedProducers = false;
/**
* Map of properties to add to the producer.
*/
private Map<String, String> properties = new HashMap<>();
private final Cache cache = new Cache();
public String getTopicName() {
return this.topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getProducerName() {
return this.producerName;
}
public void setProducerName(String producerName) {
this.producerName = producerName;
}
public Duration getSendTimeout() {
return this.sendTimeout;
}
public void setSendTimeout(Duration sendTimeout) {
this.sendTimeout = sendTimeout;
}
public Boolean getBlockIfQueueFull() {
return this.blockIfQueueFull;
}
public void setBlockIfQueueFull(Boolean blockIfQueueFull) {
this.blockIfQueueFull = blockIfQueueFull;
}
public Integer getMaxPendingMessages() {
return this.maxPendingMessages;
}
public void setMaxPendingMessages(Integer maxPendingMessages) {
this.maxPendingMessages = maxPendingMessages;
}
public Integer getMaxPendingMessagesAcrossPartitions() {
return this.maxPendingMessagesAcrossPartitions;
}
public void setMaxPendingMessagesAcrossPartitions(Integer maxPendingMessagesAcrossPartitions) {
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
}
public MessageRoutingMode getMessageRoutingMode() {
return this.messageRoutingMode;
}
public void setMessageRoutingMode(MessageRoutingMode messageRoutingMode) {
this.messageRoutingMode = messageRoutingMode;
}
public HashingScheme getHashingScheme() {
return this.hashingScheme;
}
public void setHashingScheme(HashingScheme hashingScheme) {
this.hashingScheme = hashingScheme;
}
public ProducerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
}
public void setCryptoFailureAction(ProducerCryptoFailureAction cryptoFailureAction) {
this.cryptoFailureAction = cryptoFailureAction;
}
public Duration getBatchingMaxPublishDelay() {
return this.batchingMaxPublishDelay;
}
public void setBatchingMaxPublishDelay(Duration batchingMaxPublishDelay) {
this.batchingMaxPublishDelay = batchingMaxPublishDelay;
}
public Integer getBatchingPartitionSwitchFrequencyByPublishDelay() {
return this.batchingPartitionSwitchFrequencyByPublishDelay;
}
public void setBatchingPartitionSwitchFrequencyByPublishDelay(
Integer batchingPartitionSwitchFrequencyByPublishDelay) {
this.batchingPartitionSwitchFrequencyByPublishDelay = batchingPartitionSwitchFrequencyByPublishDelay;
}
public Integer getBatchingMaxMessages() {
return this.batchingMaxMessages;
}
public void setBatchingMaxMessages(Integer batchingMaxMessages) {
this.batchingMaxMessages = batchingMaxMessages;
}
public DataSize getBatchingMaxBytes() {
return this.batchingMaxBytes;
}
public void setBatchingMaxBytes(DataSize batchingMaxBytes) {
this.batchingMaxBytes = batchingMaxBytes;
}
public Boolean getBatchingEnabled() {
return this.batchingEnabled;
}
public void setBatchingEnabled(Boolean batchingEnabled) {
this.batchingEnabled = batchingEnabled;
}
public Boolean getChunkingEnabled() {
return this.chunkingEnabled;
}
public void setChunkingEnabled(Boolean chunkingEnabled) {
this.chunkingEnabled = chunkingEnabled;
}
public Set<String> getEncryptionKeys() {
return this.encryptionKeys;
}
public void setEncryptionKeys(Set<String> encryptionKeys) {
this.encryptionKeys = encryptionKeys;
}
public CompressionType getCompressionType() {
return this.compressionType;
}
public void setCompressionType(CompressionType compressionType) {
this.compressionType = compressionType;
}
@Nullable
public Long getInitialSequenceId() {
return this.initialSequenceId;
}
public void setInitialSequenceId(@Nullable Long initialSequenceId) {
this.initialSequenceId = initialSequenceId;
}
public Boolean getAutoUpdatePartitions() {
return this.autoUpdatePartitions;
}
public void setAutoUpdatePartitions(Boolean autoUpdatePartitions) {
this.autoUpdatePartitions = autoUpdatePartitions;
}
public Duration getAutoUpdatePartitionsInterval() {
return this.autoUpdatePartitionsInterval;
}
public void setAutoUpdatePartitionsInterval(Duration autoUpdatePartitionsInterval) {
this.autoUpdatePartitionsInterval = autoUpdatePartitionsInterval;
}
public Boolean getMultiSchema() {
return this.multiSchema;
}
public void setMultiSchema(Boolean multiSchema) {
this.multiSchema = multiSchema;
}
public ProducerAccessMode getProducerAccessMode() {
return this.producerAccessMode;
}
public void setProducerAccessMode(ProducerAccessMode producerAccessMode) {
this.producerAccessMode = producerAccessMode;
}
public Boolean getLazyStartPartitionedProducers() {
return this.lazyStartPartitionedProducers;
}
public void setLazyStartPartitionedProducers(Boolean lazyStartPartitionedProducers) {
this.lazyStartPartitionedProducers = lazyStartPartitionedProducers;
}
public Map<String, String> getProperties() {
return this.properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
public Cache getCache() {
return this.cache;
}
public Map<String, Object> buildProperties() {
PulsarProperties.Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getTopicName).to(properties.in("topicName"));
map.from(this::getProducerName).to(properties.in("producerName"));
map.from(this::getSendTimeout).asInt(Duration::toMillis).to(properties.in("sendTimeoutMs"));
map.from(this::getBlockIfQueueFull).to(properties.in("blockIfQueueFull"));
map.from(this::getMaxPendingMessages).to(properties.in("maxPendingMessages"));
map.from(this::getMaxPendingMessagesAcrossPartitions)
.to(properties.in("maxPendingMessagesAcrossPartitions"));
map.from(this::getMessageRoutingMode).to(properties.in("messageRoutingMode"));
map.from(this::getHashingScheme).to(properties.in("hashingScheme"));
map.from(this::getCryptoFailureAction).to(properties.in("cryptoFailureAction"));
map.from(this::getBatchingMaxPublishDelay).as(it -> it.toNanos() / 1000)
.to(properties.in("batchingMaxPublishDelayMicros"));
map.from(this::getBatchingPartitionSwitchFrequencyByPublishDelay)
.to(properties.in("batchingPartitionSwitchFrequencyByPublishDelay"));
map.from(this::getBatchingMaxMessages).to(properties.in("batchingMaxMessages"));
map.from(this::getBatchingMaxBytes).asInt(DataSize::toBytes).to(properties.in("batchingMaxBytes"));
map.from(this::getBatchingEnabled).to(properties.in("batchingEnabled"));
map.from(this::getChunkingEnabled).to(properties.in("chunkingEnabled"));
map.from(this::getEncryptionKeys).to(properties.in("encryptionKeys"));
map.from(this::getCompressionType).to(properties.in("compressionType"));
map.from(this::getInitialSequenceId).to(properties.in("initialSequenceId"));
map.from(this::getAutoUpdatePartitions).to(properties.in("autoUpdatePartitions"));
map.from(this::getAutoUpdatePartitionsInterval).as(Duration::toSeconds)
.to(properties.in("autoUpdatePartitionsIntervalSeconds"));
map.from(this::getMultiSchema).to(properties.in("multiSchema"));
map.from(this::getProducerAccessMode).to(properties.in("accessMode"));
map.from(this::getLazyStartPartitionedProducers).to(properties.in("lazyStartPartitionedProducers"));
map.from(this::getProperties).to(properties.in("properties"));
return properties;
}
}
public static class Template {
/**
@@ -2225,7 +1353,7 @@ public class PulsarProperties {
record TypeMapping(Class<?> messageType, String topicName) {
}
private static class Properties extends HashMap<String, Object> {
static class Properties extends HashMap<String, Object> {
<V> java.util.function.Consumer<V> in(String key) {
return (value) -> put(key, value);

View File

@@ -16,36 +16,88 @@
package org.springframework.pulsar.spring.cloud.stream.binder;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties;
import org.springframework.util.StringUtils;
/**
* Utility methods for the binder.
* Binder utility methods.
*
* @author Soby Chacko
* @author Chris Bono
*/
public final class PulsarBinderUtils {
final class PulsarBinderUtils {
static final String ANON_SUBSCRIPTION = "anon-subscription";
private static final LogAccessor LOGGER = new LogAccessor(PulsarBinderUtils.class);
static final char SUBSCRIPTION_NAME_SEPARATOR = '-';
private static final String SUBSCRIPTION_NAME_FORMAT_STR = "%s-anon-subscription-%s";
private PulsarBinderUtils() {
}
static String subscriptionName(PulsarConsumerProperties pulsarConsumerProperties,
ConsumerDestination consumerDestination) {
String subscriptionName = pulsarConsumerProperties.getSubscriptionName();
if (subscriptionName == null) {
// if subscription name is not provided, each time the app starts, it will be
// an anonymous subscription
subscriptionName = consumerDestination.getName() + SUBSCRIPTION_NAME_SEPARATOR + ANON_SUBSCRIPTION
+ SUBSCRIPTION_NAME_SEPARATOR + UUID.randomUUID();
/**
* Gets the subscription name to use for the binder.
* @param consumerProps the pulsar consumer props
* @param consumerDestination the destination being subscribed to
* @return the subscription name from the consumer properties or a generated name in
* the format {@link #SUBSCRIPTION_NAME_FORMAT_STR} when the name is not set on the
* consumer properties
*/
static String subscriptionName(PulsarConsumerProperties consumerProps, ConsumerDestination consumerDestination) {
if (StringUtils.hasText(consumerProps.getSubscriptionName())) {
return consumerProps.getSubscriptionName();
}
return subscriptionName;
return SUBSCRIPTION_NAME_FORMAT_STR.formatted(consumerDestination.getName(), UUID.randomUUID());
}
/**
* Merges properties defined at the binder and binding level (binding properties
* override binder properties).
* <p>
* <b>NOTE:</b> Properties whose value is not different from the default value in the
* {@code baseProps} are not included in the merged result.
* @param baseProps the map of base level properties (eg. 'spring.pulsar.consumer.*')
* @param binderProps the map of binder level properties (eg.
* 'spring.cloud.stream.pulsar.binder.consumer.*')
* @param bindingProps the map of binding level properties (eg.
* 'spring.cloud.stream.pulsar.bindings.myBinding-in-0.consumer.*')
* @return map of merged binder and binding properties including only properties whose
* value has changed from the same property in the base properties
*/
static Map<String, Object> mergePropertiesWithPrecedence(Map<String, Object> baseProps,
Map<String, Object> binderProps, Map<String, Object> bindingProps) {
Objects.requireNonNull(baseProps, "baseProps must be specified");
Objects.requireNonNull(binderProps, "binderProps must be specified");
Objects.requireNonNull(bindingProps, "bindingProps must be specified");
Map<String, Object> newOrModifiedBinderProps = extractNewOrModifiedProperties(binderProps, baseProps);
LOGGER.trace(() -> "New or modified binder props: %s".formatted(newOrModifiedBinderProps));
Map<String, Object> newOrModifiedBindingProps = extractNewOrModifiedProperties(bindingProps, baseProps);
LOGGER.trace(() -> "New or modified binding props: %s".formatted(newOrModifiedBindingProps));
Map<String, Object> mergedProps = new HashMap<>(newOrModifiedBinderProps);
mergedProps.putAll(newOrModifiedBindingProps);
LOGGER.trace(() -> "Final merged props: %s".formatted(mergedProps));
return mergedProps;
}
private static Map<String, Object> extractNewOrModifiedProperties(Map<String, Object> candidateProps,
Map<String, Object> baseProps) {
Map<String, Object> newOrModifiedProps = new HashMap<>();
candidateProps.forEach((propName, propValue) -> {
if (!baseProps.containsKey(propName) || (!Objects.equals(propValue, baseProps.get(propName)))) {
newOrModifiedProps.put(propName, propValue);
}
});
return newOrModifiedProps;
}
}

View File

@@ -31,7 +31,6 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.MessageProducerSupport;
@@ -42,6 +41,10 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties;
import org.springframework.pulsar.autoconfigure.ProducerConfigProperties;
import org.springframework.pulsar.core.ProducerBuilderConfigurationUtil;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.SchemaResolver;
@@ -49,6 +52,7 @@ import org.springframework.pulsar.listener.AbstractPulsarMessageListenerContaine
import org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
import org.springframework.pulsar.listener.PulsarRecordMessageListener;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarBinderConfigurationProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarExtendedBindingProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarProducerProperties;
@@ -70,14 +74,17 @@ public class PulsarMessageChannelBinder extends
private final SchemaResolver schemaResolver;
private PulsarBinderConfigurationProperties binderConfigProps;
private PulsarExtendedBindingProperties extendedBindingProperties = new PulsarExtendedBindingProperties();
public PulsarMessageChannelBinder(PulsarTopicProvisioner provisioningProvider,
PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<?> pulsarConsumerFactory,
SchemaResolver schemaResolver) {
PulsarBinderConfigurationProperties binderConfigProps, SchemaResolver schemaResolver) {
super(null, provisioningProvider);
this.pulsarTemplate = pulsarTemplate;
this.pulsarConsumerFactory = pulsarConsumerFactory;
this.binderConfigProps = binderConfigProps;
this.schemaResolver = schemaResolver;
}
@@ -95,11 +102,16 @@ public class PulsarMessageChannelBinder extends
else {
schema = null;
}
PulsarProducerConfigurationMessageHandler handler = new PulsarProducerConfigurationMessageHandler(
this.pulsarTemplate, schema, destination.getName());
AbstractApplicationContext applicationContext = getApplicationContext();
handler.setApplicationContext(applicationContext);
var baseProducerProps = new ProducerConfigProperties().buildProperties();
var binderProducerProps = this.binderConfigProps.getProducer().buildProperties();
var bindingProducerProps = producerProperties.getExtension().buildProperties();
var mergedProducerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseProducerProps,
binderProducerProps, bindingProducerProps);
var handler = new PulsarProducerConfigurationMessageHandler(this.pulsarTemplate, schema, destination.getName(),
(builder) -> ProducerBuilderConfigurationUtil.loadConf(builder, mergedProducerProps));
handler.setApplicationContext(getApplicationContext());
handler.setBeanFactory(getBeanFactory());
return handler;
@@ -108,29 +120,38 @@ public class PulsarMessageChannelBinder extends
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ExtendedConsumerProperties<PulsarConsumerProperties> properties) {
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setTopics(new String[] { destination.getName() });
PulsarMessageDrivenChannelAdapter pulsarMessageDrivenChannelAdapter = new PulsarMessageDrivenChannelAdapter();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
var containerProperties = new PulsarContainerProperties();
containerProperties.setTopics(new String[] { destination.getName() });
var messageDrivenChannelAdapter = new PulsarMessageDrivenChannelAdapter();
containerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
Message<Object> message = MessageBuilder.withPayload(msg.getValue()).build();
pulsarMessageDrivenChannelAdapter.send(message);
messageDrivenChannelAdapter.send(message);
});
if (properties.isUseNativeDecoding()) {
Schema<Object> schema = resolveSchema(properties.getExtension().getSchemaType(),
var schema = resolveSchema(properties.getExtension().getSchemaType(),
properties.getExtension().getMessageType(), properties.getExtension().getMessageKeyType(),
properties.getExtension().getMessageValueType());
pulsarContainerProperties.setSchema(
containerProperties.setSchema(
Objects.requireNonNull(schema, "Could not determine consumer schema for " + destination.getName()));
}
else {
pulsarContainerProperties.setSchema(Schema.BYTES);
containerProperties.setSchema(Schema.BYTES);
}
String subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination);
pulsarContainerProperties.setSubscriptionName(subscriptionName);
DefaultPulsarMessageListenerContainer<?> container = new DefaultPulsarMessageListenerContainer<>(
this.pulsarConsumerFactory, pulsarContainerProperties);
pulsarMessageDrivenChannelAdapter.setMessageListenerContainer(container);
return pulsarMessageDrivenChannelAdapter;
var subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination);
containerProperties.setSubscriptionName(subscriptionName);
var baseConsumerProps = new ConsumerConfigProperties().buildProperties();
var binderConsumerProps = this.binderConfigProps.getConsumer().buildProperties();
var bindingConsumerProps = properties.getExtension().buildProperties();
var mergedConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseConsumerProps,
binderConsumerProps, bindingConsumerProps);
containerProperties.getPulsarConsumerProperties().putAll(mergedConsumerProps);
var container = new DefaultPulsarMessageListenerContainer<>(this.pulsarConsumerFactory, containerProperties);
messageDrivenChannelAdapter.setMessageListenerContainer(container);
return messageDrivenChannelAdapter;
}
// VisibleForTesting
@@ -230,11 +251,14 @@ public class PulsarMessageChannelBinder extends
final String destination;
final ProducerBuilderCustomizer<Object> layeredProducerPropsCustomizer;
PulsarProducerConfigurationMessageHandler(PulsarTemplate<Object> pulsarTemplate, Schema<Object> schema,
String destination) {
String destination, ProducerBuilderCustomizer<Object> layeredProducerPropsCustomizer) {
this.pulsarTemplate = pulsarTemplate;
this.schema = schema;
this.destination = destination;
this.layeredProducerPropsCustomizer = layeredProducerPropsCustomizer;
}
@Override
@@ -262,7 +286,8 @@ public class PulsarMessageChannelBinder extends
@Override
protected void handleMessageInternal(Message<?> message) {
try {
this.pulsarTemplate.sendAsync(this.destination, message.getPayload(), this.schema);
this.pulsarTemplate.newMessage(message.getPayload()).withTopic(this.destination).withSchema(this.schema)
.withProducerCustomizer(this.layeredProducerPropsCustomizer).sendAsync();
}
catch (PulsarClientException ex) {
logger.trace(ex, "Failed to send message to destination: " + this.destination);

View File

@@ -51,10 +51,11 @@ public class PulsarBinderConfiguration {
@Bean
public PulsarMessageChannelBinder pulsarMessageChannelBinder(PulsarTopicProvisioner pulsarTopicProvisioner,
PulsarTemplate<Object> pulsarTemplate, PulsarConsumerFactory<byte[]> pulsarConsumerFactory,
PulsarExtendedBindingProperties pulsarExtendedBindingProperties, SchemaResolver schemaResolver) {
PulsarBinderConfigurationProperties binderConfigProps, PulsarExtendedBindingProperties bindingConfigProps,
SchemaResolver schemaResolver) {
PulsarMessageChannelBinder pulsarMessageChannelBinder = new PulsarMessageChannelBinder(pulsarTopicProvisioner,
pulsarTemplate, pulsarConsumerFactory, schemaResolver);
pulsarMessageChannelBinder.setExtendedBindingProperties(pulsarExtendedBindingProperties);
pulsarTemplate, pulsarConsumerFactory, binderConfigProps, schemaResolver);
pulsarMessageChannelBinder.setExtendedBindingProperties(bindingConfigProps);
return pulsarMessageChannelBinder;
}

View File

@@ -17,23 +17,41 @@
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties;
import org.springframework.pulsar.autoconfigure.ProducerConfigProperties;
/**
* {@link ConfigurationProperties} for Pulsar binder configuration.
*
* @author Soby Chacko
* @author Chris Bono
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.pulsar.binder")
public class PulsarBinderConfigurationProperties {
@NestedConfigurationProperty
private final ConsumerConfigProperties consumer = new ConsumerConfigProperties();
@NestedConfigurationProperty
private final ProducerConfigProperties producer = new ProducerConfigProperties();
private int partitionCount = 1;
public ConsumerConfigProperties getConsumer() {
return this.consumer;
}
public ProducerConfigProperties getProducer() {
return this.producer;
}
public int partitionCount() {
return this.partitionCount;
}
public void setPartitionCount(int numberOfPartitions) {
this.partitionCount = numberOfPartitions;
public void setPartitionCount(int partitionCount) {
this.partitionCount = partitionCount;
}
}

View File

@@ -16,10 +16,10 @@
package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.autoconfigure.ConsumerConfigProperties;
/**
* Pulsar consumer properties used by the binder.
@@ -27,13 +27,7 @@ import org.springframework.lang.Nullable;
* @author Soby Chacko
* @author Chris Bono
*/
public class PulsarConsumerProperties {
@Nullable
private String subscriptionName;
@Nullable
private SubscriptionType subscriptionType;
public class PulsarConsumerProperties extends ConsumerConfigProperties {
@Nullable
private SchemaType schemaType;
@@ -47,25 +41,7 @@ public class PulsarConsumerProperties {
@Nullable
private Class<?> messageValueType;
private int partitionCount = 1;
@Nullable
public String getSubscriptionName() {
return this.subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
@Nullable
public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
}
public void setSubscriptionType(SubscriptionType subscriptionType) {
this.subscriptionType = subscriptionType;
}
private Integer partitionCount = 1;
@Nullable
public SchemaType getSchemaType() {
@@ -103,11 +79,11 @@ public class PulsarConsumerProperties {
this.messageValueType = messageValueType;
}
public int getPartitionCount() {
public Integer getPartitionCount() {
return this.partitionCount;
}
public void setPartitionCount(int partitionCount) {
public void setPartitionCount(Integer partitionCount) {
this.partitionCount = partitionCount;
}

View File

@@ -19,6 +19,7 @@ package org.springframework.pulsar.spring.cloud.stream.binder.properties;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.autoconfigure.ProducerConfigProperties;
/**
* Pulsar producer properties used by the binder.
@@ -26,7 +27,7 @@ import org.springframework.lang.Nullable;
* @author Soby Chacko
* @author Chris Bono
*/
public class PulsarProducerProperties {
public class PulsarProducerProperties extends ProducerConfigProperties {
@Nullable
private SchemaType schemaType;

View File

@@ -0,0 +1,121 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarBinderConfigurationProperties;
/**
* Tests for {@link PulsarBinderConfigurationProperties}.
*
* @author Chris Bono
*/
public class PulsarBinderConfigurationPropertiesTests {
private final PulsarBinderConfigurationProperties properties = new PulsarBinderConfigurationProperties();
private void bind(Map<String, String> map) {
ConfigurationPropertySource source = new MapConfigurationPropertySource(map);
new Binder(source).bind("spring.cloud.stream.pulsar.binder", Bindable.ofInstance(this.properties));
}
@Test
void partitionCountProperty() {
assertThat(properties.partitionCount()).isEqualTo(1);
bind(Map.of("spring.cloud.stream.pulsar.binder.partition-count", "5150"));
assertThat(properties.partitionCount()).isEqualTo(5150);
}
@Test
void producerProperties() {
// Only spot check a few values (PulsarPropertiesTests does the heavy lifting)
Map<String, String> props = new HashMap<>();
props.put("spring.cloud.stream.pulsar.binder.producer.topic-name", "my-topic");
props.put("spring.cloud.stream.pulsar.binder.producer.send-timeout", "2s");
props.put("spring.cloud.stream.pulsar.binder.producer.max-pending-messages", "3");
props.put("spring.cloud.stream.pulsar.binder.producer.producer-access-mode", "exclusive");
props.put("spring.cloud.stream.pulsar.binder.producer.properties[my-prop]", "my-prop-value");
bind(props);
Map<String, Object> producerProps = properties.getProducer().buildProperties();
// Verify that the props can be loaded in a ProducerBuilder
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(producerProps,
new ProducerConfigurationData(), ProducerConfigurationData.class));
// @formatter:off
assertThat(producerProps)
.containsEntry("topicName", "my-topic")
.containsEntry("sendTimeoutMs", 2_000)
.containsEntry("maxPendingMessages", 3)
.containsEntry("accessMode", ProducerAccessMode.Exclusive)
.hasEntrySatisfying("properties", properties ->
assertThat(properties)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("my-prop", "my-prop-value"));
// @formatter:on
}
@Test
void consumerProperties() {
// Only spot check a few values (PulsarPropertiesTests does the heavy lifting)
Map<String, String> props = new HashMap<>();
props.put("spring.cloud.stream.pulsar.binder.consumer.topics[0]", "my-topic");
props.put("spring.cloud.stream.pulsar.binder.consumer.subscription-properties[my-sub-prop]",
"my-sub-prop-value");
props.put("spring.cloud.stream.pulsar.binder.consumer.subscription-mode", "nondurable");
props.put("spring.cloud.stream.pulsar.binder.consumer.receiver-queue-size", "1");
bind(props);
Map<String, Object> consumerProps = properties.getConsumer().buildProperties();
// Verify that the props can be loaded in a ConsumerBuilder
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(consumerProps,
new ConsumerConfigurationData<>(), ConsumerConfigurationData.class));
// @formatter:off
assertThat(consumerProps)
.hasEntrySatisfying("topicNames",
topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.collection(String.class))
.containsExactly("my-topic"))
.hasEntrySatisfying("subscriptionProperties",
properties -> assertThat(properties)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("my-sub-prop", "my-sub-prop-value"))
.containsEntry("subscriptionMode", SubscriptionMode.NonDurable)
.containsEntry("receiverQueueSize", 1);
// @formatter:on
}
}

View File

@@ -16,10 +16,20 @@
package org.springframework.pulsar.spring.cloud.stream.binder;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.awaitility.Awaitility;
@@ -38,9 +48,17 @@ import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.autoconfigure.PulsarProperties;
import org.springframework.pulsar.core.ConsumerBuilderCustomizer;
import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.ProducerBuilderCustomizer;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
/**
@@ -50,12 +68,46 @@ import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
* @author Chris Bono
*/
@ExtendWith(OutputCaptureExtension.class)
@SuppressWarnings("JUnitMalformedDeclaration")
class PulsarBinderIntegrationTests implements PulsarTestContainerSupport {
private final LogAccessor logger = new LogAccessor(this.getClass());
private static final int AWAIT_DURATION = 10;
@Test
void binderAndBindingPropsAreAppliedAndRespected(CapturedOutput output) {
SpringApplication app = new SpringApplication(BinderAndBindingPropsTestConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--spring.pulsar.client.service-url=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
"--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
"--spring.cloud.function.definition=textSupplier;textLogger",
"--spring.cloud.stream.bindings.textLogger-in-0.destination=textSupplier-out-0",
"--spring.pulsar.producer.producer-name=textSupplierProducer-fromBase",
"--spring.cloud.stream.pulsar.binder.producer.producer-name=textSupplierProducer-fromBinder",
"--spring.cloud.stream.pulsar.bindings.textSupplier-out-0.producer.producer-name=textSupplierProducer-fromBinding",
"--spring.cloud.stream.pulsar.binder.producer.max-pending-messages=1100",
"--spring.pulsar.producer.block-if-queue-full=true",
"--spring.cloud.stream.pulsar.binder.consumer.subscription-name=textLoggerSub-fromBinder",
"--spring.cloud.stream.pulsar.binder.consumer.consumer-name=textLogger-fromBinder",
"--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.consumer-name=textLogger-fromBinding")) {
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
.until(() -> output.toString().contains("Hello binder: test-basic-scenario"));
// now verify the properties were set onto producer and consumer as expected
TrackingProducerFactory producerFactory = context.getBean(TrackingProducerFactory.class);
assertThat(producerFactory.producersCreated).isNotEmpty().element(0)
.hasFieldOrPropertyWithValue("producerName", "textSupplierProducer-fromBinding")
.hasFieldOrPropertyWithValue("conf.maxPendingMessages", 1100)
.hasFieldOrPropertyWithValue("conf.blockIfQueueFull", true);
TrackingConsumerFactory consumerFactory = context.getBean(TrackingConsumerFactory.class);
assertThat(consumerFactory.consumersCreated).isNotEmpty().element(0)
.hasFieldOrPropertyWithValue("consumerName", "textLogger-fromBinding")
.hasFieldOrPropertyWithValue("conf.subscriptionName", "textLoggerSub-fromBinder");
}
}
@Nested
class DefaultEncoding {
@@ -69,10 +121,8 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport {
"--spring.cloud.function.definition=textSupplier;textLogger",
"--spring.cloud.stream.bindings.textLogger-in-0.destination=textSupplier-out-0",
"--spring.cloud.stream.pulsar.bindings.textLogger-in-0.consumer.subscription-name=pbit-text-sub1")) {
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
.until(() -> output.toString().contains("Hello binder: test-basic-scenario"));
}
}
@@ -246,6 +296,68 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport {
}
@EnableAutoConfiguration
@SpringBootConfiguration
@Import(PrimitiveTextConfig.class)
static class BinderAndBindingPropsTestConfig {
@Bean
public PulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient,
PulsarProperties pulsarProperties, TopicResolver topicResolver) {
return new TrackingProducerFactory(pulsarClient, pulsarProperties.buildProducerProperties(), topicResolver);
}
@Bean
public PulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
PulsarProperties pulsarProperties) {
return new TrackingConsumerFactory(pulsarClient, pulsarProperties.buildConsumerProperties());
}
}
static class TrackingProducerFactory extends DefaultPulsarProducerFactory<String> {
List<Producer<String>> producersCreated = new ArrayList<>();
TrackingProducerFactory(PulsarClient pulsarClient, Map<String, Object> config, TopicResolver topicResolver) {
super(pulsarClient, config, topicResolver);
}
@Override
protected Producer<String> doCreateProducer(Schema<String> schema, @Nullable String topic,
@Nullable Collection<String> encryptionKeys,
@Nullable List<ProducerBuilderCustomizer<String>> producerBuilderCustomizers)
throws PulsarClientException {
Producer<String> producer = super.doCreateProducer(schema, topic, encryptionKeys,
producerBuilderCustomizers);
producersCreated.add(producer);
return producer;
}
}
static class TrackingConsumerFactory extends DefaultPulsarConsumerFactory<String> {
List<org.apache.pulsar.client.api.Consumer<String>> consumersCreated = new ArrayList<>();
TrackingConsumerFactory(PulsarClient pulsarClient, Map<String, Object> consumerConfig) {
super(pulsarClient, consumerConfig);
}
@Override
public org.apache.pulsar.client.api.Consumer<String> createConsumer(Schema<String> schema,
@Nullable Collection<String> topics, @Nullable String subscriptionName,
@Nullable Map<String, String> metadataProperties,
@Nullable List<ConsumerBuilderCustomizer<String>> consumerBuilderCustomizers)
throws PulsarClientException {
org.apache.pulsar.client.api.Consumer<String> consumer = super.createConsumer(schema, topics,
subscriptionName, metadataProperties, consumerBuilderCustomizers);
consumersCreated.add(consumer);
return consumer;
}
}
@EnableAutoConfiguration
@SpringBootConfiguration
static class PrimitiveFloatConfig {

View File

@@ -53,7 +53,6 @@ import org.springframework.pulsar.core.DefaultPulsarConsumerFactory;
import org.springframework.pulsar.core.DefaultPulsarProducerFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarBinderConfigurationProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties;
@@ -89,8 +88,6 @@ public class PulsarBinderTests extends
}
}
private final String CLASS_UNDER_TEST_NAME = PulsarMessageChannelBinder.class.getSimpleName();
@Override
protected boolean usesExplicitRouting() {
return false;
@@ -98,26 +95,21 @@ public class PulsarBinderTests extends
@Override
protected String getClassUnderTestName() {
return CLASS_UNDER_TEST_NAME;
return PulsarMessageChannelBinder.class.getSimpleName();
}
@Override
protected PulsarTestBinder getBinder() {
PulsarAdministration pulsarAdministration = new PulsarAdministration(
var pulsarAdministration = new PulsarAdministration(
Map.of("serviceUrl", PulsarTestContainerSupport.getHttpServiceUrl()));
PulsarBinderConfigurationProperties configurationProperties = new PulsarBinderConfigurationProperties();
PulsarTopicProvisioner pulsarTopicProvisioner = new PulsarTopicProvisioner(pulsarAdministration,
configurationProperties);
PulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
Collections.emptyMap());
PulsarTemplate<?> pulsarTemplate = new PulsarTemplate<>(producerFactory);
Map<String, Object> config = Map.of("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest);
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, config);
var configProps = new PulsarBinderConfigurationProperties();
var provisioner = new PulsarTopicProvisioner(pulsarAdministration, configProps);
var producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Collections.emptyMap());
var pulsarTemplate = new PulsarTemplate<>(producerFactory);
var config = Map.<String, Object>of("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest);
var consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, config);
if (this.binder == null) {
this.binder = new PulsarTestBinder(pulsarTopicProvisioner, pulsarTemplate, consumerFactory,
this.binder = new PulsarTestBinder(provisioner, pulsarTemplate, consumerFactory, configProps,
new DefaultSchemaResolver());
}
return this.binder;
@@ -213,7 +205,7 @@ public class PulsarBinderTests extends
String.format("defaultGroup%s0", getDestinationNameDelimiter()), null, input2,
createConsumerProperties());
String testPayload1 = "foo-" + UUID.randomUUID().toString();
String testPayload1 = "foo-" + UUID.randomUUID();
output.send(MessageBuilder.withPayload(testPayload1)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build());
@@ -227,13 +219,13 @@ public class PulsarBinderTests extends
binding2.unbind();
String testPayload2 = "foo-" + UUID.randomUUID().toString();
String testPayload2 = "foo-" + UUID.randomUUID();
output.send(MessageBuilder.withPayload(testPayload2)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build());
binding2 = binder.bindConsumer(String.format("defaultGroup%s0", getDestinationNameDelimiter()), null, input2,
createConsumerProperties());
String testPayload3 = "foo-" + UUID.randomUUID().toString();
String testPayload3 = "foo-" + UUID.randomUUID();
output.send(MessageBuilder.withPayload(testPayload3)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build());
@@ -276,7 +268,7 @@ public class PulsarBinderTests extends
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build();
moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<Message<byte[]>>();
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
@@ -305,7 +297,7 @@ public class PulsarBinderTests extends
@Test
@Override
@Disabled
public void testPartitionedModuleSpEL(TestInfo testInfo) throws Exception {
public void testPartitionedModuleSpEL(TestInfo testInfo) {
// This use-case needs to be further evaluated for Pulsar binder.
}

View File

@@ -17,10 +17,19 @@
package org.springframework.pulsar.spring.cloud.stream.binder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarConsumerProperties;
@@ -29,27 +38,96 @@ import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarCo
* Unit tests for {@link PulsarBinderUtils}.
*
* @author Soby Chacko
* @author Chris Bono
*/
public class PulsarBinderUtilsTests {
@Test
void subscriptionNameIsNotNullWhenProvidedAsProperty() {
ConsumerDestination consumerDestination = mock(ConsumerDestination.class);
PulsarConsumerProperties pulsarConsumerProperties = mock(PulsarConsumerProperties.class);
when(pulsarConsumerProperties.getSubscriptionName()).thenReturn("my-subscription");
String subscriptionName = PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination);
assertThat(subscriptionName).isEqualTo("my-subscription");
@Nested
class SubscriptionNameTests {
@Test
void respectsValueWhenSetAsProperty() {
var consumerDestination = mock(ConsumerDestination.class);
var pulsarConsumerProperties = mock(PulsarConsumerProperties.class);
when(pulsarConsumerProperties.getSubscriptionName()).thenReturn("my-sub");
assertThat(PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination))
.isEqualTo("my-sub");
}
@Test
void generatesValueWhenNotSetAsProperty() {
var consumerDestination = mock(ConsumerDestination.class);
var pulsarConsumerProperties = mock(PulsarConsumerProperties.class);
when(pulsarConsumerProperties.getSubscriptionName()).thenReturn(null);
when(consumerDestination.getName()).thenReturn("my-topic");
assertThat(PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination))
.startsWith("my-topic-anon-subscription-");
}
}
@Test
void subscriptionNameIsNotNullWhenPropertyIsMissing() {
ConsumerDestination consumerDestination = mock(ConsumerDestination.class);
PulsarConsumerProperties pulsarConsumerProperties = mock(PulsarConsumerProperties.class);
when(pulsarConsumerProperties.getSubscriptionName()).thenReturn(null);
when(consumerDestination.getName()).thenReturn("my-topic");
String subscriptionName = PulsarBinderUtils.subscriptionName(pulsarConsumerProperties, consumerDestination);
assertThat(subscriptionName).startsWith("my-topic" + PulsarBinderUtils.SUBSCRIPTION_NAME_SEPARATOR
+ PulsarBinderUtils.ANON_SUBSCRIPTION + PulsarBinderUtils.SUBSCRIPTION_NAME_SEPARATOR);
@Nested
class MergedPropertiesTests {
@ParameterizedTest(name = "{0}")
@MethodSource("mergePropertiesTestProvider")
void mergePropertiesTest(String testName, Map<String, Object> baseProps, Map<String, Object> binderProps,
Map<String, Object> bindingProps, Map<String, Object> expectedMergedProps) {
assertThat(PulsarBinderUtils.mergePropertiesWithPrecedence(baseProps, binderProps, bindingProps))
.containsExactlyInAnyOrderEntriesOf(expectedMergedProps);
}
// @formatter:off
static Stream<Arguments> mergePropertiesTestProvider() {
return Stream.of(
arguments("binderLevelContainsSamePropAsBaseWithDiffValue",
Map.of("foo", "foo-base"),
Map.of("foo", "foo-binder"),
Collections.emptyMap(),
Map.of("foo", "foo-binder")),
arguments("binderLevelContainsNewPropNotInBase",
Collections.emptyMap(),
Map.of("foo", "foo-binder"),
Collections.emptyMap(),
Map.of("foo", "foo-binder")),
arguments("binderLevelContainsSamePropAsBaseWithSameValue",
Map.of("foo", "foo-base"),
Map.of("foo", "foo-base"),
Collections.emptyMap(),
Collections.emptyMap()),
arguments("bindingLevelContainsSamePropAsBaseWithDiffValue",
Map.of("foo", "foo-base"),
Collections.emptyMap(),
Map.of("foo", "foo-binding"),
Map.of("foo", "foo-binding")),
arguments("bindingLevelContainsNewPropNotInBase",
Collections.emptyMap(),
Map.of("foo", "foo-binding"),
Collections.emptyMap(),
Map.of("foo", "foo-binding")),
arguments("bindingLevelContainsSamePropAsBaseWithSameValue",
Map.of("foo", "foo-base"),
Collections.emptyMap(),
Map.of("foo", "foo-base"),
Collections.emptyMap()),
arguments("bindingOverridesBinder",
Map.of("bar", "bar-base"),
Map.of("foo", "foo-binder"),
Map.of("foo", "foo-binding"),
Map.of("foo", "foo-binding")),
arguments("binderOverridesBaseAndBindingOverridesBinder",
Map.of("foo", "foo-base"),
Map.of("foo", "foo-binder"),
Map.of("foo", "foo-binding"),
Map.of("foo", "foo-binding")),
arguments("onlyBaseProps",
Map.of("foo", "foo-base"),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()));
}
// @formatter:on
}
}

View File

@@ -0,0 +1,114 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.spring.cloud.stream.binder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarExtendedBindingProperties;
/**
* Tests for {@link PulsarExtendedBindingProperties}.
*
* @author Chris Bono
*/
public class PulsarExtendedBindingPropertiesTests {
private final PulsarExtendedBindingProperties properties = new PulsarExtendedBindingProperties();
private void bind(Map<String, String> map) {
ConfigurationPropertySource source = new MapConfigurationPropertySource(map);
new Binder(source).bind("spring.cloud.stream.pulsar", Bindable.ofInstance(this.properties));
}
@Test
void producerProperties() {
// Only spot check a few values (PulsarPropertiesTests does the heavy lifting)
Map<String, String> props = new HashMap<>();
props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.topic-name", "my-topic");
props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.send-timeout", "2s");
props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.max-pending-messages", "3");
props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.producer-access-mode", "exclusive");
props.put("spring.cloud.stream.pulsar.bindings.my-foo.producer.properties[my-prop]", "my-prop-value");
bind(props);
assertThat(properties.getBindings()).containsOnlyKeys("my-foo");
Map<String, Object> producerProps = properties.getExtendedProducerProperties("my-foo").buildProperties();
// Verify that the props can be loaded in a ProducerBuilder
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(producerProps,
new ProducerConfigurationData(), ProducerConfigurationData.class));
// @formatter:off
assertThat(producerProps)
.containsEntry("topicName", "my-topic")
.containsEntry("sendTimeoutMs", 2_000)
.containsEntry("maxPendingMessages", 3)
.containsEntry("accessMode", ProducerAccessMode.Exclusive)
.hasEntrySatisfying("properties", properties ->
assertThat(properties)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("my-prop", "my-prop-value"));
// @formatter:on
}
@Test
void consumerProperties() {
// Only spot check a few values (PulsarPropertiesTests does the heavy lifting)
Map<String, String> props = new HashMap<>();
props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.topics[0]", "my-topic");
props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-properties[my-sub-prop]",
"my-sub-prop-value");
props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-mode", "nondurable");
props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.receiver-queue-size", "1");
bind(props);
assertThat(properties.getBindings()).containsOnlyKeys("my-foo");
Map<String, Object> consumerProps = properties.getExtendedConsumerProperties("my-foo").buildProperties();
// Verify that the props can be loaded in a ConsumerBuilder
assertThatNoException().isThrownBy(() -> ConfigurationDataUtils.loadData(consumerProps,
new ConsumerConfigurationData<>(), ConsumerConfigurationData.class));
// @formatter:off
assertThat(consumerProps)
.hasEntrySatisfying("topicNames",
topics -> assertThat(topics).asInstanceOf(InstanceOfAssertFactories.collection(String.class))
.containsExactly("my-topic"))
.hasEntrySatisfying("subscriptionProperties",
properties -> assertThat(properties)
.asInstanceOf(InstanceOfAssertFactories.map(String.class, String.class))
.containsEntry("my-sub-prop", "my-sub-prop-value"))
.containsEntry("subscriptionMode", SubscriptionMode.NonDurable)
.containsEntry("receiverQueueSize", 1);
// @formatter:on
}
}

View File

@@ -37,6 +37,7 @@ import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.Resolved;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarBinderConfigurationProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner;
/**
@@ -50,7 +51,8 @@ public class PulsarMessageChannelBinderResolveSchemaTests {
@SuppressWarnings("unchecked")
private PulsarMessageChannelBinder binder = new PulsarMessageChannelBinder(mock(PulsarTopicProvisioner.class),
mock(PulsarTemplate.class), mock(PulsarConsumerFactory.class), resolver);
mock(PulsarTemplate.class), mock(PulsarConsumerFactory.class),
mock(PulsarBinderConfigurationProperties.class), resolver);
@ParameterizedTest
@EnumSource(mode = Mode.MATCH_NONE, names = "^(AUTO.*|AVRO|JSON|KEY_VALUE|NONE|PROTOBUF.*)$")

View File

@@ -22,6 +22,7 @@ import org.springframework.integration.config.EnableIntegration;
import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarBinderConfigurationProperties;
import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.PulsarTopicProvisioner;
/**
@@ -29,18 +30,19 @@ import org.springframework.pulsar.spring.cloud.stream.binder.provisioning.Pulsar
* {@link PulsarMessageChannelBinder}.
*
* @author Soby Chacko
* @author Chris Bono
*/
public class PulsarTestBinder extends AbstractPulsarTestBinder {
@SuppressWarnings({ "unchecked" })
PulsarTestBinder(PulsarTopicProvisioner pulsarTopicProvisioner, PulsarTemplate<?> pulsarTemplate,
PulsarConsumerFactory<?> pulsarConsumerFactory, SchemaResolver schemaResolver) {
PulsarConsumerFactory<?> pulsarConsumerFactory, PulsarBinderConfigurationProperties binderConfigProps,
SchemaResolver schemaResolver) {
try {
PulsarMessageChannelBinder binder = new PulsarMessageChannelBinder(pulsarTopicProvisioner,
(PulsarTemplate<Object>) pulsarTemplate, pulsarConsumerFactory, schemaResolver);
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
var binder = new PulsarMessageChannelBinder(pulsarTopicProvisioner, (PulsarTemplate<Object>) pulsarTemplate,
pulsarConsumerFactory, binderConfigProps, schemaResolver);
var context = new AnnotationConfigApplicationContext(Config.class);
setApplicationContext(context);
binder.setApplicationContext(context);
binder.afterPropertiesSet();

View File

@@ -16,7 +16,6 @@
package org.springframework.pulsar.core;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -47,8 +46,6 @@ public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T>
private final Map<String, Object> consumerConfig;
private final List<Consumer<T>> consumers = new ArrayList<>();
private final PulsarClient pulsarClient;
/**
@@ -89,9 +86,7 @@ public class DefaultPulsarConsumerFactory<T> implements PulsarConsumerFactory<T>
if (!CollectionUtils.isEmpty(customizers)) {
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}
Consumer<T> consumer = consumerBuilder.subscribe();
this.consumers.add(consumer);
return consumer;
return consumerBuilder.subscribe();
}
public Map<String, Object> getConsumerConfig() {

View File

@@ -23,9 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
@@ -112,7 +109,7 @@ public class DefaultPulsarProducerFactory<T> implements PulsarProducerFactory<T>
if (encryptionKeys != null) {
config.put("encryptionKeys", encryptionKeys);
}
loadConf(producerBuilder, config);
ProducerBuilderConfigurationUtil.loadConf(producerBuilder, config);
producerBuilder.topic(resolvedTopic);
if (!CollectionUtils.isEmpty(customizers)) {
@@ -134,24 +131,4 @@ public class DefaultPulsarProducerFactory<T> implements PulsarProducerFactory<T>
return this.producerConfig;
}
private static <T> void loadConf(ProducerBuilder<T> producerBuilder, Map<String, Object> config) {
producerBuilder.loadConf(config);
// Set fields that are not loaded by loadConf
if (config.containsKey("encryptionKeys")) {
@SuppressWarnings("unchecked")
Collection<String> keys = (Collection<String>) config.get("encryptionKeys");
keys.forEach(producerBuilder::addEncryptionKey);
}
if (config.containsKey("customMessageRouter")) {
producerBuilder.messageRouter((MessageRouter) config.get("customMessageRouter"));
}
if (config.containsKey("batcherBuilder")) {
producerBuilder.batcherBuilder((BatcherBuilder) config.get("batcherBuilder"));
}
if (config.containsKey("cryptoKeyReader")) {
producerBuilder.cryptoKeyReader((CryptoKeyReader) config.get("cryptoKeyReader"));
}
}
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import java.util.Collection;
import java.util.Map;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.ProducerBuilder;
/**
* Utility methods to help load configuration into a {@link ProducerBuilder}.
* <p>
* The main purpose is to work around the underlying
* <a href="https://github.com/apache/pulsar/pull/18344">Pulsar issue</a> where
* {@link ProducerBuilder#loadConf} sets {@code @JsonIgnore} fields to null.
* <p>
* Should be removed once the above issue is fixed.
*
* @author Chris Bono
*/
public final class ProducerBuilderConfigurationUtil {
private ProducerBuilderConfigurationUtil() {
}
/**
* Configures the specified properties onto the specified builder in a manner that
* loads non-serializable properties. See
* <a href="https://github.com/apache/pulsar/pull/18344">Pulsar PR</a>.
* @param builder the builder
* @param properties the properties to set on the builder
* @param <T> the payload type
*/
public static <T> void loadConf(ProducerBuilder<T> builder, Map<String, Object> properties) {
builder.loadConf(properties);
// Set fields that are not loaded by loadConf
if (properties.containsKey("encryptionKeys")) {
@SuppressWarnings("unchecked")
Collection<String> keys = (Collection<String>) properties.get("encryptionKeys");
keys.forEach(builder::addEncryptionKey);
}
if (properties.containsKey("customMessageRouter")) {
builder.messageRouter((MessageRouter) properties.get("customMessageRouter"));
}
if (properties.containsKey("batcherBuilder")) {
builder.batcherBuilder((BatcherBuilder) properties.get("batcherBuilder"));
}
if (properties.containsKey("cryptoKeyReader")) {
builder.cryptoKeyReader((CryptoKeyReader) properties.get("cryptoKeyReader"));
}
}
}

View File

@@ -10,6 +10,7 @@
<suppress files="PulsarMessageChannelBinderResolveSchemaTests" checks="AvoidStaticImport" />
<suppress files="DefaultSchemaResolverTests" checks="AvoidStaticImport|MethodParamPad" />
<suppress files="DefaultTopicResolverTests" checks="AvoidStaticImport" />
<suppress files="PulsarBinderUtilsTest" checks="AvoidStaticImport" />
<suppress files="Proto" checks=".*"/>
<suppress files="ReactiveSpringPulsarBootApp" checks="HideUtilityClassConstructor"/>
<suppress files="[\\/]spring-pulsar-docs[\\/]" checks="JavadocPackage|JavadocType|JavadocVariable|SpringDeprecatedCheck" />