Polishing.

StreamInfo subtypes now implement Streamable. Switch from stream mapping to for/each style to avoid Java Stream creation.

Introduce getRequired(…) methods to improve nullability experience.

Original Pull Request: #2276
This commit is contained in:
Mark Paluch
2022-02-23 14:22:49 +01:00
committed by Christoph Strobl
parent 5c4c9e029f
commit f6eaa40019
12 changed files with 121 additions and 68 deletions

View File

@@ -26,7 +26,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
@@ -38,6 +37,7 @@ import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.NumberUtils;
/**
@@ -69,7 +69,7 @@ class StreamConverters {
return new org.springframework.data.redis.connection.stream.PendingMessage(id, consumer,
Duration.ofMillis(it.getMsSinceLastDelivery()), it.getRedeliveryCount());
}).collect(Collectors.toList());
}).toList();
return new org.springframework.data.redis.connection.stream.PendingMessages(groupName, messages);
@@ -111,18 +111,10 @@ class StreamConverters {
return (it) -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBytes(it.getBody());
}
static Converter<List<StreamMessage<byte[], byte[]>>, List<ByteRecord>> byteRecordListConverter() {
return new ListConverter<>(byteRecordConverter());
}
static Converter<StreamMessage<byte[], byte[]>, RecordId> messageToIdConverter() {
return (it) -> RecordId.of(it.getId());
}
static Converter<List<StreamMessage<byte[], byte[]>>, List<RecordId>> messagesToIds() {
return MESSAGEs_TO_IDs;
}
/**
* Convert the raw Lettuce xpending result to {@link PendingMessages}.
*
@@ -157,7 +149,7 @@ class StreamConverters {
* @param value dont't get me started om this.
* @return preconverted values that Lettuce parsers are able to understand \ö/.
*/
private static Object preConvertNativeValues(Object value) {
private static Object preConvertNativeValues(@Nullable Object value) {
if (value instanceof ByteBuffer || value instanceof byte[]) {

View File

@@ -17,8 +17,8 @@ package org.springframework.data.redis.connection.stream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializer;
@@ -65,16 +65,17 @@ public interface ByteBufferRecord extends MapRecord<ByteBuffer, ByteBuffer, Byte
* @param valueSerializer can be {@literal null} if the values suite already the target format.
* @return new {@link MapRecord} holding the deserialized values.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
default <K, HK, HV> MapRecord<K, HK, HV> deserialize(@Nullable RedisSerializer<? extends K> streamSerializer,
@Nullable RedisSerializer<? extends HK> fieldSerializer,
@Nullable RedisSerializer<? extends HV> valueSerializer) {
return mapEntries(it -> Collections.<HK, HV> singletonMap(
fieldSerializer != null ? fieldSerializer.deserialize(ByteUtils.getBytes(it.getKey())) : (HK) it.getKey(),
valueSerializer != null ? valueSerializer.deserialize(ByteUtils.getBytes(it.getValue())) : (HV) it.getValue())
.entrySet().iterator().next()).withStreamKey(
streamSerializer != null ? streamSerializer.deserialize(ByteUtils.getBytes(getStream())) : (K) getStream());
return mapEntries(it -> {
Map<HK, HV> map = Collections.singletonMap(StreamSerialization.deserialize(fieldSerializer, it.getKey()),
StreamSerialization.deserialize(valueSerializer, it.getValue()));
return map.entrySet().iterator().next();
}).withStreamKey(StreamSerialization.deserialize(streamSerializer, getRequiredStream()));
}
/**
@@ -84,7 +85,7 @@ public interface ByteBufferRecord extends MapRecord<ByteBuffer, ByteBuffer, Byte
* @return new instance of {@link ByteRecord}.
*/
static ByteBufferRecord of(MapRecord<ByteBuffer, ByteBuffer, ByteBuffer> source) {
return StreamRecords.newRecord().in(source.getStream()).withId(source.getId()).ofBuffer(source.getValue());
return StreamRecords.newRecord().in(source.getRequiredStream()).withId(source.getId()).ofBuffer(source.getValue());
}
/**
@@ -97,10 +98,13 @@ public interface ByteBufferRecord extends MapRecord<ByteBuffer, ByteBuffer, Byte
default <OV> ObjectRecord<ByteBuffer, OV> toObjectRecord(
HashMapper<? super OV, ? super ByteBuffer, ? super ByteBuffer> mapper) {
Map<byte[], byte[]> targetMap = getValue().entrySet().stream().collect(
Collectors.toMap(entry -> ByteUtils.getBytes(entry.getKey()), entry -> ByteUtils.getBytes(entry.getValue())));
Map<ByteBuffer, ByteBuffer> value = getValue();
Map<byte[], byte[]> targetMap = new LinkedHashMap<>(value.size());
value.forEach((k, v) -> targetMap.put(ByteUtils.getBytes(k), ByteUtils.getBytes(v)));
return Record.<ByteBuffer, OV> of((OV) (mapper).fromHash((Map) targetMap)).withId(getId())
.withStreamKey(getStream());
.withStreamKey(getRequiredStream());
}
}

View File

@@ -16,8 +16,8 @@
package org.springframework.data.redis.connection.stream;
import java.util.Collections;
import java.util.Map;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
@@ -65,11 +65,13 @@ public interface ByteRecord extends MapRecord<byte[], byte[], byte[]> {
@Nullable RedisSerializer<? extends HK> fieldSerializer,
@Nullable RedisSerializer<? extends HV> valueSerializer) {
return mapEntries(it -> Collections
.<HK, HV> singletonMap(fieldSerializer != null ? fieldSerializer.deserialize(it.getKey()) : (HK) it.getKey(),
valueSerializer != null ? valueSerializer.deserialize(it.getValue()) : (HV) it.getValue())
.entrySet().iterator().next())
.withStreamKey(streamSerializer != null ? streamSerializer.deserialize(getStream()) : (K) getStream());
return mapEntries(it -> {
Map<HK, HV> map = Collections.singletonMap(StreamSerialization.deserialize(fieldSerializer, it.getKey()),
StreamSerialization.deserialize(valueSerializer, it.getValue()));
return map.entrySet().iterator().next();
}).withStreamKey(StreamSerialization.deserialize(streamSerializer, getRequiredStream()));
}
/**
@@ -79,6 +81,6 @@ public interface ByteRecord extends MapRecord<byte[], byte[], byte[]> {
* @return new instance of {@link ByteRecord}.
*/
static ByteRecord of(MapRecord<byte[], byte[], byte[]> source) {
return StreamRecords.newRecord().in(source.getStream()).withId(source.getId()).ofBytes(source.getValue());
return StreamRecords.newRecord().in(source.getRequiredStream()).withId(source.getId()).ofBytes(source.getValue());
}
}

View File

@@ -78,7 +78,7 @@ public interface MapRecord<S, K, V> extends Record<S, Map<K, V>>, Iterable<Map.E
mapped.put(mappedPair.getKey(), mappedPair.getValue());
});
return StreamRecords.newRecord().in(getStream()).withId(getId()).ofMap(mapped);
return StreamRecords.newRecord().in(getRequiredStream()).withId(getId()).ofMap(mapped);
}
/**
@@ -121,7 +121,7 @@ public interface MapRecord<S, K, V> extends Record<S, Map<K, V>>, Iterable<Map.E
StreamSerialization.serialize(valueSerializer, it.getValue())).entrySet().iterator().next());
return StreamRecords.newRecord() //
.in(streamSerializer != null ? streamSerializer.serialize(getStream()) : (byte[]) getStream()) //
.in(StreamSerialization.serialize(streamSerializer, getRequiredStream())) //
.withId(getId()) //
.ofBytes(binaryMap.getValue());
}
@@ -136,6 +136,6 @@ public interface MapRecord<S, K, V> extends Record<S, Map<K, V>>, Iterable<Map.E
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
default <OV> ObjectRecord<S, OV> toObjectRecord(HashMapper<? super OV, ? super K, ? super V> mapper) {
return Record.<S, OV> of((OV) mapper.fromHash((Map) getValue())).withId(getId()).withStreamKey(getStream());
return Record.<S, OV> of((OV) mapper.fromHash((Map) getValue())).withId(getId()).withStreamKey(getRequiredStream());
}
}

View File

@@ -39,6 +39,24 @@ public interface Record<S, V> {
@Nullable
S getStream();
/**
* The id of the stream (aka the {@literal key} in Redis).
*
* @return can be {@literal null}.
* @throws IllegalStateException if the stream is {@literal null}.
* @since 3.0
*/
default S getRequiredStream() {
S stream = getStream();
if (stream == null) {
throw new IllegalStateException("Stream is not available");
}
return stream;
}
/**
* The id of the entry inside the stream.
*

View File

@@ -27,6 +27,7 @@ import java.util.function.Function;
import java.util.stream.Stream;
import org.springframework.data.redis.connection.convert.Converters;
import org.springframework.data.util.Streamable;
import org.springframework.lang.Nullable;
/**
@@ -67,6 +68,16 @@ public class StreamInfo {
return value == null ? null : type.cast(value);
}
<T> T getRequired(String entry, Class<T> type) {
T value = get(entry, type);
if (value == null) {
throw new IllegalStateException("Value for entry '%s' is null.".formatted(entry));
}
return value;
}
@Nullable
<I, T> T getAndMap(String entry, Class<I> type, Function<I, T> f) {
@@ -124,8 +135,8 @@ public class StreamInfo {
*
* @return
*/
public Long streamLength() {
return get("length", Long.class);
public long streamLength() {
return getRequired("length", Long.class);
}
/**
@@ -133,8 +144,8 @@ public class StreamInfo {
*
* @return
*/
public Long radixTreeKeySize() {
return get("radix-tree-keys", Long.class);
public long radixTreeKeySize() {
return getRequired("radix-tree-keys", Long.class);
}
/**
@@ -142,8 +153,8 @@ public class StreamInfo {
*
* @return
*/
public Long radixTreeNodesSize() {
return get("radix-tree-nodes", Long.class);
public long radixTreeNodesSize() {
return getRequired("radix-tree-nodes", Long.class);
}
/**
@@ -151,8 +162,8 @@ public class StreamInfo {
*
* @return
*/
public Long groupCount() {
return get("groups", Long.class);
public long groupCount() {
return getRequired("groups", Long.class);
}
/**
@@ -162,7 +173,7 @@ public class StreamInfo {
* @return
*/
public String lastGeneratedId() {
return get("last-generated-id", String.class);
return getRequired("last-generated-id", String.class);
}
/**
@@ -209,7 +220,7 @@ public class StreamInfo {
*
* @author Christoph Strobl
*/
public static class XInfoGroups {
public static class XInfoGroups implements Streamable<XInfoGroup> {
private final List<XInfoGroup> groupInfoList;
@@ -253,6 +264,7 @@ public class StreamInfo {
/**
* @return {@literal true} if no groups associated.
*/
@Override
public boolean isEmpty() {
return groupInfoList.isEmpty();
}
@@ -262,6 +274,7 @@ public class StreamInfo {
*
* @return
*/
@Override
public Iterator<XInfoGroup> iterator() {
return groupInfoList.iterator();
}
@@ -281,6 +294,7 @@ public class StreamInfo {
*
* @return
*/
@Override
public Stream<XInfoGroup> stream() {
return groupInfoList.stream();
}
@@ -290,6 +304,7 @@ public class StreamInfo {
*
* @param action
*/
@Override
public void forEach(Consumer<? super XInfoGroup> action) {
groupInfoList.forEach(action);
}
@@ -317,7 +332,7 @@ public class StreamInfo {
* @return
*/
public String groupName() {
return get("name", String.class);
return getRequired("name", String.class);
}
/**
@@ -326,7 +341,7 @@ public class StreamInfo {
* @return
*/
public Long consumerCount() {
return get("consumers", Long.class);
return getRequired("consumers", Long.class);
}
/**
@@ -335,7 +350,7 @@ public class StreamInfo {
* @return
*/
public Long pendingCount() {
return get("pending", Long.class);
return getRequired("pending", Long.class);
}
/**
@@ -344,11 +359,11 @@ public class StreamInfo {
* @return
*/
public String lastDeliveredId() {
return get("last-delivered-id", String.class);
return getRequired("last-delivered-id", String.class);
}
}
public static class XInfoConsumers {
public static class XInfoConsumers implements Streamable<XInfoConsumer> {
private final List<XInfoConsumer> consumerInfoList;
@@ -386,6 +401,7 @@ public class StreamInfo {
/**
* @return {@literal true} if no groups associated.
*/
@Override
public boolean isEmpty() {
return consumerInfoList.isEmpty();
}
@@ -395,6 +411,7 @@ public class StreamInfo {
*
* @return
*/
@Override
public Iterator<XInfoConsumer> iterator() {
return consumerInfoList.iterator();
}
@@ -414,6 +431,7 @@ public class StreamInfo {
*
* @return
*/
@Override
public Stream<XInfoConsumer> stream() {
return consumerInfoList.stream();
}
@@ -423,6 +441,7 @@ public class StreamInfo {
*
* @param action
*/
@Override
public void forEach(Consumer<? super XInfoConsumer> action) {
consumerInfoList.forEach(action);
}
@@ -458,7 +477,7 @@ public class StreamInfo {
* @return
*/
public String consumerName() {
return get("name", String.class);
return getRequired("name", String.class);
}
/**
@@ -466,8 +485,8 @@ public class StreamInfo {
*
* @return
*/
public Long idleTimeMs() {
return get("idle", Long.class);
public long idleTimeMs() {
return getRequired("idle", Long.class);
}
/**
@@ -484,8 +503,8 @@ public class StreamInfo {
*
* @return
*/
public Long pendingCount() {
return get("pending", Long.class);
public long pendingCount() {
return getRequired("pending", Long.class);
}
}
}

View File

@@ -232,8 +232,8 @@ public class StreamRecords {
*/
static class MapBackedRecord<S, K, V> implements MapRecord<S, K, V> {
private @Nullable S stream;
private RecordId recordId;
private final @Nullable S stream;
private final RecordId recordId;
private final Map<K, V> kvMap;
MapBackedRecord(@Nullable S stream, RecordId recordId, Map<K, V> kvMap) {
@@ -249,7 +249,6 @@ public class StreamRecords {
return stream;
}
@Nullable
@Override
public RecordId getId() {
return recordId;
@@ -322,7 +321,7 @@ public class StreamRecords {
*/
static class ByteMapBackedRecord extends MapBackedRecord<byte[], byte[], byte[]> implements ByteRecord {
ByteMapBackedRecord(byte[] stream, RecordId recordId, Map<byte[], byte[]> map) {
ByteMapBackedRecord(@Nullable byte[] stream, RecordId recordId, Map<byte[], byte[]> map) {
super(stream, recordId, map);
}
@@ -343,7 +342,8 @@ public class StreamRecords {
static class ByteBufferMapBackedRecord extends MapBackedRecord<ByteBuffer, ByteBuffer, ByteBuffer>
implements ByteBufferRecord {
ByteBufferMapBackedRecord(ByteBuffer stream, RecordId recordId, Map<ByteBuffer, ByteBuffer> map) {
ByteBufferMapBackedRecord(@Nullable ByteBuffer stream, @Nullable RecordId recordId,
Map<ByteBuffer, ByteBuffer> map) {
super(stream, recordId, map);
}
@@ -363,7 +363,7 @@ public class StreamRecords {
*/
static class StringMapBackedRecord extends MapBackedRecord<String, String, String> implements StringRecord {
StringMapBackedRecord(String stream, RecordId recordId, Map<String, String> stringStringMap) {
StringMapBackedRecord(@Nullable String stream, @Nullable RecordId recordId, Map<String, String> stringStringMap) {
super(stream, recordId, stringStringMap);
}

View File

@@ -15,7 +15,10 @@
*/
package org.springframework.data.redis.connection.stream;
import java.nio.ByteBuffer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
/**
@@ -34,11 +37,27 @@ class StreamSerialization {
* @param value the value to serialize.
* @return the serialized (binary) representation of {@code value}.
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
static byte[] serialize(@Nullable RedisSerializer<?> serializer, Object value) {
return canSerialize(serializer, value) ? ((RedisSerializer) serializer).serialize(value) : (byte[]) value;
}
/**
* Deserialize the {@code value using the optional {@link RedisSerializer}. If no conversion is possible, return
* {@code value}. @param serializer @param value @param <T> @return
*/
static <T> T deserialize(@Nullable RedisSerializer<? extends T> serializer, ByteBuffer value) {
return deserialize(serializer, ByteUtils.getBytes(value));
}
/**
* Deserialize the {@code value using the optional {@link RedisSerializer}. If no conversion is possible, return
* {@code value}. @param serializer @param value @param <T> @return
*/
static <T> T deserialize(@Nullable RedisSerializer<? extends T> serializer, byte[] value) {
return serializer != null ? serializer.deserialize(value) : (T) value;
}
/**
* Returns whether the given {@link RedisSerializer} is capable of serializing the {@code value} to {@literal byte[]}.
*
@@ -47,7 +66,7 @@ class StreamSerialization {
* @return {@literal true} if the given {@link RedisSerializer} is capable of serializing the {@code value} to
* {@literal byte[]}.
*/
private static boolean canSerialize(@Nullable RedisSerializer<?> serializer, Object value) {
private static boolean canSerialize(@Nullable RedisSerializer<?> serializer, @Nullable Object value) {
return serializer != null && (value == null || serializer.canSerialize(value.getClass()));
}
}

View File

@@ -54,6 +54,6 @@ public interface StringRecord extends MapRecord<String, String, String> {
* @return new instance of {@link StringRecord}.
*/
static StringRecord of(MapRecord<String, String, String> source) {
return StreamRecords.newRecord().in(source.getStream()).withId(source.getId()).ofStrings(source.getValue());
return StreamRecords.newRecord().in(source.getRequiredStream()).withId(source.getId()).ofStrings(source.getValue());
}
}

View File

@@ -77,7 +77,7 @@ public interface ReactiveStreamOperations<K, HK, HV> extends HashMapperProvider<
* @see <a href="https://redis.io/commands/xack">Redis Documentation: XACK</a>
*/
default Mono<Long> acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getStream(), group, record.getId());
return acknowledge(record.getRequiredStream(), group, record.getId());
}
/**

View File

@@ -106,12 +106,11 @@ class StreamObjectMapper {
@SuppressWarnings({ "unchecked", "rawtypes" })
static <K, V, HK, HV> MapRecord<K, HK, HV> toMapRecord(HashMapperProvider<HK, HV> provider, Record<K, V> source) {
if (source instanceof ObjectRecord) {
ObjectRecord entry = ((ObjectRecord) source);
if (source instanceof ObjectRecord entry) {
if (entry.getValue() instanceof Map) {
return StreamRecords.newRecord().in(source.getStream()).withId(source.getId()).ofMap((Map) entry.getValue());
return StreamRecords.newRecord().in(source.getRequiredStream()).withId(source.getId())
.ofMap((Map) entry.getValue());
}
return entry.toMapRecord(provider.getHashMapper(entry.getValue().getClass()));
@@ -122,7 +121,7 @@ class StreamObjectMapper {
}
return Record.of(((HashMapper) provider.getHashMapper(source.getClass())).toHash(source))
.withStreamKey(source.getStream());
.withStreamKey(source.getRequiredStream());
}
/**

View File

@@ -77,7 +77,7 @@ public interface StreamOperations<K, HK, HV> extends HashMapperProvider<HK, HV>
* @see <a href="https://redis.io/commands/xack">Redis Documentation: XACK</a>
*/
default Long acknowledge(String group, Record<K, ?> record) {
return acknowledge(record.getStream(), group, record.getId());
return acknowledge(record.getRequiredStream(), group, record.getId());
}
/**