[bq] improve tests coverage
Signed-off-by: Volodymyr Perebykivskyi <vova235@gmail.com>
This commit is contained in:
@@ -123,8 +123,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T>, Initia
|
||||
this.logger.debug(String.format("Mapping %d elements", items.size()));
|
||||
}
|
||||
|
||||
final ByteBuffer byteBuffer = mapDataToBigQueryFormat(items);
|
||||
doWriteDataToBigQuery(byteBuffer);
|
||||
doWriteDataToBigQuery(mapDataToBigQueryFormat(items));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -63,15 +63,6 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Row mapper which transforms single BigQuery row into a desired type.
|
||||
*
|
||||
* @param rowMapper your row mapper
|
||||
*/
|
||||
public void setRowMapper(Converter<T, byte[]> rowMapper) {
|
||||
this.rowMapper = rowMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
|
||||
return items
|
||||
@@ -105,6 +96,15 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Row mapper which transforms single BigQuery row into a desired type.
|
||||
*
|
||||
* @param rowMapper your row mapper
|
||||
*/
|
||||
public void setRowMapper(Converter<T, byte[]> rowMapper) {
|
||||
this.rowMapper = rowMapper;
|
||||
}
|
||||
|
||||
private byte[] mapItemToCsv(T t) {
|
||||
try {
|
||||
return rowMapper == null ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t);
|
||||
|
||||
@@ -46,15 +46,6 @@ public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> {
|
||||
// Unused
|
||||
}
|
||||
|
||||
/**
|
||||
* Converter that transforms a single row into a {@link String}.
|
||||
*
|
||||
* @param marshaller your JSON mapper
|
||||
*/
|
||||
public void setMarshaller(JsonObjectMarshaller<T> marshaller) {
|
||||
this.marshaller = marshaller;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
|
||||
return items
|
||||
@@ -91,6 +82,15 @@ public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> {
|
||||
Assert.isTrue(Objects.equals(format, super.writeChannelConfig.getFormat()), "Only %s format is allowed".formatted(format));
|
||||
}
|
||||
|
||||
/**
|
||||
* Converter that transforms a single row into a {@link String}.
|
||||
*
|
||||
* @param marshaller your JSON mapper
|
||||
*/
|
||||
public void setMarshaller(JsonObjectMarshaller<T> marshaller) {
|
||||
this.marshaller = marshaller;
|
||||
}
|
||||
|
||||
/**
|
||||
* BigQuery uses <a href="https://github.com/ndjson/ndjson-spec">ndjson</a>.
|
||||
* It is expected that to pass here JSON line generated by
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto;
|
||||
import org.springframework.batch.extensions.bigquery.common.TestConstants;
|
||||
import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest;
|
||||
import org.springframework.batch.extensions.bigquery.writer.BigQueryBaseItemWriter;
|
||||
import org.springframework.batch.extensions.bigquery.writer.BigQueryItemWriterException;
|
||||
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.nio.ByteBuffer;
|
||||
@@ -144,6 +145,46 @@ class BigQueryBaseItemWriterTest extends AbstractBigQueryTest {
|
||||
Mockito.verifyNoMoreInteractions(channel);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testWrite_Exception() throws Exception {
|
||||
MethodHandles.Lookup handle = MethodHandles.privateLookupIn(BigQueryBaseItemWriter.class, MethodHandles.lookup());
|
||||
AtomicBoolean consumerCalled = new AtomicBoolean();
|
||||
|
||||
Job job = Mockito.mock(Job.class);
|
||||
Mockito.when(job.getJobId()).thenReturn(JobId.newBuilder().build());
|
||||
|
||||
TableDataWriteChannel channel = Mockito.mock(TableDataWriteChannel.class);
|
||||
Mockito.when(channel.getJob()).thenReturn(job);
|
||||
Mockito.when(channel.write(Mockito.any(ByteBuffer.class))).thenThrow(BigQueryException.class);
|
||||
|
||||
BigQuery bigQuery = prepareMockedBigQuery();
|
||||
Mockito.when(bigQuery.writer(Mockito.any(WriteChannelConfiguration.class))).thenReturn(channel);
|
||||
|
||||
TestWriter writer = new TestWriter();
|
||||
writer.setBigQuery(bigQuery);
|
||||
writer.setJobConsumer(j -> consumerCalled.set(true));
|
||||
writer.setWriteChannelConfig(WriteChannelConfiguration.of(TABLE_ID));
|
||||
|
||||
BigQueryItemWriterException actual = Assertions.assertThrows(BigQueryItemWriterException.class, () -> writer.write(TestConstants.CHUNK));
|
||||
Assertions.assertEquals("Error on write happened", actual.getMessage());
|
||||
|
||||
AtomicLong actualCounter = (AtomicLong) handle
|
||||
.findVarHandle(BigQueryBaseItemWriter.class, "bigQueryWriteCounter", AtomicLong.class)
|
||||
.get(writer);
|
||||
|
||||
boolean writeFailed = (Boolean) handle
|
||||
.findVarHandle(BigQueryBaseItemWriter.class, "writeFailed", boolean.class)
|
||||
.get(writer);
|
||||
|
||||
Assertions.assertEquals(0L, actualCounter.get());
|
||||
Assertions.assertTrue(writeFailed);
|
||||
Assertions.assertFalse(consumerCalled.get());
|
||||
|
||||
Mockito.verify(channel).write(Mockito.any(ByteBuffer.class));
|
||||
Mockito.verify(channel).close();
|
||||
Mockito.verifyNoMoreInteractions(channel);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testBaseAfterPropertiesSet_Exception() {
|
||||
TestWriter writer = new TestWriter();
|
||||
|
||||
@@ -16,8 +16,10 @@
|
||||
|
||||
package org.springframework.batch.extensions.bigquery.unit.writer;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
import com.fasterxml.jackson.dataformat.csv.CsvFactory;
|
||||
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
|
||||
import com.google.cloud.bigquery.BigQuery;
|
||||
import com.google.cloud.bigquery.Field;
|
||||
import com.google.cloud.bigquery.FormatOptions;
|
||||
@@ -83,14 +85,38 @@ class BigQueryCsvItemWriterTest extends AbstractBigQueryTest {
|
||||
@Test
|
||||
void testConvertObjectsToByteArrays() {
|
||||
TestWriter writer = new TestWriter();
|
||||
List<PersonDto> items = TestConstants.CHUNK.getItems();
|
||||
|
||||
// Empty
|
||||
Assertions.assertTrue(writer.testConvert(List.of()).isEmpty());
|
||||
|
||||
// Not empty
|
||||
// Not empty (row mapper)
|
||||
writer.setRowMapper(source -> source.toString().getBytes());
|
||||
List<byte[]> actual = writer.testConvert(TestConstants.CHUNK.getItems());
|
||||
List<byte[]> expected = TestConstants.CHUNK.getItems().stream().map(PersonDto::toString).map(String::getBytes).toList();
|
||||
List<byte[]> actual = writer.testConvert(items);
|
||||
List<byte[]> expected = items.stream().map(PersonDto::toString).map(String::getBytes).toList();
|
||||
Assertions.assertEquals(expected.size(), actual.size());
|
||||
|
||||
for (int i = 0; i < actual.size(); i++) {
|
||||
Assertions.assertArrayEquals(expected.get(i), actual.get(i));
|
||||
}
|
||||
|
||||
// Not empty (object writer)
|
||||
ObjectWriter csvWriter = new CsvMapper().writerWithTypedSchemaFor(PersonDto.class);
|
||||
writer.setRowMapper(null);
|
||||
writer.testInitializeProperties(items);
|
||||
actual = writer.testConvert(items);
|
||||
|
||||
expected = items
|
||||
.stream()
|
||||
.map(pd -> {
|
||||
try {
|
||||
return csvWriter.writeValueAsBytes(pd);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
})
|
||||
.toList();
|
||||
|
||||
Assertions.assertEquals(expected.size(), actual.size());
|
||||
|
||||
for (int i = 0; i < actual.size(); i++) {
|
||||
|
||||
Reference in New Issue
Block a user