DATAGEODE-151 - BatchingResultSender does not handle empty collections.
The current o.s.d.gemfire.function.BatchingResultSendera does not handle size/length 0 arrays or collections when batch size is larger than 0. Also fixed issue where lastResult was not sent with arrays, do to logic issue, when the lastResult is to be sent.
This commit is contained in:
@@ -12,159 +12,164 @@
|
||||
*/
|
||||
package org.springframework.data.gemfire.function;
|
||||
|
||||
import org.apache.geode.cache.execute.ResultSender;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.geode.cache.execute.ResultSender;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Sends collection results using a {@link ResultSender} in chunks determined by batchSize
|
||||
*
|
||||
* @author David Turanski
|
||||
* @author Udo Kohlmeyer
|
||||
* @since 1.3.0
|
||||
*/
|
||||
class BatchingResultSender {
|
||||
private final int batchSize;
|
||||
private ResultSender<Object> resultSender;
|
||||
class BatchingResultSender {
|
||||
private final int batchSize;
|
||||
private ResultSender<Object> resultSender;
|
||||
|
||||
public BatchingResultSender(int batchSize, ResultSender<Object> resultSender) {
|
||||
Assert.notNull(resultSender, "resultSender cannot be null");
|
||||
Assert.isTrue(batchSize >= 0, "batchSize must be >= 0");
|
||||
this.batchSize = batchSize;
|
||||
this.resultSender = resultSender;
|
||||
}
|
||||
public BatchingResultSender(int batchSize, ResultSender<Object> resultSender) {
|
||||
Assert.notNull(resultSender, "resultSender cannot be null");
|
||||
Assert.isTrue(batchSize >= 0, "batchSize must be >= 0");
|
||||
this.batchSize = batchSize;
|
||||
this.resultSender = resultSender;
|
||||
}
|
||||
|
||||
|
||||
public void sendResults(Iterable<?> result) {
|
||||
if (batchSize == 0) {
|
||||
resultSender.lastResult(result);
|
||||
return;
|
||||
}
|
||||
public void sendResults(Iterable<?> result) {
|
||||
//Handle both batchSize == 0 or empty Iterable
|
||||
if (batchSize == 0 || !result.iterator().hasNext()) {
|
||||
resultSender.lastResult(result);
|
||||
return;
|
||||
}
|
||||
|
||||
List<Object> chunk = new ArrayList<Object>(batchSize);
|
||||
List<Object> chunk = new ArrayList<Object>(batchSize);
|
||||
|
||||
for (Iterator<?> it = result.iterator(); it.hasNext();) {
|
||||
if (chunk.size() < batchSize) {
|
||||
chunk.add(it.next());
|
||||
}
|
||||
for (Iterator<?> it = result.iterator(); it.hasNext(); ) {
|
||||
if (chunk.size() < batchSize) {
|
||||
chunk.add(it.next());
|
||||
}
|
||||
|
||||
if (chunk.size() == batchSize || !it.hasNext()) {
|
||||
if (chunk.size() == batchSize || !it.hasNext()) {
|
||||
if (it.hasNext()) {
|
||||
resultSender.sendResult(chunk);
|
||||
resultSender.sendResult(chunk);
|
||||
} else {
|
||||
resultSender.lastResult(chunk);
|
||||
resultSender.lastResult(chunk);
|
||||
}
|
||||
chunk.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
chunk.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void sendArrayResults(Object result) {
|
||||
public void sendArrayResults(Object result) {
|
||||
Assert.isTrue(ObjectUtils.isArray(result));
|
||||
|
||||
if (batchSize == 0) {
|
||||
resultSender.lastResult(result);
|
||||
return;
|
||||
}
|
||||
if (batchSize == 0) {
|
||||
resultSender.lastResult(result);
|
||||
return;
|
||||
}
|
||||
|
||||
Assert.isTrue(ObjectUtils.isArray(result));
|
||||
int length = Array.getLength(result);
|
||||
|
||||
int length = Array.getLength(result);
|
||||
if (length == 0) {
|
||||
resultSender.lastResult(result);
|
||||
return;
|
||||
}
|
||||
|
||||
for (int from =0; from < length; from += batchSize) {
|
||||
int to = Math.min(length,from + batchSize);
|
||||
Object chunk = copyOfRange(result,from, to);
|
||||
for (int from = 0; from < length; from += batchSize) {
|
||||
int to = Math.min(length, from + batchSize);
|
||||
Object chunk = copyOfRange(result, from, to);
|
||||
|
||||
if (to == length -1) {
|
||||
resultSender.lastResult(chunk);
|
||||
} else {
|
||||
resultSender.sendResult(chunk);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (to == length) {
|
||||
resultSender.lastResult(chunk);
|
||||
} else {
|
||||
resultSender.sendResult(chunk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param result
|
||||
* @param from
|
||||
* @param to
|
||||
* @return
|
||||
*/
|
||||
private Object copyOfRange(Object result, int from, int to) {
|
||||
/**
|
||||
* @param result
|
||||
* @param from
|
||||
* @param to
|
||||
* @return
|
||||
*/
|
||||
private Object copyOfRange(Object result, int from, int to) {
|
||||
Class<?> arrayClass = result.getClass();
|
||||
int size = to - from;
|
||||
|
||||
Class<?> arrayClass = result.getClass();
|
||||
int size = to - from;
|
||||
|
||||
if (int[].class.isAssignableFrom(arrayClass)) {
|
||||
int[] array = new int[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (int[].class.isAssignableFrom(arrayClass)) {
|
||||
int[] array = new int[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getInt(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
if (float[].class.isAssignableFrom(arrayClass)) {
|
||||
float[] array = new float[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (float[].class.isAssignableFrom(arrayClass)) {
|
||||
float[] array = new float[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getFloat(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
if (double[].class.isAssignableFrom(arrayClass)) {
|
||||
double[] array = new double[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (double[].class.isAssignableFrom(arrayClass)) {
|
||||
double[] array = new double[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getDouble(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
if (boolean[].class.isAssignableFrom(arrayClass)) {
|
||||
boolean[] array = new boolean[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (boolean[].class.isAssignableFrom(arrayClass)) {
|
||||
boolean[] array = new boolean[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getBoolean(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
if (byte[].class.isAssignableFrom(arrayClass)) {
|
||||
byte[] array = new byte[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (byte[].class.isAssignableFrom(arrayClass)) {
|
||||
byte[] array = new byte[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getByte(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
if (short[].class.isAssignableFrom(arrayClass)) {
|
||||
short[] array = new short[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (short[].class.isAssignableFrom(arrayClass)) {
|
||||
short[] array = new short[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getShort(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
if (long[].class.isAssignableFrom(arrayClass)) {
|
||||
long[] array = new long[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (long[].class.isAssignableFrom(arrayClass)) {
|
||||
long[] array = new long[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getLong(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
if (char[].class.isAssignableFrom(arrayClass)) {
|
||||
char[] array = new char[size];
|
||||
for(int i = 0; i < size ; ++i){
|
||||
if (char[].class.isAssignableFrom(arrayClass)) {
|
||||
char[] array = new char[size];
|
||||
for (int i = 0; i < size; ++i) {
|
||||
array[i] = Array.getChar(result, from + i);
|
||||
}
|
||||
return array;
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
return Arrays.copyOfRange((Object[])result, from, to);
|
||||
return Arrays.copyOfRange((Object[]) result, from, to);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.junit.Test;
|
||||
|
||||
/**
|
||||
* @author David Turanski
|
||||
* @author Udo Kohlmeyer
|
||||
*
|
||||
*/
|
||||
public class BatchingResultSenderTest {
|
||||
@@ -35,8 +36,13 @@ public class BatchingResultSenderTest {
|
||||
testBatchingResultSender(new TestArrayResultSender(),1);
|
||||
testBatchingResultSender(new TestArrayResultSender(),0);
|
||||
testBatchingResultSender(new TestArrayResultSender(),9);
|
||||
testBatchingResultSender(new TestArrayResultSender(),10);
|
||||
testBatchingResultSender(new TestArrayResultSender(),10,99);
|
||||
testBatchingResultSender(new TestArrayResultSender(),10,100);
|
||||
testBatchingResultSender(new TestArrayResultSender(),10,101);
|
||||
testBatchingResultSender(new TestArrayResultSender(),1000);
|
||||
|
||||
testBatchingResultSender(new TestArrayResultSender(),2,0);
|
||||
testBatchingResultSender(new TestArrayResultSender(),0,0);
|
||||
}
|
||||
|
||||
|
||||
@@ -45,40 +51,57 @@ public class BatchingResultSenderTest {
|
||||
testBatchingResultSender(new TestListResultSender(),1);
|
||||
testBatchingResultSender(new TestListResultSender(),0);
|
||||
testBatchingResultSender(new TestListResultSender(),9);
|
||||
testBatchingResultSender(new TestListResultSender(),10);
|
||||
testBatchingResultSender(new TestListResultSender(),10,99);
|
||||
testBatchingResultSender(new TestListResultSender(),10,100);
|
||||
testBatchingResultSender(new TestListResultSender(),10,101);
|
||||
testBatchingResultSender(new TestListResultSender(),1000);
|
||||
|
||||
testBatchingResultSender(new TestListResultSender(),3,0);
|
||||
testBatchingResultSender(new TestListResultSender(),0,0);
|
||||
}
|
||||
|
||||
private void testBatchingResultSender(AbstractTestResultSender resultSender, int batchSize,int resultSetSize){
|
||||
BatchingResultSender brs = new BatchingResultSender(batchSize, resultSender);
|
||||
|
||||
List<Integer> result = new ArrayList<>();
|
||||
for (int i = 0; i< resultSetSize; i++) {
|
||||
result.add(i);
|
||||
}
|
||||
//TODO: Clean this up. Ok for test code
|
||||
if (resultSender instanceof TestArrayResultSender) {
|
||||
brs.sendArrayResults(result.toArray(new Integer[resultSetSize]));
|
||||
} else {
|
||||
brs.sendResults(result);
|
||||
}
|
||||
|
||||
assertEquals(resultSetSize,resultSender.getResults().size());
|
||||
|
||||
assertTrue(resultSender.isLastResultSent());
|
||||
|
||||
for(int i=0; i< resultSetSize; i++) {
|
||||
assertEquals(i,resultSender.getResults().get(i));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void testBatchingResultSender(AbstractTestResultSender resultSender, int batchSize){
|
||||
BatchingResultSender brs = new BatchingResultSender(batchSize, resultSender);
|
||||
|
||||
List<Integer> result = new ArrayList<Integer>();
|
||||
for (int i = 0; i< 100; i++) {
|
||||
result.add(i);
|
||||
}
|
||||
//TODO: Clean this up. Ok for test code
|
||||
if (resultSender instanceof TestArrayResultSender) {
|
||||
brs.sendArrayResults(result.toArray(new Integer[100]));
|
||||
} else {
|
||||
brs.sendResults(result);
|
||||
}
|
||||
|
||||
assertEquals(100,resultSender.getResults().size());
|
||||
|
||||
for(int i=0; i< 100; i++) {
|
||||
assertEquals(i,resultSender.getResults().get(i));
|
||||
}
|
||||
|
||||
testBatchingResultSender(resultSender,batchSize,100);
|
||||
}
|
||||
|
||||
public static abstract class AbstractTestResultSender implements ResultSender<Object> {
|
||||
private List<Object> results = new ArrayList<Object>();
|
||||
private boolean lastResultSent = false;
|
||||
|
||||
/* (non-Javadoc)
|
||||
public boolean isLastResultSent() {
|
||||
return lastResultSent;
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.geode.cache.execute.ResultSender#lastResult(java.lang.Object)
|
||||
*/
|
||||
@Override
|
||||
public void lastResult(Object arg0) {
|
||||
lastResultSent = true;
|
||||
if (arg0 == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user