Add robust IT for PulsarFunctionAdministration (#272)

This commit is contained in:
Chris Bono
2023-01-15 23:33:18 -06:00
committed by GitHub
parent 66beedef86
commit 7789f89ab0
11 changed files with 297 additions and 126 deletions

View File

@@ -36,7 +36,7 @@ jobs:
- name: Run Gradle build
run: |
./gradlew clean build --continue --scan
./gradlew clean build -DdownloadRabbitConnector=true --continue --scan
- name: Capture Test Results
if: failure()

View File

@@ -41,7 +41,7 @@ jobs:
GRADLE_USER_HOME: ~/.gradle
- name: Run Gradle build
run: |
./gradlew clean build --continue -PartifactoryUsername="$ARTIFACTORY_USERNAME" -PartifactoryPassword="$ARTIFACTORY_PASSWORD"
./gradlew clean build -DdownloadRabbitConnector=true --continue -PartifactoryUsername="$ARTIFACTORY_USERNAME" -PartifactoryPassword="$ARTIFACTORY_PASSWORD"
- name: Capture test results
if: failure()
uses: actions/upload-artifact@v3

1
.gitignore vendored
View File

@@ -15,4 +15,5 @@ build
build.log
out
target
**/connectors/
.DS_Store

View File

@@ -43,15 +43,22 @@ You also need JDK 17.
./gradlew publishToMavenLocal
----
This will build all of the jars and documentation and publish them to your local Maven cache.
This will build all jars and documentation and publish them to your local Maven cache.
It won't run any of the tests.
If you want to build everything, use the `build` task:
If you want to build everything and run tests, use the `build` task:
[source,shell]
----
./gradlew build
./gradlew clean build
----
This will build everything and run all tests except the `PulsarFunctionAdministrationIntegrationTests` which requires the _Pulsar RabbitMQ Connector_ to be downloaded prior to running.
To download the connector (one time only) and run the integration test, add a system property to the task as follows:
[source,shell]
----
./gradlew clean build -DdownloadRabbitConnector=true
----
== Modules
@@ -78,6 +85,9 @@ Provides a dependency descriptor that can be included in your application to eas
=== spring-pulsar-reactive-spring-boot-starter
Provides a dependency descriptor that can be included in your application to easily start using Spring Pulsar in Reactive and imperative fashions.
=== spring-pulsar-spring-cloud-stream-binder
Provides a Spring Cloud Stream Binder implementation for Apache Pulsar.
=== spring-pulsar-sample-apps
Provides sample applications to illustrate Spring Pulsar functionality as well as provide ability for quick manual verification during development.

View File

@@ -1 +0,0 @@
connectors/

View File

@@ -1,6 +1,7 @@
plugins {
id 'org.springframework.pulsar.spring-module'
id 'org.springframework.pulsar.configuration-properties'
id "de.undercouch.download" version "5.3.0"
}
description = 'Spring Pulsar Spring Boot Auto-configuration'
@@ -23,8 +24,31 @@ dependencies {
testImplementation 'org.testcontainers:pulsar'
// used by PulsarFunctionTests
testImplementation 'org.testcontainers:rabbitmq'
testImplementation 'org.springframework.boot:spring-boot-starter-amqp'
}
test {
testLogging.showStandardStreams = true
}
task downloadRabbitConnector {
onlyIf {
System.getProperty("downloadRabbitConnector") == "true"
}
doLast {
try {
download.run {
println "Downloading Rabbit connector to 'src/test/resources/connectors/' (one time only if not already downloaded)..."
src 'https://archive.apache.org/dist/pulsar/pulsar-2.10.2/connectors/pulsar-io-rabbitmq-2.10.2.nar'
dest "$buildDir/../src/test/resources/connectors/pulsar-io-rabbitmq-2.10.2.nar"
overwrite false
}
} catch (Exception ex) {
println "Failed to download rabbit connector: $ex"
}
}
}
project.afterEvaluate {
compileTestJava.dependsOn downloadRabbitConnector
}

View File

@@ -0,0 +1,252 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.autoconfigure;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIf;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.ResourcePatternUtils;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.function.PulsarFunctionAdministration;
import org.springframework.pulsar.function.PulsarFunctionOperations.FunctionStopPolicy;
import org.springframework.pulsar.function.PulsarSource;
/**
* Integration tests for {@link PulsarFunctionAdministration}.
* <p>
* Sets up a Rabbit container and a Rabbit source and verifies end-end functionality.
*
* @author Chris Bono
*/
@Testcontainers(disabledWithoutDocker = true)
@EnabledIf("rabbitConnectorExists")
class PulsarFunctionAdministrationIntegrationTests {
private static final String RABBIT_QUEUE = "pft_foo_queue";
private static final String PULSAR_TOPIC = "pft_foo-topic";
private static final PulsarContainer PULSAR_CONTAINER = new PulsarContainer(
PulsarTestContainerSupport.getPulsarImage()).withFunctionsWorker()
.withClasspathResourceMapping("/connectors/", "/pulsar/connectors", BindMode.READ_ONLY);
private static final RabbitMQContainer RABBITMQ_CONTAINER = new RabbitMQContainer("rabbitmq")
.withNetworkAliases("rabbitmq").withExposedPorts(5672, 15672).withStartupTimeout(Duration.ofMinutes(1));
@BeforeAll
static void startContainers() {
Network sharedNetwork = Network.newNetwork();
PULSAR_CONTAINER.withNetwork(sharedNetwork).start();
RABBITMQ_CONTAINER.withNetwork(sharedNetwork).start();
}
private static final CountDownLatch RECEIVED_MESSAGE_LATCH = new CountDownLatch(10);
private static final List<String> RECEIVED_MESSAGES = new ArrayList<>();
static void messageReceived(String message) {
RECEIVED_MESSAGE_LATCH.countDown();
RECEIVED_MESSAGES.add(message);
}
@Test
void verifyRabbitSourceIsCreatedAndMessagesAreSourcedIntoPulsar() throws Exception {
SpringApplication app = new SpringApplication(PulsarFunctionTestConfiguration.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--spring.pulsar.client.service-url=" + PULSAR_CONTAINER.getPulsarBrokerUrl(),
"--spring.pulsar.administration.service-url=" + PULSAR_CONTAINER.getHttpServiceUrl(),
"--spring.rabbitmq.host=" + RABBITMQ_CONTAINER.getHost(),
"--spring.rabbitmq.port=" + RABBITMQ_CONTAINER.getAmqpPort())) {
// Give source 10s to get ready before sending messages to rabbit
Thread.sleep(10000);
// Send messages to rabbit and wait for them to come through the rabbit source
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
List<String> messages = LongStream.range(0, RECEIVED_MESSAGE_LATCH.getCount()).mapToObj((i) -> "bar" + i)
.toList();
messages.forEach(msg -> rabbitTemplate.convertAndSend(RABBIT_QUEUE, msg));
assertThat(RECEIVED_MESSAGE_LATCH.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(RECEIVED_MESSAGES).containsExactlyElementsOf(messages);
}
}
@Test
void verifyStopPolicyIsEnforcedOnShutdown() throws Exception {
SpringApplication app = new SpringApplication(PulsarFunctionStopPolicyTestConfiguration.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run(
"--spring.pulsar.client.service-url=" + PULSAR_CONTAINER.getPulsarBrokerUrl(),
"--spring.pulsar.administration.service-url=" + PULSAR_CONTAINER.getHttpServiceUrl(),
"--spring.rabbitmq.host=" + RABBITMQ_CONTAINER.getHost(),
"--spring.rabbitmq.port=" + RABBITMQ_CONTAINER.getAmqpPort())) {
// Give the source a few seconds to get ready
Thread.sleep(10000);
// Verify the sources are up and running
try (PulsarAdmin admin = getAdmin()) {
assertSourceExistsWithStatus("rabbit-test-source-none", true, admin);
assertSourceExistsWithStatus("rabbit-test-source-stop", true, admin);
assertSourceExistsWithStatus("rabbit-test-source-delete", true, admin);
}
}
// Stop policy runs after context close - verify source are in expected state
try (PulsarAdmin admin = getAdmin()) {
assertSourceExistsWithStatus("rabbit-test-source-none", true, admin);
assertSourceExistsWithStatus("rabbit-test-source-stop", false, admin);
assertSourceDoesNotExist("rabbit-test-source-delete", admin);
}
}
private PulsarAdmin getAdmin() throws PulsarClientException {
return PulsarAdmin.builder().serviceHttpUrl(PULSAR_CONTAINER.getHttpServiceUrl()).build();
}
private void assertSourceExistsWithStatus(String name, boolean isRunning, PulsarAdmin admin)
throws PulsarAdminException {
assertThat(admin.sources().getSourceStatus("public", "default", name)).isNotNull()
.extracting(SourceStatus::getNumRunning).isEqualTo(isRunning ? 1 : 0);
}
private void assertSourceDoesNotExist(String name, PulsarAdmin admin) {
assertThatThrownBy(() -> admin.sources().getSourceStatus("public", "default", name))
.isInstanceOf(NotFoundException.class);
}
static boolean rabbitConnectorExists() {
try {
Resource[] connectors = ResourcePatternUtils.getResourcePatternResolver(new DefaultResourceLoader())
.getResources("classpath:/connectors/**");
boolean available = Arrays.stream(connectors).map(Resource::getFilename).filter(Objects::nonNull)
.anyMatch((name) -> name.contains("pulsar-io-rabbitmq"));
if (!available) {
logTestDisabledReason();
return false;
}
return true;
}
catch (IOException e) {
logTestDisabledReason();
return false;
}
}
private static void logTestDisabledReason() {
System.err.printf("Skipping %s - Rabbit connector was not available in 'src/test/resources/connectors/'%n",
PulsarFunctionAdministrationIntegrationTests.class.getName());
}
static PulsarSource rabbitPulsarSource(@Nullable FunctionStopPolicy stopPolicy) {
// This Rabbit host/port config is what the Pulsar container uses to contact
// the Rabbit container. So that container-container is reachable we use a
// custom network and a network alias 'rabbitmq' and the exposed port '5672'.
// This differs from typical RabbitTemplate/RabbitProperties coordinates which
// require the mapped host and port (outside the container).
String suffix = stopPolicy != null ? ("-" + stopPolicy.name().toLowerCase()) : "";
Map<String, Object> configs = new HashMap<>();
configs.put("host", "rabbitmq");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", RABBIT_QUEUE + suffix);
configs.put("connectionName", "pft_foo_connection" + suffix);
SourceConfig sourceConfig = SourceConfig.builder().tenant("public").namespace("default")
.name("rabbit-test-source" + suffix).archive("builtin://rabbitmq").topicName(PULSAR_TOPIC + suffix)
.configs(configs).build();
return new PulsarSource(sourceConfig, stopPolicy != null ? stopPolicy : FunctionStopPolicy.DELETE, null);
}
@Configuration(proxyBeanMethods = false)
@Import({ PulsarAutoConfiguration.class, RabbitAutoConfiguration.class })
static class PulsarFunctionTestConfiguration {
@Bean
PulsarSource rabbitSource() {
return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(null);
}
@PulsarListener(topics = PULSAR_TOPIC, subscriptionName = "pft-foo-sub")
public void listen(String msg) {
PulsarFunctionAdministrationIntegrationTests.messageReceived(msg);
}
}
@Configuration(proxyBeanMethods = false)
@Import(PulsarAutoConfiguration.class)
static class PulsarFunctionStopPolicyTestConfiguration {
@Bean
PulsarSource rabbitSourceWithStopPolicyNone() {
return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(FunctionStopPolicy.NONE);
}
@Bean
PulsarSource rabbitSourceWithStopPolicyStop() {
return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(FunctionStopPolicy.STOP);
}
@Bean
PulsarSource rabbitSourceWithStopPolicyDelete() {
return PulsarFunctionAdministrationIntegrationTests.rabbitPulsarSource(FunctionStopPolicy.DELETE);
}
}
}

View File

@@ -1,116 +0,0 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.pulsar.autoconfigure;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.io.SourceConfig;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Container;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.context.ApplicationContextException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.pulsar.function.PulsarFunctionAdministration;
import org.springframework.pulsar.function.PulsarFunctionAdministration.PulsarFunctionException;
import org.springframework.pulsar.function.PulsarFunctionOperations;
import org.springframework.pulsar.function.PulsarFunctionOperations.FunctionStopPolicy;
import org.springframework.pulsar.function.PulsarSource;
/**
* Integration tests for {@link PulsarFunctionAdministration}.
*
* <p>
* Verifies end-to-end that a user-configured {@link PulsarSource} results in a call to
* the Pulsar broker to register the source connector.
*
* @author Chris Bono
*/
class PulsarFunctionTests implements PulsarTestContainerSupport {
// Not currently used by anything but prepares the test for when we do verify end-end
@Container
static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq").withExposedPorts(5672, 15672)
.withStartupTimeout(Duration.ofMinutes(1));
/**
* Because the docker image we use does not contain the built-in connectors we only
* verify that the configured functions are attempted to be registered w/ the broker.
*
* <p>
* Later we may provide a docker image that contains the built-in connectors and
* verify the complete end-end flow.
*/
@Test
void verifyPulsarSourceIsAttemptedToBeCreatedOnBroker() {
SpringApplication app = new SpringApplication(PulsarFunctionTestConfiguration.class);
app.setWebApplicationType(WebApplicationType.NONE);
// Again, this is a temp solution to verification of this feature
ApplicationContextException thrown = catchThrowableOfType(
() -> app.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(),
"--spring.pulsar.administration.service-url=" + PulsarTestContainerSupport.getHttpServiceUrl(),
"--spring.rabbitmq.host=" + rabbit.getHost(), "--spring.rabbitmq.port=" + rabbit.getAmqpPort()),
ApplicationContextException.class);
assertThat(thrown).hasCauseInstanceOf(PulsarFunctionException.class);
PulsarFunctionException cause = (PulsarFunctionException) thrown.getCause();
Map<PulsarFunctionOperations<?>, Exception> failures = cause.getFailures();
assertThat(failures).hasSize(1);
Map.Entry<PulsarFunctionOperations<?>, Exception> failureEntry = failures.entrySet().iterator().next();
assertThat(failureEntry.getKey()).isInstanceOf(PulsarSource.class)
.extracting("config", InstanceOfAssertFactories.type(SourceConfig.class))
.extracting(SourceConfig::getName).isEqualTo("rabbit-test-source");
assertThat(failureEntry.getValue()).isInstanceOf(PulsarAdminException.class)
.hasMessageContaining("Built-in source is not available");
}
@Configuration(proxyBeanMethods = false)
@Import(PulsarAutoConfiguration.class)
static class PulsarFunctionTestConfiguration {
@Bean
PulsarSource rabbitSource(@Value("${spring.rabbitmq.host}") String rabbitHost,
@Value("${spring.rabbitmq.port}") int rabbitPort) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", rabbitHost);
configs.put("port", rabbitPort);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder().tenant("public").namespace("default")
.name("rabbit-test-source").archive("builtin://rabbitmq").topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, FunctionStopPolicy.NONE, null);
}
}
}

