Remove rawRecords artificial mode

Utilize an existing `consumer.useNativeDecoding` together with the
`listenerMode = batch` instead of artificial `rawRecords` mode.
Currently it requires `useNativeDecoding = true` anyway to get a
proper `List<Record>` payload downstream
This commit is contained in:
Artem Bilan
2018-08-03 17:14:08 -04:00
parent 534bf4f286
commit 4272945e2e
5 changed files with 22 additions and 55 deletions

View File

@@ -165,21 +165,23 @@ listenerMode::
The mode in which records are processed.
If `record`, each `Message` will contain `byte[]` from a single `Record.data`.
If `batch`, each `Message` will contain a `List<byte[]>` extracted from the consumed records.
If `rawRecords`, each `Message` will non-converted `List<Record>`.
When `useNativeDecoding = true` is used on the consumer together with the `listenerMode = batch`, there is no any out-of-the-box conversion happened and a result message contains a payload like `List<com.amazonaws.services.kinesis.model.Record>`.
It's up to target application to convert those records manually.
+
Default: `record`
checkpointMode::
The mode in which checkpoints are updated. If `record`, checkpoints occur after each record is processed (but this option
is only effective if `listenerMode` is set to `record`). If `batch`, checkpoints occur after each batch of records is
processed. If `manual`, checkpoints occur on demand via the `Checkpointer` callback.
The mode in which checkpoints are updated.
If `record`, checkpoints occur after each record is processed (but this option is only effective if `listenerMode` is set to `record`). If `batch`, checkpoints occur after each batch of records is processed.
If `manual`, checkpoints occur on demand via the `Checkpointer` callback.
+
Default: `batch`
recordsLimit::
The maximum number of records to poll per `GetRecords` request. Must not be greater than `10000`.
The maximum number of records to poll per `GetRecords` request.
Must not be greater than `10000`.
+
Default: `10000`
idleBetweenPolls::
The sleep interval used in the main loop between shards polling cycles, in milliseconds. Must not be less than `250`
The sleep interval used in the main loop between shards polling cycles, in milliseconds. Must not be less than `250`.
+
Default: `1000`
consumerBackoff::

View File

@@ -215,24 +215,14 @@ public class KinesisMessageChannelBinder extends
? kinesisShardOffset
: KinesisShardOffset.trimHorizon());
// Defer byte[] conversion to the InboundContentTypeConvertingInterceptor
adapter.setConverter(bytes -> bytes);
adapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
switch (kinesisConsumerProperties.getListenerMode()) {
case record:
adapter.setListenerMode(ListenerMode.record);
break;
case batch:
adapter.setListenerMode(ListenerMode.batch);
break;
case rawRecords:
adapter.setListenerMode(ListenerMode.batch);
if (properties.isUseNativeDecoding()) {
adapter.setConverter(null);
break;
}
else {
// Defer byte[] conversion to the InboundContentTypeConvertingInterceptor
adapter.setConverter(bytes -> bytes);
}
adapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode());

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kinesis.properties;
import org.springframework.integration.aws.inbound.kinesis.CheckpointMode;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
/**
*
@@ -29,7 +30,7 @@ public class KinesisConsumerProperties {
private int startTimeout = 60000;
private KinesisListenerMode listenerMode = KinesisListenerMode.record;
private ListenerMode listenerMode = ListenerMode.record;
private CheckpointMode checkpointMode = CheckpointMode.batch;
@@ -49,11 +50,11 @@ public class KinesisConsumerProperties {
this.startTimeout = startTimeout;
}
public KinesisListenerMode getListenerMode() {
public ListenerMode getListenerMode() {
return this.listenerMode;
}
public void setListenerMode(KinesisListenerMode listenerMode) {
public void setListenerMode(ListenerMode listenerMode) {
this.listenerMode = listenerMode;
}
@@ -97,27 +98,4 @@ public class KinesisConsumerProperties {
this.shardIteratorType = shardIteratorType;
}
/**
* @see org.springframework.integration.aws.inbound.kinesis.ListenerMode
*/
public enum KinesisListenerMode {
/**
* Each {@code Message} will be converted from a single {@code Record}.
*/
record,
/**
* Each {@code Message} will contain {@code List<byte[]>} from {@code Record} list if not
* empty.
*/
batch,
/**
* Each {@code Message} will contain {@code List<Record>} if not empty.
*/
rawRecords
}
}

View File

@@ -71,7 +71,7 @@ import static org.assertj.core.api.Assertions.assertThat;
"spring.cloud.stream.kinesis.bindings.input.consumer.idleBetweenPolls = 1",
"spring.cloud.stream.kinesis.binder.headers = foo",
"spring.cloud.stream.kinesis.binder.checkpoint.table = checkpointTable",
"spring.cloud.stream.kinesis.binder.locks.table = fakeTable" })
"spring.cloud.stream.kinesis.binder.locks.table = lockTable" })
@DirtiesContext
public class KinesisBinderProcessorTests {
@@ -145,11 +145,6 @@ public class KinesisBinderProcessorTests {
return localKinesisResource.getResource();
}
@Bean
public LockRegistry lockRegistry() {
return new DefaultLockRegistry();
}
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(Message<?> message) {
String payload = new String((byte[]) message.getPayload());

View File

@@ -61,6 +61,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.inbound.kinesis.ListenerMode;
import org.springframework.integration.aws.support.AwsRequestFailureException;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.NullChannel;
@@ -443,7 +444,8 @@ public class KinesisBinderTests
}
ExtendedConsumerProperties<KinesisConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setListenerMode(KinesisConsumerProperties.KinesisListenerMode.rawRecords);
consumerProperties.getExtension().setListenerMode(ListenerMode.batch);
consumerProperties.setUseNativeDecoding(true);
QueueChannel input = new QueueChannel();
Binding<MessageChannel> inputBinding = binder.bindConsumer("testBatchListener", null, input,