diff --git a/src/main/java/org/springframework/data/redis/connection/ClusterCommandExecutor.java b/src/main/java/org/springframework/data/redis/connection/ClusterCommandExecutor.java index bfcf3a785..849f9e14b 100644 --- a/src/main/java/org/springframework/data/redis/connection/ClusterCommandExecutor.java +++ b/src/main/java/org/springframework/data/redis/connection/ClusterCommandExecutor.java @@ -16,7 +16,6 @@ package org.springframework.data.redis.connection; import java.util.*; -import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -227,48 +226,39 @@ public class ClusterCommandExecutor implements DisposableBean { MultiNodeResult collectResults(Map>> futures) { - NodeExceptionCollector exceptionCollector = new NodeExceptionCollector(); MultiNodeResult result = new MultiNodeResult<>(); - Object placeholder = new Object(); - Map>, Object> safeguard = new IdentityHashMap<>(); + NodeExceptionCollector exceptionCollector = new NodeExceptionCollector(); - for (;;) { + OUT: while (!futures.isEmpty()) { - boolean timeout = false; - for (Map.Entry>> entry : futures.entrySet()) { + Iterator>>> entryIterator = futures.entrySet().iterator(); + while (entryIterator.hasNext()) { + + Map.Entry>> entry = entryIterator.next(); NodeExecution nodeExecution = entry.getKey(); Future> futureNodeResult = entry.getValue(); try { + NodeResult nodeResult = futureNodeResult.get(10L, TimeUnit.MICROSECONDS); - if (!safeguard.containsKey(futureNodeResult)) { - - NodeResult nodeResult = futureNodeResult.get(10L, TimeUnit.MICROSECONDS); - - if (nodeExecution.isPositional()) { - result.add(nodeExecution.getPositionalKey(), nodeResult); - } else { - result.add(nodeResult); - } - - safeguard.put(futureNodeResult, placeholder); + if (nodeExecution.isPositional()) { + result.add(nodeExecution.getPositionalKey(), nodeResult); + } else { + result.add(nodeResult); } + + entryIterator.remove(); } catch (ExecutionException exception) { - safeguard.put(futureNodeResult, placeholder); + entryIterator.remove(); exceptionCollector.addException(nodeExecution, exception.getCause()); } catch (TimeoutException ignore) { - timeout = true; } catch (InterruptedException exception) { Thread.currentThread().interrupt(); exceptionCollector.addException(nodeExecution, exception); - break; + break OUT; } } - - if (!timeout) { - break; - } } if (exceptionCollector.hasExceptions()) { @@ -300,7 +290,7 @@ public class ClusterCommandExecutor implements DisposableBean { Map>> futures = new LinkedHashMap<>(); - for (Entry entry : nodeKeyMap.entrySet()) { + for (Map.Entry entry : nodeKeyMap.entrySet()) { if (entry.getKey().isMaster()) { for (PositionalKey key : entry.getValue()) {