diff --git a/src/main/java/org/springframework/data/gemfire/function/BatchingResultSender.java b/src/main/java/org/springframework/data/gemfire/function/BatchingResultSender.java index f8fba83c..3108793a 100644 --- a/src/main/java/org/springframework/data/gemfire/function/BatchingResultSender.java +++ b/src/main/java/org/springframework/data/gemfire/function/BatchingResultSender.java @@ -12,164 +12,180 @@ */ package org.springframework.data.gemfire.function; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.ResultSender; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - /** - * Sends collection results using a {@link ResultSender} in chunks determined by batchSize + * Sends {@link Collection} {@link Function} results using a {@link ResultSender} in chunks + * determined by {@code batchSize}. * * @author David Turanski * @author Udo Kohlmeyer + * @author John Blum * @since 1.3.0 */ class BatchingResultSender { - private final int batchSize; - private ResultSender resultSender; - public BatchingResultSender(int batchSize, ResultSender resultSender) { - Assert.notNull(resultSender, "resultSender cannot be null"); - Assert.isTrue(batchSize >= 0, "batchSize must be >= 0"); - this.batchSize = batchSize; - this.resultSender = resultSender; - } + private final int batchSize; + private ResultSender resultSender; - public void sendResults(Iterable result) { - //Handle both batchSize == 0 or empty Iterable - if (batchSize == 0 || !result.iterator().hasNext()) { - resultSender.lastResult(result); - return; - } + /** + * Constructs a new instance of {@link BatchingResultSender} initialized with the given {@link Integer batch size} + * and {@link ResultSender} object used to delegate all send operations. + * + * @param batchSize {@link Integer} specifying the configured batch size. + * @param resultSender {@link ResultSender} used to delegate all send operations. + * @throws IllegalArgumentException if {@link ResultSender} is {@literal null} + * or {@code batchSize} is less than {@literal 0}. + * @see org.apache.geode.cache.execute.ResultSender + */ + public BatchingResultSender(int batchSize, ResultSender resultSender) { - List chunk = new ArrayList(batchSize); + Assert.notNull(resultSender, "ResultSender must not be null"); + Assert.isTrue(batchSize >= 0, "batchSize must be greater than equal to 0"); - for (Iterator it = result.iterator(); it.hasNext(); ) { - if (chunk.size() < batchSize) { - chunk.add(it.next()); - } + this.batchSize = batchSize; + this.resultSender = resultSender; + } - if (chunk.size() == batchSize || !it.hasNext()) { - if (it.hasNext()) { - resultSender.sendResult(chunk); - } else { - resultSender.lastResult(chunk); - } - chunk.clear(); - } - } - } + /** + * Returns the configured {@link Integer batchSize} of this batching {@link ResultSender}. + * + * @return an {@link Integer} value specifying the configured {@link Integer batchSize} + * of this batching {@link ResultSender}. + */ + public int getBatchSize() { + return this.batchSize; + } + /** + * Returns a reference to the configured {@link ResultSender} used to send {@link Function} results. + * + * @return a reference to the configured {@link ResultSender} used to send {@link Function} results. + * @see org.apache.geode.cache.execute.ResultSender + */ + public ResultSender getResultSender() { + return this.resultSender; + } - public void sendArrayResults(Object result) { - Assert.isTrue(ObjectUtils.isArray(result)); + protected boolean isBatchingDisabled() { + return !isBatchingEnabled(); + } - if (batchSize == 0) { - resultSender.lastResult(result); - return; - } + protected boolean isBatchingEnabled() { + return getBatchSize() > 0; + } - int length = Array.getLength(result); + protected boolean doNotSendChunks(boolean resultSetIsEmpty) { + return resultSetIsEmpty || isBatchingDisabled(); + } - if (length == 0) { - resultSender.lastResult(result); - return; - } + public void sendResults(Iterable result) { - for (int from = 0; from < length; from += batchSize) { - int to = Math.min(length, from + batchSize); - Object chunk = copyOfRange(result, from, to); + ResultSender resultSender = getResultSender(); - if (to == length) { - resultSender.lastResult(chunk); - } else { - resultSender.sendResult(chunk); - } - } - } + if (doNotSendChunks(!result.iterator().hasNext())) { + resultSender.lastResult(result); + } + else { + int batchSize = getBatchSize(); - /** - * @param result - * @param from - * @param to - * @return - */ - private Object copyOfRange(Object result, int from, int to) { - Class arrayClass = result.getClass(); - int size = to - from; + List chunk = new ArrayList<>(batchSize); - if (int[].class.isAssignableFrom(arrayClass)) { - int[] array = new int[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getInt(result, from + i); - } - return array; - } + for (Iterator it = result.iterator(); it.hasNext(); ) { - if (float[].class.isAssignableFrom(arrayClass)) { - float[] array = new float[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getFloat(result, from + i); - } - return array; - } + if (chunk.size() < batchSize) { + chunk.add(it.next()); + } - if (double[].class.isAssignableFrom(arrayClass)) { - double[] array = new double[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getDouble(result, from + i); - } - return array; - } + if (chunk.size() == batchSize || !it.hasNext()) { - if (boolean[].class.isAssignableFrom(arrayClass)) { - boolean[] array = new boolean[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getBoolean(result, from + i); - } - return array; - } + if (it.hasNext()) { + resultSender.sendResult(chunk); + } + else { + resultSender.lastResult(chunk); + } - if (byte[].class.isAssignableFrom(arrayClass)) { - byte[] array = new byte[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getByte(result, from + i); - } - return array; - } + chunk.clear(); + } + } + } + } - if (short[].class.isAssignableFrom(arrayClass)) { - short[] array = new short[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getShort(result, from + i); - } - return array; - } + public void sendArrayResults(Object result) { - if (long[].class.isAssignableFrom(arrayClass)) { - long[] array = new long[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getLong(result, from + i); - } - return array; - } + Assert.isTrue(ObjectUtils.isArray(result), + () -> String.format("Object must be an array; was [%s]", ObjectUtils.nullSafeClassName(result))); - if (char[].class.isAssignableFrom(arrayClass)) { - char[] array = new char[size]; - for (int i = 0; i < size; ++i) { - array[i] = Array.getChar(result, from + i); - } - return array; - } + int arrayLength = Array.getLength(result); - return Arrays.copyOfRange((Object[]) result, from, to); + ResultSender resultSender = getResultSender(); - } + if (doNotSendChunks(arrayLength == 0)) { + resultSender.lastResult(result); + } + else { + + int batchSize = getBatchSize(); + + for (int from = 0; from < arrayLength; from += batchSize) { + + int to = Math.min(arrayLength, from + batchSize); + + Object chunk = copyOfRange(result, from, to); + + if (to == arrayLength) { + resultSender.lastResult(chunk); + } + else { + resultSender.sendResult(chunk); + } + } + } + } + + private Object copyOfRange(Object result, int from, int to) { + + Class resultType = result.getClass(); + + if (boolean[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((boolean[]) result, from, to); + } + else if (byte[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((byte[]) result, from, to); + } + else if (short[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((short[]) result, from, to); + } + else if (int[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((int[]) result, from, to); + } + else if (long[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((long[]) result, from, to); + } + else if (float[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((float[]) result, from, to); + } + else if (double[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((double[]) result, from, to); + } + else if (char[].class.isAssignableFrom(resultType)) { + return Arrays.copyOfRange((char[]) result, from, to); + } + else { + return Arrays.copyOfRange((Object[]) result, from, to); + } + } } diff --git a/src/test/java/org/springframework/data/gemfire/function/BatchingResultSenderTest.java b/src/test/java/org/springframework/data/gemfire/function/BatchingResultSenderTest.java index d0861749..8cca08b1 100644 --- a/src/test/java/org/springframework/data/gemfire/function/BatchingResultSenderTest.java +++ b/src/test/java/org/springframework/data/gemfire/function/BatchingResultSenderTest.java @@ -12,76 +12,179 @@ */ package org.springframework.data.gemfire.function; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; import org.apache.geode.cache.execute.ResultSender; +import org.assertj.core.api.Assertions; import org.junit.Test; /** + * Unit tests for {@link BatchingResultSender}. + * * @author David Turanski * @author Udo Kohlmeyer - * + * @author John Blum + * @see org.junit.Test + * @see org.apache.geode.cache.execute.ResultSender + * @see org.springframework.data.gemfire.function.BatchingResultSender + * @since 1.3.0 */ public class BatchingResultSenderTest { @Test - public void testArrayChunking() { + @SuppressWarnings("unchecked") + public void constructBatchingResultSender() { + + ResultSender mockResultSender = mock(ResultSender.class); + + BatchingResultSender batchResultSender = new BatchingResultSender(20, mockResultSender); + + assertThat(batchResultSender).isNotNull(); + assertThat(batchResultSender.getBatchSize()).isEqualTo(20); + assertThat(batchResultSender.getResultSender()).isEqualTo(mockResultSender); + } + + @SuppressWarnings("unchecked") + @Test(expected = IllegalArgumentException.class) + public void constructBatchingResultSenderWithBatchSizeOfMinusOne() { + + try { + new BatchingResultSender(-1, mock(ResultSender.class)); + } + catch (IllegalArgumentException expected) { + + assertThat(expected).hasMessage("batchSize must be greater than equal to 0"); + assertThat(expected).hasNoCause(); + + throw expected; + } + } + + @Test(expected = IllegalArgumentException.class) + public void constructBatchingResultSenderWithNullResultSender() { + + try { + new BatchingResultSender(0, null); + } + catch (IllegalArgumentException expected) { + + assertThat(expected).hasMessage("ResultSender must not be null"); + assertThat(expected).hasNoCause(); + + throw expected; + } + } + + @SuppressWarnings("unchecked") + @Test(expected = IllegalArgumentException.class) + public void sendArrayResultsWithNonArrayThrowIllegalArgumentException() { + + try { + new BatchingResultSender(20, mock(ResultSender.class)).sendArrayResults(new Object()); + } + catch (IllegalArgumentException expected) { + + assertThat(expected).hasMessage("Object must be an array; was [java.lang.Object]"); + assertThat(expected).hasNoCause(); + + throw expected; + } + } + + @Test + public void arrayChunkingIsCorrectWhenBatchSizeIsZero() { + + testBatchingResultSender(new TestArrayResultSender(),0); // Default result set size is 100 + testBatchingResultSender(new TestArrayResultSender(),0,1); + testBatchingResultSender(new TestArrayResultSender(),0,2); + } + + @Test + public void arrayChunkingIsCorrectWhenBothBatchSizeAndResultSetSizeAreZero() { + testBatchingResultSender(new TestArrayResultSender(),0,0); + } + + @Test + public void arrayCheckingIsCorrectWhenResultSetSizeIsZero() { + + testBatchingResultSender(new TestArrayResultSender(),1,0); + testBatchingResultSender(new TestArrayResultSender(),2,0); + testBatchingResultSender(new TestArrayResultSender(),3,0); + testBatchingResultSender(new TestArrayResultSender(),50,0); + } + + @Test + public void arrayChunkingIsCorrect() { testBatchingResultSender(new TestArrayResultSender(),1); - testBatchingResultSender(new TestArrayResultSender(),0); testBatchingResultSender(new TestArrayResultSender(),9); testBatchingResultSender(new TestArrayResultSender(),10,99); testBatchingResultSender(new TestArrayResultSender(),10,100); testBatchingResultSender(new TestArrayResultSender(),10,101); testBatchingResultSender(new TestArrayResultSender(),1000); - - testBatchingResultSender(new TestArrayResultSender(),2,0); - testBatchingResultSender(new TestArrayResultSender(),0,0); } + @Test + public void listChunkingIsCorrectWhenBatchSizeIsZero() { + + testBatchingResultSender(new TestListResultSender(),0); // Default result set size is 100 + testBatchingResultSender(new TestListResultSender(),0, 1); + testBatchingResultSender(new TestListResultSender(),0, 2); + } @Test - public void testListChunking() { + public void listChunkingIsCorrectWhenBothBatchSizeAndResultSetSizeAreZero() { + + testBatchingResultSender(new TestListResultSender(),0,0); + } + + @Test + public void listChunkingIsCorrectWhenResultSetSizeIsZero() { + + testBatchingResultSender(new TestListResultSender(),1,0); + testBatchingResultSender(new TestListResultSender(),2,0); + testBatchingResultSender(new TestListResultSender(),3,0); + testBatchingResultSender(new TestListResultSender(),50,0); + } + + @Test + public void listChunkingIsCorrect() { + testBatchingResultSender(new TestListResultSender(),1); - testBatchingResultSender(new TestListResultSender(),0); testBatchingResultSender(new TestListResultSender(),9); testBatchingResultSender(new TestListResultSender(),10,99); testBatchingResultSender(new TestListResultSender(),10,100); testBatchingResultSender(new TestListResultSender(),10,101); testBatchingResultSender(new TestListResultSender(),1000); - - testBatchingResultSender(new TestListResultSender(),3,0); - testBatchingResultSender(new TestListResultSender(),0,0); } - private void testBatchingResultSender(AbstractTestResultSender resultSender, int batchSize,int resultSetSize){ - BatchingResultSender brs = new BatchingResultSender(batchSize, resultSender); + private void testBatchingResultSender(AbstractTestResultSender resultSender, int batchSize, int resultSetSize){ + + BatchingResultSender batchResultSender = new BatchingResultSender(batchSize, resultSender); List result = new ArrayList<>(); - for (int i = 0; i< resultSetSize; i++) { - result.add(i); - } - //TODO: Clean this up. Ok for test code + + IntStream.range(0, resultSetSize).forEach(result::add); + if (resultSender instanceof TestArrayResultSender) { - brs.sendArrayResults(result.toArray(new Integer[resultSetSize])); + batchResultSender.sendArrayResults(result.toArray(new Integer[resultSetSize])); } else { - brs.sendResults(result); + batchResultSender.sendResults(result); } - assertEquals(resultSetSize,resultSender.getResults().size()); - - assertTrue(resultSender.isLastResultSent()); - - for(int i=0; i< resultSetSize; i++) { - assertEquals(i,resultSender.getResults().get(i)); - } + assertThat(resultSender.isLastResultSent()).isTrue(); + assertThat(resultSender.getResults()).hasSize(resultSetSize); + IntStream.range(0, resultSetSize).forEach(index -> + assertThat(resultSender.getResults().get(index)).isEqualTo(index)); } private void testBatchingResultSender(AbstractTestResultSender resultSender, int batchSize){ @@ -89,43 +192,34 @@ public class BatchingResultSenderTest { } public static abstract class AbstractTestResultSender implements ResultSender { - private List results = new ArrayList(); + private boolean lastResultSent = false; + private List results = new ArrayList<>(); + public boolean isLastResultSent() { - return lastResultSent; + return this.lastResultSent; } - /* (non-Javadoc) - * @see org.apache.geode.cache.execute.ResultSender#lastResult(java.lang.Object) - */ @Override - public void lastResult(Object arg0) { - lastResultSent = true; - if (arg0 == null) { - return; - } - addResults(arg0, results); + public void lastResult(Object result) { + + this.lastResultSent = true; + + Optional.ofNullable(result) + .ifPresent(it -> addResults(it, this.results)); } - /* (non-Javadoc) - * @see org.apache.geode.cache.execute.ResultSender#sendException(java.lang.Throwable) - */ @Override - public void sendException(Throwable arg0) { - fail(); - + public void sendException(Throwable cause) { + Assertions.fail("Function send result operation failed", cause); } - /* (non-Javadoc) - * @see org.apache.geode.cache.execute.ResultSender#sendResult(java.lang.Object) - */ @Override - public void sendResult(Object arg0) { - if (arg0 == null) { - return; - } - addResults(arg0, results); + public void sendResult(Object result) { + + Optional.ofNullable(result) + .ifPresent(it -> addResults(it, this.results)); } protected abstract void addResults(Object item, List results); @@ -133,28 +227,28 @@ public class BatchingResultSenderTest { public List getResults() { return this.results; } - - } public static class TestArrayResultSender extends AbstractTestResultSender { - protected void addResults(Object arg0, List results) { - assertTrue(arg0.getClass().isArray()); - Object[] array = (Object[]) arg0; - for (Object obj: array) { - results.add(obj); - } + protected void addResults(Object result, List results) { + + assertThat(result.getClass().isArray()).isTrue(); + + Object[] array = (Object[]) result; + + Collections.addAll(results, array); } } public static class TestListResultSender extends AbstractTestResultSender { - protected void addResults(Object arg0, List results) { - if (arg0 == null) { - return; - } - assertTrue(arg0 instanceof Collection); - Collection list = (Collection) arg0; + + protected void addResults(Object result, List results) { + + assertThat(result).isInstanceOf(Collection.class); + + Collection list = (Collection) result; + results.addAll(list); } }