Revert "GH-1267 Remove RSocket sample"

This reverts commit 110f7619fd.
This commit is contained in:
Oleg Zhurakousky
2025-04-30 19:12:51 +02:00
parent dc535f75ee
commit 441960095d
14 changed files with 1049 additions and 0 deletions

View File

@@ -0,0 +1,28 @@
package io.spring.cloudevent;
import java.util.function.Consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Consumer<Person> hire(StreamBridge streamBridge) {
return person -> {
Employee employee = new Employee(person);
streamBridge.send("hire-out-0", CloudEventMessageBuilder.withData(employee)
.setSource("http://spring.io/rsocket")
.setId("1234567890")
.build());
};
}
}

View File

@@ -0,0 +1,41 @@
package io.spring.cloudevent;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class Employee {
private Person person;
private int id;
public Employee() {
}
public Employee(Person person) {
this.person = person;
this.id = new Random().nextInt(1000);
}
public Person getPerson() {
return person;
}
public void setPerson(Person person) {
this.person = person;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getMessage() {
return "Employee " + id + " was hired on " + new SimpleDateFormat("dd-MM-yyyy").format(new Date());
}
}

View File

@@ -0,0 +1,24 @@
package io.spring.cloudevent;
public class Person {
private String firstName;
private String lastName;
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}

View File

@@ -0,0 +1,83 @@
package io.spring.cloudevent;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
import org.junit.jupiter.api.extension.ExecutionCondition;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.util.MimeTypeUtils;
@SpringBootTest(properties = {"spring.rsocket.server.port=55551"})
@ExtendWith(DemoApplicationTests.TestRule.class)
public class DemoApplicationTests {
ArrayBlockingQueue<Message<String>> queue = new ArrayBlockingQueue<>(1000);
@Autowired
private RSocketRequester.Builder rsocketRequesterBuilder;
@Test
public void test() throws Exception {
String payload = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"org.springframework\",\n" +
" \"source\" : \"https://spring.io/\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"datacontenttype\" : \"application/json\",\n" +
" \"data\" : {\n" +
" \"firstName\" : \"John\",\n" +
" \"lastName\" : \"Doe\"\n" +
" }\n" +
"}";
this.rsocketRequesterBuilder.tcp("localhost", 55551)
.route("hire")
.metadata("{\"content-type\":\"application/cloudevents+json\"}", MimeTypeUtils.APPLICATION_JSON)
.data(payload)
.send()
.subscribe();
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
System.out.println("Cloud Event 'specversion': " + CloudEventMessageUtils.getSpecVersion(resultFromKafka));
System.out.println("Cloud Event 'source': " + CloudEventMessageUtils.getSource(resultFromKafka));
System.out.println("Cloud Event 'id': " + CloudEventMessageUtils.getId(resultFromKafka));
System.out.println("Cloud Event 'type': " + CloudEventMessageUtils.getType(resultFromKafka));
}
@KafkaListener(id = "test", topics = "hire-out-0", clientIdPrefix = "cloudEvents")
public void listen(Message<String> message) {
queue.add(message);
}
public static class TestRule implements ExecutionCondition {
@Override
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("localhost", 9092));
socket.close();
}
catch (Exception e) {
System.out.println("Kafka is not available on localhost:9092");
return ConditionEvaluationResult.disabled("Kafka is not available on localhost, default port");
}
return ConditionEvaluationResult.enabled("All is good");
}
}
}