From e1d5fc5a4a9ba53b9b00b50d25054a286d0edc24 Mon Sep 17 00:00:00 2001 From: Vova Date: Sun, 13 Jun 2021 14:56:24 +0300 Subject: [PATCH] [bq] 0.1 BigQuery preparation --- spring-batch-bigquery/README.adoc | 38 ++++ spring-batch-bigquery/pom.xml | 45 ++-- .../bigquery/BigQueryItemWriter.java | 212 +++++++----------- .../builder/BigQueryItemWriterBuilder.java | 4 +- .../BigQueryItemWriterBuilderTests.java | 45 ++-- 5 files changed, 182 insertions(+), 162 deletions(-) create mode 100644 spring-batch-bigquery/README.adoc diff --git a/spring-batch-bigquery/README.adoc b/spring-batch-bigquery/README.adoc new file mode 100644 index 0000000..4169af0 --- /dev/null +++ b/spring-batch-bigquery/README.adoc @@ -0,0 +1,38 @@ +# spring-batch-bigquery + +Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery]. It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs]. + +## Configuration of `BigQueryItemWriter` + +Next to the https://docs.spring.io/spring-batch/reference/html/configureJob.html[configuration of Spring Batch] one needs to configure the `BigQueryItemWriter`. + +```java +@Bean +BigQueryItemWriter bigQueryCsvWriter() { + WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration + .newBuilder(TableId.of("csv_dataset", "csv_table")) + .setAutodetect(true) + .setFormatOptions(FormatOptions.csv()) + .build(); + + BigQueryItemWriter writer = new BigQueryItemWriterBuilder() + .bigQuery(mockedBigQuery) + .writeChannelConfig(writeConfiguration) + .build(); +} +``` + +Additional examples could be found in https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java[here]. + +## Configuration properties +[cols="1,1,4"] +.Properties for item writer +|=== +| Property | Required | Description + +| `bigQuery` | yes | BigQuery object that provided by BigQuery Java Library. Responsible for connection with BigQuery. +| `writeChannelConfig` | yes | BigQuery write channel config provided by BigQuery Java Library. Responsible for configuring data type, data channel, jobs that will be sent to BigQuery. +| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to byte array. +| `datasetInfo` | no | Your way to customize to how to create BigQuery dataset. +| `jobConsumer` | no | Your custom handler for BigQuery Job provided by BigQuery Java Library. +|=== diff --git a/spring-batch-bigquery/pom.xml b/spring-batch-bigquery/pom.xml index f90b2df..01a1d32 100644 --- a/spring-batch-bigquery/pom.xml +++ b/spring-batch-bigquery/pom.xml @@ -25,6 +25,26 @@ spring-batch-bigquery 0.1.0-SNAPSHOT + + + Dgray16 + Vova Perebykivskyi + vova235@gmail.com + + + + + + Apache 2.0 + https://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + https://github.com/spring-projects/spring-batch-extensions + + UTF-8 @@ -36,23 +56,18 @@ org.springframework.batch spring-batch-core - 4.3.2 + 4.3.3 com.google.cloud google-cloud-bigquery - 1.128.0 + 1.133.0 - org.apache.avro - avro - 1.10.2 - - - org.apache.hadoop - hadoop-common - 3.3.0 + com.fasterxml.jackson.dataformat + jackson-dataformat-csv + 2.12.3 @@ -71,19 +86,13 @@ org.junit.jupiter junit-jupiter-api - 5.7.1 - test - - - com.fasterxml.jackson.dataformat - jackson-dataformat-csv - 2.12.3 + 5.7.2 test org.mockito mockito-core - 3.9.0 + 3.11.1 test diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java index 73820b6..9041a62 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/BigQueryItemWriter.java @@ -18,13 +18,8 @@ package org.springframework.batch.extensions.bigquery; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.lang.reflect.InvocationTargetException; -import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.nio.file.Paths; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -33,6 +28,10 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetInfo; @@ -41,24 +40,17 @@ import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDataWriteChannel; import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.WriteChannelConfiguration; -import com.google.common.collect.Iterables; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.specific.SpecificRecordBase; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.convert.converter.Converter; -import org.springframework.core.io.FileSystemResource; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -71,13 +63,14 @@ import org.springframework.util.ObjectUtils; *
    *
  • JSON
  • *
  • CSV
  • - *
  • Avro
  • *
* *

