[bq] refactoring & dependency upgrade

This commit is contained in:
Volodymyr
2024-08-18 18:36:21 +03:00
committed by GitHub
parent 50f93d3cab
commit 6f3277a2a3
29 changed files with 163 additions and 176 deletions

View File

@@ -11,13 +11,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout source code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v3
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 17
cache: 'maven'
- name: Build with Maven
run: mvn -B package --file pom.xml
working-directory: spring-batch-bigquery
working-directory: spring-batch-bigquery

View File

@@ -1,12 +1,14 @@
# spring-batch-bigquery
= spring-batch-bigquery
Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery]. It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].
Spring Batch extension which contains an `ItemWriter` implementation for https://cloud.google.com/bigquery[BigQuery] based on https://github.com/googleapis/java-bigquery[Java BigQuery].
It supports writing https://en.wikipedia.org/wiki/Comma-separated_values[CSV], https://en.wikipedia.org/wiki/JSON[JSON] using https://cloud.google.com/bigquery/docs/batch-loading-data[load jobs].
## Configuration of `BigQueryCsvItemWriter`
== Configuration of `BigQueryCsvItemWriter`
Next to the https://docs.spring.io/spring-batch/reference/html/configureJob.html[configuration of Spring Batch] one needs to configure the `BigQueryCsvItemWriter`.
```javaBigQueryCsv
[source,java]
----
@Bean
BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
@@ -20,19 +22,19 @@ BigQueryCsvItemWriter<MyDto> bigQueryCsvWriter() {
.writeChannelConfig(writeConfiguration)
.build();
}
```
----
Additional examples could be found in https://github.com/spring-projects/spring-batch-extensions/blob/main/spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/writer/builder/[here].
## Configuration properties
== Configuration properties
[cols="1,1,4"]
.Properties for item writer
.Properties for an item writer
|===
| Property | Required | Description
| `bigQuery` | yes | BigQuery object that provided by BigQuery Java Library. Responsible for connection with BigQuery.
| `writeChannelConfig` | yes | BigQuery write channel config provided by BigQuery Java Library. Responsible for configuring data type, data channel, jobs that will be sent to BigQuery.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to byte array.
| `datasetInfo` | no | Your way to customize to how to create BigQuery dataset.
| `rowMapper` | no | Your own converter that specifies how to convert input CSV / JSON to a byte array.
| `datasetInfo` | no | Your way to customize how to create BigQuery dataset.
| `jobConsumer` | no | Your custom handler for BigQuery Job provided by BigQuery Java Library.
|===
|===