View File

@@ -31,8 +31,7 @@ import org.testcontainers.utility.DockerImageName;
@Testcontainers(disabledWithoutDocker = true)
public interface PulsarTestContainerSupport {
PulsarContainer PULSAR_CONTAINER = new PulsarContainer(
isRunningOnMacM1() ? getMacM1PulsarImage() : getStandardPulsarImage()).withFunctionsWorker();
PulsarContainer PULSAR_CONTAINER = new PulsarContainer(getPulsarImage());
@BeforeAll
static void startContainer() {
@@ -43,8 +42,8 @@ public interface PulsarTestContainerSupport {
return PULSAR_CONTAINER.getPulsarBrokerUrl();
}
static String getHttpServiceUrl() {
return PULSAR_CONTAINER.getHttpServiceUrl();
static DockerImageName getPulsarImage() {
return isRunningOnMacM1() ? getMacM1PulsarImage() : getStandardPulsarImage();
}
private static boolean isRunningOnMacM1() {

View File

@@ -9,4 +9,5 @@
</root>
<logger name="org.testcontainers" level="ERROR"/>
<logger name="com.github.dockerjava" level="ERROR"/>
<logger name="org.springframework.pulsar.function" level="DEBUG"/>
</configuration>

View File

@@ -6,6 +6,7 @@
<suppress files="package-info\.java" checks=".*"/>
<suppress files="[\\/]test[\\/]" checks="RequireThis"/>
<suppress files="[\\/]test[\\/]" checks="Javadoc*"/>
<suppress files="PulsarFunctionAdministrationIntegrationTests" checks="Regexp"/>
<suppress files="Proto" checks=".*"/>
<suppress files="ReactiveSpringPulsarBootApp" checks="HideUtilityClassConstructor"/>
<suppress files="[\\/]spring-pulsar-docs[\\/]" checks="JavadocPackage|JavadocType|JavadocVariable|SpringDeprecatedCheck" />