Incorporate review feedback.

Reduce object instantiations.

Original Pull Request: #2467
This commit is contained in:
Mark Paluch
2023-01-12 15:15:50 +01:00
committed by Christoph Strobl
parent 230c764c69
commit e9e2c53d78

View File

@@ -28,11 +28,10 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.data.redis.connection.*;
@@ -48,10 +47,12 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
static final RedisCodec<ByteBuffer, ByteBuffer> CODEC = ByteBufferCodec.INSTANCE;
private final Object mutex = new Object();
private final AsyncConnect<StatefulConnection<ByteBuffer, ByteBuffer>> dedicatedConnection;
private final AsyncConnect<StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>> pubSubConnection;
private final LettuceReactivePubSubCommands pubSub = new LettuceReactivePubSubCommands(this);
private volatile LettuceReactivePubSubCommands pubSub;
private @Nullable Mono<StatefulConnection<ByteBuffer, ByteBuffer>> sharedConnection;
@@ -139,7 +140,13 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
@Override
public ReactivePubSubCommands pubSubCommands() {
return pubSub;
synchronized (mutex) {
if (this.pubSub == null) {
this.pubSub = new LettuceReactivePubSubCommands(this);
}
return pubSub;
}
}
@Override
@@ -291,10 +298,13 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
*/
static class AsyncConnect<T extends io.lettuce.core.api.StatefulConnection<?, ?>> {
static AtomicReferenceFieldUpdater<AsyncConnect, State> STATE = AtomicReferenceFieldUpdater
.newUpdater(AsyncConnect.class, State.class, "state");
private final Mono<T> connectionPublisher;
private final LettuceConnectionProvider connectionProvider;
private AtomicReference<State> state = new AtomicReference<>(State.INITIAL);
private volatile State state = State.INITIAL;
private volatile @Nullable StatefulConnection<ByteBuffer, ByteBuffer> connection;
@SuppressWarnings("unchecked")
@@ -310,7 +320,7 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
this.connectionPublisher = defer.doOnNext(it -> {
if (isClosing(this.state.get())) {
if (isClosing(STATE.get(this))) {
it.closeAsync();
} else {
connection = it;
@@ -319,7 +329,7 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
.cache() //
.handle((connection, sink) -> {
if (isClosing(this.state.get())) {
if (isClosing(STATE.get(this))) {
sink.error(new IllegalStateException("Unable to connect; Connection is closed"));
} else {
sink.next((T) connection);
@@ -335,12 +345,12 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
*/
Mono<T> getConnection() {
State state = this.state.get();
State state = STATE.get(this);
if (isClosing(state)) {
return Mono.error(new IllegalStateException("Unable to connect; Connection is closed"));
}
this.state.compareAndSet(State.INITIAL, State.CONNECTION_REQUESTED);
STATE.compareAndSet(this, State.INITIAL, State.CONNECTION_REQUESTED);
return connectionPublisher;
}
@@ -352,12 +362,13 @@ class LettuceReactiveRedisConnection implements ReactiveRedisConnection {
return Mono.defer(() -> {
if (state.compareAndSet(State.INITIAL, CLOSING) || state.compareAndSet(State.CONNECTION_REQUESTED, CLOSING)) {
if (STATE.compareAndSet(this, State.INITIAL, CLOSING)
|| STATE.compareAndSet(this, State.CONNECTION_REQUESTED, CLOSING)) {
StatefulConnection<ByteBuffer, ByteBuffer> connection = this.connection;
this.connection = null;
state.set(State.CLOSED);
STATE.set(this, State.CLOSED);
if (connection != null) {
return Mono.fromCompletionStage(connectionProvider.releaseAsync(connection));
}