View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2002-2023 the original author or authors.
~ Copyright 2002-2024 the original author or authors.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -13,7 +13,9 @@
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
--><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -51,36 +53,31 @@
<!-- Dependent on Spring Batch core -->
<java.version>17</java.version>
<logback.version>1.4.14</logback.version>
<logback.version>1.5.7</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>5.1.0</version>
<version>5.1.2</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>2.35.0</version>
<version>2.42.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
<version>2.16.0</version>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
<version>3.16.0</version>
</dependency>
@@ -88,13 +85,13 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.1</version>
<version>5.11.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.8.0</version>
<version>5.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -112,7 +109,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
<version>2.0.16</version>
<scope>test</scope>
</dependency>
@@ -125,7 +122,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<version>3.13.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
@@ -136,7 +133,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.2</version>
<version>3.3.1</version>
<configuration>
<includes>
<!-- Integration tests are omitted because they are designed to be run locally -->
@@ -149,7 +146,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.6.3</version>
<version>3.8.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
@@ -162,7 +159,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.3.0</version>
<version>3.3.1</version>
<executions>
<execution>
<id>attach-sources</id>
@@ -175,4 +172,4 @@
</plugins>
</build>
</project>
</project>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,7 +27,6 @@ import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import java.util.Iterator;
import java.util.Objects;
/**
* BigQuery {@link ItemReader} that accepts simple query as the input.
@@ -37,7 +36,7 @@ import java.util.Objects;
* <p>
* Also, worth mentioning that you should take into account concurrency limits.
* <p>
* Results of this query by default are stored in a shape of temporary table.
* Results of this query by default are stored in the shape of a temporary table.
*
* @param <T> your DTO type
* @author Volodymyr Perebykivskyi
@@ -84,7 +83,7 @@ public class BigQueryQueryItemReader<T> implements ItemReader<T>, InitializingBe
@Override
public T read() throws Exception {
if (Objects.isNull(iterator)) {
if (iterator == null) {
doOpen();
}
@@ -109,4 +108,4 @@ public class BigQueryQueryItemReader<T> implements ItemReader<T>, InitializingBe
Assert.notNull(this.jobConfiguration, "Job configuration must be provided");
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,8 +24,6 @@ import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemRea
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import java.util.Objects;
/**
* A builder for {@link BigQueryQueryItemReader}.
*
@@ -70,7 +68,7 @@ public class BigQueryQueryItemReaderBuilder<T> {
}
/**
* Row mapper which transforms single BigQuery row into desired type.
* Row mapper which transforms single BigQuery row into a desired type.
*
* @param rowMapper your row mapper
* @return {@link BigQueryQueryItemReaderBuilder}
@@ -94,7 +92,7 @@ public class BigQueryQueryItemReaderBuilder<T> {
}
/**
* Please do not forget about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
* Please remember about {@link BigQueryQueryItemReader#afterPropertiesSet()}.
*
* @return {@link BigQueryQueryItemReader}
*/
@@ -104,14 +102,14 @@ public class BigQueryQueryItemReaderBuilder<T> {
reader.setBigQuery(this.bigQuery);
reader.setRowMapper(this.rowMapper);
if (Objects.nonNull(this.jobConfiguration)) {
reader.setJobConfiguration(this.jobConfiguration);
} else {
if (this.jobConfiguration == null) {
Assert.isTrue(StringUtils.isNotBlank(this.query), "No query provided");
reader.setJobConfiguration(QueryJobConfiguration.newBuilder(this.query).build());
} else {
reader.setJobConfiguration(this.jobConfiguration);
}
return reader;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -54,6 +54,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
/** Logger that can be reused */
protected final Log logger = LogFactory.getLog(getClass());
private final AtomicLong bigQueryWriteCounter = new AtomicLong();
/**
@@ -77,7 +78,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
/**
* Fetches table from provided configuration.
* Fetches table from the provided configuration.
*
* @return {@link Table} that is described in {@link BigQueryBaseItemWriter#writeChannelConfig}
*/
@@ -123,7 +124,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
@Override
public void write(Chunk<? extends T> chunk) throws Exception {
if (BooleanUtils.isFalse(chunk.isEmpty())) {
if (!chunk.isEmpty()) {
List<? extends T> items = chunk.getItems();
doInitializeProperties(items);
@@ -147,8 +148,8 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
}
/*
* It is extremely important to create larger ByteBuffer,
* if you call TableDataWriteChannel too many times, it leads to BigQuery exceptions.
* It is extremely important to create larger ByteBuffer.
* If you call TableDataWriteChannel too many times, it leads to BigQuery exceptions.
*/
byteBuffer = ByteBuffer.wrap(outputStream.toByteArray());
}
@@ -170,9 +171,9 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
finally {
String logMessage = "Write operation submitted: " + bigQueryWriteCounter.incrementAndGet();
if (Objects.nonNull(writeChannel)) {
if (writeChannel != null) {
logMessage += " -- Job ID: " + writeChannel.getJob().getJobId().getJob();
if (Objects.nonNull(this.jobConsumer)) {
if (this.jobConsumer != null) {
this.jobConsumer.accept(writeChannel.getJob());
}
}
@@ -212,7 +213,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
Assert.notNull(this.writeChannelConfig.getFormat(), "Data format must be provided");
String dataset = this.writeChannelConfig.getDestinationTable().getDataset();
if (Objects.isNull(this.datasetInfo)) {
if (this.datasetInfo == null) {
this.datasetInfo = DatasetInfo.newBuilder(dataset).build();
}
@@ -228,13 +229,11 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
TableId tableId = this.writeChannelConfig.getDestinationTable();
String datasetToCheck = tableId.getDataset();
if (Objects.nonNull(datasetToCheck)) {
if (datasetToCheck != null) {
Dataset foundDataset = this.bigQuery.getDataset(datasetToCheck);
if (Objects.isNull(foundDataset)) {
if (Objects.nonNull(this.datasetInfo)) {
this.bigQuery.create(this.datasetInfo);
}
if (foundDataset == null && this.datasetInfo != null) {
this.bigQuery.create(this.datasetInfo);
}
}
}
@@ -264,7 +263,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
}
/**
* Schema can be computed on BigQuery side during upload,
* Schema can be computed on the BigQuery side during upload,
* so it is good to know when schema is supplied by user manually.
*
* @param table BigQuery table
@@ -287,7 +286,7 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
protected abstract void doInitializeProperties(List<? extends T> items);
/**
* Converts chunk into byte array.
* Converts chunk into a byte array.
* Each data type should be converted with respect to its specification.
*
* @param items current chunk
@@ -295,4 +294,4 @@ public abstract class BigQueryBaseItemWriter<T> implements ItemWriter<T> {
*/
protected abstract List<byte[]> convertObjectsToByteArrays(List<? extends T> items);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.google.cloud.bigquery.Table;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
@@ -31,7 +30,6 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* CSV writer for BigQuery.
@@ -45,18 +43,21 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> implemen
private Converter<T, byte[]> rowMapper;
private ObjectWriter objectWriter;
private Class itemClass;
private Class<?> itemClass;
/**
* Actual type of incoming data can be obtained only in runtime
*/
@Override
protected synchronized void doInitializeProperties(List<? extends T> items) {
if (Objects.isNull(this.itemClass)) {
T firstItem = items.stream().findFirst().orElseThrow(RuntimeException::new);
if (this.itemClass == null) {
T firstItem = items.stream().findFirst().orElseThrow(() -> {
logger.warn("Class type was not found");
return new IllegalStateException("Class type was not found");
});
this.itemClass = firstItem.getClass();
if (Objects.isNull(this.rowMapper)) {
if (this.rowMapper == null) {
this.objectWriter = new CsvMapper().writerWithTypedSchemaFor(this.itemClass);
}
@@ -65,7 +66,7 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> implemen
}
/**
* Row mapper which transforms single BigQuery row into desired type.
* Row mapper which transforms single BigQuery row into a desired type.
*
* @param rowMapper your row mapper
*/
@@ -83,7 +84,7 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> implemen
.map(String::new)
.filter(Predicate.not(ObjectUtils::isEmpty))
.map(row -> row.getBytes(StandardCharsets.UTF_8))
.collect(Collectors.toList());
.toList();
}
@Override
@@ -91,16 +92,16 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> implemen
super.baseAfterPropertiesSet(() -> {
Table table = getTable();
if (BooleanUtils.toBoolean(super.writeChannelConfig.getAutodetect())) {
if ((tableHasDefinedSchema(table) && super.logger.isWarnEnabled())) {
super.logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
if (Boolean.TRUE.equals(super.writeChannelConfig.getAutodetect())) {
if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) {
logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
}
} else {
Assert.notNull(super.writeChannelConfig.getSchema(), "Schema must be provided");
if (tableHasDefinedSchema(table)) {
Assert.isTrue(
table.getDefinition().getSchema().equals(super.writeChannelConfig.getSchema()),
Objects.equals(table.getDefinition().getSchema(), super.writeChannelConfig.getSchema()),
"Schema should be the same"
);
}
@@ -112,7 +113,7 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> implemen
private byte[] mapItemToCsv(T t) {
try {
return Objects.isNull(rowMapper) ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t);
return rowMapper == null ? objectWriter.writeValueAsBytes(t) : rowMapper.convert(t);
}
catch (JsonProcessingException e) {
logger.error("Error during processing of the line: ", e);
@@ -120,4 +121,4 @@ public class BigQueryCsvItemWriter<T> extends BigQueryBaseItemWriter<T> implemen
}
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.cloud.bigquery.Table;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
@@ -31,7 +30,6 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* JSON writer for BigQuery.
@@ -45,24 +43,27 @@ public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> impleme
private Converter<T, byte[]> rowMapper;
private ObjectWriter objectWriter;
private Class itemClass;
private Class<?> itemClass;
@Override
protected void doInitializeProperties(List<? extends T> items) {
if (Objects.isNull(this.itemClass)) {
T firstItem = items.stream().findFirst().orElseThrow(RuntimeException::new);
if (this.itemClass == null) {
T firstItem = items.stream().findFirst().orElseThrow(() -> {
logger.warn("Class type was not found");
return new IllegalStateException("Class type was not found");
});
this.itemClass = firstItem.getClass();
if (Objects.isNull(this.rowMapper)) {
if (this.rowMapper == null) {
this.objectWriter = new ObjectMapper().writerFor(this.itemClass);
}
super.logger.debug("Writer setup is completed");
logger.debug("Writer setup is completed");
}
}
/**
* Converter that transforms a single row into byte array.
* Converter that transforms a single row into a byte array.
*
* @param rowMapper your JSON row mapper
*/
@@ -80,7 +81,7 @@ public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> impleme
.map(this::convertToNdJson)
.filter(Predicate.not(ObjectUtils::isEmpty))
.map(row -> row.getBytes(StandardCharsets.UTF_8))
.collect(Collectors.toList());
.toList();
}
@Override
@@ -88,16 +89,16 @@ public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> impleme
super.baseAfterPropertiesSet(() -> {
Table table = getTable();
if (BooleanUtils.toBoolean(super.writeChannelConfig.getAutodetect())) {
if ((tableHasDefinedSchema(table) && super.logger.isWarnEnabled())) {
super.logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
if (Boolean.TRUE.equals(writeChannelConfig.getAutodetect())) {
if (tableHasDefinedSchema(table) && super.logger.isWarnEnabled()) {
logger.warn("Mixing autodetect mode with already defined schema may lead to errors on BigQuery side");
}
} else {
Assert.notNull(super.writeChannelConfig.getSchema(), "Schema must be provided");
Assert.notNull(writeChannelConfig.getSchema(), "Schema must be provided");
if (tableHasDefinedSchema(table)) {
Assert.isTrue(
table.getDefinition().getSchema().equals(super.writeChannelConfig.getSchema()),
Objects.equals(table.getDefinition().getSchema(), writeChannelConfig.getSchema()),
"Schema should be the same"
);
}
@@ -126,4 +127,4 @@ public class BigQueryJsonItemWriter<T> extends BigQueryBaseItemWriter<T> impleme
return json.concat(org.apache.commons.lang3.StringUtils.LF);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -103,7 +103,7 @@ public class BigQueryCsvItemWriterBuilder<T> {
}
/**
* Please do not forget about {@link BigQueryCsvItemWriter#afterPropertiesSet()}.
* Please remember about {@link BigQueryCsvItemWriter#afterPropertiesSet()}.
*
* @return {@link BigQueryCsvItemWriter}
*/
@@ -119,4 +119,4 @@ public class BigQueryCsvItemWriterBuilder<T> {
return writer;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -43,7 +43,7 @@ public class BigQueryJsonItemWriterBuilder<T> {
private BigQuery bigQuery;
/**
* Converts your DTO into byte array.
* Converts your DTO into a byte array.
*
* @param rowMapper your mapping
* @return {@link BigQueryJsonItemWriter}
@@ -103,7 +103,7 @@ public class BigQueryJsonItemWriterBuilder<T> {
}
/**
* Please do not forget about {@link BigQueryJsonItemWriter#afterPropertiesSet()}.
* Please remember about {@link BigQueryJsonItemWriter#afterPropertiesSet()}.
*
* @return {@link BigQueryJsonItemWriter}
*/
@@ -119,4 +119,4 @@ public class BigQueryJsonItemWriterBuilder<T> {
return writer;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
/**
* Google BigQuery related functionality.
* <p>
* These writers use java client from Google, so we cannot control this flow fully.
* These writers use a Java client from Google, so we cannot control this flow fully.
* Take into account that this writer produces {@link com.google.cloud.bigquery.JobConfiguration.Type#LOAD} {@link com.google.cloud.bigquery.Job}.
*
* <p>Supported formats:
@@ -43,4 +43,4 @@
@NonNullApi
package org.springframework.batch.extensions.bigquery.writer;
import org.springframework.lang.NonNullApi;
import org.springframework.lang.NonNullApi;

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -90,4 +90,4 @@ public class BigQueryDataLoader {
job.get().waitFor();
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,4 +30,4 @@ public record PersonDto(String name, Integer age) {
return Schema.of(nameField, ageField);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,4 +31,4 @@ public final class TestConstants {
res.get(NAME).getStringValue(), Long.valueOf(res.get(AGE).getLongValue()).intValue()
);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,8 +31,8 @@ public abstract class BaseBigQueryIntegrationTest {
protected String getTableName(TestInfo testInfo) {
return String.format(
TABLE_PATTERN,
testInfo.getTags().stream().findFirst().orElseThrow(),
testInfo.getTags().iterator().next(),
testInfo.getTestMethod().map(Method::getName).orElseThrow()
);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,13 +15,13 @@
*/
/**
* In order to launch these tests you should provide a way how to authorize to Google BigQuery.
* A simple way is to create service account, store credentials as JSON file and provide environment variable.
* Example: GOOGLE_APPLICATION_CREDENTIALS=/home/dgray/Downloads/bq-key.json
* To launch these tests, you should provide a way how to authorize to Google BigQuery.
* A simple way is to create a service account, store credentials as JSON file and provide environment variable.
* Example: GOOGLE_APPLICATION_CREDENTIALS=/home/user/Downloads/bq-key.json
* <p>
* Test names should follow this pattern: test1, test2, testN.
* So later in BigQuery you will see generated table name: csv_test1, csv_test2, csv_testN.
* This way it will be easier to trace errors in BigQuery.
* This way, it will be easier to trace errors in BigQuery.
*
* @see <a href="https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries#before-you-begin">Authentication</a>
*/

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,19 +28,17 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.integration.base.BaseBigQueryIntegrationTest;
import java.util.Objects;
public abstract class BaseCsvJsonInteractiveQueryItemReaderTest extends BaseBigQueryIntegrationTest {
@BeforeEach
void prepareTest(TestInfo testInfo) {
if (Objects.isNull(bigQuery.getDataset(TestConstants.DATASET))) {
if (bigQuery.getDataset(TestConstants.DATASET) == null) {
bigQuery.create(DatasetInfo.of(TestConstants.DATASET));
}
String tableName = getTableName(testInfo);
if (Objects.isNull(bigQuery.getTable(TestConstants.DATASET, tableName))) {
if (bigQuery.getTable(TestConstants.DATASET, tableName) == null) {
TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema());
bigQuery.create(TableInfo.of(TableId.of(TestConstants.DATASET, tableName), tableDefinition));
}
@@ -51,4 +49,4 @@ public abstract class BaseCsvJsonInteractiveQueryItemReaderTest extends BaseBigQ
bigQuery.delete(TableId.of(TestConstants.DATASET, getTableName(testInfo)));
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@ package org.springframework.batch.extensions.bigquery.integration.reader.batch;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -32,7 +31,7 @@ import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQuer
import org.springframework.batch.item.Chunk;
@Tag("csv")
public class BigQueryBatchQueryCsvItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
class BigQueryBatchQueryCsvItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
@Test
void batchQueryTest1(TestInfo testInfo) throws Exception {
@@ -64,13 +63,13 @@ public class BigQueryBatchQueryCsvItemReaderTest extends BaseCsvJsonInteractiveQ
Assertions.assertNotNull(actualFirstPerson);
Assertions.assertEquals(expectedFirstPerson.name(), actualFirstPerson.name());
Assertions.assertEquals(expectedFirstPerson.age().compareTo(actualFirstPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedFirstPerson.age().compareTo(actualFirstPerson.age()));
Assertions.assertNotNull(actualSecondPerson);
Assertions.assertEquals(expectedSecondPerson.name(), actualSecondPerson.name());
Assertions.assertEquals(expectedSecondPerson.age().compareTo(actualSecondPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedSecondPerson.age().compareTo(actualSecondPerson.age()));
Assertions.assertNull(actualThirdPerson);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@ package org.springframework.batch.extensions.bigquery.integration.reader.batch;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -32,7 +31,7 @@ import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQuer
import org.springframework.batch.item.Chunk;
@Tag("json")
public class BigQueryBatchQueryJsonItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
class BigQueryBatchQueryJsonItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
@Test
void batchQueryTest1(TestInfo testInfo) throws Exception {
@@ -64,13 +63,13 @@ public class BigQueryBatchQueryJsonItemReaderTest extends BaseCsvJsonInteractive
Assertions.assertNotNull(actualFirstPerson);
Assertions.assertEquals(expectedFirstPerson.name(), actualFirstPerson.name());
Assertions.assertEquals(expectedFirstPerson.age().compareTo(actualFirstPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedFirstPerson.age().compareTo(actualFirstPerson.age()));
Assertions.assertNotNull(actualSecondPerson);
Assertions.assertEquals(expectedSecondPerson.name(), actualSecondPerson.name());
Assertions.assertEquals(expectedSecondPerson.age().compareTo(actualSecondPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedSecondPerson.age().compareTo(actualSecondPerson.age()));
Assertions.assertNull(actualThirdPerson);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
package org.springframework.batch.extensions.bigquery.integration.reader.interactive;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -30,7 +29,7 @@ import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQuer
import org.springframework.batch.item.Chunk;
@Tag("csv")
public class BigQueryInteractiveQueryCsvItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
class BigQueryInteractiveQueryCsvItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
@Test
void interactiveQueryTest1(TestInfo testInfo) throws Exception {
@@ -56,13 +55,13 @@ public class BigQueryInteractiveQueryCsvItemReaderTest extends BaseCsvJsonIntera
Assertions.assertNotNull(actualFirstPerson);
Assertions.assertEquals(expectedFirstPerson.name(), actualFirstPerson.name());
Assertions.assertEquals(expectedFirstPerson.age().compareTo(actualFirstPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedFirstPerson.age().compareTo(actualFirstPerson.age()));
Assertions.assertNotNull(actualSecondPerson);
Assertions.assertEquals(expectedSecondPerson.name(), actualSecondPerson.name());
Assertions.assertEquals(expectedSecondPerson.age().compareTo(actualSecondPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedSecondPerson.age().compareTo(actualSecondPerson.age()));
Assertions.assertNull(actualThirdPerson);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
package org.springframework.batch.extensions.bigquery.integration.reader.interactive;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -30,7 +29,7 @@ import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQuer
import org.springframework.batch.item.Chunk;
@Tag("json")
public class BigQueryInteractiveQueryJsonItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
class BigQueryInteractiveQueryJsonItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
@Test
void interactiveQueryTest1(TestInfo testInfo) throws Exception {
@@ -56,13 +55,13 @@ public class BigQueryInteractiveQueryJsonItemReaderTest extends BaseCsvJsonInter
Assertions.assertNotNull(actualFirstPerson);
Assertions.assertEquals(expectedFirstPerson.name(), actualFirstPerson.name());
Assertions.assertEquals(expectedFirstPerson.age().compareTo(actualFirstPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedFirstPerson.age().compareTo(actualFirstPerson.age()));
Assertions.assertNotNull(actualSecondPerson);
Assertions.assertEquals(expectedSecondPerson.name(), actualSecondPerson.name());
Assertions.assertEquals(expectedSecondPerson.age().compareTo(actualSecondPerson.age()), NumberUtils.INTEGER_ZERO);
Assertions.assertEquals(0, expectedSecondPerson.age().compareTo(actualSecondPerson.age()));
Assertions.assertNull(actualThirdPerson);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -33,7 +32,7 @@ import org.springframework.batch.extensions.bigquery.integration.writer.base.Bas
import org.springframework.batch.item.Chunk;
@Tag("csv")
public class BigQueryCsvItemWriterTest extends BaseBigQueryItemWriterTest {
class BigQueryCsvItemWriterTest extends BaseBigQueryItemWriterTest {
@Test
void test1(TestInfo testInfo) throws Exception {
@@ -54,7 +53,7 @@ public class BigQueryCsvItemWriterTest extends BaseBigQueryItemWriterTest {
.getValues()
.forEach(field -> {
Assertions.assertTrue(
chunk.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(NumberUtils.INTEGER_ZERO).getStringValue().equals(name))
chunk.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(0).getStringValue().equals(name))
);
boolean ageCondition = chunk
@@ -62,10 +61,10 @@ public class BigQueryCsvItemWriterTest extends BaseBigQueryItemWriterTest {
.stream()
.map(PersonDto::age)
.map(Long::valueOf)
.anyMatch(age -> age.compareTo(field.get(NumberUtils.INTEGER_ONE).getLongValue()) == NumberUtils.INTEGER_ZERO);
.anyMatch(age -> age.compareTo(field.get(1).getLongValue()) == 0);
Assertions.assertTrue(ageCondition);
});
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import org.apache.commons.lang3.math.NumberUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -33,7 +32,7 @@ import org.springframework.batch.extensions.bigquery.integration.writer.base.Bas
import org.springframework.batch.item.Chunk;
@Tag("json")
public class BigQueryJsonItemWriterTest extends BaseBigQueryItemWriterTest {
class BigQueryJsonItemWriterTest extends BaseBigQueryItemWriterTest {
@Test
void test1(TestInfo testInfo) throws Exception {
@@ -54,7 +53,7 @@ public class BigQueryJsonItemWriterTest extends BaseBigQueryItemWriterTest {
.getValues()
.forEach(field -> {
Assertions.assertTrue(
chunk.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(NumberUtils.INTEGER_ZERO).getStringValue().equals(name))
chunk.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(0).getStringValue().equals(name))
);
boolean ageCondition = chunk
@@ -62,10 +61,10 @@ public class BigQueryJsonItemWriterTest extends BaseBigQueryItemWriterTest {
.stream()
.map(PersonDto::age)
.map(Long::valueOf)
.anyMatch(age -> age.compareTo(field.get(NumberUtils.INTEGER_ONE).getLongValue()) == NumberUtils.INTEGER_ZERO);
.anyMatch(age -> age.compareTo(field.get(1).getLongValue()) == 0);
Assertions.assertTrue(ageCondition);
});
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,17 +28,15 @@ import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.integration.base.BaseBigQueryIntegrationTest;
import java.util.Objects;
public abstract class BaseBigQueryItemWriterTest extends BaseBigQueryIntegrationTest {
@BeforeEach
void prepareTest(TestInfo testInfo) {
if (Objects.isNull(bigQuery.getDataset(TestConstants.DATASET))) {
if (bigQuery.getDataset(TestConstants.DATASET) == null) {
bigQuery.create(DatasetInfo.of(TestConstants.DATASET));
}
if (Objects.isNull(bigQuery.getTable(TestConstants.DATASET, getTableName(testInfo)))) {
if (bigQuery.getTable(TestConstants.DATASET, getTableName(testInfo)) == null) {
TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema());
bigQuery.create(TableInfo.of(TableId.of(TestConstants.DATASET, getTableName(testInfo)), tableDefinition));
}
@@ -49,4 +47,4 @@ public abstract class BaseBigQueryItemWriterTest extends BaseBigQueryIntegration
bigQuery.delete(TableId.of(TestConstants.DATASET, getTableName(testInfo)));
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,4 +35,4 @@ public abstract class AbstractBigQueryTest {
return mockedBigQuery;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -50,4 +50,4 @@ class BigQueryBatchQueryItemReaderBuilderTests extends AbstractBigQueryTest {
Assertions.assertNotNull(reader);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -64,4 +64,4 @@ class BigQueryInteractiveQueryItemReaderBuilderTests extends AbstractBigQueryTes
Assertions.assertNotNull(reader);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -96,4 +96,4 @@ class BigQueryCsvItemWriterBuilderTests extends AbstractBigQueryTest {
}
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -97,4 +97,4 @@ class BigQueryJsonItemWriterBuilderTests extends AbstractBigQueryTest {
}
}
}
}