DATAREDIS-659 - Polishing.
Update JavaDoc and assert nullability contract. Use slowlog-max-len in tests instead of maxclients avoiding potential ulimit errors. Original Pull Request: #253
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -15,3 +15,4 @@ build
|
||||
out
|
||||
work
|
||||
*.rdb
|
||||
*.aof
|
||||
|
||||
@@ -23,102 +23,163 @@ import java.util.Properties;
|
||||
import org.springframework.data.redis.core.types.RedisClientInfo;
|
||||
|
||||
/**
|
||||
* Redis Server commands executed in cluster environment using reactive infrastructure.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @since 2.0
|
||||
*/
|
||||
public interface ReactiveClusterServerCommands extends ReactiveServerCommands {
|
||||
|
||||
/**
|
||||
* Start an {@literal Append Only File} rewrite process on the specific server.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#bgReWriteAof()
|
||||
*/
|
||||
Mono<String> bgReWriteAof(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Start background saving of db on server.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return {@link Mono} indicating command received by server. Operation success needs to be checked via
|
||||
* {@link #lastSave(RedisClusterNode)}.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#bgSave()
|
||||
*/
|
||||
Mono<String> bgSave(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Get time unix timestamp of last successful {@link #bgSave()} operation in seconds.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return
|
||||
* @return @return {@link Mono} wrapping unix timestamp.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#lastSave()
|
||||
*/
|
||||
Mono<Long> lastSave(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Synchronous save current db snapshot on server.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#save()
|
||||
*/
|
||||
Mono<String> save(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Get the total number of available keys in currently selected database.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return
|
||||
* @return {@link Mono} wrapping number of keys.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#dbSize()
|
||||
*/
|
||||
Mono<Long> dbSize(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* @param node must not be {@literal null}.
|
||||
* Delete all keys of the currently selected database.
|
||||
*
|
||||
* @param node must not be {@literal null}. {@link Mono} indicating command completion.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#flushDb()
|
||||
*/
|
||||
Mono<String> flushDb(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Delete all <b>all keys</b> from <b>all databases</b>.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#flushAll()
|
||||
*/
|
||||
Mono<String> flushAll(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Load {@literal default} server information like
|
||||
* <ul>
|
||||
* <li>memory</li>
|
||||
* <li>cpu utilization</li>
|
||||
* <li>replication</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return
|
||||
* @return {@link Mono} wrapping server information.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#info()
|
||||
*/
|
||||
Mono<Properties> info(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Load server information for given {@code selection}.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @param section
|
||||
* @return
|
||||
* @param section must not be {@literal null} nor {@literal empty}.
|
||||
* @return {@link Mono} wrapping server information of given {@code section}.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @throws IllegalArgumentException when section is {@literal null} or {@literal empty}.
|
||||
* @see RedisServerCommands#info(String)
|
||||
*/
|
||||
Mono<Properties> info(RedisClusterNode node, String section);
|
||||
|
||||
/**
|
||||
* Load configuration parameters for given {@code pattern} from server.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @param pattern
|
||||
* @return
|
||||
* @param pattern must not be {@literal null}.
|
||||
* @return {@link Mono} wrapping configuration parameters matching given {@code pattern}.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code pattern} is {@literal null} or {@literal empty}.
|
||||
* @see RedisServerCommands#getConfig(String)
|
||||
*/
|
||||
Mono<Properties> getConfig(RedisClusterNode node, String pattern);
|
||||
|
||||
/**
|
||||
* Set server configuration for {@code param} to {@code value}.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @param param
|
||||
* @param value
|
||||
* @param param must not be {@literal null} nor {@literal empty}.
|
||||
* @param value must not be {@literal null} nor {@literal empty}.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code pattern} / {@code value} is {@literal null} or {@literal empty}.
|
||||
* @see RedisServerCommands#setConfig(String, String)
|
||||
*/
|
||||
Mono<String> setConfig(RedisClusterNode node, String param, String value);
|
||||
|
||||
/**
|
||||
* Reset statistic counters on server. <br>
|
||||
* Counters can be retrieved using {@link #info()}.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#resetConfigStats()
|
||||
*/
|
||||
Mono<String> resetConfigStats(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Request server timestamp using {@code TIME} command.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return
|
||||
* @return {@link Mono} wrapping current server time in milliseconds.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#time()
|
||||
*/
|
||||
Mono<Long> time(RedisClusterNode node);
|
||||
|
||||
/**
|
||||
* Request information and statistics about connected clients.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return
|
||||
* @return {@link Flux} emitting {@link RedisClientInfo} objects.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisServerCommands#getClientList()
|
||||
*/
|
||||
Flux<RedisClientInfo> getClientList(RedisClusterNode node);
|
||||
|
||||
@@ -24,13 +24,6 @@ import reactor.core.publisher.Mono;
|
||||
*/
|
||||
public interface ReactiveRedisClusterConnection extends ReactiveRedisConnection {
|
||||
|
||||
/**
|
||||
* @param node must not be {@literal null}.
|
||||
* @return
|
||||
* @see RedisConnectionCommands#ping()
|
||||
*/
|
||||
Mono<String> ping(RedisClusterNode node);
|
||||
|
||||
@Override
|
||||
ReactiveClusterKeyCommands keyCommands();
|
||||
|
||||
@@ -60,4 +53,14 @@ public interface ReactiveRedisClusterConnection extends ReactiveRedisConnection
|
||||
|
||||
@Override
|
||||
ReactiveClusterServerCommands serverCommands();
|
||||
|
||||
/**
|
||||
* Test the connection to a specific Redis cluster node.
|
||||
*
|
||||
* @param node must not be {@literal null}.
|
||||
* @return {@link Mono} wrapping server response message - usually {@literal PONG}.
|
||||
* @throws IllegalArgumentException when {@code node} is {@literal null}.
|
||||
* @see RedisConnectionCommands#ping()
|
||||
*/
|
||||
Mono<String> ping(RedisClusterNode node);
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ public interface ReactiveRedisConnection extends Closeable {
|
||||
/**
|
||||
* Get {@link ReactiveHashCommands}.
|
||||
*
|
||||
* @return
|
||||
* @return never {@literal null}.
|
||||
*/
|
||||
ReactiveHashCommands hashCommands();
|
||||
|
||||
@@ -125,7 +125,7 @@ public interface ReactiveRedisConnection extends Closeable {
|
||||
/**
|
||||
* Test connection.
|
||||
*
|
||||
* @return Server response message - usually {@literal PONG}.
|
||||
* @return {@link Mono} wrapping server response message - usually {@literal PONG}.
|
||||
* @see <a href="http://redis.io/commands/ping">Redis Documentation: PING</a>
|
||||
*/
|
||||
Mono<String> ping();
|
||||
|
||||
@@ -18,7 +18,6 @@ package org.springframework.data.redis.connection;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.springframework.data.redis.core.types.RedisClientInfo;
|
||||
@@ -27,6 +26,7 @@ import org.springframework.data.redis.core.types.RedisClientInfo;
|
||||
* Redis Server commands executed using reactive infrastructure.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @since 2.0
|
||||
*/
|
||||
public interface ReactiveServerCommands {
|
||||
@@ -34,6 +34,7 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Start an {@literal Append Only File} rewrite process on server.
|
||||
*
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @see <a href="http://redis.io/commands/bgrewriteaof">Redis Documentation: BGREWRITEAOF</a>
|
||||
*/
|
||||
Mono<String> bgReWriteAof();
|
||||
@@ -41,14 +42,16 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Start background saving of db on server.
|
||||
*
|
||||
* @return {@link Mono} indicating command received by server. Operation success needs to be checked via
|
||||
* {@link #lastSave()}.
|
||||
* @see <a href="http://redis.io/commands/bgsave">Redis Documentation: BGSAVE</a>
|
||||
*/
|
||||
Mono<String> bgSave();
|
||||
|
||||
/**
|
||||
* Get time of last {@link #bgSave()} operation in seconds.
|
||||
* Get time unix timestamp of last successful {@link #bgSave()} operation in seconds.
|
||||
*
|
||||
* @return
|
||||
* @return {@link Mono} wrapping unix timestamp.
|
||||
* @see <a href="http://redis.io/commands/lastsave">Redis Documentation: LASTSAVE</a>
|
||||
*/
|
||||
Mono<Long> lastSave();
|
||||
@@ -56,6 +59,7 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Synchronous save current db snapshot on server.
|
||||
*
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @see <a href="http://redis.io/commands/save">Redis Documentation: SAVE</a>
|
||||
*/
|
||||
Mono<String> save();
|
||||
@@ -63,7 +67,7 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Get the total number of available keys in currently selected database.
|
||||
*
|
||||
* @return
|
||||
* @return {@link Mono} wrapping number of keys.
|
||||
* @see <a href="http://redis.io/commands/dbsize">Redis Documentation: DBSIZE</a>
|
||||
*/
|
||||
Mono<Long> dbSize();
|
||||
@@ -71,6 +75,7 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Delete all keys of the currently selected database.
|
||||
*
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @see <a href="http://redis.io/commands/flushdb">Redis Documentation: FLUSHDB</a>
|
||||
*/
|
||||
Mono<String> flushDb();
|
||||
@@ -78,6 +83,7 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Delete all <b>all keys</b> from <b>all databases</b>.
|
||||
*
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @see <a href="http://redis.io/commands/flushall">Redis Documentation: FLUSHALL</a>
|
||||
*/
|
||||
Mono<String> flushAll();
|
||||
@@ -91,7 +97,7 @@ public interface ReactiveServerCommands {
|
||||
* </ul>
|
||||
* <p>
|
||||
*
|
||||
* @return
|
||||
* @return {@link Mono} wrapping server information.
|
||||
* @see <a href="http://redis.io/commands/info">Redis Documentation: INFO</a>
|
||||
*/
|
||||
Mono<Properties> info();
|
||||
@@ -99,8 +105,9 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Load server information for given {@code selection}.
|
||||
*
|
||||
* @param section
|
||||
* @return
|
||||
* @param section must not be {@literal null} nor {@literal empty}.
|
||||
* @return {@link Mono} wrapping server information of given {@code section}.
|
||||
* @throws IllegalArgumentException when section is {@literal null} or {@literal empty}.
|
||||
* @see <a href="http://redis.io/commands/info">Redis Documentation: INFO</a>
|
||||
*/
|
||||
Mono<Properties> info(String section);
|
||||
@@ -108,8 +115,9 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Load configuration parameters for given {@code pattern} from server.
|
||||
*
|
||||
* @param pattern
|
||||
* @return
|
||||
* @param pattern must not be {@literal null}.
|
||||
* @return {@link Mono} wrapping configuration parameters matching given {@code pattern}.
|
||||
* @throws IllegalArgumentException when {@code pattern} is {@literal null} or {@literal empty}.
|
||||
* @see <a href="http://redis.io/commands/config-get">Redis Documentation: CONFIG GET</a>
|
||||
*/
|
||||
Mono<Properties> getConfig(String pattern);
|
||||
@@ -117,8 +125,9 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Set server configuration for {@code param} to {@code value}.
|
||||
*
|
||||
* @param param
|
||||
* @param value
|
||||
* @param param must not be {@literal null} nor {@literal empty}.
|
||||
* @param value must not be {@literal null} nor {@literal empty}.
|
||||
* @throws IllegalArgumentException when {@code pattern} / {@code value} is {@literal null} or {@literal empty}.
|
||||
* @see <a href="http://redis.io/commands/config-set">Redis Documentation: CONFIG SET</a>
|
||||
*/
|
||||
Mono<String> setConfig(String param, String value);
|
||||
@@ -127,6 +136,7 @@ public interface ReactiveServerCommands {
|
||||
* Reset statistic counters on server. <br>
|
||||
* Counters can be retrieved using {@link #info()}.
|
||||
*
|
||||
* @return {@link Mono} indicating command completion.
|
||||
* @see <a href="http://redis.io/commands/config-resetstat">Redis Documentation: CONFIG RESETSTAT</a>
|
||||
*/
|
||||
Mono<String> resetConfigStats();
|
||||
@@ -134,7 +144,7 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Request server timestamp using {@code TIME} command.
|
||||
*
|
||||
* @return current server time in milliseconds.
|
||||
* @return {@link Mono} wrapping current server time in milliseconds.
|
||||
* @see <a href="http://redis.io/commands/time">Redis Documentation: TIME</a>
|
||||
*/
|
||||
Mono<Long> time();
|
||||
@@ -142,8 +152,10 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Closes a given client connection identified by {@literal host:port}.
|
||||
*
|
||||
* @param host of connection to close.
|
||||
* @param host of connection to close. Must not be {@literal null} nor {@literal empty}.
|
||||
* @param port of connection to close
|
||||
* @return {@link Mono} wrapping {@link String} representation of the command result.
|
||||
* @throws IllegalArgumentException if {@code host} is {@literal null} or {@literal empty}.
|
||||
* @see <a href="http://redis.io/commands/client-kill">Redis Documentation: CLIENT KILL</a>
|
||||
*/
|
||||
Mono<String> killClient(String host, int port);
|
||||
@@ -151,7 +163,8 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Assign given name to current connection.
|
||||
*
|
||||
* @param name
|
||||
* @param name must not be {@literal null} nor {@literal empty}.
|
||||
* @throws IllegalArgumentException when {@code name} is {@literal null} or {@literal empty}.
|
||||
* @see <a href="http://redis.io/commands/client-setname">Redis Documentation: CLIENT SETNAME</a>
|
||||
*/
|
||||
Mono<String> setClientName(String name);
|
||||
@@ -159,15 +172,15 @@ public interface ReactiveServerCommands {
|
||||
/**
|
||||
* Returns the name of the current connection.
|
||||
*
|
||||
* @return {@link Mono} wrapping the connection name.
|
||||
* @see <a href="http://redis.io/commands/client-getname">Redis Documentation: CLIENT GETNAME</a>
|
||||
* @return
|
||||
*/
|
||||
Mono<String> getClientName();
|
||||
|
||||
/**
|
||||
* Request information and statistics about connected clients.
|
||||
*
|
||||
* @return {@link List} of {@link RedisClientInfo} objects.
|
||||
* @return {@link Flux} emitting {@link RedisClientInfo} objects.
|
||||
* @see <a href="http://redis.io/commands/client-list">Redis Documentation: CLIENT LIST</a>
|
||||
*/
|
||||
Flux<RedisClientInfo> getClientList();
|
||||
|
||||
@@ -42,9 +42,13 @@ import org.springframework.data.redis.connection.ReactiveClusterServerCommands;
|
||||
import org.springframework.data.redis.connection.RedisClusterNode;
|
||||
import org.springframework.data.redis.core.types.RedisClientInfo;
|
||||
import org.springframework.data.redis.util.ByteUtils;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* {@link ReactiveClusterServerCommands} implementation for {@literal Lettuce}.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @since 2.0
|
||||
*/
|
||||
class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
@@ -57,6 +61,9 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
* Create new {@link LettuceReactiveGeoCommands}.
|
||||
*
|
||||
* @param connection must not be {@literal null}.
|
||||
* @param topologyProvider must not be {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code connection} is {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code topologyProvider} is {@literal null}.
|
||||
*/
|
||||
public LettuceReactiveClusterServerCommands(LettuceReactiveRedisClusterConnection connection,
|
||||
ClusterTopologyProvider topologyProvider) {
|
||||
@@ -67,7 +74,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
this.topologyProvider = topologyProvider;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#bgReWriteAof(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -75,7 +83,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::bgrewriteaof).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#bgSave(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -83,7 +92,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::bgsave).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#lastSave(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -91,7 +101,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::lastsave).map(Date::getTime).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#save(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -99,7 +110,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::save).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#dbSize(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -107,7 +119,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::dbsize).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#flushDb(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -115,7 +128,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::flushdb).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#flushAll(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -123,7 +137,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::flushall).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#info()
|
||||
*/
|
||||
@Override
|
||||
@@ -131,7 +146,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return Flux.merge(executeOnAllNodes(this::info)).collect(PropertiesCollector.INSTANCE);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#info(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -148,42 +164,54 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
@Override
|
||||
public Mono<Properties> info(String section) {
|
||||
|
||||
Assert.hasText(section, "Section must not be null nor empty!");
|
||||
|
||||
return Flux.merge(executeOnAllNodes(redisClusterNode -> info(redisClusterNode, section)))
|
||||
.collect(PropertiesCollector.INSTANCE);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#info(org.springframework.data.redis.connection.RedisClusterNode, java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<Properties> info(RedisClusterNode node, String section) {
|
||||
|
||||
Assert.hasText(section, "Section must not be null nor empty!");
|
||||
|
||||
return connection.execute(node, c -> c.info(section)) //
|
||||
.map(LettuceConverters::toProperties).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#getConfig(java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<Properties> getConfig(String pattern) {
|
||||
|
||||
Assert.hasText(pattern, "Pattern must not be null nor empty!");
|
||||
|
||||
return Flux.merge(executeOnAllNodes(node -> getConfig(node, pattern))) //
|
||||
.collect(PropertiesCollector.INSTANCE);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#getConfig(org.springframework.data.redis.connection.RedisClusterNode, java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<Properties> getConfig(RedisClusterNode node, String pattern) {
|
||||
|
||||
Assert.hasText(pattern, "Pattern must not be null nor empty!");
|
||||
|
||||
return connection.execute(node, c -> c.configGet(pattern).collectList()) //
|
||||
.map(LettuceConverters::toProperties) //
|
||||
.next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#setConfig(java.lang.String, java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
@@ -191,15 +219,21 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return Flux.merge(executeOnAllNodes(node -> setConfig(node, param, value))).map(Tuple2::getT2).last();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#setConfig(org.springframework.data.redis.connection.RedisClusterNode, java.lang.String, java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<String> setConfig(RedisClusterNode node, String param, String value) {
|
||||
|
||||
Assert.hasText(param, "Param must not be null nor empty!");
|
||||
Assert.hasText(value, "Value must not be null nor empty!");
|
||||
|
||||
return connection.execute(node, c -> c.configSet(param, value)).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#resetConfigStats()
|
||||
*/
|
||||
@Override
|
||||
@@ -207,7 +241,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return Flux.merge(executeOnAllNodes(this::resetConfigStats)).map(Tuple2::getT2).last();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#resetConfigStats(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -215,7 +250,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return connection.execute(node, RedisServerReactiveCommands::configResetstat).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#time(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -227,7 +263,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
.map(LettuceConverters.toTimeConverter()::convert);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands#getClientList()
|
||||
*/
|
||||
@Override
|
||||
@@ -235,7 +272,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return Flux.merge(executeOnAllNodesMany(this::getClientList)).map(Tuple2::getT2);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveClusterServerCommands#getClientList(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -279,7 +317,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
|
||||
INSTANCE;
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see java.util.stream.Collector#supplier()
|
||||
*/
|
||||
@Override
|
||||
@@ -301,7 +340,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
};
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see java.util.stream.Collector#combiner()
|
||||
*/
|
||||
@Override
|
||||
@@ -318,7 +358,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
};
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see java.util.stream.Collector#finisher()
|
||||
*/
|
||||
@Override
|
||||
@@ -326,7 +367,8 @@ class LettuceReactiveClusterServerCommands extends LettuceReactiveServerCommands
|
||||
return properties -> properties;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see java.util.stream.Collector#characteristics()
|
||||
*/
|
||||
@Override
|
||||
|
||||
@@ -33,6 +33,8 @@ import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* {@link ReactiveRedisClusterConnection} implementation for {@literal Lettuce}.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @since 2.0
|
||||
@@ -42,14 +44,23 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
|
||||
private final ClusterTopologyProvider topologyProvider;
|
||||
|
||||
public LettuceReactiveRedisClusterConnection(RedisClusterClient client) {
|
||||
/**
|
||||
* Creates new {@link LettuceReactiveRedisClusterConnection}.
|
||||
*
|
||||
* @param client must not be {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code client} is {@literal null}.
|
||||
* @throws org.springframework.dao.InvalidDataAccessResourceUsageException when {@code client} is not suitable for
|
||||
* cluster environment.
|
||||
*/
|
||||
LettuceReactiveRedisClusterConnection(RedisClusterClient client) {
|
||||
|
||||
super(client);
|
||||
|
||||
this.topologyProvider = new LettuceClusterTopologyProvider(client);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#keyCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -57,7 +68,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterKeyCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#listCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -65,7 +77,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterListCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#setCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -73,7 +86,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterSetCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#zSetCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -81,7 +95,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterZSetCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#hyperLogLogCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -89,7 +104,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterHyperLogLogCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#stringCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -97,7 +113,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterStringCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#geoCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -105,7 +122,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterGeoCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#hashCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -113,7 +131,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterHashCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#numberCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -121,7 +140,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterNumberCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#serverCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -129,7 +149,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return new LettuceReactiveClusterServerCommands(this, topologyProvider);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisClusterConnection#ping(org.springframework.data.redis.connection.RedisClusterNode)
|
||||
*/
|
||||
@Override
|
||||
@@ -138,8 +159,10 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
}
|
||||
|
||||
/**
|
||||
* @param callback
|
||||
* @return
|
||||
* @param node must not be {@literal null}.
|
||||
* @param callback must not be {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code node} or {@code callback} is {@literal null}.
|
||||
* @return {@link Flux} emitting execution results.
|
||||
*/
|
||||
public <T> Flux<T> execute(RedisNode node, LettuceReactiveCallback<T> callback) {
|
||||
|
||||
@@ -153,7 +176,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return Flux.defer(() -> callback.doWithCommands(getCommands(node))).onErrorMap(translateException());
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#getConnection()
|
||||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@@ -166,7 +190,8 @@ class LettuceReactiveRedisClusterConnection extends LettuceReactiveRedisConnecti
|
||||
return (StatefulRedisClusterConnection) super.getConnection();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection#getCommands()
|
||||
*/
|
||||
protected RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> getCommands() {
|
||||
|
||||
@@ -33,7 +33,17 @@ import java.util.function.Function;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.dao.InvalidDataAccessResourceUsageException;
|
||||
import org.springframework.data.redis.connection.*;
|
||||
import org.springframework.data.redis.connection.ReactiveGeoCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveHashCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveHyperLogLogCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveKeyCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveListCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveNumberCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveRedisConnection;
|
||||
import org.springframework.data.redis.connection.ReactiveServerCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveSetCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveStringCommands;
|
||||
import org.springframework.data.redis.connection.ReactiveZSetCommands;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
@@ -43,11 +53,18 @@ import org.springframework.util.Assert;
|
||||
*/
|
||||
class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
|
||||
private StatefulConnection<ByteBuffer, ByteBuffer> connection;
|
||||
|
||||
private static final RedisCodec<ByteBuffer, ByteBuffer> CODEC = ByteBufferCodec.INSTANCE;
|
||||
|
||||
public LettuceReactiveRedisConnection(AbstractRedisClient client) {
|
||||
private StatefulConnection<ByteBuffer, ByteBuffer> connection;
|
||||
|
||||
/**
|
||||
* Creates new {@link LettuceReactiveRedisConnection}.
|
||||
*
|
||||
* @param client must not be {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code client} is {@literal null}.
|
||||
* @throws InvalidDataAccessResourceUsageException when {@code client} is not suitable for connection.
|
||||
*/
|
||||
LettuceReactiveRedisConnection(AbstractRedisClient client) {
|
||||
|
||||
Assert.notNull(client, "RedisClient must not be null!");
|
||||
|
||||
@@ -70,7 +87,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveKeyCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#stringCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -78,7 +96,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveStringCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#numberCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -86,7 +105,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveNumberCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#listCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -94,7 +114,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveListCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#setCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -102,7 +123,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveSetCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#zSetCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -110,7 +132,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveZSetCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#hashCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -118,7 +141,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveHashCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#geoCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -126,7 +150,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveGeoCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#hyperLogLogCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -134,7 +159,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveHyperLogLogCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#hyperLogLogCommands()
|
||||
*/
|
||||
@Override
|
||||
@@ -142,7 +168,8 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
return new LettuceReactiveServerCommands(this);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveRedisConnection#ping()
|
||||
*/
|
||||
@Override
|
||||
@@ -200,7 +227,7 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
|
||||
Publisher<T> doWithCommands(RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> cmd);
|
||||
}
|
||||
|
||||
static enum ByteBufferCodec implements RedisCodec<ByteBuffer, ByteBuffer> {
|
||||
enum ByteBufferCodec implements RedisCodec<ByteBuffer, ByteBuffer> {
|
||||
|
||||
INSTANCE;
|
||||
|
||||
|
||||
@@ -29,7 +29,10 @@ import org.springframework.data.redis.util.ByteUtils;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* {@link ReactiveServerCommands} implementation for {@literal Lettuce}.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
|
||||
@@ -39,14 +42,17 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
* Create new {@link LettuceReactiveGeoCommands}.
|
||||
*
|
||||
* @param connection must not be {@literal null}.
|
||||
* @throws IllegalArgumentException when {@code connection} is {@literal null}.
|
||||
*/
|
||||
public LettuceReactiveServerCommands(LettuceReactiveRedisConnection connection) {
|
||||
LettuceReactiveServerCommands(LettuceReactiveRedisConnection connection) {
|
||||
|
||||
Assert.notNull(connection, "Connection must not be null!");
|
||||
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#bgReWriteAof()
|
||||
*/
|
||||
@Override
|
||||
@@ -54,7 +60,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::bgrewriteaof).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#bgSave()
|
||||
*/
|
||||
@Override
|
||||
@@ -62,7 +69,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::bgsave).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#lastSave()
|
||||
*/
|
||||
@Override
|
||||
@@ -70,7 +78,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::lastsave).next().map(Date::getTime);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#save()
|
||||
*/
|
||||
@Override
|
||||
@@ -78,7 +87,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::save).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#dbSize()
|
||||
*/
|
||||
@Override
|
||||
@@ -86,7 +96,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::dbsize).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#flushDb()
|
||||
*/
|
||||
@Override
|
||||
@@ -94,7 +105,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::flushdb).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#flushAll()
|
||||
*/
|
||||
@Override
|
||||
@@ -102,7 +114,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::flushall).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#info()
|
||||
*/
|
||||
@Override
|
||||
@@ -113,36 +126,48 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
.next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#info(java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<Properties> info(String section) {
|
||||
|
||||
Assert.hasText(section, "Section must not be null nor empty!");
|
||||
|
||||
return connection.execute(c -> c.info(section)) //
|
||||
.map(LettuceConverters::toProperties) //
|
||||
.next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#getConfig(java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<Properties> getConfig(String pattern) {
|
||||
|
||||
Assert.hasText(pattern, "Pattern must not be null nor empty!");
|
||||
|
||||
return connection.execute(c -> c.configGet(pattern).collectList()) //
|
||||
.map(LettuceConverters::toProperties).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#setConfig(java.lang.String, java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<String> setConfig(String param, String value) {
|
||||
|
||||
Assert.hasText(param, "Param must not be null nor empty!");
|
||||
Assert.hasText(value, "Value must not be null nor empty!");
|
||||
|
||||
return connection.execute(c -> c.configSet(param, value)).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#resetConfigStats()
|
||||
*/
|
||||
@Override
|
||||
@@ -150,7 +175,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
return connection.execute(RedisServerReactiveCommands::configResetstat).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#time()
|
||||
*/
|
||||
@Override
|
||||
@@ -162,27 +188,32 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
.map(LettuceConverters.toTimeConverter()::convert);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#killClient(java.lang.String, int)
|
||||
*/
|
||||
@Override
|
||||
public Mono<String> killClient(String host, int port) {
|
||||
|
||||
Assert.notNull(host, "Host must not be null");
|
||||
Assert.notNull(host, "Host must not be null nor empty!");
|
||||
|
||||
String client = String.format("%s:%s", host, port);
|
||||
return connection.execute(c -> c.clientKill(client)).next();
|
||||
return connection.execute(c -> c.clientKill(String.format("%s:%s", host, port))).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#setClientName(java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Mono<String> setClientName(String name) {
|
||||
|
||||
Assert.hasText(name, "Name must not be null nor empty!");
|
||||
|
||||
return connection.execute(c -> c.clientSetname(ByteBuffer.wrap(LettuceConverters.toBytes(name)))).next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#getClientName()
|
||||
*/
|
||||
@Override
|
||||
@@ -194,7 +225,8 @@ class LettuceReactiveServerCommands implements ReactiveServerCommands {
|
||||
.next();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.redis.connection.ReactiveServerCommands#getClientList()
|
||||
*/
|
||||
@Override
|
||||
|
||||
@@ -26,6 +26,7 @@ import org.springframework.data.redis.connection.RedisClusterNode;
|
||||
|
||||
/**
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
public class LettuceReactiveClusterServerCommandsTests extends LettuceReactiveClusterCommandsTestsBase {
|
||||
|
||||
@@ -132,25 +133,38 @@ public class LettuceReactiveClusterServerCommandsTests extends LettuceReactiveCl
|
||||
@Test // DATAREDIS-659
|
||||
public void setConfigShouldApplyConfiguration() throws InterruptedException {
|
||||
|
||||
StepVerifier.create(connection.serverCommands().setConfig("maxclients", "10000")) //
|
||||
.expectNext("OK") //
|
||||
.verifyComplete();
|
||||
final String slowLogKey = "slowlog-max-len";
|
||||
|
||||
StepVerifier.create(connection.serverCommands().setConfig(NODE1, "maxclients", "9999")) //
|
||||
.expectNext("OK") //
|
||||
.verifyComplete();
|
||||
String resetValue = connection.serverCommands().getConfig(slowLogKey).map(it -> {
|
||||
if (it.containsKey(slowLogKey)) {
|
||||
return it.get(slowLogKey);
|
||||
}
|
||||
return it.get("127.0.0.1:7379." + slowLogKey);
|
||||
}).block().toString();
|
||||
|
||||
StepVerifier.create(connection.serverCommands().getConfig(NODE1, "maxclients")) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry("maxclients", "9999");
|
||||
}) //
|
||||
.verifyComplete();
|
||||
try {
|
||||
StepVerifier.create(connection.serverCommands().setConfig(slowLogKey, resetValue)) //
|
||||
.expectNext("OK") //
|
||||
.verifyComplete();
|
||||
|
||||
StepVerifier.create(connection.serverCommands().getConfig(NODE2, "maxclients")) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry("maxclients", "10000");
|
||||
}) //
|
||||
.verifyComplete();
|
||||
StepVerifier.create(connection.serverCommands().setConfig(NODE1, slowLogKey, "127")) //
|
||||
.expectNext("OK") //
|
||||
.verifyComplete();
|
||||
|
||||
StepVerifier.create(connection.serverCommands().getConfig(NODE1, slowLogKey)) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry(slowLogKey, "127");
|
||||
}) //
|
||||
.verifyComplete();
|
||||
|
||||
StepVerifier.create(connection.serverCommands().getConfig(NODE2, slowLogKey)) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry(slowLogKey, resetValue);
|
||||
}) //
|
||||
.verifyComplete();
|
||||
} finally {
|
||||
connection.serverCommands().setConfig(slowLogKey, resetValue).block();
|
||||
}
|
||||
}
|
||||
|
||||
@Test // DATAREDIS-659
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.junit.Test;
|
||||
|
||||
/**
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
public class LettuceReactiveServerCommandsTests extends LettuceReactiveCommandsTestsBase {
|
||||
|
||||
@@ -156,22 +157,35 @@ public class LettuceReactiveServerCommandsTests extends LettuceReactiveCommandsT
|
||||
@Test // DATAREDIS-659
|
||||
public void setConfigShouldApplyConfiguration() {
|
||||
|
||||
StepVerifier.create(connection.serverCommands().setConfig("maxclients", "9999")) //
|
||||
.expectNext("OK") //
|
||||
.verifyComplete();
|
||||
final String slowLogKey = "slowlog-max-len";
|
||||
|
||||
if (connection instanceof LettuceReactiveRedisClusterConnection) {
|
||||
StepVerifier.create(connection.serverCommands().getConfig("maxclients")) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry("127.0.0.1:7379.maxclients", "9999");
|
||||
}) //
|
||||
.verifyComplete();
|
||||
} else {
|
||||
StepVerifier.create(connection.serverCommands().getConfig("maxclients")) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry("maxclients", "9999");
|
||||
}) //
|
||||
String resetValue = connection.serverCommands().getConfig(slowLogKey).map(it -> {
|
||||
if (it.containsKey(slowLogKey)) {
|
||||
return it.get(slowLogKey);
|
||||
}
|
||||
return it.get("127.0.0.1:7379." + slowLogKey);
|
||||
}).block().toString();
|
||||
|
||||
try {
|
||||
StepVerifier.create(connection.serverCommands().setConfig(slowLogKey, "127")) //
|
||||
.expectNext("OK") //
|
||||
.verifyComplete();
|
||||
|
||||
if (connection instanceof LettuceReactiveRedisClusterConnection) {
|
||||
StepVerifier.create(connection.serverCommands().getConfig(slowLogKey)) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry("127.0.0.1:7379." + slowLogKey, "127");
|
||||
}) //
|
||||
.verifyComplete();
|
||||
} else {
|
||||
StepVerifier.create(connection.serverCommands().getConfig(slowLogKey)) //
|
||||
.consumeNextWith(properties -> {
|
||||
assertThat(properties).containsEntry(slowLogKey, "127");
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
} finally {
|
||||
connection.serverCommands().setConfig(slowLogKey, resetValue).block();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user