diff --git a/.github/workflows/ci-pr.yml b/.github/workflows/ci-pr.yml index f529c690..f5fb08d9 100644 --- a/.github/workflows/ci-pr.yml +++ b/.github/workflows/ci-pr.yml @@ -36,7 +36,7 @@ jobs: - name: Run Gradle build run: | - ./gradlew clean build --continue --scan + ./gradlew clean build -DdownloadRabbitConnector=true --continue --scan - name: Capture Test Results if: failure() diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cf3f4ed4..90138f61 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,7 +41,7 @@ jobs: GRADLE_USER_HOME: ~/.gradle - name: Run Gradle build run: | - ./gradlew clean build --continue -PartifactoryUsername="$ARTIFACTORY_USERNAME" -PartifactoryPassword="$ARTIFACTORY_PASSWORD" + ./gradlew clean build -DdownloadRabbitConnector=true --continue -PartifactoryUsername="$ARTIFACTORY_USERNAME" -PartifactoryPassword="$ARTIFACTORY_PASSWORD" - name: Capture test results if: failure() uses: actions/upload-artifact@v3 diff --git a/.gitignore b/.gitignore index ed41db46..93ca394e 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,5 @@ build build.log out target +**/connectors/ .DS_Store diff --git a/README.adoc b/README.adoc index 2eec3fd9..d4f0fb12 100644 --- a/README.adoc +++ b/README.adoc @@ -43,15 +43,22 @@ You also need JDK 17. ./gradlew publishToMavenLocal ---- -This will build all of the jars and documentation and publish them to your local Maven cache. +This will build all jars and documentation and publish them to your local Maven cache. It won't run any of the tests. -If you want to build everything, use the `build` task: +If you want to build everything and run tests, use the `build` task: [source,shell] ---- -./gradlew build +./gradlew clean build ---- +This will build everything and run all tests except the `PulsarFunctionAdministrationIntegrationTests` which requires the _Pulsar RabbitMQ Connector_ to be downloaded prior to running. +To download the connector (one time only) and run the integration test, add a system property to the task as follows: + +[source,shell] +---- +./gradlew clean build -DdownloadRabbitConnector=true +---- == Modules @@ -78,6 +85,9 @@ Provides a dependency descriptor that can be included in your application to eas === spring-pulsar-reactive-spring-boot-starter Provides a dependency descriptor that can be included in your application to easily start using Spring Pulsar in Reactive and imperative fashions. +=== spring-pulsar-spring-cloud-stream-binder +Provides a Spring Cloud Stream Binder implementation for Apache Pulsar. + === spring-pulsar-sample-apps Provides sample applications to illustrate Spring Pulsar functionality as well as provide ability for quick manual verification during development. diff --git a/spring-pulsar-sample-apps/sample-pulsar-functions/.gitignore b/spring-pulsar-sample-apps/sample-pulsar-functions/.gitignore deleted file mode 100644 index 5cf335a6..00000000 --- a/spring-pulsar-sample-apps/sample-pulsar-functions/.gitignore +++ /dev/null @@ -1 +0,0 @@ -connectors/ diff --git a/spring-pulsar-spring-boot-autoconfigure/build.gradle b/spring-pulsar-spring-boot-autoconfigure/build.gradle index 207972e7..090636a6 100644 --- a/spring-pulsar-spring-boot-autoconfigure/build.gradle +++ b/spring-pulsar-spring-boot-autoconfigure/build.gradle @@ -1,6 +1,7 @@ plugins { id 'org.springframework.pulsar.spring-module' id 'org.springframework.pulsar.configuration-properties' + id "de.undercouch.download" version "5.3.0" } description = 'Spring Pulsar Spring Boot Auto-configuration' @@ -23,8 +24,31 @@ dependencies { testImplementation 'org.testcontainers:pulsar' // used by PulsarFunctionTests testImplementation 'org.testcontainers:rabbitmq' + testImplementation 'org.springframework.boot:spring-boot-starter-amqp' } test { testLogging.showStandardStreams = true } + +task downloadRabbitConnector { + onlyIf { + System.getProperty("downloadRabbitConnector") == "true" + } + doLast { + try { + download.run { + println "Downloading Rabbit connector to 'src/test/resources/connectors/' (one time only if not already downloaded)..." + src 'https://archive.apache.org/dist/pulsar/pulsar-2.10.2/connectors/pulsar-io-rabbitmq-2.10.2.nar' + dest "$buildDir/../src/test/resources/connectors/pulsar-io-rabbitmq-2.10.2.nar" + overwrite false + } + } catch (Exception ex) { + println "Failed to download rabbit connector: $ex" + } + } +} + +project.afterEvaluate { + compileTestJava.dependsOn downloadRabbitConnector +} diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarFunctionAdministrationIntegrationTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarFunctionAdministrationIntegrationTests.java new file mode 100644 index 00000000..3ec92b74 --- /dev/null +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarFunctionAdministrationIntegrationTests.java @@ -0,0 +1,252 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.autoconfigure; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.LongStream; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.policies.data.SourceStatus; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.core.io.DefaultResourceLoader; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.ResourcePatternUtils; +import org.springframework.lang.Nullable; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.function.PulsarFunctionAdministration; +import org.springframework.pulsar.function.PulsarFunctionOperations.FunctionStopPolicy; +import org.springframework.pulsar.function.PulsarSource; + +/** + * Integration tests for {@link PulsarFunctionAdministration}. + *

