- When Kafka Streams branching function is provided as a Component bean,
there is an issue where the raw class check on the return outbound
resolvable type cuases an NPE. Fixing this issue by adding a null check
on the return type's raw class.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2821
- When event-type routing is enabled in Kafka Streams binder
and conurrency > 1 is used, messages are occasionally getting
dispatched to the wrong consumer causing CCE. This is due
to a race condition caused by a shared resource across threads.
Fixing the issue by introducing a ThreadLocal variable.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2823
- Instead of using a BiFunction as a delegate, use standard Function that takes the full record
- Remove Supplier<Long> that was used to handle record time stamps since this is no longer needed
- Docs cleanup
- Ensure that the Pulsar binder default properties can be properly
expressed via spring.cloud.stream.pulsar.default property prefix.
- Add the binder child context bean with the name binderName_binderProducingContext
into the parent application context so that individual beans from the binder context
can be easily queried.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2806
The PulsarBinder relies on the Spring Pulsar Spring Boot starter.
The starter moved out of the spring-pulsar core repo and into
Spring Boot proper. This commit updates the Pulsar binder to
use the new coordinates for the Spring Boot based starter.
* Additionally, the PulsarProperties were greatly reduced in the
move to Spring Boot. As such, the binder exposes an extended
set of config properties for producer/consumer (the initial
set supported before the property reduction).
If a target middleware (Kafka for ex) topic has more partitions
than what is set on the partition-count producer property, the binder
never updates the original producer partition-count property to the
partition count from the middleware if it is higher than what was given
through the property. Because of this, te PartitionAwareFunctionWrapper
which evaluates and assigns the partition header in Spring Cloud Stream
does not compute the correct partition since the hash operation still
using the original partition-count from the producer binding property.
This commit is addressing this issue.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2796
If kafka truststore and keystore locations are not local files, then they
are converted to org.springframework.core.io.Resource resources, then copied
to local file system. This means that, paths can be defined as HTTP resources too.
Currently, the Kafka binder only supports CLASSPATH based resources. It would be
useful if we can support non-CLASSPATH like resources such as HTTP, so that if an
application uses config server for example to store certificates, then those will
be copied from it's HTTP endpoint to the local filesystem as Resources.
Checkstyle, documentation fixes.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2761
There was a regression introduced in Spring Cloud Function where consumers
of type Consumer<Message<?>> receive null values when tombstone records
are given as KafkaNull.
See this issue for more details: https://github.com/spring-cloud/spring-cloud-function/issues/1060
Regression is addressed in Spring Cloud Function and
making the corresponding test changes in Spring Cloud Stream Kafka binder.
- Insted of unconditionally initializing KafkaJaasLoginModuleInitializer,
only create it, if jaas properties are set properly. By creating it always,
we unncessarily expose Java security related class requirements to the binder
apps. For e.g. if the underlying JVM security classes change and require
reflection, that could break AOT apps that don't use a secure Kafka cluster.
Fixing these type of issues by creating KafkaJaasLoginModuleInitializer only if required.
- Before running a test, invalidate any existing JVM-wide static
security configuration so that tests are forced to create/use
fresh security configuration. Without this, test suites with
multiple security tests might fail as they might overlap with
security confugiruation from other tests.
* Reactor Kafka Binder Health Indicator
- Provide a new abstraction for general Kafka binder related HealthIndicators.
- Refactor Kafka binder to use the new abstraction
- Add HealthIndicator implementation for the ReactorKafkaBinder
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2752
* Addressing PR review
* Addressing PR review
* Addressing PR review
- When using reactive functions, partition selector strategy does not
use the configured partition count for multiple outbounds. This is
because we take the first configured output binding and apply it's
partition counts on all the outbound reactive streams (Tuples).
Addressing this issue by properly applying the correct partition handling
per output binding.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2750
* KafkaStreams Functions Detection Logic
Kafka Streams functions declared in super classes are no longer
detected by the binder discovery algorithms. Fixing this issue
by properly scanning the methods from the super classes.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2737
* Addressing PR review
* GH-2729: Reactor Kafka Binder SenderResult Support
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2729
Allow configuration of a `FluxMessageChannel` to receive `SenderResult`s.
Add `SenderResultMessageHandler` to consume from that channel.
Remove undocumented `sendResult` header, which has no value without the
sender result correlation metadata.
* Add integration test and polish property docs.
- result channel must be FMC for reactive binder.
* Add documentation.
* Remove SenderResultMessageHandler.
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2706
- Update partition count changes in Kafka dynamically during runtime
- Checkstyle fixes
- PartitionHandler changes
- Improved handling of expression with 'payload' in the logic
- Removed getter/setter for PartitionHandler and use ReflectionUtils within Test
- Renamed property to 'dynamicPartitionUpdatesEnabled' and improved documentation (also one line per sentence)
- Improved/renamed test to use embeddedkafka and really test update behaviour
- Javadoc
Latest version of Mockito (5.3.0) used in Spring Boot 3.1.0,
apparently needs the specific Collection type when capturing
values via the ArgumentCaptor. Earlier versisons didn't mandate this.