Fixes https://github.com/spring-projects/spring-kafka/issues/439
Ignore `stop()` if container is not running, `start()` if container is running.
__cherry-pick to all branches__
* Use `doStop(Runnable)` optimization from the `stop()`
# Conflicts:
# spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java
# spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
(cherry picked from commit 3b060d8)
# Conflicts:
# spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java
Fixes#319
Add the `Acknowledgment` to the retry context if the listener type warrants it.
Conflicts:
spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java
src/reference/asciidoc/kafka.adoc
Resolves: https://github.com/spring-projects/spring-kafka/issues/295
When JMX is enabled and a `client.id` property provided, we get an `InstanceAlreadyExistsException`
when using the `ConcurrentMessageListenerContainer` with `concurrency > 1`.
Add a mechanism to append the client index to the client.id.
Also fix private inner class ctors -> package to avoid synthetic ctor.
Fix deprecation warning and test
Conflicts:
spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
src/reference/asciidoc/kafka.adoc
* Fix compilation warnings on the cast to `ClientIdSuffixAware`
Fixes GH-221 (https://github.com/spring-projects/spring-kafka/issues/221)
When the value is `String[]` we iterate it for the `if (resolvedValue instanceof String[]) {` and then just go to a new `if` block instead of `else if` or `return`
* Fix `KafkaListenerBPP.resolveAsString()` as an `else if` for the second condition
* Confirm the fix with the `EnableKafkaIntegrationTests` test modification to resolve `topics` to the `String[]`
Fixes GH-213 (https://github.com/spring-projects/spring-kafka/issues/213)
After introduction copying of `ContainerProperties` instance in the `AbstractMessageListenerContainer` the sample in the Reference Manual ins't valid any more.
* Move `containerProps.setMessageListener()` before `createContainer()` where copying is happened already and we can't affect container with original `ContainerProperties` modifications afterwards.
**Cherry pick to 1.0.x**
Fixes GH-160 (https://github.com/spring-projects/spring-kafka/issues/160)
* Add `KafkaNull`
* Override `PayloadArgumentResolver#isEmptyPayload` to support new `KafkaNull`
Polishing code style.
Add commit author to the `@author` list
Fixes: GH-154 (https://github.com/spring-projects/spring-kafka/issues/154)
Previously the `AbstractMessageListenerContainer` save external `ContainerProperties` instance leaving the room for external/internal mutation,
which may lead to unexpected behavior
* Create a new instance of the `ContainerProperties` in the `AbstractMessageListenerContainer` ctor based on the provided `ContainerProperties`
and use `BeanUtils.copyProperties()` for convenience.
* Refactoring for tests to meet a new state. Some of them indeed modified `ContainerProperties` after container instantiation.
* Remove fake `"propertiesFactory"` topic from the `AbstractKafkaListenerContainerFactory.containerProperties` instance to avoid unexpected behavior.
Change that to `(Pattern) null`
**Cherry-pick to 1.0.x**
Conflicts:
spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java
Resolved.
Fixes: GH-150 (https://github.com/spring-projects/spring-kafka/issues/150)
In some corner cases the target `MessageListener` implementation may decide to invoke `AbstractAdaptableMessageListener.onMessage(message)` of its delegate.
In this case we can't receive any exceptions back into container for handling, because `handleListenerException()` just logs them.
* Deprecate `handleListenerException()` and don't wrap `onMessage(message, null)` call with `try...catch` to let exception bubble in container back,
like it is with regular `onMessage(message, null)` in the container.
(Consider to remove `handleListenerException()` altogether for current `1.1` `master`)
**Cherry-pick to 1.0.x**
De-`@Deprecate` `handleListenerException()`
A custom implementation may decide to use that method for its purpose.
Fixes#146
When using reset relative to the current end, ensure the seek value is
not less than zero.
Add `consumer.position()` to error log message to reflect reality on `seek()`failure
(cherry picked from commit 6d80e0e)
Fixes GH-140 (https://github.com/spring-projects/spring-kafka/issues/140)
Previously with the pretty big `containerProperties.getPollTimeout()` we ended up with the issue of not acked commits for `BATCH` mode.
Just because the logic relies on the `consumer.wakeup()` which causes `WakeupException` breaking the main polling loop.
* Move `processCommits()` function to the beginning of main loop, before blocking on the `this.consumer.poll()`
* Increase `EnableKafkaIntegrationTests` wait timeouts
**Cherry-pick to 1.0.x**
Add `firstBatchLatch` to test to verify that the first batch is commit within the time less than 10 sec for poll
Fixes GH-141 (https://github.com/spring-projects/spring-kafka/issues/141)
The `MANUAL` acks are intended to be acknowledged only by the end-listener initiative.
An unconditional `this.acks.add(record)` is in the `invokeListener()` function by mistake.
* Wrap `this.acks.add(record)` in the `invokeListener()` to `if (!this.isAnyManualAck)`
* Fix `NPE` in te main poll loop as `if (records != null && this.logger.isDebugEnabled())`
(cherry picked from commit 44cc15f)
Resolves#135
Properly transfer `ack`s from the listener thread to the consumer thread
for all ack modes.
Previously, record ack mode was not handled in `processCommits`.
Fix test according PR comments
(cherry picked from commit 677d135)
Fixes#118
- Remove MANUAL_IMMEDIATE_SYNC - sync/async is controlled by the `syncCommits` property
- For MANUAL, wake the consumer thread at the end of the batch
- For MANUAL_IMMEDIATE, the consumer thread is woken directly from the Acknowledgement
Polishing
* Some simple code reformatting for `KafkaMessageListenerContainer`
* Move the `CountDownLatch` logic in the `KafkaMessageListenerContainerTests#testSlowConsumerCommitsAreProcessed()`
into the mock for `Consumer.commitSync()` since the count logic in the listener immediately after `ack.acknowledge()` doesn't guaranty that
`Consumer.commitSync()`will be called. That is because listener lives in one Thread, but `processCommits()` is done from a different `ListenerConsumer` Thread
Fixes GH-124 (https://github.com/spring-projects/spring-kafka/issues/124)
That doesn't look reasonable to have void `commitIfNecessary()` if we exceed default `0` for `ackCount` or `ackTime`.
* Add requirement to have `ackCount` and `ackTime` `> 0` in case appropriate `ackMode`
* Make `ackTime` as 5 secs by default - similar to default for `auto.commit.interval.ms`
Address PR comments
* Don't require `count` in case of `BATCH` mode
* Make `ackTime` as 5 sec only when `**TIME` mode
* Validate provided `count` and `time` options in the `ContainerProperties`
* Mention in JavaDocs for `MANUAL` that it works as `MANUAL_IMMEDIATE_SYNC` when no `count` and `time`
* Restore changes for tests
* Overcome the `Assert`s with `if` when `BeanUtils.copyProperties` is used