From c19acb0b45e01da82937f43b34dbeb80a4a9cd23 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 13 Dec 2023 11:33:00 +0100 Subject: [PATCH] Add tests to verify symmetric usage of scan cursor id. See: #2796 --- .../connection/jedis/JedisKeyCommands.java | 4 +- .../jedis/JedisConnectionUnitTests.java | 96 +++++++++- .../lettuce/LettuceConnectionUnitTests.java | 177 +++++++++++++++++- 3 files changed, 270 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java index 138dbf268..258cf4768 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisKeyCommands.java @@ -165,9 +165,9 @@ class JedisKeyCommands implements RedisKeyCommands { } if (type != null) { - result = connection.getJedis().scan(Long.toUnsignedString(cursorId).getBytes(), params, type); + result = connection.getJedis().scan(JedisConverters.toBytes(Long.toUnsignedString(cursorId)), params, type); } else { - result = connection.getJedis().scan(Long.toUnsignedString(cursorId).getBytes(), params); + result = connection.getJedis().scan(JedisConverters.toBytes(Long.toUnsignedString(cursorId)), params); } return new ScanIteration<>(Long.parseUnsignedLong(result.getCursor()), result.getResult()); diff --git a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java index 920a6e651..7172f60e7 100644 --- a/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/jedis/JedisConnectionUnitTests.java @@ -27,19 +27,22 @@ import redis.clients.jedis.resps.ScanResult; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; - +import org.mockito.ArgumentCaptor; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.dao.InvalidDataAccessResourceUsageException; import org.springframework.data.redis.connection.AbstractConnectionUnitTestBase; import org.springframework.data.redis.connection.RedisServerCommands.ShutdownOption; import org.springframework.data.redis.connection.zset.Tuple; import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.KeyScanOptions; import org.springframework.data.redis.core.ScanOptions; /** @@ -179,6 +182,23 @@ class JedisConnectionUnitTests { verify(jedisSpy, times(1)).disconnect(); } + @Test // GH-2796 + void scanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); + doReturn(new ScanResult<>(cursorId, List.of("spring".getBytes()))).when(jedisSpy).scan(any(byte[].class), + any(ScanParams.class)); + + Cursor cursor = connection.scan(KeyScanOptions.NONE); + cursor.next(); // initial value + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + verify(jedisSpy, times(2)).scan(captor.capture(), any(ScanParams.class)); + assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId); + } + @Test // DATAREDIS-531 public void sScanShouldKeepTheConnectionOpen() { @@ -202,6 +222,23 @@ class JedisConnectionUnitTests { verify(jedisSpy, times(1)).disconnect(); } + @Test // GH-2796 + void sScanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); + doReturn(new ScanResult<>(cursorId, List.of("spring".getBytes()))).when(jedisSpy).sscan(any(byte[].class), + any(byte[].class), any(ScanParams.class)); + + Cursor cursor = connection.setCommands().sScan("spring".getBytes(), ScanOptions.NONE); + cursor.next(); // initial value + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + verify(jedisSpy, times(2)).sscan(any(byte[].class), captor.capture(), any(ScanParams.class)); + assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId); + } + @Test // DATAREDIS-531 public void zScanShouldKeepTheConnectionOpen() { @@ -225,6 +262,23 @@ class JedisConnectionUnitTests { verify(jedisSpy, times(1)).disconnect(); } + @Test // GH-2796 + void zScanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); + doReturn(new ScanResult<>(cursorId, List.of(new redis.clients.jedis.resps.Tuple("spring", 1D)))).when(jedisSpy).zscan(any(byte[].class), + any(byte[].class), any(ScanParams.class)); + + Cursor cursor = connection.zSetCommands().zScan("spring".getBytes(), ScanOptions.NONE); + cursor.next(); // initial value + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + verify(jedisSpy, times(2)).zscan(any(byte[].class), captor.capture(), any(ScanParams.class)); + assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId); + } + @Test // DATAREDIS-531 public void hScanShouldKeepTheConnectionOpen() { @@ -248,6 +302,23 @@ class JedisConnectionUnitTests { verify(jedisSpy, times(1)).disconnect(); } + @Test // GH-2796 + void hScanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); + doReturn(new ScanResult<>(cursorId, List.of(Map.entry("spring".getBytes(), "data".getBytes())))).when(jedisSpy).hscan(any(byte[].class), + any(byte[].class), any(ScanParams.class)); + + Cursor> cursor = connection.hashCommands().hScan("spring".getBytes(), ScanOptions.NONE); + cursor.next(); // initial value + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + verify(jedisSpy, times(2)).hscan(any(byte[].class), captor.capture(), any(ScanParams.class)); + assertThat(captor.getAllValues()).map(String::new).containsExactly("0", cursorId); + } + @Test // DATAREDIS-714 void doesNotSelectDbWhenCurrentDbMatchesDesiredOne() { @@ -369,6 +440,29 @@ class JedisConnectionUnitTests { .isThrownBy(() -> super.hScanShouldCloseTheConnectionWhenCursorIsClosed()); } + @Test + @Disabled("scan not supported in pipeline") + void scanShouldOperateUponUnsigned64BitCursorId() { + + } + + @Test + @Disabled("scan not supported in pipeline") + void sScanShouldOperateUponUnsigned64BitCursorId() { + + } + + @Test + @Disabled("scan not supported in pipeline") + void zScanShouldOperateUponUnsigned64BitCursorId() { + + } + + @Test + @Disabled("scan not supported in pipeline") + void hScanShouldOperateUponUnsigned64BitCursorId() { + + } } } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java index 1f88f8211..6aba05092 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionUnitTests.java @@ -18,15 +18,13 @@ package org.springframework.data.redis.connection.lettuce; import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; -import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisFuture; -import io.lettuce.core.XAddArgs; -import io.lettuce.core.XClaimArgs; +import io.lettuce.core.*; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.output.ScanOutput; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.Command; @@ -34,8 +32,12 @@ import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandType; import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -48,6 +50,9 @@ import org.springframework.data.redis.connection.RedisServerCommands.ShutdownOpt import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.zset.Tuple; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.KeyScanOptions; import org.springframework.test.util.ReflectionTestUtils; /** @@ -247,6 +252,146 @@ public class LettuceConnectionUnitTests { assertThat(args.getValue()).extracting("nomkstream").isEqualTo(true); } + @Test // GH-2796 + void scanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + KeyScanCursor sc = new KeyScanCursor<>() { + @Override + public List getKeys() { + return List.of("spring".getBytes()); + } + }; + sc.setCursor(cursorId); + sc.setFinished(false); + + Command> command = new Command<>(new LettuceConnection.CustomCommandType("SCAN"), + new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) { + @Override + protected void setOutput(ByteBuffer bytes) { + + } + }); + AsyncCommand> future = new AsyncCommand<>(command); + future.complete(); + + when(asyncCommandsMock.scan(any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future); + + Cursor cursor = connection.scan(KeyScanOptions.NONE); + cursor.next(); //initial + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + ArgumentCaptor captor = ArgumentCaptor.forClass(ScanCursor.class); + verify(asyncCommandsMock, times(2)).scan(captor.capture(), any(ScanArgs.class)); + assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId); + } + + @Test // GH-2796 + void sScanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + ValueScanCursor sc = new ValueScanCursor<>() { + @Override + public List getValues() { + return List.of("spring".getBytes()); + } + }; + sc.setCursor(cursorId); + sc.setFinished(false); + + Command> command = new Command<>(new LettuceConnection.CustomCommandType("SSCAN"), + new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) { + @Override + protected void setOutput(ByteBuffer bytes) { + + } + }); + AsyncCommand> future = new AsyncCommand<>(command); + future.complete(); + + when(asyncCommandsMock.sscan(any(byte[].class), any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future); + + Cursor cursor = connection.setCommands().sScan("key".getBytes(), KeyScanOptions.NONE); + cursor.next(); //initial + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + ArgumentCaptor captor = ArgumentCaptor.forClass(ScanCursor.class); + verify(asyncCommandsMock, times(2)).sscan(any(byte[].class), captor.capture(), any(ScanArgs.class)); + assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId); + } + + @Test // GH-2796 + void zScanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + ScoredValueScanCursor sc = new ScoredValueScanCursor<>() { + @Override + public List> getValues() { + return List.of(ScoredValue.just(10D, "spring".getBytes())); + } + }; + sc.setCursor(cursorId); + sc.setFinished(false); + + Command> command = new Command<>(new LettuceConnection.CustomCommandType("ZSCAN"), + new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) { + @Override + protected void setOutput(ByteBuffer bytes) { + + } + }); + AsyncCommand> future = new AsyncCommand<>(command); + future.complete(); + + when(asyncCommandsMock.zscan(any(byte[].class), any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future); + + Cursor cursor = connection.zSetCommands().zScan("key".getBytes(), KeyScanOptions.NONE); + cursor.next(); //initial + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + ArgumentCaptor captor = ArgumentCaptor.forClass(ScanCursor.class); + verify(asyncCommandsMock, times(2)).zscan(any(byte[].class), captor.capture(), any(ScanArgs.class)); + assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId); + } + + @Test // GH-2796 + void hScanShouldOperateUponUnsigned64BitCursorId() { + + String cursorId = "9286422431637962824"; + MapScanCursor sc = new MapScanCursor<>() { + @Override + public Map getMap() { + return Map.of("spring".getBytes(), "data".getBytes()); + } + }; + sc.setCursor(cursorId); + sc.setFinished(false); + + Command> command = new Command<>(new LettuceConnection.CustomCommandType("HSCAN"), + new ScanOutput<>(ByteArrayCodec.INSTANCE, sc) { + @Override + protected void setOutput(ByteBuffer bytes) { + + } + }); + AsyncCommand> future = new AsyncCommand<>(command); + future.complete(); + + when(asyncCommandsMock.hscan(any(byte[].class), any(ScanCursor.class),any(ScanArgs.class))).thenReturn(future, future); + + Cursor> cursor = connection.hashCommands().hScan("key".getBytes(), KeyScanOptions.NONE); + cursor.next(); //initial + assertThat(cursor.getCursorId()).isEqualTo(Long.parseUnsignedLong(cursorId)); + + cursor.next(); // fetch next + ArgumentCaptor captor = ArgumentCaptor.forClass(ScanCursor.class); + verify(asyncCommandsMock, times(2)).hscan(any(byte[].class), captor.capture(), any(ScanArgs.class)); + assertThat(captor.getAllValues()).map(ScanCursor::getCursor).containsExactly("0", cursorId); + } + } public static class LettucePipelineConnectionUnitTests extends BasicUnitTests { @@ -304,5 +449,29 @@ public class LettuceConnectionUnitTests { connection.getClientName(); verify(asyncCommandsMock).clientGetname(); } + + @Test + @Disabled("scan not supported in pipeline") + void scanShouldOperateUponUnsigned64BitCursorId() { + + } + + @Test + @Disabled("scan not supported in pipeline") + void sScanShouldOperateUponUnsigned64BitCursorId() { + + } + + @Test + @Disabled("scan not supported in pipeline") + void zScanShouldOperateUponUnsigned64BitCursorId() { + + } + + @Test + @Disabled("scan not supported in pipeline") + void hScanShouldOperateUponUnsigned64BitCursorId() { + + } } }