Add smoke test for Spring RabbitMQ
- Change the AmqpRabbitApplication implementation to align the RabbitTemplate with SmartLifecycle. - Add reminder to update the spring-rabbit version to 3.0.8+.
This commit is contained in:
committed by
Sébastien Deleuze
parent
87be1668b1
commit
865edc0755
1
.gitignore
vendored
1
.gitignore
vendored
@@ -9,3 +9,4 @@ out/
|
||||
ci/pipeline.yml
|
||||
stats-restore
|
||||
.vscode/
|
||||
.DS_Store
|
||||
|
||||
1
integration/spring-amqp-rabbit/README.adoc
Normal file
1
integration/spring-amqp-rabbit/README.adoc
Normal file
@@ -0,0 +1 @@
|
||||
Tests if Spring AMQP with RabbitMQ is working.
|
||||
21
integration/spring-amqp-rabbit/build.gradle
Normal file
21
integration/spring-amqp-rabbit/build.gradle
Normal file
@@ -0,0 +1,21 @@
|
||||
plugins {
|
||||
id "java"
|
||||
id "org.springframework.boot"
|
||||
id "org.springframework.cr.smoke-test"
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(platform(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES))
|
||||
implementation("org.springframework.boot:spring-boot-starter-amqp")
|
||||
|
||||
// TODO: update to non snapshot spring-rabbit 3.0.8+ version when available.
|
||||
implementation("org.springframework.amqp:spring-rabbit:3.0.8-SNAPSHOT")
|
||||
|
||||
implementation("org.crac:crac:$cracVersion")
|
||||
implementation(project(":cr-listener"))
|
||||
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-test")
|
||||
|
||||
appTestImplementation(project(":cr-smoke-test-support"))
|
||||
appTestImplementation("org.awaitility:awaitility:4.2.0")
|
||||
}
|
||||
7
integration/spring-amqp-rabbit/docker-compose.yml
Normal file
7
integration/spring-amqp-rabbit/docker-compose.yml
Normal file
@@ -0,0 +1,7 @@
|
||||
version: '3'
|
||||
services:
|
||||
rabbitmq:
|
||||
image: 'rabbitmq:management'
|
||||
ports:
|
||||
- '5672'
|
||||
- '15672'
|
||||
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.example.amqp;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.cr.smoketest.support.assertj.AssertableOutput;
|
||||
import org.springframework.cr.smoketest.support.junit.ApplicationTest;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@ApplicationTest
|
||||
class AmqpRabbitApplicationCrTests {
|
||||
|
||||
@Test
|
||||
void rabbitListenerMethodReceivesMessageAndSendsResponse(AssertableOutput output) {
|
||||
Awaitility.await()
|
||||
.atMost(Duration.ofSeconds(60))
|
||||
.untilAsserted(() -> assertThat(output).hasLineContaining("++++++ Received: ONEtwo"));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,106 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.example.amqp;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.connection.CorrelationData;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
public class AmqpRabbitApplication implements SmartLifecycle {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
SpringApplication.run(AmqpRabbitApplication.class, args);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
RabbitTemplate template;
|
||||
|
||||
@RabbitListener(id = "cf1", queues = "cf1")
|
||||
@SendTo
|
||||
public String upperCaseIt(String in) {
|
||||
try {
|
||||
String two = sendWithConfirms();
|
||||
return in.toUpperCase() + two;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new AmqpRejectAndDontRequeueException("fail");
|
||||
}
|
||||
}
|
||||
|
||||
@RabbitListener(id = "cf2", queues = "cf2")
|
||||
@SendTo
|
||||
public String lowerCaseIt(String in) {
|
||||
return in.toLowerCase();
|
||||
}
|
||||
|
||||
private String sendWithConfirms() throws Exception {
|
||||
CorrelationData data = new CorrelationData();
|
||||
String result = (String) this.template.convertSendAndReceive("", "cf2", "TWO", data);
|
||||
data.getFuture().get(10, TimeUnit.SECONDS);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue queue1() {
|
||||
return new Queue("cf1");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue queue2() {
|
||||
return new Queue("cf2");
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
|
||||
public void doSendMessage() {
|
||||
if (this.isRunning.get()) {
|
||||
System.out.println("++++++ Received: " + template.convertSendAndReceive("", "cf1", "one"));
|
||||
}
|
||||
}
|
||||
|
||||
// Use the SmartLifecycle to synch the message sending with the checkpoint/restore.
|
||||
private AtomicBoolean isRunning = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
this.isRunning.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
this.isRunning.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return this.isRunning.get();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
spring.rabbitmq.host=${RABBITMQ_HOST:localhost}
|
||||
spring.rabbitmq.port=${RABBITMQ_PORT_5672:5672}
|
||||
spring.rabbitmq.publisher-confirm-type=correlated
|
||||
Reference in New Issue
Block a user