Files
2020-12-18 18:54:21 +01:00

55 lines
3.2 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
## Examples of Cloud Events with Spring via AMQP and Apache Kafka
### Introduction
The current example uses spring-cloud-function framework as its core which allows users to only worry about functional aspects of
their requirement while taking care-off non-functional aspects. For more information on Spring Cloud Function please visit
our https://spring.io/projects/spring-cloud-function[project page].
The example consists of a Spring boot configuration class
https://github.com/spring-cloud/spring-cloud-function/blob/master/spring-cloud-function-samples/function-sample-cloudevent-stream/src/main/java/io/spring/cloudevent/DemoApplication.java[DemoApplication]
which contains a sample function which you can interact with following via AMQP and Apache Kafka.
### From RabbitMQ to Apache Kafka
Assuming you have RabbitMQ and Kafka running, start the application and send a Message to RabbitMQ.
We included a https://github.com/spring-cloud/spring-cloud-function/blob/master/spring-cloud-function-samples/function-sample-cloudevent-stream/src/test/java/io/spring/cloudevent/DemoApplicationTests.java[demo test case] which effectively automates this demo by sending Cloud Event to RabbitMQ and receives one from Apache Kafka.
```
Message<byte[]> messageToAMQP = CloudEventMessageBuilder
.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
.setSource("https://cloudevent.demo")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .
```
Note how we are using CloudEventMessageBuilder here to only set source as Cloud Event attribute while relying on default values for the rest of the
required Cloud Event attributes. Were also using build(CloudEventMessageUtils.AMQP_ATTR_PREFIX) to ensure that the attributes are prefixed with `cloudEvents:`
prefix (see Cloud Events AMQP protocol bindings).
Also note that on the receiving end Cloud Events attributes are now prefixed with `ce_` prefix (see Cloud Events Kafka protocol bindings),
since it was determined by the framework that the target destination is Apache Kafka.
This last point is worth elaborating a bit. We already established that setting Cloud Event attributes is a non-functional aspect and because
of it weve exposed a mechanism for you to deal with it outside of your business logic. But what about attribute prefixes? Note that we are running the
same code in different execution contexts. This means that the attribute prefixes actually depend on the execution context. So by being aware of the execution
context, the framework ensures the correctness of the Cloud Event attribute prefixes.
You can also use http://localhost:15672/[RabbitMQ dashboard] (if you have it installed) and send message to `hire-in-0` exchange.
To stay compliant with Cloud Event specification you should provide attributes with AMQP appropriate prefixes (i.e., `cloudEvents:`). For example:
```
cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001
```
And your data:
```
{"firstName":"John", "lastName":"Doe"}
```