+ * Sets up a Rabbit container and a Rabbit source and verifies end-end functionality. + * + * @author Chris Bono + */ +@Testcontainers(disabledWithoutDocker = true) +@EnabledIf("rabbitConnectorExists") +class PulsarFunctionAdministrationIntegrationTests { + + private static final String RABBIT_QUEUE = "pft_foo_queue"; + + private static final String PULSAR_TOPIC = "pft_foo-topic"; + + private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer( + PulsarTestContainerSupport.getPulsarImage()).withFunctionsWorker() + .withClasspathResourceMapping("/connectors/", "/pulsar/connectors", BindMode.READ_ONLY); + + private static final RabbitMQContainer RABBITMQ_CONTAINER = new RabbitMQContainer("rabbitmq") + .withNetworkAliases("rabbitmq").withExposedPorts(5672, 15672).withStartupTimeout(Duration.ofMinutes(1)); + + @BeforeAll + static void startContainers() { + Network sharedNetwork = Network.newNetwork(); + PULSAR_CONTAINER.withNetwork(sharedNetwork).start(); + RABBITMQ_CONTAINER.withNetwork(sharedNetwork).start(); + } + + private static final CountDownLatch RECEIVED_MESSAGE_LATCH = new CountDownLatch(10); + + private static final List RECEIVED_MESSAGES = new ArrayList<>(); + + static void messageReceived(String message) { + RECEIVED_MESSAGE_LATCH.countDown(); + RECEIVED_MESSAGES.add(message); + } + + @Test + void verifyRabbitSourceIsCreatedAndMessagesAreSourcedIntoPulsar() throws Exception { + SpringApplication app = new SpringApplication(PulsarFunctionTestConfiguration.class); + app.setWebApplicationType(WebApplicationType.NONE); + try (ConfigurableApplicationContext context = app.run( + "--spring.pulsar.client.service-url=" + PULSAR_CONTAINER.getPulsarBrokerUrl(), + "--spring.pulsar.administration.service-url=" + PULSAR_CONTAINER.getHttpServiceUrl(), + "--spring.rabbitmq.host=" + RABBITMQ_CONTAINER.getHost(), + "--spring.rabbitmq.port=" + RABBITMQ_CONTAINER.getAmqpPort())) { + + // Give source 10s to get ready before sending messages to rabbit + Thread.sleep(10000); + + // Send messages to rabbit and wait for them to come through the rabbit source + RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); + List messages = LongStream.range(0, RECEIVED_MESSAGE_LATCH.getCount()).mapToObj((i) -> "bar" + i) + .toList(); + messages.forEach(msg -> rabbitTemplate.convertAndSend(RABBIT_QUEUE, msg)); + assertThat(RECEIVED_MESSAGE_LATCH.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(RECEIVED_MESSAGES).containsExactlyElementsOf(messages); + } + } + + @Test + void verifyStopPolicyIsEnforcedOnShutdown() throws Exception { + SpringApplication app = new SpringApplication(PulsarFunctionStopPolicyTestConfiguration.class); + app.setWebApplicationType(WebApplicationType.NONE); + try (ConfigurableApplicationContext ignored = app.run( + "--spring.pulsar.client.service-url=" + PULSAR_CONTAINER.getPulsarBrokerUrl(), + "--spring.pulsar.administration.service-url=" + PULSAR_CONTAINER.getHttpServiceUrl(), + "--spring.rabbitmq.host=" + RABBITMQ_CONTAINER.getHost(), + "--spring.rabbitmq.port=" + RABBITMQ_CONTAINER.getAmqpPort())) { + + // Give the source a few seconds to get ready + Thread.sleep(10000); + + // Verify the sources are up and running + try (PulsarAdmin admin = getAdmin()) { + assertSourceExistsWithStatus("rabbit-test-source-none", true, admin); + assertSourceExistsWithStatus("rabbit-test-source-stop", true, admin); + assertSourceExistsWithStatus("rabbit-test-source-delete", true, admin); + } + } + + // Stop policy runs after context close - verify source are in expected state + try (PulsarAdmin admin = getAdmin()) { + assertSourceExistsWithStatus("rabbit-test-source-none", true, admin); + assertSourceExistsWithStatus("rabbit-test-source-stop", false, admin); + assertSourceDoesNotExist("rabbit-test-source-delete", admin); + } + } + + private PulsarAdmin getAdmin() throws PulsarClientException { + return PulsarAdmin.builder().serviceHttpUrl(PULSAR_CONTAINER.getHttpServiceUrl()).build(); + } + + private void assertSourceExistsWithStatus(String name, boolean isRunning, PulsarAdmin admin) + throws PulsarAdminException { + assertThat(admin.sources().getSourceStatus("public", "default", name)).isNotNull() + .extracting(SourceStatus::getNumRunning).isEqualTo(isRunning ? 1 : 0); + } + + private void assertSourceDoesNotExist(String name, PulsarAdmin admin) { + assertThatThrownBy(() -> admin.sources().getSourceStatus("public", "default", name)) + .isInstanceOf(NotFoundException.class); + } + + static boolean rabbitConnectorExists() { + try { + Resource[] connectors = ResourcePatternUtils.getResourcePatternResolver(new DefaultResourceLoader()) + .getResources("classpath:/connectors/**"); + boolean available = Arrays.stream(connectors).map(Resource::getFilename).filter(Objects::nonNull) + .anyMatch((name) -> name.contains("pulsar-io-rabbitmq")); + if (!available) { + logTestDisabledReason(); + return false; + } + return true; + } + catch (IOException e) { + logTestDisabledReason(); + return false; + } + } + + private static void logTestDisabledReason() { + System.err.printf("Skipping %s - Rabbit connector was not available in 'src/test/resources/connectors/'%n", + PulsarFunctionAdministrationIntegrationTests.class.getName()); + } + + static PulsarSource rabbitPulsarSource(@Nullable FunctionStopPolicy stopPolicy) { + // This Rabbit host/port config is what the Pulsar container uses to contact + // the Rabbit container. So that container-container is reachable we use a + // custom network and a network alias 'rabbitmq' and the exposed port '5672'. + // This differs from typical RabbitTemplate/RabbitProperties coordinates which + // require the mapped host and port (outside the container). + String suffix = stopPolicy != null ? ("-" + stopPolicy.name().toLowerCase()) : ""; + Map configs = new HashMap<>(); + configs.put("host", "rabbitmq"); + configs.put("port", 5672); + configs.put("virtualHost", "/"); + configs.put("username", "guest"); + configs.put("password", "guest"); + configs.put("queueName", RABBIT_QUEUE + suffix); + configs.put("connectionName", "pft_foo_connection" + suffix); + SourceConfig sourceConfig = SourceConfig.builder().tenant("public").namespace("default") + .name("rabbit-test-source" + suffix).archive("builtin://rabbitmq").topicName(PULSAR_TOPIC + suffix) + .configs(configs).build(); + return new PulsarSource(sourceConfig, stopPolicy != null ? stopPolicy : FunctionStopPolicy.DELETE, null); + } + + @Configuration(proxyBeanMethods = false) + @Import({ PulsarAutoConfiguration.class, RabbitAutoConfiguration.class }) + static class PulsarFunctionTestConfiguration { + + @Bean + PulsarSource rabbitSource() { + return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(null); + } + + @PulsarListener(topics = PULSAR_TOPIC, subscriptionName = "pft-foo-sub") + public void listen(String msg) { + PulsarFunctionAdministrationIntegrationTests.messageReceived(msg); + } + + } + + @Configuration(proxyBeanMethods = false) + @Import(PulsarAutoConfiguration.class) + static class PulsarFunctionStopPolicyTestConfiguration { + + @Bean + PulsarSource rabbitSourceWithStopPolicyNone() { + return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(FunctionStopPolicy.NONE); + } + + @Bean + PulsarSource rabbitSourceWithStopPolicyStop() { + return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(FunctionStopPolicy.STOP); + } + + @Bean + PulsarSource rabbitSourceWithStopPolicyDelete() { + return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(FunctionStopPolicy.DELETE); + } + + } + +} diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarFunctionTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarFunctionTests.java deleted file mode 100644 index b2b39d5f..00000000 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarFunctionTests.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2023-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.pulsar.autoconfigure; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.catchThrowableOfType; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; - -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.io.SourceConfig; -import org.assertj.core.api.InstanceOfAssertFactories; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.RabbitMQContainer; -import org.testcontainers.junit.jupiter.Container; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.WebApplicationType; -import org.springframework.context.ApplicationContextException; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; -import org.springframework.pulsar.function.PulsarFunctionAdministration; -import org.springframework.pulsar.function.PulsarFunctionAdministration.PulsarFunctionException; -import org.springframework.pulsar.function.PulsarFunctionOperations; -import org.springframework.pulsar.function.PulsarFunctionOperations.FunctionStopPolicy; -import org.springframework.pulsar.function.PulsarSource; - -/** - * Integration tests for {@link PulsarFunctionAdministration}. - * - *

- * Verifies end-to-end that a user-configured {@link PulsarSource} results in a call to - * the Pulsar broker to register the source connector. - * - * @author Chris Bono - */ -class PulsarFunctionTests implements PulsarTestContainerSupport { - - // Not currently used by anything but prepares the test for when we do verify end-end - @Container - static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq").withExposedPorts(5672, 15672) - .withStartupTimeout(Duration.ofMinutes(1)); - - /** - * Because the docker image we use does not contain the built-in connectors we only - * verify that the configured functions are attempted to be registered w/ the broker. - * - *

- * Later we may provide a docker image that contains the built-in connectors and - * verify the complete end-end flow. - */ - @Test - void verifyPulsarSourceIsAttemptedToBeCreatedOnBroker() { - SpringApplication app = new SpringApplication(PulsarFunctionTestConfiguration.class); - app.setWebApplicationType(WebApplicationType.NONE); - - // Again, this is a temp solution to verification of this feature - ApplicationContextException thrown = catchThrowableOfType( - () -> app.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), - "--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(), - "--spring.rabbitmq.host=" + rabbit.getHost(), "--spring.rabbitmq.port=" + rabbit.getAmqpPort()), - ApplicationContextException.class); - assertThat(thrown).hasCauseInstanceOf(PulsarFunctionException.class); - PulsarFunctionException cause = (PulsarFunctionException) thrown.getCause(); - Map, Exception> failures = cause.getFailures(); - assertThat(failures).hasSize(1); - Map.Entry, Exception> failureEntry = failures.entrySet().iterator().next(); - assertThat(failureEntry.getKey()).isInstanceOf(PulsarSource.class) - .extracting("config", InstanceOfAssertFactories.type(SourceConfig.class)) - .extracting(SourceConfig::getName).isEqualTo("rabbit-test-source"); - assertThat(failureEntry.getValue()).isInstanceOf(PulsarAdminException.class) - .hasMessageContaining("Built-in source is not available"); - } - - @Configuration(proxyBeanMethods = false) - @Import(PulsarAutoConfiguration.class) - static class PulsarFunctionTestConfiguration { - - @Bean - PulsarSource rabbitSource(@Value("${spring.rabbitmq.host}") String rabbitHost, - @Value("${spring.rabbitmq.port}") int rabbitPort) { - Map configs = new HashMap<>(); - configs.put("host", rabbitHost); - configs.put("port", rabbitPort); - configs.put("virtualHost", "/"); - configs.put("username", "guest"); - configs.put("password", "guest"); - configs.put("queueName", "test_rabbit"); - configs.put("connectionName", "test-connection"); - SourceConfig sourceConfig = SourceConfig.builder().tenant("public").namespace("default") - .name("rabbit-test-source").archive("builtin://rabbitmq").topicName("incoming_rabbit") - .configs(configs).build(); - return new PulsarSource(sourceConfig, FunctionStopPolicy.NONE, null); - } - - } - -} diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarTestContainerSupport.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarTestContainerSupport.java index d1fd6260..5c818444 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarTestContainerSupport.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarTestContainerSupport.java @@ -31,8 +31,7 @@ import org.testcontainers.utility.DockerImageName; @Testcontainers(disabledWithoutDocker = true) public interface PulsarTestContainerSupport { - PulsarContainer PULSAR_CONTAINER = new PulsarContainer( - isRunningOnMacM1() ? getMacM1PulsarImage() : getStandardPulsarImage()).withFunctionsWorker(); + PulsarContainer PULSAR_CONTAINER = new PulsarContainer(getPulsarImage()); @BeforeAll static void startContainer() { @@ -43,8 +42,8 @@ public interface PulsarTestContainerSupport { return PULSAR_CONTAINER.getPulsarBrokerUrl(); } - static String getHttpServiceUrl() { - return PULSAR_CONTAINER.getHttpServiceUrl(); + static DockerImageName getPulsarImage() { + return isRunningOnMacM1() ? getMacM1PulsarImage() : getStandardPulsarImage(); } private static boolean isRunningOnMacM1() { diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/resources/logback-test.xml b/spring-pulsar-spring-boot-autoconfigure/src/test/resources/logback-test.xml index 554cd1bd..3d9aedd4 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/resources/logback-test.xml +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/resources/logback-test.xml @@ -9,4 +9,5 @@ + diff --git a/src/checkstyle/checkstyle-suppressions.xml b/src/checkstyle/checkstyle-suppressions.xml index ff8ca884..f20df39e 100644 --- a/src/checkstyle/checkstyle-suppressions.xml +++ b/src/checkstyle/checkstyle-suppressions.xml @@ -6,6 +6,7 @@ +