[bq] 0.1 BigQuery preparation

This commit is contained in:
Vova
2021-06-13 14:56:24 +03:00
committed by GitHub
parent e63fe4a249
commit e1d5fc5a4a
5 changed files with 182 additions and 162 deletions

View File

@@ -0,0 +1,38 @@
# spring-batch-bigquery
Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery]. It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].
## Configuration of `BigQueryItemWriter`
Next to the https://docs.spring.io/spring-batch/reference/html/configureJob.html[configuration of Spring Batch] one needs to configure the `BigQueryItemWriter`.
```java
@Bean
BigQueryItemWriter<MyDto> bigQueryCsvWriter() {
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of("csv_dataset", "csv_table"))
.setAutodetect(true)
.setFormatOptions(FormatOptions.csv())
.build();
BigQueryItemWriter<MyDto> writer = new BigQueryItemWriterBuilder<MyDto>()
.bigQuery(mockedBigQuery)
.writeChannelConfig(writeConfiguration)
.build();
}
```
Additional examples could be found in https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java[here].
## Configuration properties
[cols="1,1,4"]
.Properties for item writer
|===
| Property | Required | Description
| `bigQuery` | yes | BigQuery object that provided by BigQuery Java Library. Responsible for connection with BigQuery.
| `writeChannelConfig` | yes | BigQuery write channel config provided by BigQuery Java Library. Responsible for configuring data type, data channel, jobs that will be sent to BigQuery.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to byte array.
| `datasetInfo` | no | Your way to customize to how to create BigQuery dataset.
| `jobConsumer` | no | Your custom handler for BigQuery Job provided by BigQuery Java Library.
|===

View File

@@ -25,6 +25,26 @@
<artifactId>spring-batch-bigquery</artifactId>
<version>0.1.0-SNAPSHOT</version>
<developers>
<developer>
<id>Dgray16</id>
<name>Vova Perebykivskyi</name>
<email>vova235@gmail.com</email>
</developer>
</developers>
<licenses>
<license>
<name>Apache 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/spring-projects/spring-batch-extensions</url>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -36,23 +56,18 @@
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.2</version>
<version>4.3.3</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>1.128.0</version>
<version>1.133.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.0</version>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
@@ -71,19 +86,13 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.12.3</version>
<version>5.7.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.9.0</version>
<version>3.11.1</version>
<scope>test</scope>
</dependency>

View File

