diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarReaderFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarReaderFactory.java new file mode 100644 index 00000000..248150ec --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarReaderFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; + +import org.springframework.lang.Nullable; + +/** + * Default implementation of {@link PulsarReaderFactory}. + * + * @param message type + * @author Soby Chacko + */ +public class DefaultPulsarReaderFactory implements PulsarReaderFactory { + + private final PulsarClient pulsarClient; + + private final Map readerConfig; + + public DefaultPulsarReaderFactory(PulsarClient pulsarClient) { + this(pulsarClient, Collections.emptyMap()); + } + + public DefaultPulsarReaderFactory(PulsarClient pulsarClient, Map readerConfig) { + this.pulsarClient = pulsarClient; + this.readerConfig = readerConfig; + } + + @Override + public Reader createReader(@Nullable List topics, @Nullable MessageId messageId, Schema schema) + throws PulsarClientException { + ReaderBuilder readerBuilder = this.pulsarClient.newReader(schema); + readerBuilder.topics(topics); + readerBuilder.startMessageId(messageId); + + readerBuilder.loadConf(this.readerConfig); + return readerBuilder.create(); + } + +} diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarReaderFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarReaderFactory.java new file mode 100644 index 00000000..0df5e4a6 --- /dev/null +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/PulsarReaderFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import java.util.List; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; + +import org.springframework.lang.Nullable; + +/** + * Pulsar {@link Reader} factory interface. + * + * @param Underlying message type handled by this reader. + * @author Soby Chacko + */ +public interface PulsarReaderFactory { + + /** + * Creating a Pulsar {@link Reader} based on the provided topics, schema and + * properties. + * @param topics set of topics to read from + * @param messageId starting message id to read from + * @param schema schema of the message to consume + * @return Pulsar {@link Reader} + * @throws PulsarClientException if there are issues when creating the reader + */ + Reader createReader(@Nullable List topics, @Nullable MessageId messageId, Schema schema) + throws PulsarClientException; + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarReaderFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarReaderFactoryTests.java new file mode 100644 index 00000000..1c50e6ba --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarReaderFactoryTests.java @@ -0,0 +1,105 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.core; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.pulsar.test.support.PulsarTestContainerSupport; + +/** + * Testing {@link DefaultPulsarReaderFactory}. + * + * @author Soby Chacko + */ +public class DefaultPulsarReaderFactoryTests implements PulsarTestContainerSupport { + + private PulsarClient pulsarClient; + + @BeforeEach + void createPulsarClient() throws PulsarClientException { + this.pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestContainerSupport.getPulsarBrokerUrl()).build(); + } + + @AfterEach + void closePulsarClient() throws PulsarClientException { + if (this.pulsarClient != null && !this.pulsarClient.isClosed()) { + this.pulsarClient.close(); + } + } + + @Test + void basicPulsarReader() throws Exception { + PulsarReaderFactory pulsarReaderFactory = new DefaultPulsarReaderFactory<>(this.pulsarClient, + Collections.emptyMap()); + Message message; + try (Reader reader = pulsarReaderFactory.createReader(List.of("basic-pulsar-reader-topic"), + MessageId.earliest, Schema.STRING)) { + + Map prodConfig = Map.of("topicName", "basic-pulsar-reader-topic"); + PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, + prodConfig); + PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + pulsarTemplate.send("hello john doe"); + + message = reader.readNext(); + } + assertThat(message.getValue()).isEqualTo("hello john doe"); + } + + @Test + void readingFromTheMiddleOfTheTopic() throws Exception { + PulsarReaderFactory pulsarReaderFactory = new DefaultPulsarReaderFactory<>(this.pulsarClient, + Collections.emptyMap()); + + Map prodConfig = Map.of("topicName", "reading-from-the-middle-of-topic"); + PulsarProducerFactory pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, + prodConfig); + PulsarTemplate pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory); + MessageId[] messageIds = new MessageId[10]; + + for (int i = 0; i < 10; i++) { + messageIds[i] = pulsarTemplate.send("hello john doe-" + i); + } + + Message message; + try (Reader reader = pulsarReaderFactory.createReader(List.of("reading-from-the-middle-of-topic"), + messageIds[4], Schema.STRING)) { + for (int i = 0; i < 5; i++) { + message = reader.readNext(); + assertThat(message.getValue()).isEqualTo("hello john doe-" + (i + 5)); + } + message = reader.readNext(1, TimeUnit.SECONDS); + assertThat(message).isNull(); + } + } + +}