Basic Pulsar Reader API support in Spring Pulsar

Addressing PR review
This commit is contained in:
Soby Chacko
2023-01-31 19:59:57 -05:00
parent 883aeaaf3c
commit 8dd39fdc7a
3 changed files with 217 additions and 0 deletions

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.springframework.lang.Nullable;
/**
* Default implementation of {@link PulsarReaderFactory}.
*
* @param <T> message type
* @author Soby Chacko
*/
public class DefaultPulsarReaderFactory<T> implements PulsarReaderFactory<T> {
private final PulsarClient pulsarClient;
private final Map<String, Object> readerConfig;
public DefaultPulsarReaderFactory(PulsarClient pulsarClient) {
this(pulsarClient, Collections.emptyMap());
}
public DefaultPulsarReaderFactory(PulsarClient pulsarClient, Map<String, Object> readerConfig) {
this.pulsarClient = pulsarClient;
this.readerConfig = readerConfig;
}
@Override
public Reader<T> createReader(@Nullable List<String> topics, @Nullable MessageId messageId, Schema<T> schema)
throws PulsarClientException {
ReaderBuilder<T> readerBuilder = this.pulsarClient.newReader(schema);
readerBuilder.topics(topics);
readerBuilder.startMessageId(messageId);
readerBuilder.loadConf(this.readerConfig);
return readerBuilder.create();
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import java.util.List;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.springframework.lang.Nullable;
/**
* Pulsar {@link Reader} factory interface.
*
* @param <T> Underlying message type handled by this reader.
* @author Soby Chacko
*/
public interface PulsarReaderFactory<T> {
/**
* Creating a Pulsar {@link Reader} based on the provided topics, schema and
* properties.
* @param topics set of topics to read from
* @param messageId starting message id to read from
* @param schema schema of the message to consume
* @return Pulsar {@link Reader}
* @throws PulsarClientException if there are issues when creating the reader
*/
Reader<T> createReader(@Nullable List<String> topics, @Nullable MessageId messageId, Schema<T> schema)
throws PulsarClientException;
}

View File

@@ -0,0 +1,105 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.core;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
/**
* Testing {@link DefaultPulsarReaderFactory}.
*
* @author Soby Chacko
*/
public class DefaultPulsarReaderFactoryTests implements PulsarTestContainerSupport {
private PulsarClient pulsarClient;
@BeforeEach
void createPulsarClient() throws PulsarClientException {
this.pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build();
}
@AfterEach
void closePulsarClient() throws PulsarClientException {
if (this.pulsarClient != null && !this.pulsarClient.isClosed()) {
this.pulsarClient.close();
}
}
@Test
void basicPulsarReader() throws Exception {
PulsarReaderFactory<String> pulsarReaderFactory = new DefaultPulsarReaderFactory<>(this.pulsarClient,
Collections.emptyMap());
Message<String> message;
try (Reader<String> reader = pulsarReaderFactory.createReader(List.of("basic-pulsar-reader-topic"),
MessageId.earliest, Schema.STRING)) {
Map<String, Object> prodConfig = Map.of("topicName", "basic-pulsar-reader-topic");
PulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
prodConfig);
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
pulsarTemplate.send("hello john doe");
message = reader.readNext();
}
assertThat(message.getValue()).isEqualTo("hello john doe");
}
@Test
void readingFromTheMiddleOfTheTopic() throws Exception {
PulsarReaderFactory<String> pulsarReaderFactory = new DefaultPulsarReaderFactory<>(this.pulsarClient,
Collections.emptyMap());
Map<String, Object> prodConfig = Map.of("topicName", "reading-from-the-middle-of-topic");
PulsarProducerFactory<String> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
prodConfig);
PulsarTemplate<String> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
MessageId[] messageIds = new MessageId[10];
for (int i = 0; i < 10; i++) {
messageIds[i] = pulsarTemplate.send("hello john doe-" + i);
}
Message<String> message;
try (Reader<String> reader = pulsarReaderFactory.createReader(List.of("reading-from-the-middle-of-topic"),
messageIds[4], Schema.STRING)) {
for (int i = 0; i < 5; i++) {
message = reader.readNext();
assertThat(message.getValue()).isEqualTo("hello john doe-" + (i + 5));
}
message = reader.readNext(1, TimeUnit.SECONDS);
assertThat(message).isNull();
}
}
}