Fixes#118
- Remove MANUAL_IMMEDIATE_SYNC - sync/async is controlled by the `syncCommits` property
- For MANUAL, wake the consumer thread at the end of the batch
- For MANUAL_IMMEDIATE, the consumer thread is woken directly from the Acknowledgement
Polishing
* Some simple code reformatting for `KafkaMessageListenerContainer`
* Move the `CountDownLatch` logic in the `KafkaMessageListenerContainerTests#testSlowConsumerCommitsAreProcessed()`
into the mock for `Consumer.commitSync()` since the count logic in the listener immediately after `ack.acknowledge()` doesn't guaranty that
`Consumer.commitSync()`will be called. That is because listener lives in one Thread, but `processCommits()` is done from a different `ListenerConsumer` Thread
Fixes GH-124 (https://github.com/spring-projects/spring-kafka/issues/124)
That doesn't look reasonable to have void `commitIfNecessary()` if we exceed default `0` for `ackCount` or `ackTime`.
* Add requirement to have `ackCount` and `ackTime` `> 0` in case appropriate `ackMode`
* Make `ackTime` as 5 secs by default - similar to default for `auto.commit.interval.ms`
Address PR comments
* Don't require `count` in case of `BATCH` mode
* Make `ackTime` as 5 sec only when `**TIME` mode
* Validate provided `count` and `time` options in the `ContainerProperties`
* Mention in JavaDocs for `MANUAL` that it works as `MANUAL_IMMEDIATE_SYNC` when no `count` and `time`
* Restore changes for tests
* Overcome the `Assert`s with `if` when `BeanUtils.copyProperties` is used
Fixes GH-117 (https://github.com/spring-projects/spring-kafka/issues/117)
- merge `handleManualAcks`
- simplify offset handling by removing the `manualOffsets` map
- ensure that all acks are flushed on `stop()`
- Upgrade to Gradle 2.14
Having the async consumption nature we have to `spy` the `Consumer` before the real consumption.
Otherwise we can end up with the fact that we pass all desired phases in one thread, but another hasn't been spied yet to be verified in the end properly.
https://build.spring.io/browse/SK-SON-97/
Fixes GH-89 (https://github.com/spring-projects/spring-kafka/issues/89)
* Introduce `TopicPartitionInitialOffset`, where it utilizes `TopicPartition` and `Long initialOffset`
* The `initialOffset` can be:
- `null` - do nothing;
- positive (including `0`) - absolute offset
- negative - the offset relative to the current last offset of the partition: `consumer.seekToEnd() + initialOffset`
* Rework everything around to rely on a new `TopicPartitionInitialOffset` abstraction
* The logic in the `KafkaMessageListenerContainer.ListenerConsumer.initPartitionsIfNeeded()` reworked to in favor of a new abstraction
* remove redundant `recentOffset`
* Reflect new `TopicPartitionInitialOffset` in the docs
Add `@PartitionOffset` support for the `@TopicPartition`
Polishing
- Fixes#110, committing the initial state of a consumer
Polishing according PR comments
Don't Interrupt invoker unless he doesn't stop
Logging and Clear Unsent
Resolves#105
Extract the retry template into adapters.
Polishing
Redo `ConcurrentMessageListenerContainerTests` to Java 8 style.
Change `ArrayList` for thread names to the `ConcurrentSkipListSet` which is synchronized
Fixes#87 (https://github.com/spring-projects/spring-kafka/issues/87)
- allow for acking on both success and error (make the latter conditional upon `ackOnError`)
- fixes an issue where `processCommits()` wasn't handled on invoker stop, allowing for missed commits
Resolves#99
Also change `ContainerProperties` to use accessors
Also fix `stop(Runnable callback` logic for the container registry and
concurrent container.
* Polishing according PR comments and some typos fixes
Resolves#84
Slow consumers can cause a rebalance when not required.
Solution: 2 threads per consumer
- beanName-kafka-consumer-n
- beanName-kafka-listener-n
The first constantly polls Kafka; for each retrieved message then hands off
the message to the second thread (via a `ListenableFuture`).
If the future completes ok (`get()`, all is well and we move on to the next
message, if not, the consumer is paused and we continue to poll (heartbeat).
When the callback completes successfully, the consumer is awoken from the poll
and the next previously retrieved message is processed. When all such messages
are processed, the consumer is resumed.
If manual acks are in play, the ack is passed from the listener thread back to
the consumer thread via a `BlockingQueue` so the actual commit is done on the
right thread.
If the delayed execution fails, normally, the error is reported to the error
handler, the message is ack'd and we move on to the next as with a successful
completion.
However, to satisfy the use case reported by issue 84, if the exception is
assignable to the configured `pausedException` class, the consumer is
paused as with a timeout. In this case, after the next poll, the same
record is attempted to be delivered. Deliveries continue until successful
or some other exception is thrown.
Properties:
- `pauseWhenSlow` - enable pause/resume (default `true`)
- `pauseAfter` - how long to wait for the listener thread to return
- `pauseException` - an exception that will cause a pause
Polishing
Polishing - Use a BlockingQueue
In order to maintain a pipeline of work for the listener invoker,
hand ConsumerRecords from each poll to the listener via a blocking
queue.
Now, pause the consumer only if the blocking queue becomes full.
The queue size is configurable (default 1) so we pause when 2
ConsumerRecords have been retrieved.
Remove the `pauseForException` code - instead use a `RetryTemplate` in
the listener invoker configured to retry for an exception of choice.
Since retry generally uses a backoff - this will put backpressure on the
queue and the consumer will be paused.
There's a slight semantic change to `AckMode.BATCH` in that it now means
acks for currently processed records will be committed after each poll;
it may not be the complete batch.
Suspend if not pauseEnabled
Enhacements to GH-84
- guarantee that the container is effectively stopped when stop() returns;
- On topic and topic pattern subscriptions, clear pending messages and
flush acknowledgments;
- On topic and topic pattern subscriptions, use `partitionsAssigned` as trigger
for starting the invoker
Additional enhancements to GH-84
- Run autocommit containers on a single thread
Additional enhancements to GH-84
- only start the invoker if the container is running and we have partitions to listen
Additional changes to GH-84
Use `cancel` to stop the listener and interrupt the poll
Allow for asynchronous stop in `stop(Runnable)`
Simplify classes used by the tests
Reinstate isPauseEnabled Use
Interrupt thread explicitly and wait for a clean stop on the invoker
Create ContainerProperties
Move ContainerProperties to Ctor Argument
Add topic config to the properties.
Since the annotated listener is created by a factory, we have
to "clone" the properties.
Doc Polishing for ContainerProperties
Move ContainerProperties to a Top Level Class
Simple polishing
When a KafkaMessageListenerContainer is created, the error handler method is not called. There is no way to set the error handler after it is created
Fix codestyle
Update the testListenerException test case to test the errorHandler method
Add my name as requested.
Remove accent on my name
* Remove `Lombok` dependency
* Allow to write tests with Java 8
* Remove redundant `JsonDatabindFactory` in favor of `ObjectMapper` injection into the `JsonSerializer`/`JsonDeserializer`
* Remove `JsonWrapperException` in favor of existing `SerializationException` in Apache Kafka
* Fix `StringJsonMessageConverter` to let inject `ObjectMapper`
* Fix inline `ObjectMapper` for the
```
this.objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
```
to align with all other Spring projects
* Fix `JsonSerializationTests` for the Spring Code style
* Document Serialization and Conversion components
- implementation is based on "Serializer" and "Deserializer" concepts from 'kafka-clients' module
- updated Jackson dependency to 2.6.5 (same is used in Spring Boot v.1.3.3.RELEASE)
- migrated the related unit tests
GH-79: Use the already implemented Spring utility class instead of native java approach
GH-79: Code formatting according to Spring Conventions, checkstyle fixes, code polishing
Fixes#62Resolves#72
See the discussion on GH-62 `commitAsync()` is not currenly reliable.
Use `commitSync()` by default; add `syncCommits` property to the containers
(default true).
Also allow a user-injected commit callback (GH-72)
Fixes GH-66 (https://github.com/spring-projects/spring-kafka/issues/66)
Added fixes for code review comments by artembilan
Added minor documentation change.
Added injection of RebalanceListener to SimpleKafkaListenerContainerFactory
Polishing
The `ProducerListenerInvokingCallback` logic has been absorbed by the `KafkaTemplate`
inner `Callback` implementation to meet the `ListenableFuture` requirements
Resolves#42
Convert to MethodIntrospector
Polishing
* Use more simple `selectMethods` for `boolean` case
* Add `spring-tx` dependency
* add `@Transactional` test-case to be sure that our job around proxies is fine
Resolves#45
Avoid propagation of received headers when sending `o.s.messaging.Message`s.
Also add `Message<?>` methods to the template.
GH-45: Polishing
Remove Extra Spaces
Resolves#39Resolves#44
Remove `convert` from method names.
Add missing methods - no key.
Also remove log4j.properties from test jar.
Polishing according PR comments