@@ -18,13 +18,8 @@ package org.springframework.batch.extensions.bigquery;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -33,6 +28,10 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
@@ -41,24 +40,17 @@ import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.collect.Iterables;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.io.FileSystemResource;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@@ -71,13 +63,14 @@ import org.springframework.util.ObjectUtils;
* <ul>
* <li>JSON</li>
* <li>CSV</li>
* <li>Avro</li>
* </ul>
*
* <p>For example if you generate {@link TableDataWriteChannel} and you {@link TableDataWriteChannel#close()} it,
* there is no guarantee that single {@link com.google.cloud.bigquery.Job} will be created.
*
* <p>It does not support save state feature. It is thread-safe.
* Take into account that BigQuery has rate limits and it is very easy to exceed those in concurrent environment.
* @see <a href="https://cloud.google.com/bigquery/quotas">BigQuery Quotas & Limits</a>
*
* @author Vova Perebykivskyi
* @since 0.1.0
@@ -93,7 +86,8 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
* Used for simple conversion.
*/
private Converter<T, byte[]> rowMapper;
private ObjectWriter objectWriter;
private Class itemClass;
private BigQuery bigQuery;
@@ -138,8 +132,10 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
@Override
public void write(List<? extends T> items) throws Exception {
if (CollectionUtils.isNotEmpty(items)) {
initializeProperties(items);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Mapping data");
this.logger.debug(String.format("Mapping %d elements", items.size()));
}
ByteBuffer byteBuffer = mapDataToBigQueryFormat(items);
@@ -147,9 +143,28 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
}
}
private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items)
throws IOException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
/** Actual type of incoming data can be obtained only in runtime */
private synchronized void initializeProperties(List<? extends T> items) {
if (Objects.isNull(this.itemClass)) {
if (isCsv() || isJson()) {
T firstItem = items.stream().findFirst().orElseThrow(RuntimeException::new);
this.itemClass = firstItem.getClass();
if (Objects.isNull(this.rowMapper)) {
if (isCsv()) {
this.objectWriter = new CsvMapper().writerWithTypedSchemaFor(this.itemClass);
}
else if (isJson()) {
this.objectWriter = new ObjectMapper().writerFor(this.itemClass);
}
}
logger.debug("Writer setup is completed");
}
}
}
private ByteBuffer mapDataToBigQueryFormat(List<? extends T> items) throws IOException {
ByteBuffer byteBuffer;
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
@@ -181,19 +196,22 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
writeChannel = writer;
}
finally {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet());
String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();
if (Objects.nonNull(writeChannel)) {
logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
if (Objects.nonNull(this.jobConsumer)) {
this.jobConsumer.accept(writeChannel.getJob());
}
}
if (Objects.nonNull(writeChannel) && Objects.nonNull(this.jobConsumer)) {
this.jobConsumer.accept(writeChannel.getJob());
if (this.logger.isDebugEnabled()) {
this.logger.debug(logMessage);
}
}
}
private List<byte[]> convertObjectsToByteArrays(List<? extends T> items)
throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
private List<byte[]> convertObjectsToByteArrays(List<? extends T> items) {
Stream<byte[]> byteArrayStream = Stream.empty();
if (isJson()) {
@@ -202,12 +220,8 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
else if (isCsv()) {
byteArrayStream = getCsvByteArrayStream(items);
}
else if (isAvro()) {
byteArrayStream = getAvroByteArrayStream(items);
}
else if (isParquet() || isOrc()) {
else if (isParquet() || isOrc() || isAvro()) {
throw new UnsupportedOperationException("Not supported right now");
/*byteArrayStream = getHadoopPathByteArrayStream(items);*/
}
return byteArrayStream.collect(Collectors.toList());
@@ -219,7 +233,7 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
private Stream<byte[]> getJsonByteArrayStream(List<? extends T> items) {
return items
.stream()
.map(this.rowMapper::convert)
.map(this::mapItemToCsvOrJson)
.filter(ArrayUtils::isNotEmpty)
.map(String::new)
.map(this::convertToNdJson)
@@ -227,6 +241,17 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
.map(row -> row.getBytes(StandardCharsets.UTF_8));
}
private byte[] mapItemToCsvOrJson(T t) {
byte[] result = null;
try {
result = Objects.isNull(rowMapper) ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t);
}
catch (JsonProcessingException e) {
logger.error("Error during processing of the line: ", e);
}
return result;
}
/**
* BigQuery uses ndjson https://github.com/ndjson/ndjson-spec.
* It is expected that to pass here JSON line generated by
@@ -242,56 +267,13 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
private Stream<byte[]> getCsvByteArrayStream(List<? extends T> items) {
return items
.stream()
.map(this.rowMapper::convert)
.map(this::mapItemToCsvOrJson)
.filter(ArrayUtils::isNotEmpty)
.map(String::new)
.filter(value -> !ObjectUtils.isEmpty(value))
.map(row -> row.getBytes(StandardCharsets.UTF_8));
}
/**
* Generates Avro file and writes it to {@link OutputStream}.
* @see <a href="https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample">Avro example</a>
*/
private Stream<byte[]> getAvroByteArrayStream(List<? extends T> items)
throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
T firstElement = Iterables.getFirst(items, null);
Assert.notNull(firstElement, "Collection is empty");
Class classType = firstElement.getClass();
SpecificRecordBase objectInstance = (SpecificRecordBase) classType.getDeclaredConstructor().newInstance();
SpecificDatumWriter<? super T> avroWriter = new SpecificDatumWriter<>(classType);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataFileWriter<? super T> avroFileWriter = new DataFileWriter<>(avroWriter)) {
/*
* Input data - 500 rows.
* Statistic gathered from BigQuery (com.google.cloud.bigquery.JobStatistics.LoadStatistics#getInputBytes).
*
* 41,229 input bytes Avro (no codec)
* 14,691 input bytes Avro (Deflate lvl 8)
* 41,122 input bytes CSV
*/
CodecFactory compressionCodec = CodecFactory.deflateCodec(8);
/* Order of lines (code) should not be changed */
avroFileWriter.setCodec(compressionCodec);
avroFileWriter.create(objectInstance.getSchema(), outputStream);
for (T item : items) {
avroFileWriter.append(item);
}
/* At this point of time only schema present in output stream */
avroFileWriter.flush();
return Stream.of(outputStream.toByteArray());
}
}
/**
* @return {@link TableDataWriteChannel} that should be closed manually.
* @see <a href="https://github.com/googleapis/google-cloud-java/blob/969bbeef18f004fd51fd46c5def1ae5c644cae3c/google-cloud-examples/src/main/java/com/google/cloud/examples/bigquery/snippets/BigQuerySnippets.java">Examples</a>
@@ -302,18 +284,25 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
@Override
public void afterPropertiesSet() {
Assert.notNull(this.bigQuery, "BigQuery service must be provided");
Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided");
Assert.isTrue(BooleanUtils.isFalse(isBigtable()), "Google BigTable is not supported");
Assert.isTrue(BooleanUtils.isFalse(isGoogleSheets()), "Google Sheets is not supported");
Assert.isTrue(BooleanUtils.isFalse(isDatastore()), "Google Datastore is not supported");
Assert.isTrue(BooleanUtils.isFalse(isParquet()), "Parquet is not supported");
Assert.isTrue(BooleanUtils.isFalse(isOrc()), "Orc is not supported");
Assert.notNull(this.bigQuery, "BigQuery service must be provided");
Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided");
Assert.isTrue(BooleanUtils.isFalse(isAvro()), "Avro is not supported");
if (BooleanUtils.isFalse(isAvro())) {
Table table = getTable();
if (BooleanUtils.isFalse(BooleanUtils.toBoolean(this.writeChannelConfig.getAutodetect()))) {
if (BooleanUtils.toBoolean(this.writeChannelConfig.getAutodetect())) {
if ((isCsv() || isJson()) && tableHasDefinedSchema(table) && this.logger.isWarnEnabled()) {
this.logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
}
}
else {
Assert.notNull(this.writeChannelConfig.getSchema(), "Schema must be provided");
if (tableHasDefinedSchema(table)) {
Assert.isTrue(
@@ -322,11 +311,6 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
);
}
}
else {
if ((isCsv() || isJson()) && this.logger.isWarnEnabled() && tableHasDefinedSchema(table)) {
this.logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
}
}
}
else {
Assert.isNull(this.writeChannelConfig.getSchema(), "Avro does not require schema");
@@ -335,16 +319,16 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");
if (isCsv() || isJson()) {
Assert.notNull(this.rowMapper, "Row mapper must be provided");
String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
if (Objects.isNull(this.datasetInfo)) {
this.datasetInfo = DatasetInfo.newBuilder(dataset).build();
}
if (Objects.nonNull(this.datasetInfo)) {
Assert.isTrue(
Objects.equals(this.datasetInfo.getDatasetId().getDataset(), this.writeChannelConfig.getDestinationTable().getDataset()),
"Dataset should be configured properly"
);
}
Assert.isTrue(
Objects.equals(this.datasetInfo.getDatasetId().getDataset(), dataset),
"Dataset should be configured properly"
);
createDataset();
}
@@ -391,47 +375,17 @@ public class BigQueryItemWriter<T> implements ItemWriter<T>, InitializingBean {
}
private void createDataset() {
if (Objects.nonNull(this.datasetInfo)) {
Dataset foundDataset = this.bigQuery.getDataset(this.writeChannelConfig.getDestinationTable().getDataset());
TableId tableId = this.writeChannelConfig.getDestinationTable();
String datasetToCheck = tableId.getDataset();
if (Objects.nonNull(datasetToCheck)) {
Dataset foundDataset = this.bigQuery.getDataset(datasetToCheck);
if (Objects.isNull(foundDataset)) {
this.bigQuery.create(this.datasetInfo);
}
}
}
/**
* It is expected that for {@link BigQueryItemWriter#isParquet()} and {@link BigQueryItemWriter#isOrc()} you use
* {@link Path} because there is no convenient way to write records to {@link OutputStream}.
* Not supported right now.
*/
private Stream<byte[]> getHadoopPathByteArrayStream(List<? extends T> items) throws IOException {
Stream<byte[]> result = Stream.empty();
T firstElement = Iterables.getFirst(items, null);
Assert.notNull(firstElement, "Collection is empty");
Class<?> classType = firstElement.getClass();
if (classType.isAssignableFrom(Path.class)) {
List<URI> uris = items.stream()
.map(Path.class::cast)
.map(Path::toUri)
.collect(Collectors.toList());
return getFileBasedByteArrayStream(uris);
}
return result;
}
private Stream<byte[]> getFileBasedByteArrayStream(List<URI> items) throws IOException {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
for (URI uri : items) {
try (InputStream inputStream = new FileSystemResource(Paths.get(uri)).getInputStream()) {
IOUtils.copy(inputStream, outputStream);
if (Objects.nonNull(this.datasetInfo)) {
this.bigQuery.create(this.datasetInfo);
}
}
return Stream.of(outputStream.toByteArray());
}
}

