diff --git a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
index 460a8cc3f..8ba39b08e 100644
--- a/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
+++ b/src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java
@@ -32,6 +32,7 @@ import org.springframework.data.redis.connection.ReactiveRedisConnection.Command
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -58,6 +59,7 @@ import org.springframework.util.StringUtils;
* @author Tugdual Grall
* @author Dengliming
* @author Mark John Moreno
+ * @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamCommands {
@@ -394,11 +396,40 @@ public interface ReactiveStreamCommands {
return xAdd(Mono.just(AddStreamRecord.of(record))).next().map(CommandResponse::getOutput);
}
+ /**
+ * Add stream record with the specified options.
+ *
+ * @param record must not be {@literal null}.
+ * @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
+ * @return {@link Mono} the {@link RecordId id}.
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ default Mono xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
+
+ Assert.notNull(record, "Record must not be null");
+ Assert.notNull(xAddOptions, "XAddOptions must not be null");
+
+ AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
+ .approximateTrimming(xAddOptions.isApproximateTrimming())
+ .makeNoStream(xAddOptions.isNoMkStream());
+
+ if (xAddOptions.hasMaxlen()) {
+ addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
+ }
+
+ if (xAddOptions.hasMinId()) {
+ addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
+ }
+
+ return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
+ }
+
/**
* Add stream record with given {@literal body} to {@literal key}.
*
* @param commands must not be {@literal null}.
- * @return {@link Flux} emitting the {@link RecordId} on by for for the given {@link AddStreamRecord} commands.
+ * @return {@link Flux} emitting the {@link RecordId} on by for the given {@link AddStreamRecord} commands.
* @see Redis Documentation: XADD
*/
Flux> xAdd(Publisher commands);
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java
index 432cf7728..a3d7b9366 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java
@@ -33,6 +33,7 @@ import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -60,6 +61,7 @@ import org.springframework.util.ClassUtils;
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
+ * @author jinkshower
* @since 2.2
*/
class DefaultReactiveStreamOperations implements ReactiveStreamOperations {
@@ -146,6 +148,18 @@ class DefaultReactiveStreamOperations implements ReactiveStreamOperat
return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input)));
}
+ @Override
+ public Mono add(Record record, XAddOptions xAddOptions) {
+
+ Assert.notNull(record.getStream(), "Key must not be null");
+ Assert.notNull(record.getValue(), "Body must not be null");
+ Assert.notNull(xAddOptions, "XAddOptions must not be null");
+
+ MapRecord input = StreamObjectMapper.toMapRecord(this, record);
+
+ return createMono(streamCommands -> streamCommands.xAdd(serializeRecord(input), xAddOptions));
+ }
+
@Override
public Flux> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
diff --git a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
index 52eaa318a..a53b18d03 100644
--- a/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java
@@ -26,6 +26,7 @@ import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -54,6 +55,7 @@ import org.springframework.util.ClassUtils;
* @author Christoph Strobl
* @author Marcin Zielinski
* @author John Blum
+ * @author jinkshower
* @since 2.2
*/
class DefaultStreamOperations extends AbstractOperations implements StreamOperations {
@@ -136,6 +138,21 @@ class DefaultStreamOperations extends AbstractOperations i
return execute(connection -> connection.xAdd(binaryRecord));
}
+ @Nullable
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordId add(Record record, XAddOptions options) {
+
+ Assert.notNull(record, "Record must not be null");
+ Assert.notNull(options, "XAddOptions must not be null");
+
+ MapRecord input = StreamObjectMapper.toMapRecord(this, record);
+
+ ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer());
+
+ return execute(connection -> connection.streamCommands().xAdd(binaryRecord, options));
+ }
+
@Override
public List> claim(K key, String consumerGroup, String newOwner, XClaimOptions xClaimOptions) {
diff --git a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
index fc3ca2965..e4b905030 100644
--- a/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java
@@ -26,6 +26,7 @@ import org.reactivestreams.Publisher;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -54,6 +55,7 @@ import org.springframework.util.Assert;
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
+ * @author jinkshower
* @since 2.2
*/
public interface ReactiveStreamOperations extends HashMapperProvider {
@@ -94,6 +96,63 @@ public interface ReactiveStreamOperations extends HashMapperProvider<
return acknowledge(record.getRequiredStream(), group, record.getId());
}
+ /**
+ * Append one or more records to the stream {@code key} with the specified options.
+ *
+ * @param key the stream key.
+ * @param bodyPublisher record body {@link Publisher}.
+ * @param xAddOptions parameters for the {@literal XADD} call.
+ * @return the record Ids.
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ default Flux add (K key, Publisher extends Map extends HK, ? extends HV>> bodyPublisher,
+ XAddOptions xAddOptions) {
+ return Flux.from(bodyPublisher).flatMap(it -> add(key, it, xAddOptions));
+ }
+
+ /**
+ * Append a record to the stream {@code key} with the specified options.
+ *
+ * @param key the stream key.
+ * @param content record content as Map.
+ * @param xAddOptions parameters for the {@literal XADD} call.
+ * @return the {@link Mono} emitting the {@link RecordId}.
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ default Mono add(K key, Map extends HK, ? extends HV> content, XAddOptions xAddOptions) {
+ return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
+ }
+
+ /**
+ * Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
+ *
+ * @param record the record to append.
+ * @param xAddOptions parameters for the {@literal XADD} call.
+ * @return the {@link Mono} emitting the {@link RecordId}.
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ @SuppressWarnings("unchecked")
+ default Mono add(MapRecord record, XAddOptions xAddOptions) {
+ return add((Record) record, xAddOptions);
+ }
+
+ /**
+ * Append the record, backed by the given value, to the stream with the specified options.
+ * The value will be hashed and serialized.
+ *
+ * @param record must not be {@literal null}.
+ * @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
+ * @return the {@link Mono} emitting the {@link RecordId}.
+ * @see MapRecord
+ * @see ObjectRecord
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ Mono add(Record record, XAddOptions xAddOptions);
+
/**
* Append one or more records to the stream {@code key}.
*
diff --git a/src/main/java/org/springframework/data/redis/core/StreamOperations.java b/src/main/java/org/springframework/data/redis/core/StreamOperations.java
index 4636c5bcf..c38a524ff 100644
--- a/src/main/java/org/springframework/data/redis/core/StreamOperations.java
+++ b/src/main/java/org/springframework/data/redis/core/StreamOperations.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
@@ -53,6 +54,7 @@ import org.springframework.util.Assert;
* @author Dengliming
* @author Marcin Zielinski
* @author John Blum
+ * @author jinkshower
* @since 2.2
*/
public interface StreamOperations extends HashMapperProvider {
@@ -95,6 +97,53 @@ public interface StreamOperations extends HashMapperProvider
return acknowledge(record.getRequiredStream(), group, record.getId());
}
+ /**
+ * Append a record to the stream {@code key} with the specified options.
+ *
+ * @param key the stream key.
+ * @param content record content as Map.
+ * @param xAddOptions additional parameters for the {@literal XADD} call.
+ * @return the record Id. {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ default RecordId add(K key, Map extends HK, ? extends HV> content, XAddOptions xAddOptions) {
+ return add(StreamRecords.newRecord().in(key).ofMap(content), xAddOptions);
+ }
+
+ /**
+ * Append a record, backed by a {@link Map} holding the field/value pairs, to the stream with the specified options.
+ *
+ * @param record the record to append.
+ * @param xAddOptions additional parameters for the {@literal XADD} call.
+ * @return the record Id. {@literal null} when used in pipeline / transaction.
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ default RecordId add(MapRecord record, XAddOptions xAddOptions) {
+ return add((Record) record, xAddOptions);
+ }
+
+ /**
+ * Append the record, backed by the given value, to the stream with the specified options.
+ * The value will be hashed and serialized.
+ *
+ * @param record must not be {@literal null}.
+ * @param xAddOptions parameters for the {@literal XADD} call. Must not be {@literal null}.
+ * @return the record Id. {@literal null} when used in pipeline / transaction.
+ * @see MapRecord
+ * @see ObjectRecord
+ * @see Redis Documentation: XADD
+ * @since 3.3
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ RecordId add(Record record, XAddOptions xAddOptions);
+
/**
* Append a record to the stream {@code key}.
*
diff --git a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java
index c6040d795..27cf52f0f 100644
--- a/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/DefaultReactiveStreamOperationsIntegrationTests.java
@@ -35,6 +35,7 @@ import org.springframework.data.redis.PersonObjectFactory;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
@@ -61,6 +62,7 @@ import org.springframework.data.redis.test.extension.parametrized.ParameterizedR
* @author Mark Paluch
* @author Christoph Strobl
* @author Marcin Zielinski
+ * @author jinkshower
*/
@MethodSource("testParams")
@SuppressWarnings("unchecked")
@@ -192,6 +194,170 @@ public class DefaultReactiveStreamOperationsIntegrationTests {
.verifyComplete();
}
+ @ParameterizedRedisTest // GH-2915
+ void addMaxLenShouldLimitMessagesSize() {
+
+ K key = keyFactory.instance();
+ HK hashKey = hashKeyFactory.instance();
+ HV value = valueFactory.instance();
+
+ streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();
+
+ HV newValue = valueFactory.instance();
+ XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
+
+ RecordId messageId = streamOperations.add(key, Collections.singletonMap(hashKey, newValue), options).block();
+
+ streamOperations.range(key, Range.unbounded()).as(StepVerifier::create)
+ .consumeNextWith(actual -> {
+
+ assertThat(actual.getId()).isEqualTo(messageId);
+ assertThat(actual.getStream()).isEqualTo(key);
+ assertThat(actual).hasSize(1);
+
+ if (!(key instanceof byte[] || value instanceof byte[])) {
+ assertThat(actual.getValue()).containsEntry(hashKey, newValue);
+ }
+
+ })
+ .verifyComplete();
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMaxLenShouldLimitSimpleMessagesSize() {
+
+ assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer)
+ && !(serializer instanceof GenericJackson2JsonRedisSerializer)
+ && !(serializer instanceof JdkSerializationRedisSerializer) && !(serializer instanceof OxmSerializer));
+
+ K key = keyFactory.instance();
+ HV value = valueFactory.instance();
+
+ streamOperations.add(StreamRecords.objectBacked(value).withStreamKey(key)).block();
+
+ HV newValue = valueFactory.instance();
+ XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
+
+ RecordId messageId = streamOperations.add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block();
+
+ streamOperations.range((Class) value.getClass(), key, Range.unbounded()).as(StepVerifier::create)
+ .consumeNextWith(actual -> {
+
+ assertThat(actual.getId()).isEqualTo(messageId);
+ assertThat(actual.getStream()).isEqualTo(key);
+ assertThat(actual.getValue()).isEqualTo(newValue);
+
+ })
+ .expectNextCount(0)
+ .verifyComplete();
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMaxLenShouldLimitSimpleMessageWithRawSerializerSize() {
+
+ assumeTrue(!(serializer instanceof Jackson2JsonRedisSerializer)
+ && !(serializer instanceof GenericJackson2JsonRedisSerializer));
+
+ SerializationPair keySerializer = redisTemplate.getSerializationContext().getKeySerializationPair();
+
+ RedisSerializationContext serializationContext = RedisSerializationContext
+ . newSerializationContext(StringRedisSerializer.UTF_8).key(keySerializer)
+ .hashValue(SerializationPair.raw()).hashKey(SerializationPair.raw()).build();
+
+ ReactiveRedisTemplate raw = new ReactiveRedisTemplate<>(redisTemplate.getConnectionFactory(),
+ serializationContext);
+
+ K key = keyFactory.instance();
+ Person value = new PersonObjectFactory().instance();
+
+ raw.opsForStream().add(StreamRecords.objectBacked(value).withStreamKey(key)).block();
+
+ Person newValue = new PersonObjectFactory().instance();
+ XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
+
+ RecordId messageId = raw.opsForStream().add(StreamRecords.objectBacked(newValue).withStreamKey(key), options).block();
+
+ raw.opsForStream().range((Class) value.getClass(), key, Range.unbounded()).as(StepVerifier::create)
+ .consumeNextWith(it -> {
+
+ assertThat(it.getId()).isEqualTo(messageId);
+ assertThat(it.getStream()).isEqualTo(key);
+ assertThat(it.getValue()).isEqualTo(newValue);
+
+ })
+ .expectNextCount(0)
+ .verifyComplete();
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMinIdShouldEvictLowerIdMessages() {
+
+ K key = keyFactory.instance();
+ HK hashKey = hashKeyFactory.instance();
+ HV value = valueFactory.instance();
+
+ streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();
+ RecordId messageId1 = streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();
+
+ XAddOptions options = XAddOptions.none().minId(messageId1);
+
+ RecordId messageId2 = streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block();
+
+ streamOperations.range(key, Range.unbounded()).as(StepVerifier::create)
+ .consumeNextWith(actual -> {
+ assertThat(actual.getId()).isEqualTo(messageId1);
+ assertThat(actual.getStream()).isEqualTo(key);
+ })
+ .consumeNextWith(actual -> {
+ assertThat(actual.getId()).isEqualTo(messageId2);
+ assertThat(actual.getStream()).isEqualTo(key);
+ })
+ .expectNextCount(0)
+ .verifyComplete();
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMakeNoStreamShouldNotCreateStreamWhenNoStreamExists() {
+
+ K key = keyFactory.instance();
+ HK hashKey = hashKeyFactory.instance();
+ HV value = valueFactory.instance();
+
+ XAddOptions options = XAddOptions.makeNoStream();
+
+ streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block();
+
+ streamOperations.size(key).as(StepVerifier::create)
+ .expectNext(0L)
+ .verifyComplete();
+
+ streamOperations.range(key, Range.unbounded()).as(StepVerifier::create)
+ .expectNextCount(0L)
+ .verifyComplete();
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMakeNoStreamShouldCreateStreamWhenStreamExists() {
+
+ K key = keyFactory.instance();
+ HK hashKey = hashKeyFactory.instance();
+ HV value = valueFactory.instance();
+
+ streamOperations.add(key, Collections.singletonMap(hashKey, value)).block();
+
+ XAddOptions options = XAddOptions.makeNoStream();
+
+ streamOperations.add(key, Collections.singletonMap(hashKey, value), options).block();
+
+ streamOperations.size(key).as(StepVerifier::create)
+ .expectNext(2L)
+ .verifyComplete();
+
+ streamOperations.range(key, Range.unbounded()).as(StepVerifier::create)
+ .expectNextCount(2L)
+ .verifyComplete();
+ }
+
@ParameterizedRedisTest // DATAREDIS-864
void rangeShouldReportMessages() {
diff --git a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java
index eb2cb3307..06ab8c598 100644
--- a/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java
+++ b/src/test/java/org/springframework/data/redis/core/DefaultStreamOperationsIntegrationTests.java
@@ -32,6 +32,7 @@ import org.springframework.data.redis.ObjectFactory;
import org.springframework.data.redis.Person;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.extension.LettuceConnectionFactoryExtension;
@@ -51,6 +52,7 @@ import org.springframework.data.redis.test.extension.parametrized.ParameterizedR
* @author Mark Paluch
* @author Christoph Strobl
* @author Marcin Zielinski
+ * @author jinkshower
*/
@MethodSource("testParams")
@EnabledOnCommand("XADD")
@@ -149,6 +151,155 @@ public class DefaultStreamOperationsIntegrationTests {
assertThat(message.getValue()).isEqualTo(value);
}
+ @ParameterizedRedisTest // GH-2915
+ void addMaxLenShouldLimitMessagesSize() {
+
+ K key = keyFactory.instance();
+ HK hashKey = hashKeyFactory.instance();
+ HV value = hashValueFactory.instance();
+
+ streamOps.add(key, Collections.singletonMap(hashKey, value));
+
+ HV newValue = hashValueFactory.instance();
+
+ XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
+
+ RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, newValue), options);
+
+ List> messages = streamOps.range(key, Range.unbounded());
+
+ assertThat(messages).hasSize(1);
+
+ MapRecord message = messages.get(0);
+
+ assertThat(message.getId()).isEqualTo(messageId);
+ assertThat(message.getStream()).isEqualTo(key);
+
+ if (!(key instanceof byte[] || value instanceof byte[])) {
+ assertThat(message.getValue()).containsEntry(hashKey, newValue);
+ }
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMaxLenShouldLimitSimpleMessagesSize() {
+
+ K key = keyFactory.instance();
+ HV value = hashValueFactory.instance();
+
+ streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key));
+
+ HV newValue = hashValueFactory.instance();
+
+ XAddOptions options = XAddOptions.maxlen(1).approximateTrimming(false);
+
+ RecordId messageId = streamOps.add(StreamRecords.objectBacked(newValue).withStreamKey(key), options);
+
+ List> messages = streamOps.range((Class) value.getClass(), key, Range.unbounded());
+
+ assertThat(messages).hasSize(1);
+
+ ObjectRecord message = messages.get(0);
+
+ assertThat(message.getId()).isEqualTo(messageId);
+ assertThat(message.getStream()).isEqualTo(key);
+
+ assertThat(message.getValue()).isEqualTo(newValue);
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMinIdShouldEvictLowerIdMessages() {
+
+ K key = keyFactory.instance();
+ HK hashKey = hashKeyFactory.instance();
+ HV value = hashValueFactory.instance();
+
+ streamOps.add(key, Collections.singletonMap(hashKey, value));
+ RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value));
+
+ XAddOptions options = XAddOptions.none().minId(messageId1);
+
+ RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value), options);
+
+ List> messages = streamOps.range(key, Range.unbounded());
+
+ assertThat(messages).hasSize(2);
+
+ MapRecord message1 = messages.get(0);
+
+ assertThat(message1.getId()).isEqualTo(messageId1);
+ assertThat(message1.getStream()).isEqualTo(key);
+
+ MapRecord message2 = messages.get(1);
+
+ assertThat(message2.getId()).isEqualTo(messageId2);
+ assertThat(message2.getStream()).isEqualTo(key);
+
+ if (!(key instanceof byte[] || value instanceof byte[])) {
+ assertThat(message1.getValue()).containsEntry(hashKey, value);
+ assertThat(message2.getValue()).containsEntry(hashKey, value);
+ }
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMinIdShouldEvictLowerIdSimpleMessages() {
+
+ K key = keyFactory.instance();
+ HV value = hashValueFactory.instance();
+
+ streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key));
+ RecordId messageId1 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key));
+
+ XAddOptions options = XAddOptions.none().minId(messageId1);
+
+ RecordId messageId2 = streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options);
+
+ List> messages = streamOps.range((Class) value.getClass(), key, Range.unbounded());
+
+ assertThat(messages).hasSize(2);
+
+ ObjectRecord message1 = messages.get(0);
+
+ assertThat(message1.getId()).isEqualTo(messageId1);
+ assertThat(message1.getStream()).isEqualTo(key);
+ assertThat(message1.getValue()).isEqualTo(value);
+
+ ObjectRecord message2 = messages.get(1);
+
+ assertThat(message2.getId()).isEqualTo(messageId2);
+ assertThat(message2.getStream()).isEqualTo(key);
+ assertThat(message2.getValue()).isEqualTo(value);
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMakeNoStreamShouldNotCreateStreamWhenNoStreamExists() {
+
+ K key = keyFactory.instance();
+ HV value = hashValueFactory.instance();
+
+ XAddOptions options = XAddOptions.makeNoStream();
+
+ streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options);
+
+ assertThat(streamOps.size(key)).isZero();
+ assertThat(streamOps.range(key, Range.unbounded())).isEmpty();
+ }
+
+ @ParameterizedRedisTest // GH-2915
+ void addMakeNoStreamShouldCreateStreamWhenStreamExists() {
+
+ K key = keyFactory.instance();
+ HV value = hashValueFactory.instance();
+
+ streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key));
+
+ XAddOptions options = XAddOptions.makeNoStream();
+
+ streamOps.add(StreamRecords.objectBacked(value).withStreamKey(key), options);
+
+ assertThat(streamOps.size(key)).isEqualTo(2);
+ assertThat(streamOps.range(key, Range.unbounded())).hasSize(2);
+ }
+
@ParameterizedRedisTest // DATAREDIS-864
void simpleMessageReadWriteSymmetry() {