Preparation for Avro support

This commit is contained in:
Vova Perebykivskyi
2021-06-13 15:00:21 +03:00
committed by Vova Perebykivskyi
parent e1d5fc5a4a
commit c53c0f9694
3 changed files with 134 additions and 3 deletions

View File

@@ -64,11 +64,21 @@
<artifactId>google-cloud-bigquery</artifactId>
<version>1.133.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@@ -18,8 +18,13 @@ package org.springframework.batch.extensions.bigquery;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -42,15 +47,23 @@ import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.collect.Iterables;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.io.FileSystemResource;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@@ -146,10 +159,15 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
/** Actual type of incoming data can be obtained only in runtime */
private synchronized void initializeProperties(List<? extends T> items) {
if (Objects.isNull(this.itemClass)) {
if (isCsv() || isJson()) {
if (isAvro() || isCsv() || isJson()) {
T firstItem = items.stream().findFirst().orElseThrow(RuntimeException::new);
this.itemClass = firstItem.getClass();
if (isAvro()) {
boolean isAvroClass = SpecificRecordBase.class.isAssignableFrom(this.itemClass);
Assert.isTrue(isAvroClass, "Avro class expected");
}
if (Objects.isNull(this.rowMapper)) {
if (isCsv()) {
this.objectWriter = new CsvMapper().writerWithTypedSchemaFor(this.itemClass);
@@ -164,7 +182,9 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
}
}
private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items) throws IOException {
private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items)
throws IOException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
ByteBuffer byteBuffer;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
@@ -211,7 +231,9 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
}
}
private List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
private List<byte[]> convertObjectsToByteArrays(List<? extends T> items)
throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
Stream<byte[]> byteArrayStream = Stream.empty();
if (isJson()) {
@@ -222,6 +244,8 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
}
else if (isParquet() || isOrc() || isAvro()) {
throw new UnsupportedOperationException("Not supported right now");
/*byteArrayStream = getHadoopPathByteArrayStream(items);*/
/*byteArrayStream = getAvroByteArrayStream(items);*/
}
return byteArrayStream.collect(Collectors.toList());
@@ -274,6 +298,44 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
.map(row -> row.getBytes(StandardCharsets.UTF_8));
}
/**
* Generates Avro file and writes it to {@link OutputStream}.
* @see <a href="https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample">Avro example</a>
*/
private Stream<byte[]> getAvroByteArrayStream(List<? extends T> items)
throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
SpecificRecordBase objectInstance = (SpecificRecordBase) this.itemClass.getDeclaredConstructor().newInstance();
SpecificDatumWriter<? super T> avroWriter = new SpecificDatumWriter<>(this.itemClass);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataFileWriter<? super T> avroFileWriter = new DataFileWriter<>(avroWriter)) {
/*
* Input data - 500 rows.
* Statistic gathered from BigQuery (com.google.cloud.bigquery.JobStatistics.LoadStatistics#getInputBytes).
*
* 41,229 input bytes Avro (no codec)
* 14,691 input bytes Avro (Deflate lvl 8)
* 41,122 input bytes CSV
*/
CodecFactory compressionCodec = CodecFactory.deflateCodec(8);
/* Order of lines (code) should not be changed */
avroFileWriter.setCodec(compressionCodec);
avroFileWriter.create(objectInstance.getSchema(), outputStream);
for (T item : items) {
avroFileWriter.append(item);
}
/* At this point of time only schema present in output stream */
avroFileWriter.flush();
return Stream.of(outputStream.toByteArray());
}
}
/**
* @return {@link TableDataWriteChannel} that should be closed manually.
* @see <a href="https://github.com/googleapis/google-cloud-java/blob/969bbeef18f004fd51fd46c5def1ae5c644cae3c/google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/BigQuerySnippets.java">Examples</a>
@@ -389,4 +451,39 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
}
}
/**
* It is expected that for {@link BigQueryItemWriter#isParquet()} and {@link BigQueryItemWriter#isOrc()} you use
* {@link Path} because there is no convenient way to write records to {@link OutputStream}.
* Not supported right now.
*/
private Stream<byte[]> getHadoopPathByteArrayStream(List<? extends T> items) throws IOException {
Stream<byte[]> result = Stream.empty();
T firstElement = Iterables.getFirst(items, null);
Assert.notNull(firstElement, "Collection is empty");
Class<?> classType = firstElement.getClass();
if (classType.isAssignableFrom(Path.class)) {
List<URI> uris = items.stream()
.map(Path.class::cast)
.map(Path::toUri)
.collect(Collectors.toList());
return getFileBasedByteArrayStream(uris);
}
return result;
}
private Stream<byte[]> getFileBasedByteArrayStream(List<URI> items) throws IOException {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
for (URI uri : items) {
try (InputStream inputStream = new FileSystemResource(Paths.get(uri)).getInputStream()) {
IOUtils.copy(inputStream, outputStream);
}
}
return Stream.of(outputStream.toByteArray());
}
}
}

View File

@@ -30,6 +30,7 @@ import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -136,6 +137,29 @@ class BigQueryItemWriterBuilderTests {
Assertions.assertNotNull(writer);
}
/**
* Example how Apache Avro writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation.
*/
@Test
@Disabled("Not yet implemented")
void testAvroWriter() {
BigQuery mockedBigQuery = prepareMockedBigQuery();
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of(DATASET_NAME, "avro_table"))
.setFormatOptions(FormatOptions.avro())
.build();
BigQueryItemWriter<PersonDto> writer = new BigQueryItemWriterBuilder<PersonDto>()
.bigQuery(mockedBigQuery)
.writeChannelConfig(writeConfiguration)
.build();
writer.afterPropertiesSet();
Assertions.assertNotNull(writer);
}
private byte[] convertDtoToJsonByteArray(ObjectMapper objectMapper, PersonDto dto) {
try {
return objectMapper.writeValueAsBytes(dto);