Skip to content

Commit

Permalink
[BEAM-876] Support schemaUpdateOption in BigQueryIO (#9524)
Browse files Browse the repository at this point in the history
* [BEAM-876] Support schemaUpdateOption in BigQueryIO

* Move schemaUpdateOptions into WriteTables constructor

* Add integration test for SchemaUpdateOption in BigQuery

* Fix for funkiness from rebase to master

* Spotless apply
  • Loading branch information
ziel authored and pabloem committed Nov 25, 2019
1 parent e3e52ad commit 5fd93af
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.services.bigquery.model.TableRow;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -36,6 +38,7 @@
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -115,6 +118,7 @@ class BatchLoads<DestinationT, ElementT>
private BigQueryServices bigQueryServices;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private Set<SchemaUpdateOption> schemaUpdateOptions;
private final boolean ignoreUnknownValues;
// Indicates that we are writing to a constant single table. If this is the case, we will create
// the table, even if there is no data in it.
Expand Down Expand Up @@ -166,6 +170,11 @@ class BatchLoads<DestinationT, ElementT>
this.elementCoder = elementCoder;
this.kmsKey = kmsKey;
this.rowWriterFactory = rowWriterFactory;
schemaUpdateOptions = Collections.emptySet();
}

void setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
this.schemaUpdateOptions = schemaUpdateOptions;
}

void setTestServices(BigQueryServices bigQueryServices) {
Expand Down Expand Up @@ -587,7 +596,8 @@ private PCollection<KV<TableDestination, String>> writeTempTables(
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat()));
rowWriterFactory.getSourceFormat(),
schemaUpdateOptions));
}

// In the case where the files fit into a single load job, there's no need to write temporary
Expand Down Expand Up @@ -621,7 +631,8 @@ void writeSinglePartition(
maxRetryJobs,
ignoreUnknownValues,
kmsKey,
rowWriterFactory.getSourceFormat()));
rowWriterFactory.getSourceFormat(),
schemaUpdateOptions));
}

private WriteResult writeResult(Pipeline p) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -1635,6 +1637,7 @@ public static <T> Write<T> write() {
.setBigQueryServices(new BigQueryServicesImpl())
.setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
.setSchemaUpdateOptions(Collections.emptySet())
.setNumFileShards(0)
.setMethod(Write.Method.DEFAULT)
.setExtendedErrorInfo(false)
Expand Down Expand Up @@ -1729,6 +1732,8 @@ public enum Method {
abstract CreateDisposition getCreateDisposition();

abstract WriteDisposition getWriteDisposition();

abstract Set<SchemaUpdateOption> getSchemaUpdateOptions();
/** Table description. Default is empty. */
@Nullable
abstract String getTableDescription();
Expand Down Expand Up @@ -1807,6 +1812,8 @@ abstract Builder<T> setAvroSchemaFactory(

abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);

abstract Builder<T> setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions);

abstract Builder<T> setTableDescription(String tableDescription);

abstract Builder<T> setValidate(boolean validate);
Expand Down Expand Up @@ -1910,6 +1917,25 @@ public enum WriteDisposition {
WRITE_EMPTY
}

/**
* An enumeration type for the BigQuery schema update options strings.
*
* <p>Note from the BigQuery API doc -- Schema update options are supported in two cases: when
* writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination
* table is a partition of a table, specified by partition decorators.
*
* @see <a
* href="https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery">
* <code>configuration.query.schemaUpdateOptions</code> in the BigQuery Jobs API</a>
*/
public enum SchemaUpdateOption {
/** Allow adding a nullable field to the schema. */
ALLOW_FIELD_ADDITION,

/** Allow relaxing a required field in the original schema to nullable. */
ALLOW_FIELD_RELAXATION
}

/**
* Writes to the given table, specified in the format described in {@link
* BigQueryHelpers#parseTableSpec}.
Expand Down Expand Up @@ -2098,6 +2124,12 @@ public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
return toBuilder().setWriteDisposition(writeDisposition).build();
}

/** Allows the schema of the destination table to be updated as a side effect of the write. */
public Write<T> withSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
checkArgument(schemaUpdateOptions != null, "schemaUpdateOptions can not be null");
return toBuilder().setSchemaUpdateOptions(schemaUpdateOptions).build();
}

