diff --git a/spring-batch-bigquery/pom.xml b/spring-batch-bigquery/pom.xml
index 01a1d32..079d5f0 100644
--- a/spring-batch-bigquery/pom.xml
+++ b/spring-batch-bigquery/pom.xml
@@ -64,11 +64,21 @@
google-cloud-bigquery
1.133.0
+
+ org.apache.avro
+ avro
+ 1.10.2
+
com.fasterxml.jackson.dataformat
jackson-dataformat-csv
2.12.3
+
+ org.apache.hadoop
+ hadoop-common
+ 3.3.0
+
org.apache.commons
diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java
index 9041a62..246acd3 100644
--- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java
+++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java
@@ -18,8 +18,13 @@ package org.springframework.batch.extensions.bigquery;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -42,15 +47,23 @@ import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.Iterables;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.convert.converter.Converter;
+import org.springframework.core.io.FileSystemResource;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@@ -146,10 +159,15 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean {
/** Actual type of incoming data can be obtained only in runtime */
private synchronized void initializeProperties(List extends T> items) {
if (Objects.isNull(this.itemClass)) {
- if (isCsv() || isJson()) {
+ if (isAvro() || isCsv() || isJson()) {
T firstItem = items.stream().findFirst().orElseThrow(RuntimeException::new);
this.itemClass = firstItem.getClass();
+ if (isAvro()) {
+ boolean isAvroClass = SpecificRecordBase.class.isAssignableFrom(this.itemClass);
+ Assert.isTrue(isAvroClass, "Avro class expected");
+ }
+
if (Objects.isNull(this.rowMapper)) {
if (isCsv()) {
this.objectWriter = new CsvMapper().writerWithTypedSchemaFor(this.itemClass);
@@ -164,7 +182,9 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean {
}
}
- private ByteBuffer mapDataToBigQueryFormat(List extends T> items) throws IOException {
+ private ByteBuffer mapDataToBigQueryFormat(List extends T> items)
+ throws IOException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
+
ByteBuffer byteBuffer;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
@@ -211,7 +231,9 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean {
}
}
- private List convertObjectsToByteArrays(List extends T> items) {
+ private List convertObjectsToByteArrays(List extends T> items)
+ throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+
Stream byteArrayStream = Stream.empty();
if (isJson()) {
@@ -222,6 +244,8 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean {
}
else if (isParquet() || isOrc() || isAvro()) {
throw new UnsupportedOperationException("Not supported right now");
+ /*byteArrayStream = getHadoopPathByteArrayStream(items);*/
+ /*byteArrayStream = getAvroByteArrayStream(items);*/
}
return byteArrayStream.collect(Collectors.toList());
@@ -274,6 +298,44 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean {
.map(row -> row.getBytes(StandardCharsets.UTF_8));
}
+ /**
+ * Generates Avro file and writes it to {@link OutputStream}.
+ * @see Avro example
+ */
+ private Stream getAvroByteArrayStream(List extends T> items)
+ throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
+
+ SpecificRecordBase objectInstance = (SpecificRecordBase) this.itemClass.getDeclaredConstructor().newInstance();
+ SpecificDatumWriter super T> avroWriter = new SpecificDatumWriter<>(this.itemClass);
+
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataFileWriter super T> avroFileWriter = new DataFileWriter<>(avroWriter)) {
+
+ /*
+ * Input data - 500 rows.
+ * Statistic gathered from BigQuery (com.google.cloud.bigquery.JobStatistics.LoadStatistics#getInputBytes).
+ *
+ * 41,229 input bytes Avro (no codec)
+ * 14,691 input bytes Avro (Deflate lvl 8)
+ * 41,122 input bytes CSV
+ */
+ CodecFactory compressionCodec = CodecFactory.deflateCodec(8);
+
+ /* Order of lines (code) should not be changed */
+ avroFileWriter.setCodec(compressionCodec);
+ avroFileWriter.create(objectInstance.getSchema(), outputStream);
+
+ for (T item : items) {
+ avroFileWriter.append(item);
+ }
+
+ /* At this point of time only schema present in output stream */
+ avroFileWriter.flush();
+
+ return Stream.of(outputStream.toByteArray());
+ }
+ }
+
/**
* @return {@link TableDataWriteChannel} that should be closed manually.
* @see Examples
@@ -389,4 +451,39 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean {
}
}
+ /**
+ * It is expected that for {@link BigQueryItemWriter#isParquet()} and {@link BigQueryItemWriter#isOrc()} you use
+ * {@link Path} because there is no convenient way to write records to {@link OutputStream}.
+ * Not supported right now.
+ */
+ private Stream getHadoopPathByteArrayStream(List extends T> items) throws IOException {
+ Stream result = Stream.empty();
+
+ T firstElement = Iterables.getFirst(items, null);
+ Assert.notNull(firstElement, "Collection is empty");
+ Class> classType = firstElement.getClass();
+
+ if (classType.isAssignableFrom(Path.class)) {
+ List uris = items.stream()
+ .map(Path.class::cast)
+ .map(Path::toUri)
+ .collect(Collectors.toList());
+
+ return getFileBasedByteArrayStream(uris);
+ }
+
+ return result;
+ }
+
+ private Stream getFileBasedByteArrayStream(List items) throws IOException {
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ for (URI uri : items) {
+ try (InputStream inputStream = new FileSystemResource(Paths.get(uri)).getInputStream()) {
+ IOUtils.copy(inputStream, outputStream);
+ }
+ }
+ return Stream.of(outputStream.toByteArray());
+ }
+ }
+
}
diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java
index 556bef5..ce12824 100644
--- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java
+++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java
@@ -30,6 +30,7 @@ import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -136,6 +137,29 @@ class BigQueryItemWriterBuilderTests {
Assertions.assertNotNull(writer);
}
+ /**
+ * Example how Apache Avro writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation.
+ */
+ @Test
+ @Disabled("Not yet implemented")
+ void testAvroWriter() {
+ BigQuery mockedBigQuery = prepareMockedBigQuery();
+
+ WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
+ .newBuilder(TableId.of(DATASET_NAME, "avro_table"))
+ .setFormatOptions(FormatOptions.avro())
+ .build();
+
+ BigQueryItemWriter writer = new BigQueryItemWriterBuilder()
+ .bigQuery(mockedBigQuery)
+ .writeChannelConfig(writeConfiguration)
+ .build();
+
+ writer.afterPropertiesSet();
+
+ Assertions.assertNotNull(writer);
+ }
+
private byte[] convertDtoToJsonByteArray(ObjectMapper objectMapper, PersonDto dto) {
try {
return objectMapper.writeValueAsBytes(dto);