Fixes: https://github.com/spring-cloud/spring-cloud-stream/issues/2939
The RabbitMQ 4.0 does not deal with client side `x-*` headers.
Therefore, an `x-death.count` is not incremented anymore when
message is re-published from client back to the broker.
* Spring AMQP 3.2 has introduced an `AmqpHeaders.RETRY_COUNT` custom header.
Use `messageProperties.incrementRetryCount()` in the `RabbitMessageChannelBinder`
when we re-published message back to the broker for server-side retries
* Fix docs respectively
Resolves#3019
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2985
This commit introduces new customization options for Kafka listener containers
in Spring Cloud Stream, along with comprehensive documentation:
- Add KafkaListenerContainerCustomizer interface for Kafka-specific customization
with access to extended consumer properties
- Extend ListenerContainerWithDlqAndRetryCustomizer to include access to
extended consumer properties
- Update KafkaMessageChannelBinder to support the new customizer interfaces
- Implement KafkaListenerContainerCustomizerTests for integration testing
- Add detailed AsciiDoc reference documentation explaining the purpose,
usage, and hierarchy of these customizer interfaces:
* ListenerContainerCustomizer (existing)
* KafkaListenerContainerCustomizer (new)
* ListenerContainerWithDlqAndRetryCustomizer (extended)
- Update navigation to include the new documentation
These changes enhance the flexibility and configurability of Kafka consumer
endpoints in Spring Cloud Stream applications, allowing users to fine-tune
their listener containers based on specific requirements and scenarios,
with improved access to Kafka-specific properties.
Fixes https://github.com/spring-cloud/spring-cloud-stream/issues/2650
* Enable native observability support for output binding in the reactive Kafka binder
* Adding test to verify this support with downstream consumers
* Adding ref docs
* Addressing PR review
Resolves https://github.com/spring-cloud/spring-cloud-stream/issues/2932
Spring Boot provides `SanitizingFunction` to allow the applicaitons to clear out
sensitive data when using certain actuator endpoints. This feature can be
extended to custom endpoints as well. Enable the bindings actuator endpoint
to sanitze sensitive data based on user-provided logic in `SantizingFuction`
beans in the application.
- Instead of using a BiFunction as a delegate, use standard Function that takes the full record
- Remove Supplier<Long> that was used to handle record time stamps since this is no longer needed
- Docs cleanup