/** Specifies the table description. */
public Write<T> withTableDescription(String tableDescription) {
checkArgument(tableDescription != null, "tableDescription can not be null");
Expand Down Expand Up @@ -2589,6 +2621,9 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
rowWriterFactory,
getKmsKey());
batchLoads.setTestServices(getBigQueryServices());
if (getSchemaUpdateOptions() != null) {
batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions());
}
if (getMaxFilesPerBundle() != null) {
batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
}
Expand Down Expand Up @@ -2634,6 +2669,9 @@ public void populateDisplayData(DisplayData.Builder builder) {
.add(
DisplayData.item("writeDisposition", getWriteDisposition().toString())
.withLabel("Table WriteDisposition"))
.add(
DisplayData.item("schemaUpdateOptions", getSchemaUpdateOptions().toString())
.withLabel("Table SchemaUpdateOptions"))
.addIfNotDefault(
DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true)
.addIfNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -38,6 +40,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJob;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJobManager;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
Expand Down Expand Up @@ -90,6 +93,7 @@ class WriteTables<DestinationT>
private final PCollectionView<String> loadJobIdPrefixView;
private final WriteDisposition firstPaneWriteDisposition;
private final CreateDisposition firstPaneCreateDisposition;
private final Set<SchemaUpdateOption> schemaUpdateOptions;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<KV<TableDestination, String>> mainOutputTag;
Expand Down Expand Up @@ -219,7 +223,8 @@ public void processElement(ProcessContext c, BoundedWindow window) throws Except
tableSchema,
partitionFiles,
writeDisposition,
createDisposition);
createDisposition,
schemaUpdateOptions);
pendingJobs.add(
new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference));
}
Expand Down Expand Up @@ -288,7 +293,9 @@ public WriteTables(
int maxRetryJobs,
boolean ignoreUnknownValues,
String kmsKey,
String sourceFormat) {
String sourceFormat,
Set<SchemaUpdateOption> schemaUpdateOptions) {

this.tempTable = tempTable;
this.bqServices = bqServices;
this.loadJobIdPrefixView = loadJobIdPrefixView;
Expand All @@ -303,6 +310,7 @@ public WriteTables(
this.ignoreUnknownValues = ignoreUnknownValues;
this.kmsKey = kmsKey;
this.sourceFormat = sourceFormat;
this.schemaUpdateOptions = schemaUpdateOptions;
}

@Override
Expand Down Expand Up @@ -346,7 +354,8 @@ private PendingJob startLoad(
@Nullable TableSchema schema,
List<String> gcsUris,
WriteDisposition writeDisposition,
CreateDisposition createDisposition) {
CreateDisposition createDisposition,
Set<SchemaUpdateOption> schemaUpdateOptions) {
JobConfigurationLoad loadConfig =
new JobConfigurationLoad()
.setDestinationTable(ref)
Expand All @@ -356,6 +365,11 @@ private PendingJob startLoad(
.setCreateDisposition(createDisposition.name())
.setSourceFormat(sourceFormat)
.setIgnoreUnknownValues(ignoreUnknownValues);
if (schemaUpdateOptions != null) {
List<String> options =
schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());
loadConfig.setSchemaUpdateOptions(options);
}
if (timePartitioning != null) {
loadConfig.setTimePartitioning(timePartitioning);
// only set clustering if timePartitioning is set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
Expand Down Expand Up @@ -264,6 +266,12 @@ public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query,
throw new UnsupportedOperationException();
}

public Collection<Job> getAllJobs() {
synchronized (allJobs) {
return allJobs.values().stream().map(j -> j.job).collect(Collectors.toList());
}
}

@Override
public Job getJob(JobReference jobRef) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableFieldSchema;
Expand All @@ -54,11 +56,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AtomicCoder;
Expand All @@ -68,6 +73,7 @@
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
Expand Down Expand Up @@ -1179,6 +1185,8 @@ public void testBuildWriteDisplayData() {
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(
EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.withTableDescription(tblDescription)
.withoutValidation();

Expand All @@ -1194,6 +1202,11 @@ public void testBuildWriteDisplayData() {
displayData,
hasDisplayItem(
"writeDisposition", BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString()));
assertThat(
displayData,
hasDisplayItem(
"schemaUpdateOptions",
EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION).toString()));
assertThat(displayData, hasDisplayItem("tableDescription", tblDescription));
assertThat(displayData, hasDisplayItem("validation", false));
}
Expand Down Expand Up @@ -1571,7 +1584,8 @@ public void testWriteTables() throws Exception {
4,
false,
null,
"NEWLINE_DELIMITED_JSON");
"NEWLINE_DELIMITED_JSON",
Collections.emptySet());

PCollection<KV<TableDestination, String>> writeTablesOutput =
writeTablesInput.apply(writeTables);
Expand Down Expand Up @@ -1854,4 +1868,59 @@ public void testWrongErrorConfigs() {
+ "uses extended errors information. Use getFailedInsertsWithErr instead"));
}
}

void schemaUpdateOptionsTest(
BigQueryIO.Write.Method insertMethod, Set<SchemaUpdateOption> schemaUpdateOptions)
throws Exception {
TableRow row = new TableRow().set("date", "2019-01-01").set("number", "1");

TableSchema schema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("date")
.setType("DATE")
.setName("number")
.setType("INTEGER")));

Write<TableRow> writeTransform =
BigQueryIO.writeTableRows()
.to("project-id:dataset-id.table-id")
.withTestServices(fakeBqServices)
.withMethod(insertMethod)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(schemaUpdateOptions);

p.apply(Create.<TableRow>of(row)).apply(writeTransform);
p.run();

List<String> expectedOptions =
schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList());

for (Job job : fakeJobService.getAllJobs()) {
JobConfigurationLoad configuration = job.getConfiguration().getLoad();
assertEquals(expectedOptions, configuration.getSchemaUpdateOptions());
}
}

@Test
public void testWriteFileSchemaUpdateOptionAllowFieldAddition() throws Exception {
Set<SchemaUpdateOption> options = EnumSet.of(SchemaUpdateOption.ALLOW_FIELD_ADDITION);
schemaUpdateOptionsTest(BigQueryIO.Write.Method.FILE_LOADS, options);
}

@Test
public void testWriteFileSchemaUpdateOptionAllowFieldRelaxation() throws Exception {
Set<SchemaUpdateOption> options = EnumSet.of(SchemaUpdateOption.ALLOW_FIELD_RELAXATION);
schemaUpdateOptionsTest(BigQueryIO.Write.Method.FILE_LOADS, options);
}

@Test
public void testWriteFileSchemaUpdateOptionAll() throws Exception {
Set<SchemaUpdateOption> options = EnumSet.allOf(SchemaUpdateOption.class);
schemaUpdateOptionsTest(BigQueryIO.Write.Method.FILE_LOADS, options);
}
}
Loading

0 comments on commit 5fd93af

Please sign in to comment.