View File

@@ -72,10 +72,10 @@ public class BigQueryItemWriterBuilder<T> {
BigQueryItemWriter<T> writer = new BigQueryItemWriter<>();
writer.setRowMapper(this.rowMapper);
writer.setDatasetInfo(this.datasetInfo);
writer.setJobConsumer(this.jobConsumer);
writer.setWriteChannelConfig(this.writeChannelConfig);
writer.setJobConsumer(this.jobConsumer);
writer.setBigQuery(this.bigQuery);
writer.setDatasetInfo(this.datasetInfo);
return writer;
}

View File

@@ -45,9 +45,32 @@ class BigQueryItemWriterBuilderTests {
* Example how CSV writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation.
*/
@Test
void testCsvWriter() {
void testCsvWriterWithRowMapper() {
BigQuery mockedBigQuery = prepareMockedBigQuery();
CsvMapper csvMapper = new CsvMapper();
DatasetInfo datasetInfo = DatasetInfo.newBuilder(DATASET_NAME).setLocation("europe-west-2").build();
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), "csv_table"))
.setAutodetect(true)
.setFormatOptions(FormatOptions.csv())
.build();
BigQueryItemWriter<PersonDto> writer = new BigQueryItemWriterBuilder<PersonDto>()
.bigQuery(mockedBigQuery)
.rowMapper(dto -> convertDtoToCsvByteArray(csvMapper, dto))
.writeChannelConfig(writeConfiguration)
.datasetInfo(datasetInfo)
.build();
writer.afterPropertiesSet();
Assertions.assertNotNull(writer);
}
@Test
void testCsvWriterWithCsvMapper() {
BigQuery mockedBigQuery = prepareMockedBigQuery();
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of(DATASET_NAME, "csv_table"))
@@ -57,9 +80,7 @@ class BigQueryItemWriterBuilderTests {
BigQueryItemWriter<PersonDto> writer = new BigQueryItemWriterBuilder<PersonDto>()
.bigQuery(mockedBigQuery)
.rowMapper(dto -> convertDtoToCsvByteArray(csvMapper, dto))
.writeChannelConfig(writeConfiguration)
.datasetInfo(DatasetInfo.newBuilder(DATASET_NAME).setLocation("europe-west-2").build())
.build();
writer.afterPropertiesSet();
@@ -95,16 +116,14 @@ class BigQueryItemWriterBuilderTests {
Assertions.assertNotNull(writer);
}
/**
* Example how Apache Avro writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation.
*/
@Test
void testAvroWriter() {
void testCsvWriterWithJsonMapper() {
BigQuery mockedBigQuery = prepareMockedBigQuery();
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of(DATASET_NAME, "avro_table"))
.setFormatOptions(FormatOptions.avro())
.newBuilder(TableId.of(DATASET_NAME, "json_table"))
.setAutodetect(true)
.setFormatOptions(FormatOptions.json())
.build();
BigQueryItemWriter<PersonDto> writer = new BigQueryItemWriterBuilder<PersonDto>()
@@ -149,12 +168,12 @@ class BigQueryItemWriterBuilderTests {
}
static class PersonDto {
class PersonDto {
private String name;
private final String name;
public String getName() {
return name;
public PersonDto(String name) {
this.name = name;
}
}