For example if you generate {@link TableDataWriteChannel} and you {@link TableDataWriteChannel#close()} it, * there is no guarantee that single {@link com.google.cloud.bigquery.Job} will be created. * *

It does not support save state feature. It is thread-safe. + * Take into account that BigQuery has rate limits and it is very easy to exceed those in concurrent environment. + * @see BigQuery Quotas & Limits * * @author Vova Perebykivskyi * @since 0.1.0 @@ -93,7 +86,8 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { * Used for simple conversion. */ private Converter rowMapper; - + private ObjectWriter objectWriter; + private Class itemClass; private BigQuery bigQuery; @@ -138,8 +132,10 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { @Override public void write(List items) throws Exception { if (CollectionUtils.isNotEmpty(items)) { + initializeProperties(items); + if (this.logger.isDebugEnabled()) { - this.logger.debug("Mapping data"); + this.logger.debug(String.format("Mapping %d elements", items.size())); } ByteBuffer byteBuffer = mapDataToBigQueryFormat(items); @@ -147,9 +143,28 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { } } - private ByteBuffer mapDataToBigQueryFormat(List items) - throws IOException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + /** Actual type of incoming data can be obtained only in runtime */ + private synchronized void initializeProperties(List items) { + if (Objects.isNull(this.itemClass)) { + if (isCsv() || isJson()) { + T firstItem = items.stream().findFirst().orElseThrow(RuntimeException::new); + this.itemClass = firstItem.getClass(); + if (Objects.isNull(this.rowMapper)) { + if (isCsv()) { + this.objectWriter = new CsvMapper().writerWithTypedSchemaFor(this.itemClass); + } + else if (isJson()) { + this.objectWriter = new ObjectMapper().writerFor(this.itemClass); + } + } + + logger.debug("Writer setup is completed"); + } + } + } + + private ByteBuffer mapDataToBigQueryFormat(List items) throws IOException { ByteBuffer byteBuffer; try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { @@ -181,19 +196,22 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { writeChannel = writer; } finally { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Write operation submitted: " + bigQueryWriteCounter.incrementAndGet()); + String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet(); + + if (Objects.nonNull(writeChannel)) { + logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob(); + if (Objects.nonNull(this.jobConsumer)) { + this.jobConsumer.accept(writeChannel.getJob()); + } } - if (Objects.nonNull(writeChannel) && Objects.nonNull(this.jobConsumer)) { - this.jobConsumer.accept(writeChannel.getJob()); + if (this.logger.isDebugEnabled()) { + this.logger.debug(logMessage); } } } - private List convertObjectsToByteArrays(List items) - throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { - + private List convertObjectsToByteArrays(List items) { Stream byteArrayStream = Stream.empty(); if (isJson()) { @@ -202,12 +220,8 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { else if (isCsv()) { byteArrayStream = getCsvByteArrayStream(items); } - else if (isAvro()) { - byteArrayStream = getAvroByteArrayStream(items); - } - else if (isParquet() || isOrc()) { + else if (isParquet() || isOrc() || isAvro()) { throw new UnsupportedOperationException("Not supported right now"); - /*byteArrayStream = getHadoopPathByteArrayStream(items);*/ } return byteArrayStream.collect(Collectors.toList()); @@ -219,7 +233,7 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { private Stream getJsonByteArrayStream(List items) { return items .stream() - .map(this.rowMapper::convert) + .map(this::mapItemToCsvOrJson) .filter(ArrayUtils::isNotEmpty) .map(String::new) .map(this::convertToNdJson) @@ -227,6 +241,17 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { .map(row -> row.getBytes(StandardCharsets.UTF_8)); } + private byte[] mapItemToCsvOrJson(T t) { + byte[] result = null; + try { + result = Objects.isNull(rowMapper) ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t); + } + catch (JsonProcessingException e) { + logger.error("Error during processing of the line: ", e); + } + return result; + } + /** * BigQuery uses ndjson https://github.com/ndjson/ndjson-spec. * It is expected that to pass here JSON line generated by @@ -242,56 +267,13 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { private Stream getCsvByteArrayStream(List items) { return items .stream() - .map(this.rowMapper::convert) + .map(this::mapItemToCsvOrJson) .filter(ArrayUtils::isNotEmpty) .map(String::new) .filter(value -> !ObjectUtils.isEmpty(value)) .map(row -> row.getBytes(StandardCharsets.UTF_8)); } - /** - * Generates Avro file and writes it to {@link OutputStream}. - * @see Avro example - */ - private Stream getAvroByteArrayStream(List items) - throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { - - T firstElement = Iterables.getFirst(items, null); - Assert.notNull(firstElement, "Collection is empty"); - - Class classType = firstElement.getClass(); - - SpecificRecordBase objectInstance = (SpecificRecordBase) classType.getDeclaredConstructor().newInstance(); - SpecificDatumWriter avroWriter = new SpecificDatumWriter<>(classType); - - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - DataFileWriter avroFileWriter = new DataFileWriter<>(avroWriter)) { - - /* - * Input data - 500 rows. - * Statistic gathered from BigQuery (com.google.cloud.bigquery.JobStatistics.LoadStatistics#getInputBytes). - * - * 41,229 input bytes Avro (no codec) - * 14,691 input bytes Avro (Deflate lvl 8) - * 41,122 input bytes CSV - */ - CodecFactory compressionCodec = CodecFactory.deflateCodec(8); - - /* Order of lines (code) should not be changed */ - avroFileWriter.setCodec(compressionCodec); - avroFileWriter.create(objectInstance.getSchema(), outputStream); - - for (T item : items) { - avroFileWriter.append(item); - } - - /* At this point of time only schema present in output stream */ - avroFileWriter.flush(); - - return Stream.of(outputStream.toByteArray()); - } - } - /** * @return {@link TableDataWriteChannel} that should be closed manually. * @see Examples @@ -302,18 +284,25 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { @Override public void afterPropertiesSet() { + Assert.notNull(this.bigQuery, "BigQuery service must be provided"); + Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided"); + Assert.isTrue(BooleanUtils.isFalse(isBigtable()), "Google BigTable is not supported"); Assert.isTrue(BooleanUtils.isFalse(isGoogleSheets()), "Google Sheets is not supported"); Assert.isTrue(BooleanUtils.isFalse(isDatastore()), "Google Datastore is not supported"); Assert.isTrue(BooleanUtils.isFalse(isParquet()), "Parquet is not supported"); Assert.isTrue(BooleanUtils.isFalse(isOrc()), "Orc is not supported"); - - Assert.notNull(this.bigQuery, "BigQuery service must be provided"); - Assert.notNull(this.writeChannelConfig, "Write channel configuration must be provided"); + Assert.isTrue(BooleanUtils.isFalse(isAvro()), "Avro is not supported"); if (BooleanUtils.isFalse(isAvro())) { Table table = getTable(); - if (BooleanUtils.isFalse(BooleanUtils.toBoolean(this.writeChannelConfig.getAutodetect()))) { + + if (BooleanUtils.toBoolean(this.writeChannelConfig.getAutodetect())) { + if ((isCsv() || isJson()) && tableHasDefinedSchema(table) && this.logger.isWarnEnabled()) { + this.logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side"); + } + } + else { Assert.notNull(this.writeChannelConfig.getSchema(), "Schema must be provided"); if (tableHasDefinedSchema(table)) { Assert.isTrue( @@ -322,11 +311,6 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { ); } } - else { - if ((isCsv() || isJson()) && this.logger.isWarnEnabled() && tableHasDefinedSchema(table)) { - this.logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side"); - } - } } else { Assert.isNull(this.writeChannelConfig.getSchema(), "Avro does not require schema"); @@ -335,16 +319,16 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided"); - if (isCsv() || isJson()) { - Assert.notNull(this.rowMapper, "Row mapper must be provided"); + String dataset = this.writeChannelConfig.getDestinationTable().getDataset(); + + if (Objects.isNull(this.datasetInfo)) { + this.datasetInfo = DatasetInfo.newBuilder(dataset).build(); } - if (Objects.nonNull(this.datasetInfo)) { - Assert.isTrue( - Objects.equals(this.datasetInfo.getDatasetId().getDataset(), this.writeChannelConfig.getDestinationTable().getDataset()), - "Dataset should be configured properly" - ); - } + Assert.isTrue( + Objects.equals(this.datasetInfo.getDatasetId().getDataset(), dataset), + "Dataset should be configured properly" + ); createDataset(); } @@ -391,47 +375,17 @@ public class BigQueryItemWriter implements ItemWriter, InitializingBean { } private void createDataset() { - if (Objects.nonNull(this.datasetInfo)) { - Dataset foundDataset = this.bigQuery.getDataset(this.writeChannelConfig.getDestinationTable().getDataset()); + TableId tableId = this.writeChannelConfig.getDestinationTable(); + String datasetToCheck = tableId.getDataset(); + + if (Objects.nonNull(datasetToCheck)) { + Dataset foundDataset = this.bigQuery.getDataset(datasetToCheck); if (Objects.isNull(foundDataset)) { - this.bigQuery.create(this.datasetInfo); - } - } - } - - /** - * It is expected that for {@link BigQueryItemWriter#isParquet()} and {@link BigQueryItemWriter#isOrc()} you use - * {@link Path} because there is no convenient way to write records to {@link OutputStream}. - * Not supported right now. - */ - private Stream getHadoopPathByteArrayStream(List items) throws IOException { - Stream result = Stream.empty(); - - T firstElement = Iterables.getFirst(items, null); - Assert.notNull(firstElement, "Collection is empty"); - Class classType = firstElement.getClass(); - - if (classType.isAssignableFrom(Path.class)) { - List uris = items.stream() - .map(Path.class::cast) - .map(Path::toUri) - .collect(Collectors.toList()); - - return getFileBasedByteArrayStream(uris); - } - - return result; - } - - private Stream getFileBasedByteArrayStream(List items) throws IOException { - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - for (URI uri : items) { - try (InputStream inputStream = new FileSystemResource(Paths.get(uri)).getInputStream()) { - IOUtils.copy(inputStream, outputStream); + if (Objects.nonNull(this.datasetInfo)) { + this.bigQuery.create(this.datasetInfo); } } - return Stream.of(outputStream.toByteArray()); } } diff --git a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilder.java b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilder.java index f51066d..f71d110 100644 --- a/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilder.java +++ b/spring-batch-bigquery/src/main/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilder.java @@ -72,10 +72,10 @@ public class BigQueryItemWriterBuilder { BigQueryItemWriter writer = new BigQueryItemWriter<>(); writer.setRowMapper(this.rowMapper); - writer.setDatasetInfo(this.datasetInfo); - writer.setJobConsumer(this.jobConsumer); writer.setWriteChannelConfig(this.writeChannelConfig); + writer.setJobConsumer(this.jobConsumer); writer.setBigQuery(this.bigQuery); + writer.setDatasetInfo(this.datasetInfo); return writer; } diff --git a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java index aaf4941..556bef5 100644 --- a/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java +++ b/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/builder/BigQueryItemWriterBuilderTests.java @@ -45,9 +45,32 @@ class BigQueryItemWriterBuilderTests { * Example how CSV writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation. */ @Test - void testCsvWriter() { + void testCsvWriterWithRowMapper() { BigQuery mockedBigQuery = prepareMockedBigQuery(); CsvMapper csvMapper = new CsvMapper(); + DatasetInfo datasetInfo = DatasetInfo.newBuilder(DATASET_NAME).setLocation("europe-west-2").build(); + + WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration + .newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), "csv_table")) + .setAutodetect(true) + .setFormatOptions(FormatOptions.csv()) + .build(); + + BigQueryItemWriter writer = new BigQueryItemWriterBuilder() + .bigQuery(mockedBigQuery) + .rowMapper(dto -> convertDtoToCsvByteArray(csvMapper, dto)) + .writeChannelConfig(writeConfiguration) + .datasetInfo(datasetInfo) + .build(); + + writer.afterPropertiesSet(); + + Assertions.assertNotNull(writer); + } + + @Test + void testCsvWriterWithCsvMapper() { + BigQuery mockedBigQuery = prepareMockedBigQuery(); WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration .newBuilder(TableId.of(DATASET_NAME, "csv_table")) @@ -57,9 +80,7 @@ class BigQueryItemWriterBuilderTests { BigQueryItemWriter writer = new BigQueryItemWriterBuilder() .bigQuery(mockedBigQuery) - .rowMapper(dto -> convertDtoToCsvByteArray(csvMapper, dto)) .writeChannelConfig(writeConfiguration) - .datasetInfo(DatasetInfo.newBuilder(DATASET_NAME).setLocation("europe-west-2").build()) .build(); writer.afterPropertiesSet(); @@ -95,16 +116,14 @@ class BigQueryItemWriterBuilderTests { Assertions.assertNotNull(writer); } - /** - * Example how Apache Avro writer is expected to be built without {@link org.springframework.context.annotation.Bean} annotation. - */ @Test - void testAvroWriter() { + void testCsvWriterWithJsonMapper() { BigQuery mockedBigQuery = prepareMockedBigQuery(); WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration - .newBuilder(TableId.of(DATASET_NAME, "avro_table")) - .setFormatOptions(FormatOptions.avro()) + .newBuilder(TableId.of(DATASET_NAME, "json_table")) + .setAutodetect(true) + .setFormatOptions(FormatOptions.json()) .build(); BigQueryItemWriter writer = new BigQueryItemWriterBuilder() @@ -149,12 +168,12 @@ class BigQueryItemWriterBuilderTests { } - static class PersonDto { + class PersonDto { - private String name; + private final String name; - public String getName() { - return name; + public PersonDto(String name) { + this.name = name; } }