Fixes https://github.com/spring-projects/spring-integration-aws/issues/97
This fixes an issue where `additionalTimeToWait` has a negative value
that is subtracted later by the Amazon DynamoDB client to
`leaseDuration`, making it impossible to acquire the lock if expired.
Also allow to use the existing `refreshPeriod` properties in this case.
* Use default refresh period in tryLock()
The 'refreshPeriod' value might not satisfy both tryLock() and lock()
requirements and should probably be used exclusively in the second case.
We can make the value used for tryLock() configurable as well in another
pull request.
* Rollback previous commit and add author name
There is a race condition when real consumers are started later in
different thread.
* Add `eventually()` matcher to check the `Map<KinesisShardOffset, ?> shardConsumers`
for particular keys to appear eventually
Since we don't have choice and wait at minimum `leaseDuration` to be
able to iterate in the Amazon LockClient at least two times and that is
going to be just a contract of this client, so we need to add up a
`timeout` for the `tryLock` to that `leaseDuration`
https://stackoverflow.com/questions/51428196/spring-aws-kinesis-binder-acquiring-and-releasing-lock-issues-in-dynamo-db-while
According the logic in the `AmazonDynamoDBLockClient.acquireLock()` we
need to let the loop to iterate at least twice to really get access to
the existing lock and check its expiration status
* Fix the `DynamoDbLockRegistry.tryLock()` to set a proper
`.withRefreshPeriod(0L)` and `.withRefreshPeriod(0L)` to let it pull
the DB until we reach a `leaseDuration` limit on the lock or its
expiration status
Fixes https://github.com/spring-projects/spring-integration-aws/issues/90
* Add an internal `ShardConsumerManager` which is responsible to initiate
a locking for the shard key in the provided consumer group and populating
a `ShardConsumer` if `tryLock()` is successful or no `LockRegistry` at all
* Additional logic is added to always iterate over candidate shards if
`tryLock()` on the matter is not successful.
This way the current `KinesisMessageDrivenChannelAdapter` picks up
those shards which have been locked by the consumer which has just
left a cluster and unlocked distributed locks
* Now all the shards are considered as candidates independently of the
`streams` or `shardOffsets` configuration
* Improve Kinesis tests performance
To be able to run Kinesalite and Dynalite npm services locally in
parallel, we need to provide unique ports for them.
Therefore a default `4567` is left for the Kinesalite, meanwhile the
`DynamoDbLocalRunning` is expecting a 4568 now
Since the contract of the `tryLock()` to wait as close to the provided
timeout as possible, then we should not wait for the `leasePeriod` as
minimum.
Also we should sleep in between attempts not more then provided timeout
If `DynamoDbLockRegistry` and `DynamoDbMetaDataStore` are not declared
as beans (or their `afterPropertiesSet()` is not called), they hanging on
the `awaitForActive()`
When we try to lock the DynamoDB item we need to iterate at least
the lease time, but we still don't need to sleep too much - maximum
the time requested for `tryLock()`
Fixes https://github.com/spring-projects/spring-integration-aws/issues/90
* The `KinesisMessageDrivenChannelAdapter` can now be supplied with the
`LockRegistry` (e.g. `DynamoDbLockRegistry`) and when stream-based
configuration is used, the channel adapter performs `tryLock()` for the
shard in the channel adapter consumer group.
Therefor only one listener in the group is able to consume from the shard
Note: there is no yet full support for rebalance functionality.
And such a feature can be implemented using Spring Cloud Bus with the
command to stop and start channel adapters when a new
`KinesisMessageDrivenChannelAdapter` arrives to the cluster
* GH-66: Add DynamoDbLockRegistry implementation
Fixes https://github.com/spring-projects/spring-integration-aws/issues/66
* * Remove `Lifecycle` from `DynamoDbLockRegistry` in favor of a thread
execution in the `afterPropertiesSet()`
* Fix `lock()` interruptibility logic
* * Remove `mavenLocal()` since the upstream PR is merged
* decrease an amount of expectations in the Kinesis test
* * Upgrade to SC-AWS-2.0.0.RC2
* * Add Docs for the `DynamoDbLockRegistry`
Fixesspring-projects/spring-integration-aws#85
* For better component management add
`SqsMessageDrivenChannelAdapter.getQueue()` to return what queues this
channel adapter is subscribed
* Upgrade to Gradle 4.6 and Checkstyle 8.8
Fixesspring-projects/spring-integration-aws#84Fixesspring-cloud/spring-cloud-stream-binder-aws-kinesis#38Fixesspring-cloud/spring-cloud-stream-binder-aws-kinesis#36
Even if AWS Client has some reconnect and retry mechanism, it can be
exhausted and no connection error is rethrown to the `KinesisMessageDrivenChannelAdapter`
anyway.
On the other hand the error can be thrown from the record processor -
the flow on the `outputChannel`.
* log the exception around AWS Client calls and let background process
to restore/retry
* log the exception around message to send to let the processor to move
to the next record or perform the next task
* null the current `task` in the `ShardConsumer` in the `finally` block
to avoid hanging the thread without ability to moving to some other state
without end-user interaction
* When perform the `batch` checkpoint, check the result and if it is
negative, consider such a situation as processed and skip records from
sending downstream
* Upgrade dependencies
* Use Log4J2 for tests logging
Relates spring-cloud-stream-binder-aws-kinesis/#34
* Introduce `KinesisMessageHeaderErrorMessageStrategy` to populate
`AwsHeaders.RAW_RECORD` to the `ErrorMessage` headers
* Improve `KinesisMessageDrivenChannelAdapter` for the `AttributeAccessor`.
Also `try...finally` the `processRecords()` to perform important tasks
independently of the `processRecords()` result
Fixes: spring-projects/spring-integration-aws#62
* Make `SnsMessageHandler extends AbstractAwsMessageHandler`
* Remove SNS Outbound Gateway variant since `SnsMessageHandler` covers
that part via `successChannel` and `failureChannel`
* Fix XSD for SQS and SNS
* Fix SQS and SNS tests according their logic changes
* Fix README for new changes
Fixes: spring-cloud/spring-cloud-stream-binder-aws-kinesis#29
Previously the same header name has been used for sending and receiving
operations (e.g. Kinesis `stream`).
This causes collisions in streaming processes when we receive message
from the AWS and send it downstream to AWS.
The header presence has a precedence over configured property/expression.
Therefore we send message to the AWS (e.g. Kinesis) using wrong
destination or other correlation properties
* Add `AwsHeaders.RECEIVED_*` headers to avoid collisions
* Remove Jackson dependency since it is managed now properly by SC-AWS
* Upgrade to SI-5.0, SC-AWS-2.0, Gradle-4.4.1 and some Gradle plugins
* Add `Jackson` dependency for compatibility with SF
* Implement new API of super classes
* Fix deprecations
* Fix tests for new state of classes under test
* Rename XSD to version 2.0
* Always call provided `AsyncHandler` from the internal instance
in the `KinesisMessageHandler`
* Make the `KinesisMessageHandler.obtainAsyncHandler()` as generic method
* Rename `sendFailureChannel` property just to the `failureChannel`
since the real operation is `put` not send
* Add `AwsHeaders.SERVICE_RESULT` to represent the service result, e.g.
in case of `PutRecordsRequest` in the `KinesisMessageHandler` to
send on success the whole `PutRecordsResult`
* Fix README to reflect the current reality of the code
* Fix `KinesisMessageHandlerTests` for provided `AsyncHandler` verification
This is groundwork to allow usage of a failure channel within the Kinesis binder per.
This implementation is intended to be backward-compatible with respect to the current handling
of `AsyncHandler`. Client code can still provide an `AsyncHandler`,
but doing so precludes the usage of channels for successful or unsuccessful sends.
Renaming to AwsRequestFailureException
generic getasynchandler method
always delegate or build handler
Added readme docs and using channel for tests
Since the listener may take some time for records processing,
there is a possibility that checkpoint will be stored too late
after the process and thus we are able to get the same records
in different channel adapter for the same shard, even if they are
in the same consumer group and use shared `MetadataStore`
This solution is some compromise for the current state of things and
has to be reconsidered in the future in favor of proper rebalance and
shard leader election solution
As a workaround for the duplicate records an additional
`@IdempotentReceiver` approach can be used
* Upgrade to Gradle 4.2.1, Checkstyle 8.3, AssertJ 3.8.0
* Fix race condition in the `KinesisMessageDrivenChannelAdapterTests`
Fixesspring-projects/spring-integration-aws#72
* Document Kinesis Channel Adapters
* Fix some inconsistency in the `KinesisMessageHandler`
* Add integration test against `KinesisLocalRunning` `@Rule`
* Document testing against Kinesalite
Fixesspring-projects/spring-integration-aws#64
* Add `DynamoDbRunning` for testing against locally ran DynamoDB
* Upgrade to Gradle 4.0.1, SI-4.3.11
* Switch on some Checkstyle rules for tests
Resolvesspring-projects/spring-integration-aws#75
Since we can build S3 entity key any deep path,
e.g. `my_bucket/foo/bar/baz/file.name` and S3 Object `key` representation
is exactly the whole path without bucket name, we should treat only bucket
as a remote dir; the key should be as a file name
* Change `S3InboundFileSynchronizer` logic to extract bucket name before
performing `copyFileToLocalDirectory()`
* Change `S3StreamingMessageSource` to override the `S3FileInfo.remoteDirectory`
to the only bucket name after `poll()`.
* Use for both cases a new `S3Session.normalizeBucketName()` method
* Upgrade to Gradle 4.0 and some other `build.gradle` polishing